Skip to main content

alien_commands/runtime/
mod.rs

1use std::time::Instant;
2
3use alien_core::{MessagePayload, QueueMessage};
4use alien_error::{AlienError, Context, IntoAlienError};
5use tracing::{debug, info};
6
7use crate::{
8    error::{ErrorData, Result},
9    types::{BodySpec, CommandResponse, Envelope},
10    PROTOCOL_VERSION,
11};
12
13/// Parse a QueueMessage to extract an ARC envelope if present
14pub fn parse_envelope(message: &QueueMessage) -> Result<Option<Envelope>> {
15    let envelope_data = match &message.payload {
16        MessagePayload::Json(value) => {
17            // Try to parse as ARC envelope
18            match serde_json::from_value::<Envelope>(value.clone()) {
19                Ok(envelope) => envelope,
20                Err(_) => return Ok(None), // Not an ARC envelope
21            }
22        }
23        MessagePayload::Text(text) => {
24            // Try to parse JSON text as ARC envelope
25            match serde_json::from_str::<Envelope>(text) {
26                Ok(envelope) => envelope,
27                Err(_) => return Ok(None), // Not an ARC envelope
28            }
29        }
30    };
31
32    // Validate it's a valid ARC envelope
33    if envelope_data.protocol != PROTOCOL_VERSION {
34        return Ok(None);
35    }
36
37    envelope_data
38        .validate()
39        .context(ErrorData::InvalidEnvelope {
40            message: "Envelope validation failed".to_string(),
41            field: None,
42        })?;
43    Ok(Some(envelope_data))
44}
45
46/// Decode params from an envelope to JSON
47///
48/// For inline params, decodes the base64 and parses as JSON.
49/// For storage params, fetches from storage using the presigned request.
50pub async fn decode_params(envelope: &Envelope) -> Result<serde_json::Value> {
51    match &envelope.params {
52        BodySpec::Inline { inline_base64 } => {
53            use base64::{engine::general_purpose, Engine as _};
54
55            let bytes = general_purpose::STANDARD
56                .decode(inline_base64)
57                .into_alien_error()
58                .context(ErrorData::InvalidEnvelope {
59                    message: "Failed to decode base64 params".to_string(),
60                    field: Some("params.inlineBase64".to_string()),
61                })?;
62
63            serde_json::from_slice(&bytes)
64                .into_alien_error()
65                .context(ErrorData::InvalidEnvelope {
66                    message: "Failed to parse params JSON".to_string(),
67                    field: Some("params".to_string()),
68                })
69        }
70        BodySpec::Storage {
71            storage_get_request,
72            ..
73        } => {
74            let presigned_request = storage_get_request.as_ref().ok_or_else(|| {
75                AlienError::new(ErrorData::InvalidEnvelope {
76                    message: "Storage params missing storage_get_request".to_string(),
77                    field: Some("params.storageGetRequest".to_string()),
78                })
79            })?;
80
81            let response = presigned_request.execute(None).await.context(
82                ErrorData::StorageOperationFailed {
83                    message: "Failed to fetch params from storage".to_string(),
84                    operation: Some("get".to_string()),
85                    path: Some(presigned_request.path.clone()),
86                },
87            )?;
88
89            let body = response.body.ok_or_else(|| {
90                AlienError::new(ErrorData::StorageOperationFailed {
91                    message: "Storage response has no body".to_string(),
92                    operation: Some("get".to_string()),
93                    path: Some(presigned_request.path.clone()),
94                })
95            })?;
96
97            serde_json::from_slice(&body)
98                .into_alien_error()
99                .context(ErrorData::InvalidEnvelope {
100                    message: "Failed to parse params JSON from storage".to_string(),
101                    field: Some("params".to_string()),
102                })
103        }
104    }
105}
106
107/// Decode params from an envelope to raw bytes
108///
109/// For inline params, decodes the base64.
110/// For storage params, fetches from storage using the presigned request.
111pub async fn decode_params_bytes(envelope: &Envelope) -> Result<Vec<u8>> {
112    match &envelope.params {
113        BodySpec::Inline { inline_base64 } => {
114            use base64::{engine::general_purpose, Engine as _};
115
116            general_purpose::STANDARD
117                .decode(inline_base64)
118                .into_alien_error()
119                .context(ErrorData::InvalidEnvelope {
120                    message: "Failed to decode base64 params".to_string(),
121                    field: Some("params.inlineBase64".to_string()),
122                })
123        }
124        BodySpec::Storage {
125            storage_get_request,
126            ..
127        } => {
128            let presigned_request = storage_get_request.as_ref().ok_or_else(|| {
129                AlienError::new(ErrorData::InvalidEnvelope {
130                    message: "Storage params missing storage_get_request".to_string(),
131                    field: Some("params.storageGetRequest".to_string()),
132                })
133            })?;
134
135            let response = presigned_request.execute(None).await.context(
136                ErrorData::StorageOperationFailed {
137                    message: "Failed to fetch params from storage".to_string(),
138                    operation: Some("get".to_string()),
139                    path: Some(presigned_request.path.clone()),
140                },
141            )?;
142
143            response.body.map(|b| b.to_vec()).ok_or_else(|| {
144                AlienError::new(ErrorData::StorageOperationFailed {
145                    message: "Storage response has no body".to_string(),
146                    operation: Some("get".to_string()),
147                    path: Some(presigned_request.path.clone()),
148                })
149            })
150        }
151    }
152}
153
154/// Submit a command response back to the ARC server
155///
156/// This function implements the complete ARC response submission protocol:
157/// - Small responses (≤ maxInlineBytes) are submitted inline as base64
158/// - Large responses are uploaded to storage first, then submitted with storage reference
159#[cfg(feature = "runtime")]
160pub async fn submit_response(envelope: &Envelope, response: CommandResponse) -> Result<()> {
161    use reqwest::Client;
162    use std::time::Duration;
163
164    let start_time = Instant::now();
165
166    // Create client with connection pooling to prevent FD exhaustion
167    let client = Client::builder()
168        .timeout(Duration::from_secs(30))
169        .pool_max_idle_per_host(2)
170        .pool_idle_timeout(Some(Duration::from_secs(60)))
171        .build()
172        .map_err(|e| {
173            AlienError::new(ErrorData::Other {
174                message: format!("Failed to create HTTP client: {}", e),
175            })
176        })?;
177
178    // Check if response body needs storage upload
179    let final_response = match &response {
180        CommandResponse::Success { response: body } => {
181            let body_size = body.size().unwrap_or(0);
182
183            if body_size > envelope.response_handling.max_inline_bytes {
184                // Large response: upload to storage first
185                debug!(
186                    command_id = %envelope.command_id,
187                    body_size = body_size,
188                    max_inline = envelope.response_handling.max_inline_bytes,
189                    "Uploading large response body to storage"
190                );
191
192                // Get the bytes from the body
193                let body_bytes = body.decode_inline().ok_or_else(|| {
194                    AlienError::new(ErrorData::Other {
195                        message: "Cannot upload storage body - expected inline body".to_string(),
196                    })
197                })?;
198
199                // Upload to storage using the presigned request
200                let upload_response = envelope
201                    .response_handling
202                    .storage_upload_request
203                    .execute(Some(bytes::Bytes::from(body_bytes.clone())))
204                    .await
205                    .into_alien_error()
206                    .context(ErrorData::StorageOperationFailed {
207                        message: "Failed to upload response to storage".to_string(),
208                        operation: Some("put".to_string()),
209                        path: Some(
210                            envelope
211                                .response_handling
212                                .storage_upload_request
213                                .path
214                                .clone(),
215                        ),
216                    })?;
217
218                if upload_response.status_code < 200 || upload_response.status_code >= 300 {
219                    return Err(AlienError::new(ErrorData::StorageOperationFailed {
220                        message: format!(
221                            "Storage upload failed with status {}",
222                            upload_response.status_code
223                        ),
224                        operation: Some("put".to_string()),
225                        path: Some(
226                            envelope
227                                .response_handling
228                                .storage_upload_request
229                                .path
230                                .clone(),
231                        ),
232                    }));
233                }
234
235                debug!(
236                    command_id = %envelope.command_id,
237                    "Response body uploaded to storage successfully"
238                );
239
240                // Create storage body spec
241                CommandResponse::Success {
242                    response: BodySpec::Storage {
243                        size: Some(body_bytes.len() as u64),
244                        storage_get_request: None, // Server will fill this in
245                        storage_put_used: Some(true),
246                    },
247                }
248            } else {
249                response.clone()
250            }
251        }
252        CommandResponse::Error { .. } => response.clone(),
253    };
254
255    // Submit response to ARC server using the URL from the envelope
256    let submit_url = &envelope.response_handling.submit_response_url;
257
258    debug!(
259        command_id = %envelope.command_id,
260        url = %submit_url,
261        "Submitting ARC response"
262    );
263
264    let http_response = client
265        .put(submit_url)
266        .json(&crate::types::SubmitResponseRequest {
267            response: final_response.clone(),
268        })
269        .send()
270        .await
271        .into_alien_error()
272        .context(ErrorData::HttpOperationFailed {
273            message: "Failed to submit response".to_string(),
274            method: Some("PUT".to_string()),
275            url: Some(submit_url.clone()),
276        })?;
277
278    if !http_response.status().is_success() {
279        let status = http_response.status();
280        let error_body = http_response.text().await.unwrap_or_default();
281        return Err(AlienError::new(ErrorData::HttpOperationFailed {
282            message: format!(
283                "Response submission failed with status {}: {}",
284                status, error_body
285            ),
286            method: Some("PUT".to_string()),
287            url: Some(submit_url.clone()),
288        }));
289    }
290
291    info!(
292        command_id = %envelope.command_id,
293        processing_ms = start_time.elapsed().as_millis(),
294        response_type = if final_response.is_success() { "success" } else { "error" },
295        "ARC response submitted successfully"
296    );
297
298    Ok(())
299}
300
301/// Create a simple success response for testing
302pub fn create_test_response(data: &[u8]) -> CommandResponse {
303    CommandResponse::success(data)
304}
305
306/// Create a simple error response for testing
307pub fn create_test_error(code: &str, message: &str) -> CommandResponse {
308    CommandResponse::error(code, message)
309}
310
311#[cfg(test)]
312mod tests {
313    use super::*;
314    use alien_bindings::presigned::PresignedRequest;
315    use chrono::Utc;
316
317    fn create_test_envelope() -> Envelope {
318        Envelope {
319            protocol: PROTOCOL_VERSION.to_string(),
320            command_id: "cmd_123".to_string(),
321            attempt: 1,
322            deadline: None,
323            command: "test-command".to_string(),
324            params: BodySpec::inline(b"{}"),
325            response_handling: crate::types::ResponseHandling {
326                max_inline_bytes: 150000,
327                submit_response_url: "https://arc.example.com/commands/cmd_123/response"
328                    .to_string(),
329                storage_upload_request: PresignedRequest::new_http(
330                    "https://storage.example.com/upload".to_string(),
331                    "PUT".to_string(),
332                    std::collections::HashMap::new(),
333                    alien_bindings::presigned::PresignedOperation::Put,
334                    "test-path".to_string(),
335                    Utc::now() + chrono::Duration::hours(1),
336                ),
337            },
338            deployment_id: "ag_123".to_string(),
339        }
340    }
341
342    #[test]
343    fn test_parse_envelope_json() {
344        let envelope = create_test_envelope();
345        let envelope_json = serde_json::to_value(&envelope).unwrap();
346
347        let queue_message = QueueMessage {
348            id: "msg_123".to_string(),
349            payload: MessagePayload::Json(envelope_json),
350            receipt_handle: "handle_123".to_string(),
351            timestamp: Utc::now(),
352            source: "test-queue".to_string(),
353            attributes: std::collections::HashMap::new(),
354            attempt_count: Some(1),
355        };
356
357        let parsed = parse_envelope(&queue_message).unwrap();
358        assert!(parsed.is_some());
359
360        let parsed_envelope = parsed.unwrap();
361        assert_eq!(parsed_envelope.command_id, "cmd_123");
362        assert_eq!(parsed_envelope.command, "test-command");
363        assert_eq!(parsed_envelope.protocol, PROTOCOL_VERSION);
364    }
365
366    #[test]
367    fn test_parse_envelope_text() {
368        let envelope = create_test_envelope();
369        let envelope_text = serde_json::to_string(&envelope).unwrap();
370
371        let queue_message = QueueMessage {
372            id: "msg_456".to_string(),
373            payload: MessagePayload::Text(envelope_text),
374            receipt_handle: "handle_456".to_string(),
375            timestamp: Utc::now(),
376            source: "test-queue".to_string(),
377            attributes: std::collections::HashMap::new(),
378            attempt_count: Some(1),
379        };
380
381        let parsed = parse_envelope(&queue_message).unwrap();
382        assert!(parsed.is_some());
383
384        let parsed_envelope = parsed.unwrap();
385        assert_eq!(parsed_envelope.command_id, "cmd_123");
386    }
387
388    #[test]
389    fn test_parse_non_arc_message() {
390        let queue_message = QueueMessage {
391            id: "msg_789".to_string(),
392            payload: MessagePayload::Json(serde_json::json!({"regular": "message"})),
393            receipt_handle: "handle_789".to_string(),
394            timestamp: Utc::now(),
395            source: "test-queue".to_string(),
396            attributes: std::collections::HashMap::new(),
397            attempt_count: Some(1),
398        };
399
400        let parsed = parse_envelope(&queue_message).unwrap();
401        assert!(parsed.is_none());
402    }
403
404    #[test]
405    fn test_parse_invalid_protocol() {
406        let mut envelope = create_test_envelope();
407        envelope.protocol = "invalid.v1".to_string();
408
409        let envelope_json = serde_json::to_value(&envelope).unwrap();
410        let queue_message = QueueMessage {
411            id: "msg_invalid".to_string(),
412            payload: MessagePayload::Json(envelope_json),
413            receipt_handle: "handle_invalid".to_string(),
414            timestamp: Utc::now(),
415            source: "test-queue".to_string(),
416            attributes: std::collections::HashMap::new(),
417            attempt_count: Some(1),
418        };
419
420        let parsed = parse_envelope(&queue_message).unwrap();
421        assert!(parsed.is_none());
422    }
423
424    #[test]
425    fn test_create_test_response() {
426        let response = create_test_response(b"Hello World");
427        assert!(response.is_success());
428
429        if let CommandResponse::Success { response: body } = response {
430            assert_eq!(body.decode_inline().unwrap(), b"Hello World");
431        } else {
432            panic!("Expected success response");
433        }
434    }
435
436    #[test]
437    fn test_create_test_error() {
438        let response = create_test_error("TEST_ERROR", "Something went wrong");
439        assert!(response.is_error());
440
441        if let CommandResponse::Error { code, message, .. } = response {
442            assert_eq!(code, "TEST_ERROR");
443            assert_eq!(message, "Something went wrong");
444        } else {
445            panic!("Expected error response");
446        }
447    }
448
449    #[tokio::test]
450    async fn test_decode_params_inline() {
451        let params_json = serde_json::json!({"key": "value", "num": 42});
452        let params_bytes = serde_json::to_vec(&params_json).unwrap();
453
454        let envelope = Envelope {
455            protocol: PROTOCOL_VERSION.to_string(),
456            command_id: "cmd_decode".to_string(),
457            attempt: 1,
458            deadline: None,
459            command: "test".to_string(),
460            params: BodySpec::inline(&params_bytes),
461            response_handling: crate::types::ResponseHandling {
462                max_inline_bytes: 150000,
463                submit_response_url: "https://arc.example.com/response".to_string(),
464                storage_upload_request: PresignedRequest::new_http(
465                    "https://storage.example.com/upload".to_string(),
466                    "PUT".to_string(),
467                    std::collections::HashMap::new(),
468                    alien_bindings::presigned::PresignedOperation::Put,
469                    "test-path".to_string(),
470                    Utc::now() + chrono::Duration::hours(1),
471                ),
472            },
473            deployment_id: "ag_123".to_string(),
474        };
475
476        let decoded = decode_params(&envelope).await.unwrap();
477        assert_eq!(decoded["key"], "value");
478        assert_eq!(decoded["num"], 42);
479    }
480}