Skip to main content

alien_commands/
dispatchers.rs

1use std::any::Any;
2use std::fmt::Debug;
3
4use alien_error::{Context, IntoAlienError};
5use async_trait::async_trait;
6
7use crate::{error::Result, types::Envelope};
8
9/// Trait for dispatching command envelopes to agents via platform-specific transport
10#[async_trait]
11pub trait CommandDispatcher: Send + Sync + Debug {
12    /// Dispatch an envelope to the target agent
13    async fn dispatch(&self, envelope: &Envelope) -> Result<()>;
14
15    /// Helper method for downcasting to concrete types in tests
16    fn as_any(&self) -> &dyn Any;
17}
18
19/// No-op command dispatcher that succeeds without doing anything
20#[derive(Debug)]
21pub struct NullCommandDispatcher;
22
23#[async_trait]
24impl CommandDispatcher for NullCommandDispatcher {
25    async fn dispatch(&self, envelope: &Envelope) -> Result<()> {
26        tracing::debug!(
27            command_id = %envelope.command_id,
28            command = %envelope.command,
29            "NullCommandDispatcher: no-op dispatch"
30        );
31        Ok(())
32    }
33
34    fn as_any(&self) -> &dyn Any {
35        self
36    }
37}
38
39#[cfg(any(feature = "server", feature = "dispatchers"))]
40mod platform_dispatchers {
41    use super::*;
42    use alien_aws_clients::aws::{
43        lambda::{InvocationType, InvokeRequest, LambdaApi, LambdaClient},
44        AwsClientConfig,
45    };
46    use alien_aws_clients::AwsCredentialProvider;
47    use alien_azure_clients::azure::{
48        service_bus::{
49            AzureServiceBusDataPlaneClient, SendMessageParameters, ServiceBusDataPlaneApi,
50        },
51        token_cache::AzureTokenCache,
52        AzureClientConfig,
53    };
54    use alien_gcp_clients::gcp::{
55        pubsub::{PubSubApi, PubSubClient, PublishRequest, PubsubMessage},
56        GcpClientConfig,
57    };
58    use base64::prelude::*;
59    use reqwest::Client;
60    use std::collections::HashMap;
61
62    /// AWS Lambda command dispatcher using InvokeFunction API
63    #[derive(Debug)]
64    pub struct LambdaCommandDispatcher {
65        lambda_client: LambdaClient,
66        function_name: String,
67    }
68
69    impl LambdaCommandDispatcher {
70        pub async fn new(
71            client: Client,
72            config: AwsClientConfig,
73            function_name: String,
74        ) -> Result<Self> {
75            let credentials = AwsCredentialProvider::from_config(config)
76                .await
77                .into_alien_error()
78                .context(crate::ErrorData::TransportDispatchFailed {
79                    message: "Failed to create AWS credential provider".to_string(),
80                    transport_type: Some("lambda".to_string()),
81                    target: None,
82                })?;
83            Ok(Self {
84                lambda_client: LambdaClient::new(client, credentials),
85                function_name,
86            })
87        }
88    }
89
90    #[async_trait]
91    impl CommandDispatcher for LambdaCommandDispatcher {
92        async fn dispatch(&self, envelope: &Envelope) -> Result<()> {
93            // Serialize the command envelope as JSON payload
94            let payload = serde_json::to_vec(envelope).into_alien_error().context(
95                crate::ErrorData::TransportDispatchFailed {
96                    message: "Failed to serialize command envelope".to_string(),
97                    transport_type: Some("lambda".to_string()),
98                    target: Some(envelope.command_id.clone()),
99                },
100            )?;
101
102            let function_name = self.function_name.clone();
103
104            // Use async invocation to send the envelope to the Lambda function
105            // The Lambda function should have alien-runtime configured to handle command envelopes
106            let invoke_request = InvokeRequest::builder()
107                .function_name(function_name.clone())
108                .invocation_type(InvocationType::Event) // Async invocation
109                .payload(payload)
110                .build();
111
112            self.lambda_client.invoke(invoke_request).await.context(
113                crate::ErrorData::TransportDispatchFailed {
114                    message: format!("Failed to invoke Lambda function {}", function_name),
115                    transport_type: Some("lambda".to_string()),
116                    target: Some(envelope.command_id.clone()),
117                },
118            )?;
119
120            tracing::debug!(
121                command_id = %envelope.command_id,
122                command = %envelope.command,
123                function_name = %function_name,
124                "Successfully dispatched command envelope to Lambda function"
125            );
126
127            Ok(())
128        }
129
130        fn as_any(&self) -> &dyn Any {
131            self
132        }
133    }
134
135    /// GCP Pub/Sub command dispatcher
136    #[derive(Debug)]
137    pub struct PubSubCommandDispatcher {
138        pubsub_client: PubSubClient,
139        #[allow(dead_code)]
140        project_id: String,
141        topic_id: String,
142    }
143
144    impl PubSubCommandDispatcher {
145        pub fn new(client: Client, config: GcpClientConfig, topic_id: String) -> Self {
146            let project_id = config.project_id.clone();
147            Self {
148                pubsub_client: PubSubClient::new(client, config),
149                project_id,
150                topic_id,
151            }
152        }
153    }
154
155    #[async_trait]
156    impl CommandDispatcher for PubSubCommandDispatcher {
157        async fn dispatch(&self, envelope: &Envelope) -> Result<()> {
158            // Serialize the command envelope as JSON
159            let envelope_json = serde_json::to_string(envelope).into_alien_error().context(
160                crate::ErrorData::TransportDispatchFailed {
161                    message: "Failed to serialize command envelope".to_string(),
162                    transport_type: Some("pubsub".to_string()),
163                    target: Some(envelope.command_id.clone()),
164                },
165            )?;
166
167            // Base64 encode the JSON payload as required by Pub/Sub
168            let data = BASE64_STANDARD.encode(envelope_json.as_bytes());
169
170            let topic_id = self.topic_id.clone();
171
172            // Create the Pub/Sub message with command envelope metadata
173            let mut attributes = HashMap::new();
174            attributes.insert("cmd-protocol".to_string(), envelope.protocol.clone());
175            attributes.insert("cmd-command-id".to_string(), envelope.command_id.clone());
176            attributes.insert("cmd-command".to_string(), envelope.command.clone());
177
178            let message = PubsubMessage::builder()
179                .data(data)
180                .attributes(attributes)
181                .build();
182
183            let publish_request = PublishRequest::builder().messages(vec![message]).build();
184
185            self.pubsub_client
186                .publish(topic_id.clone(), publish_request)
187                .await
188                .context(crate::ErrorData::TransportDispatchFailed {
189                    message: format!("Failed to publish to Pub/Sub topic {}", topic_id),
190                    transport_type: Some("pubsub".to_string()),
191                    target: Some(envelope.command_id.clone()),
192                })?;
193
194            tracing::debug!(
195                command_id = %envelope.command_id,
196                command = %envelope.command,
197                topic_id = %topic_id,
198                "Successfully dispatched command envelope to Pub/Sub topic"
199            );
200
201            Ok(())
202        }
203
204        fn as_any(&self) -> &dyn Any {
205            self
206        }
207    }
208
209    /// Azure Service Bus command dispatcher
210    #[derive(Debug)]
211    pub struct ServiceBusCommandDispatcher {
212        servicebus_client: AzureServiceBusDataPlaneClient,
213        namespace_name: String,
214        queue_name: String,
215    }
216
217    impl ServiceBusCommandDispatcher {
218        pub fn new(
219            client: Client,
220            config: AzureClientConfig,
221            namespace_name: String,
222            queue_name: String,
223        ) -> Self {
224            Self {
225                servicebus_client: AzureServiceBusDataPlaneClient::new(
226                    client,
227                    AzureTokenCache::new(config),
228                ),
229                namespace_name,
230                queue_name,
231            }
232        }
233    }
234
235    #[async_trait]
236    impl CommandDispatcher for ServiceBusCommandDispatcher {
237        async fn dispatch(&self, envelope: &Envelope) -> Result<()> {
238            // Serialize the command envelope as JSON
239            let envelope_json = serde_json::to_string(envelope).into_alien_error().context(
240                crate::ErrorData::TransportDispatchFailed {
241                    message: "Failed to serialize command envelope".to_string(),
242                    transport_type: Some("servicebus".to_string()),
243                    target: Some(envelope.command_id.clone()),
244                },
245            )?;
246
247            let namespace_name = self.namespace_name.clone();
248            let queue_name = self.queue_name.clone();
249
250            // Create custom properties for command metadata
251            let mut custom_properties = HashMap::new();
252            custom_properties.insert("cmd-protocol".to_string(), envelope.protocol.clone());
253            custom_properties.insert("cmd-command-id".to_string(), envelope.command_id.clone());
254            custom_properties.insert("cmd-command".to_string(), envelope.command.clone());
255
256            let message = SendMessageParameters {
257                body: envelope_json,
258                broker_properties: None,
259                custom_properties,
260            };
261
262            self.servicebus_client
263                .send_message(namespace_name.clone(), queue_name.clone(), message)
264                .await
265                .context(crate::ErrorData::TransportDispatchFailed {
266                    message: format!(
267                        "Failed to send message to Service Bus queue {}/{}",
268                        namespace_name, queue_name
269                    ),
270                    transport_type: Some("servicebus".to_string()),
271                    target: Some(envelope.command_id.clone()),
272                })?;
273
274            tracing::debug!(
275                command_id = %envelope.command_id,
276                command = %envelope.command,
277                namespace = %namespace_name,
278                queue = %queue_name,
279                "Successfully dispatched command envelope to Service Bus queue"
280            );
281
282            Ok(())
283        }
284
285        fn as_any(&self) -> &dyn Any {
286            self
287        }
288    }
289}
290
291#[cfg(any(feature = "server", feature = "dispatchers"))]
292pub use platform_dispatchers::*;