1use std::any::Any;
2use std::fmt::Debug;
3
4use alien_error::{AlienError, Context, IntoAlienError};
5use async_trait::async_trait;
6
7use crate::{error::Result, types::Envelope};
8
9#[async_trait]
11pub trait CommandDispatcher: Send + Sync + Debug {
12 async fn dispatch(&self, envelope: &Envelope) -> Result<()>;
14
15 fn as_any(&self) -> &dyn Any;
17}
18
19#[derive(Debug)]
21pub struct NullCommandDispatcher;
22
23#[async_trait]
24impl CommandDispatcher for NullCommandDispatcher {
25 async fn dispatch(&self, envelope: &Envelope) -> Result<()> {
26 tracing::debug!(
27 command_id = %envelope.command_id,
28 command = %envelope.command,
29 "NullCommandDispatcher: no-op dispatch"
30 );
31 Ok(())
32 }
33
34 fn as_any(&self) -> &dyn Any {
35 self
36 }
37}
38
39#[cfg(feature = "server")]
40mod platform_dispatchers {
41 use super::*;
42 use alien_aws_clients::aws::{
43 lambda::{InvocationType, InvokeRequest, LambdaApi, LambdaClient},
44 AwsClientConfig,
45 };
46 use alien_azure_clients::azure::{
47 service_bus::{
48 AzureServiceBusDataPlaneClient, SendMessageParameters, ServiceBusDataPlaneApi,
49 },
50 AzureClientConfig,
51 };
52 use alien_gcp_clients::gcp::{
53 pubsub::{PubSubApi, PubSubClient, PublishRequest, PubsubMessage},
54 GcpClientConfig,
55 };
56 use base64::prelude::*;
57 use reqwest::Client;
58 use std::collections::HashMap;
59
60 #[derive(Debug)]
62 pub struct LambdaCommandDispatcher {
63 lambda_client: LambdaClient,
64 }
65
66 impl LambdaCommandDispatcher {
67 pub fn new(client: Client, config: AwsClientConfig) -> Self {
68 Self {
69 lambda_client: LambdaClient::new(client, config),
70 }
71 }
72 }
73
74 #[async_trait]
75 impl CommandDispatcher for LambdaCommandDispatcher {
76 async fn dispatch(&self, envelope: &Envelope) -> Result<()> {
77 let payload = serde_json::to_vec(envelope).into_alien_error().context(
79 crate::ErrorData::TransportDispatchFailed {
80 message: "Failed to serialize ARC envelope".to_string(),
81 transport_type: Some("lambda".to_string()),
82 target: Some(envelope.command_id.clone()),
83 },
84 )?;
85
86 let function_name = envelope.command_id.clone();
90
91 let invoke_request = InvokeRequest::builder()
94 .function_name(function_name.clone())
95 .invocation_type(InvocationType::Event) .payload(payload)
97 .build();
98
99 self.lambda_client.invoke(invoke_request).await.context(
100 crate::ErrorData::TransportDispatchFailed {
101 message: format!("Failed to invoke Lambda function {}", function_name),
102 transport_type: Some("lambda".to_string()),
103 target: Some(envelope.command_id.clone()),
104 },
105 )?;
106
107 tracing::debug!(
108 command_id = %envelope.command_id,
109 command = %envelope.command,
110 function_name = %function_name,
111 "Successfully dispatched ARC envelope to Lambda function"
112 );
113
114 Ok(())
115 }
116
117 fn as_any(&self) -> &dyn Any {
118 self
119 }
120 }
121
122 #[derive(Debug)]
124 pub struct PubSubCommandDispatcher {
125 pubsub_client: PubSubClient,
126 #[allow(dead_code)]
127 project_id: String,
128 }
129
130 impl PubSubCommandDispatcher {
131 pub fn new(client: Client, config: GcpClientConfig) -> Self {
132 let project_id = config.project_id.clone();
133 Self {
134 pubsub_client: PubSubClient::new(client, config),
135 project_id,
136 }
137 }
138 }
139
140 #[async_trait]
141 impl CommandDispatcher for PubSubCommandDispatcher {
142 async fn dispatch(&self, envelope: &Envelope) -> Result<()> {
143 let envelope_json = serde_json::to_string(envelope).into_alien_error().context(
145 crate::ErrorData::TransportDispatchFailed {
146 message: "Failed to serialize ARC envelope".to_string(),
147 transport_type: Some("pubsub".to_string()),
148 target: Some(envelope.command_id.clone()),
149 },
150 )?;
151
152 let data = BASE64_STANDARD.encode(envelope_json.as_bytes());
154
155 let topic_id = envelope.command_id.clone();
158
159 let mut attributes = HashMap::new();
161 attributes.insert("arc-protocol".to_string(), envelope.protocol.clone());
162 attributes.insert("arc-command-id".to_string(), envelope.command_id.clone());
163 attributes.insert("arc-command".to_string(), envelope.command.clone());
164
165 let message = PubsubMessage::builder()
166 .data(data)
167 .attributes(attributes)
168 .build();
169
170 let publish_request = PublishRequest::builder().messages(vec![message]).build();
171
172 self.pubsub_client
173 .publish(topic_id.clone(), publish_request)
174 .await
175 .context(crate::ErrorData::TransportDispatchFailed {
176 message: format!("Failed to publish to Pub/Sub topic {}", topic_id),
177 transport_type: Some("pubsub".to_string()),
178 target: Some(envelope.command_id.clone()),
179 })?;
180
181 tracing::debug!(
182 command_id = %envelope.command_id,
183 command = %envelope.command,
184 topic_id = %topic_id,
185 "Successfully dispatched ARC envelope to Pub/Sub topic"
186 );
187
188 Ok(())
189 }
190
191 fn as_any(&self) -> &dyn Any {
192 self
193 }
194 }
195
196 #[derive(Debug)]
198 pub struct ServiceBusCommandDispatcher {
199 servicebus_client: AzureServiceBusDataPlaneClient,
200 }
201
202 impl ServiceBusCommandDispatcher {
203 pub fn new(client: Client, config: AzureClientConfig) -> Self {
204 Self {
205 servicebus_client: AzureServiceBusDataPlaneClient::new(client, config),
206 }
207 }
208 }
209
210 #[async_trait]
211 impl CommandDispatcher for ServiceBusCommandDispatcher {
212 async fn dispatch(&self, envelope: &Envelope) -> Result<()> {
213 let envelope_json = serde_json::to_string(envelope).into_alien_error().context(
215 crate::ErrorData::TransportDispatchFailed {
216 message: "Failed to serialize ARC envelope".to_string(),
217 transport_type: Some("servicebus".to_string()),
218 target: Some(envelope.command_id.clone()),
219 },
220 )?;
221
222 let command_id = &envelope.command_id;
225 let (namespace_name, queue_name) = if command_id.contains('/') {
226 let parts: Vec<&str> = command_id.splitn(2, '/').collect();
227 (parts[0].to_string(), parts[1].to_string())
228 } else {
229 return Err(AlienError::new(crate::ErrorData::TransportDispatchFailed {
230 message: format!(
231 "Service Bus target must include namespace: expected 'namespace/queue', got '{}'",
232 command_id
233 ),
234 transport_type: Some("servicebus".to_string()),
235 target: Some(envelope.command_id.clone()),
236 }));
237 };
238
239 let mut custom_properties = HashMap::new();
241 custom_properties.insert("arc-protocol".to_string(), envelope.protocol.clone());
242 custom_properties.insert("arc-command-id".to_string(), envelope.command_id.clone());
243 custom_properties.insert("arc-command".to_string(), envelope.command.clone());
244
245 let message = SendMessageParameters {
246 body: envelope_json,
247 broker_properties: None,
248 custom_properties,
249 };
250
251 self.servicebus_client
252 .send_message(namespace_name.clone(), queue_name.clone(), message)
253 .await
254 .context(crate::ErrorData::TransportDispatchFailed {
255 message: format!(
256 "Failed to send message to Service Bus queue {}/{}",
257 namespace_name, queue_name
258 ),
259 transport_type: Some("servicebus".to_string()),
260 target: Some(envelope.command_id.clone()),
261 })?;
262
263 tracing::debug!(
264 command_id = %envelope.command_id,
265 command = %envelope.command,
266 namespace = %namespace_name,
267 queue = %queue_name,
268 "Successfully dispatched ARC envelope to Service Bus queue"
269 );
270
271 Ok(())
272 }
273
274 fn as_any(&self) -> &dyn Any {
275 self
276 }
277 }
278}
279
280#[cfg(feature = "server")]
281pub use platform_dispatchers::*;