1use std::any::Any;
2use std::fmt::Debug;
3
4use alien_error::{Context, IntoAlienError};
5use async_trait::async_trait;
6
7use crate::{error::Result, types::Envelope};
8
9#[async_trait]
11pub trait CommandDispatcher: Send + Sync + Debug {
12 async fn dispatch(&self, envelope: &Envelope) -> Result<()>;
14
15 fn as_any(&self) -> &dyn Any;
17}
18
19#[derive(Debug)]
21pub struct NullCommandDispatcher;
22
23#[async_trait]
24impl CommandDispatcher for NullCommandDispatcher {
25 async fn dispatch(&self, envelope: &Envelope) -> Result<()> {
26 tracing::debug!(
27 command_id = %envelope.command_id,
28 command = %envelope.command,
29 "NullCommandDispatcher: no-op dispatch"
30 );
31 Ok(())
32 }
33
34 fn as_any(&self) -> &dyn Any {
35 self
36 }
37}
38
39#[cfg(any(feature = "server", feature = "dispatchers"))]
40mod platform_dispatchers {
41 use super::*;
42 use alien_aws_clients::aws::{
43 lambda::{InvocationType, InvokeRequest, LambdaApi, LambdaClient},
44 AwsClientConfig,
45 };
46 use alien_aws_clients::AwsCredentialProvider;
47 use alien_azure_clients::azure::{
48 service_bus::{
49 AzureServiceBusDataPlaneClient, SendMessageParameters, ServiceBusDataPlaneApi,
50 },
51 token_cache::AzureTokenCache,
52 AzureClientConfig,
53 };
54 use alien_gcp_clients::gcp::{
55 pubsub::{PubSubApi, PubSubClient, PublishRequest, PubsubMessage},
56 GcpClientConfig,
57 };
58 use base64::prelude::*;
59 use reqwest::Client;
60 use std::collections::HashMap;
61
62 #[derive(Debug)]
64 pub struct LambdaCommandDispatcher {
65 lambda_client: LambdaClient,
66 function_name: String,
67 }
68
69 impl LambdaCommandDispatcher {
70 pub async fn new(
71 client: Client,
72 config: AwsClientConfig,
73 function_name: String,
74 ) -> Result<Self> {
75 let credentials = AwsCredentialProvider::from_config(config)
76 .await
77 .into_alien_error()
78 .context(crate::ErrorData::TransportDispatchFailed {
79 message: "Failed to create AWS credential provider".to_string(),
80 transport_type: Some("lambda".to_string()),
81 target: None,
82 })?;
83 Ok(Self {
84 lambda_client: LambdaClient::new(client, credentials),
85 function_name,
86 })
87 }
88 }
89
90 #[async_trait]
91 impl CommandDispatcher for LambdaCommandDispatcher {
92 async fn dispatch(&self, envelope: &Envelope) -> Result<()> {
93 let payload = serde_json::to_vec(envelope).into_alien_error().context(
95 crate::ErrorData::TransportDispatchFailed {
96 message: "Failed to serialize command envelope".to_string(),
97 transport_type: Some("lambda".to_string()),
98 target: Some(envelope.command_id.clone()),
99 },
100 )?;
101
102 let function_name = self.function_name.clone();
103
104 let invoke_request = InvokeRequest::builder()
107 .function_name(function_name.clone())
108 .invocation_type(InvocationType::Event) .payload(payload)
110 .build();
111
112 self.lambda_client.invoke(invoke_request).await.context(
113 crate::ErrorData::TransportDispatchFailed {
114 message: format!("Failed to invoke Lambda function {}", function_name),
115 transport_type: Some("lambda".to_string()),
116 target: Some(envelope.command_id.clone()),
117 },
118 )?;
119
120 tracing::debug!(
121 command_id = %envelope.command_id,
122 command = %envelope.command,
123 function_name = %function_name,
124 "Successfully dispatched command envelope to Lambda function"
125 );
126
127 Ok(())
128 }
129
130 fn as_any(&self) -> &dyn Any {
131 self
132 }
133 }
134
135 #[derive(Debug)]
137 pub struct PubSubCommandDispatcher {
138 pubsub_client: PubSubClient,
139 #[allow(dead_code)]
140 project_id: String,
141 topic_id: String,
142 }
143
144 impl PubSubCommandDispatcher {
145 pub fn new(client: Client, config: GcpClientConfig, topic_id: String) -> Self {
146 let project_id = config.project_id.clone();
147 Self {
148 pubsub_client: PubSubClient::new(client, config),
149 project_id,
150 topic_id,
151 }
152 }
153 }
154
155 #[async_trait]
156 impl CommandDispatcher for PubSubCommandDispatcher {
157 async fn dispatch(&self, envelope: &Envelope) -> Result<()> {
158 let envelope_json = serde_json::to_string(envelope).into_alien_error().context(
160 crate::ErrorData::TransportDispatchFailed {
161 message: "Failed to serialize command envelope".to_string(),
162 transport_type: Some("pubsub".to_string()),
163 target: Some(envelope.command_id.clone()),
164 },
165 )?;
166
167 let data = BASE64_STANDARD.encode(envelope_json.as_bytes());
169
170 let topic_id = self.topic_id.clone();
171
172 let mut attributes = HashMap::new();
174 attributes.insert("cmd-protocol".to_string(), envelope.protocol.clone());
175 attributes.insert("cmd-command-id".to_string(), envelope.command_id.clone());
176 attributes.insert("cmd-command".to_string(), envelope.command.clone());
177
178 let message = PubsubMessage::builder()
179 .data(data)
180 .attributes(attributes)
181 .build();
182
183 let publish_request = PublishRequest::builder().messages(vec![message]).build();
184
185 self.pubsub_client
186 .publish(topic_id.clone(), publish_request)
187 .await
188 .context(crate::ErrorData::TransportDispatchFailed {
189 message: format!("Failed to publish to Pub/Sub topic {}", topic_id),
190 transport_type: Some("pubsub".to_string()),
191 target: Some(envelope.command_id.clone()),
192 })?;
193
194 tracing::debug!(
195 command_id = %envelope.command_id,
196 command = %envelope.command,
197 topic_id = %topic_id,
198 "Successfully dispatched command envelope to Pub/Sub topic"
199 );
200
201 Ok(())
202 }
203
204 fn as_any(&self) -> &dyn Any {
205 self
206 }
207 }
208
209 #[derive(Debug)]
211 pub struct ServiceBusCommandDispatcher {
212 servicebus_client: AzureServiceBusDataPlaneClient,
213 namespace_name: String,
214 queue_name: String,
215 }
216
217 impl ServiceBusCommandDispatcher {
218 pub fn new(
219 client: Client,
220 config: AzureClientConfig,
221 namespace_name: String,
222 queue_name: String,
223 ) -> Self {
224 Self {
225 servicebus_client: AzureServiceBusDataPlaneClient::new(
226 client,
227 AzureTokenCache::new(config),
228 ),
229 namespace_name,
230 queue_name,
231 }
232 }
233 }
234
235 #[async_trait]
236 impl CommandDispatcher for ServiceBusCommandDispatcher {
237 async fn dispatch(&self, envelope: &Envelope) -> Result<()> {
238 let envelope_json = serde_json::to_string(envelope).into_alien_error().context(
240 crate::ErrorData::TransportDispatchFailed {
241 message: "Failed to serialize command envelope".to_string(),
242 transport_type: Some("servicebus".to_string()),
243 target: Some(envelope.command_id.clone()),
244 },
245 )?;
246
247 let namespace_name = self.namespace_name.clone();
248 let queue_name = self.queue_name.clone();
249
250 let mut custom_properties = HashMap::new();
252 custom_properties.insert("cmd-protocol".to_string(), envelope.protocol.clone());
253 custom_properties.insert("cmd-command-id".to_string(), envelope.command_id.clone());
254 custom_properties.insert("cmd-command".to_string(), envelope.command.clone());
255
256 let message = SendMessageParameters {
257 body: envelope_json,
258 broker_properties: None,
259 custom_properties,
260 };
261
262 self.servicebus_client
263 .send_message(namespace_name.clone(), queue_name.clone(), message)
264 .await
265 .context(crate::ErrorData::TransportDispatchFailed {
266 message: format!(
267 "Failed to send message to Service Bus queue {}/{}",
268 namespace_name, queue_name
269 ),
270 transport_type: Some("servicebus".to_string()),
271 target: Some(envelope.command_id.clone()),
272 })?;
273
274 tracing::debug!(
275 command_id = %envelope.command_id,
276 command = %envelope.command,
277 namespace = %namespace_name,
278 queue = %queue_name,
279 "Successfully dispatched command envelope to Service Bus queue"
280 );
281
282 Ok(())
283 }
284
285 fn as_any(&self) -> &dyn Any {
286 self
287 }
288 }
289}
290
291#[cfg(any(feature = "server", feature = "dispatchers"))]
292pub use platform_dispatchers::*;