Skip to main content

alien_commands/server/
dispatchers.rs

1use std::any::Any;
2use std::fmt::Debug;
3
4use alien_error::{AlienError, Context, IntoAlienError};
5use async_trait::async_trait;
6
7use crate::{error::Result, types::Envelope};
8
9/// Trait for dispatching ARC envelopes to agents via platform-specific transport
10#[async_trait]
11pub trait CommandDispatcher: Send + Sync + Debug {
12    /// Dispatch an envelope to the target agent
13    async fn dispatch(&self, envelope: &Envelope) -> Result<()>;
14
15    /// Helper method for downcasting to concrete types in tests
16    fn as_any(&self) -> &dyn Any;
17}
18
19/// No-op command dispatcher that succeeds without doing anything
20#[derive(Debug)]
21pub struct NullCommandDispatcher;
22
23#[async_trait]
24impl CommandDispatcher for NullCommandDispatcher {
25    async fn dispatch(&self, envelope: &Envelope) -> Result<()> {
26        tracing::debug!(
27            command_id = %envelope.command_id,
28            command = %envelope.command,
29            "NullCommandDispatcher: no-op dispatch"
30        );
31        Ok(())
32    }
33
34    fn as_any(&self) -> &dyn Any {
35        self
36    }
37}
38
39#[cfg(feature = "server")]
40mod platform_dispatchers {
41    use super::*;
42    use alien_aws_clients::aws::{
43        lambda::{InvocationType, InvokeRequest, LambdaApi, LambdaClient},
44        AwsClientConfig,
45    };
46    use alien_azure_clients::azure::{
47        service_bus::{
48            AzureServiceBusDataPlaneClient, SendMessageParameters, ServiceBusDataPlaneApi,
49        },
50        AzureClientConfig,
51    };
52    use alien_gcp_clients::gcp::{
53        pubsub::{PubSubApi, PubSubClient, PublishRequest, PubsubMessage},
54        GcpClientConfig,
55    };
56    use base64::prelude::*;
57    use reqwest::Client;
58    use std::collections::HashMap;
59
60    /// AWS Lambda command dispatcher using InvokeFunction API
61    #[derive(Debug)]
62    pub struct LambdaCommandDispatcher {
63        lambda_client: LambdaClient,
64    }
65
66    impl LambdaCommandDispatcher {
67        pub fn new(client: Client, config: AwsClientConfig) -> Self {
68            Self {
69                lambda_client: LambdaClient::new(client, config),
70            }
71        }
72    }
73
74    #[async_trait]
75    impl CommandDispatcher for LambdaCommandDispatcher {
76        async fn dispatch(&self, envelope: &Envelope) -> Result<()> {
77            // Serialize the ARC envelope as JSON payload
78            let payload = serde_json::to_vec(envelope).into_alien_error().context(
79                crate::ErrorData::TransportDispatchFailed {
80                    message: "Failed to serialize ARC envelope".to_string(),
81                    transport_type: Some("lambda".to_string()),
82                    target: Some(envelope.command_id.clone()),
83                },
84            )?;
85
86            // The function name should be provided via configuration or extracted from context
87            // For now, we use the command_id as a placeholder - in practice this would come from
88            // agent configuration
89            let function_name = envelope.command_id.clone();
90
91            // Use async invocation to send the envelope to the Lambda function
92            // The Lambda function should have alien-runtime configured to handle ARC envelopes
93            let invoke_request = InvokeRequest::builder()
94                .function_name(function_name.clone())
95                .invocation_type(InvocationType::Event) // Async invocation
96                .payload(payload)
97                .build();
98
99            self.lambda_client.invoke(invoke_request).await.context(
100                crate::ErrorData::TransportDispatchFailed {
101                    message: format!("Failed to invoke Lambda function {}", function_name),
102                    transport_type: Some("lambda".to_string()),
103                    target: Some(envelope.command_id.clone()),
104                },
105            )?;
106
107            tracing::debug!(
108                command_id = %envelope.command_id,
109                command = %envelope.command,
110                function_name = %function_name,
111                "Successfully dispatched ARC envelope to Lambda function"
112            );
113
114            Ok(())
115        }
116
117        fn as_any(&self) -> &dyn Any {
118            self
119        }
120    }
121
122    /// GCP Pub/Sub command dispatcher
123    #[derive(Debug)]
124    pub struct PubSubCommandDispatcher {
125        pubsub_client: PubSubClient,
126        #[allow(dead_code)]
127        project_id: String,
128    }
129
130    impl PubSubCommandDispatcher {
131        pub fn new(client: Client, config: GcpClientConfig) -> Self {
132            let project_id = config.project_id.clone();
133            Self {
134                pubsub_client: PubSubClient::new(client, config),
135                project_id,
136            }
137        }
138    }
139
140    #[async_trait]
141    impl CommandDispatcher for PubSubCommandDispatcher {
142        async fn dispatch(&self, envelope: &Envelope) -> Result<()> {
143            // Serialize the ARC envelope as JSON
144            let envelope_json = serde_json::to_string(envelope).into_alien_error().context(
145                crate::ErrorData::TransportDispatchFailed {
146                    message: "Failed to serialize ARC envelope".to_string(),
147                    transport_type: Some("pubsub".to_string()),
148                    target: Some(envelope.command_id.clone()),
149                },
150            )?;
151
152            // Base64 encode the JSON payload as required by Pub/Sub
153            let data = BASE64_STANDARD.encode(envelope_json.as_bytes());
154
155            // The topic_id should come from agent configuration
156            // For now, we use the command_id as a placeholder
157            let topic_id = envelope.command_id.clone();
158
159            // Create the Pub/Sub message with ARC envelope metadata
160            let mut attributes = HashMap::new();
161            attributes.insert("arc-protocol".to_string(), envelope.protocol.clone());
162            attributes.insert("arc-command-id".to_string(), envelope.command_id.clone());
163            attributes.insert("arc-command".to_string(), envelope.command.clone());
164
165            let message = PubsubMessage::builder()
166                .data(data)
167                .attributes(attributes)
168                .build();
169
170            let publish_request = PublishRequest::builder().messages(vec![message]).build();
171
172            self.pubsub_client
173                .publish(topic_id.clone(), publish_request)
174                .await
175                .context(crate::ErrorData::TransportDispatchFailed {
176                    message: format!("Failed to publish to Pub/Sub topic {}", topic_id),
177                    transport_type: Some("pubsub".to_string()),
178                    target: Some(envelope.command_id.clone()),
179                })?;
180
181            tracing::debug!(
182                command_id = %envelope.command_id,
183                command = %envelope.command,
184                topic_id = %topic_id,
185                "Successfully dispatched ARC envelope to Pub/Sub topic"
186            );
187
188            Ok(())
189        }
190
191        fn as_any(&self) -> &dyn Any {
192            self
193        }
194    }
195
196    /// Azure Service Bus command dispatcher
197    #[derive(Debug)]
198    pub struct ServiceBusCommandDispatcher {
199        servicebus_client: AzureServiceBusDataPlaneClient,
200    }
201
202    impl ServiceBusCommandDispatcher {
203        pub fn new(client: Client, config: AzureClientConfig) -> Self {
204            Self {
205                servicebus_client: AzureServiceBusDataPlaneClient::new(client, config),
206            }
207        }
208    }
209
210    #[async_trait]
211    impl CommandDispatcher for ServiceBusCommandDispatcher {
212        async fn dispatch(&self, envelope: &Envelope) -> Result<()> {
213            // Serialize the ARC envelope as JSON
214            let envelope_json = serde_json::to_string(envelope).into_alien_error().context(
215                crate::ErrorData::TransportDispatchFailed {
216                    message: "Failed to serialize ARC envelope".to_string(),
217                    transport_type: Some("servicebus".to_string()),
218                    target: Some(envelope.command_id.clone()),
219                },
220            )?;
221
222            // Parse namespace and queue from command_id (placeholder)
223            // In practice, this would come from agent configuration
224            let command_id = &envelope.command_id;
225            let (namespace_name, queue_name) = if command_id.contains('/') {
226                let parts: Vec<&str> = command_id.splitn(2, '/').collect();
227                (parts[0].to_string(), parts[1].to_string())
228            } else {
229                return Err(AlienError::new(crate::ErrorData::TransportDispatchFailed {
230                    message: format!(
231                        "Service Bus target must include namespace: expected 'namespace/queue', got '{}'",
232                        command_id
233                    ),
234                    transport_type: Some("servicebus".to_string()),
235                    target: Some(envelope.command_id.clone()),
236                }));
237            };
238
239            // Create custom properties for ARC metadata
240            let mut custom_properties = HashMap::new();
241            custom_properties.insert("arc-protocol".to_string(), envelope.protocol.clone());
242            custom_properties.insert("arc-command-id".to_string(), envelope.command_id.clone());
243            custom_properties.insert("arc-command".to_string(), envelope.command.clone());
244
245            let message = SendMessageParameters {
246                body: envelope_json,
247                broker_properties: None,
248                custom_properties,
249            };
250
251            self.servicebus_client
252                .send_message(namespace_name.clone(), queue_name.clone(), message)
253                .await
254                .context(crate::ErrorData::TransportDispatchFailed {
255                    message: format!(
256                        "Failed to send message to Service Bus queue {}/{}",
257                        namespace_name, queue_name
258                    ),
259                    transport_type: Some("servicebus".to_string()),
260                    target: Some(envelope.command_id.clone()),
261                })?;
262
263            tracing::debug!(
264                command_id = %envelope.command_id,
265                command = %envelope.command,
266                namespace = %namespace_name,
267                queue = %queue_name,
268                "Successfully dispatched ARC envelope to Service Bus queue"
269            );
270
271            Ok(())
272        }
273
274        fn as_any(&self) -> &dyn Any {
275            self
276        }
277    }
278}
279
280#[cfg(feature = "server")]
281pub use platform_dispatchers::*;