1use std::time::Instant;
2
3use alien_core::{MessagePayload, QueueMessage};
4use alien_error::{AlienError, Context, IntoAlienError};
5use tracing::{debug, info};
6
7use crate::{
8 error::{ErrorData, Result},
9 types::{BodySpec, CommandResponse, Envelope},
10 PROTOCOL_VERSION,
11};
12
13pub fn parse_envelope(message: &QueueMessage) -> Result<Option<Envelope>> {
15 let envelope_data = match &message.payload {
16 MessagePayload::Json(value) => {
17 match serde_json::from_value::<Envelope>(value.clone()) {
19 Ok(envelope) => envelope,
20 Err(_) => return Ok(None), }
22 }
23 MessagePayload::Text(text) => {
24 match serde_json::from_str::<Envelope>(text) {
26 Ok(envelope) => envelope,
27 Err(_) => return Ok(None), }
29 }
30 };
31
32 if envelope_data.protocol != PROTOCOL_VERSION {
34 return Ok(None);
35 }
36
37 envelope_data
38 .validate()
39 .context(ErrorData::InvalidEnvelope {
40 message: "Envelope validation failed".to_string(),
41 field: None,
42 })?;
43 Ok(Some(envelope_data))
44}
45
46pub async fn decode_params(envelope: &Envelope) -> Result<serde_json::Value> {
51 match &envelope.params {
52 BodySpec::Inline { inline_base64 } => {
53 use base64::{engine::general_purpose, Engine as _};
54
55 let bytes = general_purpose::STANDARD
56 .decode(inline_base64)
57 .into_alien_error()
58 .context(ErrorData::InvalidEnvelope {
59 message: "Failed to decode base64 params".to_string(),
60 field: Some("params.inlineBase64".to_string()),
61 })?;
62
63 serde_json::from_slice(&bytes)
64 .into_alien_error()
65 .context(ErrorData::InvalidEnvelope {
66 message: "Failed to parse params JSON".to_string(),
67 field: Some("params".to_string()),
68 })
69 }
70 BodySpec::Storage {
71 storage_get_request,
72 ..
73 } => {
74 let presigned_request = storage_get_request.as_ref().ok_or_else(|| {
75 AlienError::new(ErrorData::InvalidEnvelope {
76 message: "Storage params missing storage_get_request".to_string(),
77 field: Some("params.storageGetRequest".to_string()),
78 })
79 })?;
80
81 let response = presigned_request.execute(None).await.context(
82 ErrorData::StorageOperationFailed {
83 message: "Failed to fetch params from storage".to_string(),
84 operation: Some("get".to_string()),
85 path: Some(presigned_request.path.clone()),
86 },
87 )?;
88
89 let body = response.body.ok_or_else(|| {
90 AlienError::new(ErrorData::StorageOperationFailed {
91 message: "Storage response has no body".to_string(),
92 operation: Some("get".to_string()),
93 path: Some(presigned_request.path.clone()),
94 })
95 })?;
96
97 serde_json::from_slice(&body)
98 .into_alien_error()
99 .context(ErrorData::InvalidEnvelope {
100 message: "Failed to parse params JSON from storage".to_string(),
101 field: Some("params".to_string()),
102 })
103 }
104 }
105}
106
107pub async fn decode_params_bytes(envelope: &Envelope) -> Result<Vec<u8>> {
112 match &envelope.params {
113 BodySpec::Inline { inline_base64 } => {
114 use base64::{engine::general_purpose, Engine as _};
115
116 general_purpose::STANDARD
117 .decode(inline_base64)
118 .into_alien_error()
119 .context(ErrorData::InvalidEnvelope {
120 message: "Failed to decode base64 params".to_string(),
121 field: Some("params.inlineBase64".to_string()),
122 })
123 }
124 BodySpec::Storage {
125 storage_get_request,
126 ..
127 } => {
128 let presigned_request = storage_get_request.as_ref().ok_or_else(|| {
129 AlienError::new(ErrorData::InvalidEnvelope {
130 message: "Storage params missing storage_get_request".to_string(),
131 field: Some("params.storageGetRequest".to_string()),
132 })
133 })?;
134
135 let response = presigned_request.execute(None).await.context(
136 ErrorData::StorageOperationFailed {
137 message: "Failed to fetch params from storage".to_string(),
138 operation: Some("get".to_string()),
139 path: Some(presigned_request.path.clone()),
140 },
141 )?;
142
143 response.body.map(|b| b.to_vec()).ok_or_else(|| {
144 AlienError::new(ErrorData::StorageOperationFailed {
145 message: "Storage response has no body".to_string(),
146 operation: Some("get".to_string()),
147 path: Some(presigned_request.path.clone()),
148 })
149 })
150 }
151 }
152}
153
154#[cfg(feature = "runtime")]
160pub async fn submit_response(envelope: &Envelope, response: CommandResponse) -> Result<()> {
161 use reqwest::Client;
162 use std::time::Duration;
163
164 let start_time = Instant::now();
165
166 let client = Client::builder()
168 .timeout(Duration::from_secs(30))
169 .pool_max_idle_per_host(2)
170 .pool_idle_timeout(Some(Duration::from_secs(60)))
171 .build()
172 .map_err(|e| {
173 AlienError::new(ErrorData::Other {
174 message: format!("Failed to create HTTP client: {}", e),
175 })
176 })?;
177
178 let final_response = match &response {
180 CommandResponse::Success { response: body } => {
181 let body_size = body.size().unwrap_or(0);
182
183 if body_size > envelope.response_handling.max_inline_bytes {
184 debug!(
186 command_id = %envelope.command_id,
187 body_size = body_size,
188 max_inline = envelope.response_handling.max_inline_bytes,
189 "Uploading large response body to storage"
190 );
191
192 let body_bytes = body.decode_inline().ok_or_else(|| {
194 AlienError::new(ErrorData::Other {
195 message: "Cannot upload storage body - expected inline body".to_string(),
196 })
197 })?;
198
199 let upload_response = envelope
201 .response_handling
202 .storage_upload_request
203 .execute(Some(bytes::Bytes::from(body_bytes.clone())))
204 .await
205 .into_alien_error()
206 .context(ErrorData::StorageOperationFailed {
207 message: "Failed to upload response to storage".to_string(),
208 operation: Some("put".to_string()),
209 path: Some(
210 envelope
211 .response_handling
212 .storage_upload_request
213 .path
214 .clone(),
215 ),
216 })?;
217
218 if upload_response.status_code < 200 || upload_response.status_code >= 300 {
219 return Err(AlienError::new(ErrorData::StorageOperationFailed {
220 message: format!(
221 "Storage upload failed with status {}",
222 upload_response.status_code
223 ),
224 operation: Some("put".to_string()),
225 path: Some(
226 envelope
227 .response_handling
228 .storage_upload_request
229 .path
230 .clone(),
231 ),
232 }));
233 }
234
235 debug!(
236 command_id = %envelope.command_id,
237 "Response body uploaded to storage successfully"
238 );
239
240 CommandResponse::Success {
242 response: BodySpec::Storage {
243 size: Some(body_bytes.len() as u64),
244 storage_get_request: None, storage_put_used: Some(true),
246 },
247 }
248 } else {
249 response.clone()
250 }
251 }
252 CommandResponse::Error { .. } => response.clone(),
253 };
254
255 let submit_url = &envelope.response_handling.submit_response_url;
257
258 debug!(
259 command_id = %envelope.command_id,
260 url = %submit_url,
261 "Submitting ARC response"
262 );
263
264 let http_response = client
265 .put(submit_url)
266 .json(&crate::types::SubmitResponseRequest {
267 response: final_response.clone(),
268 })
269 .send()
270 .await
271 .into_alien_error()
272 .context(ErrorData::HttpOperationFailed {
273 message: "Failed to submit response".to_string(),
274 method: Some("PUT".to_string()),
275 url: Some(submit_url.clone()),
276 })?;
277
278 if !http_response.status().is_success() {
279 let status = http_response.status();
280 let error_body = http_response.text().await.unwrap_or_default();
281 return Err(AlienError::new(ErrorData::HttpOperationFailed {
282 message: format!(
283 "Response submission failed with status {}: {}",
284 status, error_body
285 ),
286 method: Some("PUT".to_string()),
287 url: Some(submit_url.clone()),
288 }));
289 }
290
291 info!(
292 command_id = %envelope.command_id,
293 processing_ms = start_time.elapsed().as_millis(),
294 response_type = if final_response.is_success() { "success" } else { "error" },
295 "ARC response submitted successfully"
296 );
297
298 Ok(())
299}
300
301pub fn create_test_response(data: &[u8]) -> CommandResponse {
303 CommandResponse::success(data)
304}
305
306pub fn create_test_error(code: &str, message: &str) -> CommandResponse {
308 CommandResponse::error(code, message)
309}
310
311#[cfg(test)]
312mod tests {
313 use super::*;
314 use alien_bindings::presigned::PresignedRequest;
315 use chrono::Utc;
316
317 fn create_test_envelope() -> Envelope {
318 Envelope {
319 protocol: PROTOCOL_VERSION.to_string(),
320 command_id: "cmd_123".to_string(),
321 attempt: 1,
322 deadline: None,
323 command: "test-command".to_string(),
324 params: BodySpec::inline(b"{}"),
325 response_handling: crate::types::ResponseHandling {
326 max_inline_bytes: 150000,
327 submit_response_url: "https://arc.example.com/commands/cmd_123/response"
328 .to_string(),
329 storage_upload_request: PresignedRequest::new_http(
330 "https://storage.example.com/upload".to_string(),
331 "PUT".to_string(),
332 std::collections::HashMap::new(),
333 alien_bindings::presigned::PresignedOperation::Put,
334 "test-path".to_string(),
335 Utc::now() + chrono::Duration::hours(1),
336 ),
337 },
338 deployment_id: "ag_123".to_string(),
339 }
340 }
341
342 #[test]
343 fn test_parse_envelope_json() {
344 let envelope = create_test_envelope();
345 let envelope_json = serde_json::to_value(&envelope).unwrap();
346
347 let queue_message = QueueMessage {
348 id: "msg_123".to_string(),
349 payload: MessagePayload::Json(envelope_json),
350 receipt_handle: "handle_123".to_string(),
351 timestamp: Utc::now(),
352 source: "test-queue".to_string(),
353 attributes: std::collections::HashMap::new(),
354 attempt_count: Some(1),
355 };
356
357 let parsed = parse_envelope(&queue_message).unwrap();
358 assert!(parsed.is_some());
359
360 let parsed_envelope = parsed.unwrap();
361 assert_eq!(parsed_envelope.command_id, "cmd_123");
362 assert_eq!(parsed_envelope.command, "test-command");
363 assert_eq!(parsed_envelope.protocol, PROTOCOL_VERSION);
364 }
365
366 #[test]
367 fn test_parse_envelope_text() {
368 let envelope = create_test_envelope();
369 let envelope_text = serde_json::to_string(&envelope).unwrap();
370
371 let queue_message = QueueMessage {
372 id: "msg_456".to_string(),
373 payload: MessagePayload::Text(envelope_text),
374 receipt_handle: "handle_456".to_string(),
375 timestamp: Utc::now(),
376 source: "test-queue".to_string(),
377 attributes: std::collections::HashMap::new(),
378 attempt_count: Some(1),
379 };
380
381 let parsed = parse_envelope(&queue_message).unwrap();
382 assert!(parsed.is_some());
383
384 let parsed_envelope = parsed.unwrap();
385 assert_eq!(parsed_envelope.command_id, "cmd_123");
386 }
387
388 #[test]
389 fn test_parse_non_arc_message() {
390 let queue_message = QueueMessage {
391 id: "msg_789".to_string(),
392 payload: MessagePayload::Json(serde_json::json!({"regular": "message"})),
393 receipt_handle: "handle_789".to_string(),
394 timestamp: Utc::now(),
395 source: "test-queue".to_string(),
396 attributes: std::collections::HashMap::new(),
397 attempt_count: Some(1),
398 };
399
400 let parsed = parse_envelope(&queue_message).unwrap();
401 assert!(parsed.is_none());
402 }
403
404 #[test]
405 fn test_parse_invalid_protocol() {
406 let mut envelope = create_test_envelope();
407 envelope.protocol = "invalid.v1".to_string();
408
409 let envelope_json = serde_json::to_value(&envelope).unwrap();
410 let queue_message = QueueMessage {
411 id: "msg_invalid".to_string(),
412 payload: MessagePayload::Json(envelope_json),
413 receipt_handle: "handle_invalid".to_string(),
414 timestamp: Utc::now(),
415 source: "test-queue".to_string(),
416 attributes: std::collections::HashMap::new(),
417 attempt_count: Some(1),
418 };
419
420 let parsed = parse_envelope(&queue_message).unwrap();
421 assert!(parsed.is_none());
422 }
423
424 #[test]
425 fn test_create_test_response() {
426 let response = create_test_response(b"Hello World");
427 assert!(response.is_success());
428
429 if let CommandResponse::Success { response: body } = response {
430 assert_eq!(body.decode_inline().unwrap(), b"Hello World");
431 } else {
432 panic!("Expected success response");
433 }
434 }
435
436 #[test]
437 fn test_create_test_error() {
438 let response = create_test_error("TEST_ERROR", "Something went wrong");
439 assert!(response.is_error());
440
441 if let CommandResponse::Error { code, message, .. } = response {
442 assert_eq!(code, "TEST_ERROR");
443 assert_eq!(message, "Something went wrong");
444 } else {
445 panic!("Expected error response");
446 }
447 }
448
449 #[tokio::test]
450 async fn test_decode_params_inline() {
451 let params_json = serde_json::json!({"key": "value", "num": 42});
452 let params_bytes = serde_json::to_vec(¶ms_json).unwrap();
453
454 let envelope = Envelope {
455 protocol: PROTOCOL_VERSION.to_string(),
456 command_id: "cmd_decode".to_string(),
457 attempt: 1,
458 deadline: None,
459 command: "test".to_string(),
460 params: BodySpec::inline(¶ms_bytes),
461 response_handling: crate::types::ResponseHandling {
462 max_inline_bytes: 150000,
463 submit_response_url: "https://arc.example.com/response".to_string(),
464 storage_upload_request: PresignedRequest::new_http(
465 "https://storage.example.com/upload".to_string(),
466 "PUT".to_string(),
467 std::collections::HashMap::new(),
468 alien_bindings::presigned::PresignedOperation::Put,
469 "test-path".to_string(),
470 Utc::now() + chrono::Duration::hours(1),
471 ),
472 },
473 deployment_id: "ag_123".to_string(),
474 };
475
476 let decoded = decode_params(&envelope).await.unwrap();
477 assert_eq!(decoded["key"], "value");
478 assert_eq!(decoded["num"], 42);
479 }
480}