1use std::sync::Arc;
8use std::time::Duration;
9
10use alien_bindings::presigned::PresignedRequest;
11use alien_bindings::traits::{Kv, PutOptions, Storage};
12use alien_core::DeploymentModel;
13use alien_error::{AlienError, Context, ContextError, IntoAlienError};
14use chrono::{DateTime, Utc};
15use hex;
16use hmac::{Hmac, Mac};
17use serde::{Deserialize, Serialize};
18use sha2::Sha256;
19use tracing::{debug, info};
20use uuid::Uuid;
21
22use base64::{engine::general_purpose, Engine as _};
23use bytes::Bytes;
24use object_store::path::Path as StoragePath;
25
26use crate::error::{ErrorData, Result};
27use crate::types::*;
28use crate::INLINE_MAX_BYTES;
29
30const KV_VALUE_THRESHOLD: usize = 20_000;
33
34pub mod axum_handlers;
35pub mod command_registry;
36pub mod dispatchers;
37pub mod storage;
38
39pub use axum_handlers::{
40 create_axum_router, CommandPayloadResponse, HasCommandServer, StorePayloadRequest,
41};
42pub use command_registry::{
43 CommandEnvelopeData, CommandMetadata, CommandRegistry, CommandStatus, InMemoryCommandRegistry,
44};
45pub use dispatchers::{CommandDispatcher, NullCommandDispatcher};
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
53struct CommandParamsData {
54 pub params: BodySpec,
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
59struct CommandResponseData {
60 pub response: CommandResponse,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
65struct LeaseData {
66 pub lease_id: String,
67 pub acquired_at: DateTime<Utc>,
68 pub expires_at: DateTime<Utc>,
69 pub owner: String,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
74struct DeadlineIndexData {
75 pub command_id: String,
76 pub deadline: DateTime<Utc>,
77}
78
79pub struct CommandServer {
88 kv: Arc<dyn Kv>,
89 storage: Arc<dyn Storage>,
90 command_dispatcher: Arc<dyn CommandDispatcher>,
91 command_registry: Arc<dyn CommandRegistry>,
92 inline_max_bytes: usize,
93 base_url: String,
94 response_signing_key: Vec<u8>,
95}
96
97impl CommandServer {
98 pub fn new(
100 kv: Arc<dyn Kv>,
101 storage: Arc<dyn Storage>,
102 command_dispatcher: Arc<dyn CommandDispatcher>,
103 command_registry: Arc<dyn CommandRegistry>,
104 base_url: String,
105 response_signing_key: Vec<u8>,
106 ) -> Self {
107 Self {
108 kv,
109 storage,
110 command_dispatcher,
111 command_registry,
112 inline_max_bytes: INLINE_MAX_BYTES,
113 base_url,
114 response_signing_key,
115 }
116 }
117
118 pub fn with_inline_limit(
120 kv: Arc<dyn Kv>,
121 storage: Arc<dyn Storage>,
122 command_dispatcher: Arc<dyn CommandDispatcher>,
123 command_registry: Arc<dyn CommandRegistry>,
124 base_url: String,
125 inline_max_bytes: usize,
126 response_signing_key: Vec<u8>,
127 ) -> Self {
128 Self {
129 kv,
130 storage,
131 command_dispatcher,
132 command_registry,
133 inline_max_bytes,
134 base_url,
135 response_signing_key,
136 }
137 }
138
139 const MAX_RESPONSE_TOKEN_LIFETIME_SECS: i64 = 7200;
143
144 fn sign_response_url(&self, command_id: &str) -> (String, i64) {
149 let expires = Utc::now().timestamp() + 3600; let message = format!("commands.v1:{}:{}", command_id, expires);
151
152 type HmacSha256 = Hmac<Sha256>;
153 let mut mac =
154 HmacSha256::new_from_slice(&self.response_signing_key).expect("HMAC accepts any key");
155 mac.update(message.as_bytes());
156 let result = mac.finalize();
157 let token = hex::encode(result.into_bytes());
158
159 (token, expires)
160 }
161
162 pub fn verify_response_token(&self, command_id: &str, token: &str, expires: i64) -> bool {
167 if token.len() != 64 {
169 return false;
170 }
171
172 let message = format!("commands.v1:{}:{}", command_id, expires);
173
174 type HmacSha256 = Hmac<Sha256>;
175 let mut mac =
176 HmacSha256::new_from_slice(&self.response_signing_key).expect("HMAC accepts any key");
177 mac.update(message.as_bytes());
178
179 let Ok(token_bytes) = hex::decode(token) else {
181 return false;
182 };
183 let hmac_valid = mac.verify_slice(&token_bytes).is_ok();
184
185 let now = Utc::now().timestamp();
187 let not_expired = now <= expires;
188 let within_max_lifetime = expires <= now + Self::MAX_RESPONSE_TOKEN_LIFETIME_SECS;
189
190 hmac_valid && not_expired && within_max_lifetime
191 }
192
193 pub async fn create_command(
205 &self,
206 request: CreateCommandRequest,
207 ) -> Result<CreateCommandResponse> {
208 self.validate_create_command(&request).await?;
210
211 if let Some(ref idem_key) = request.idempotency_key {
213 if let Some(existing_id) = self.check_idempotency(idem_key).await? {
214 let status = self
216 .command_registry
217 .get_command_status(&existing_id)
218 .await?;
219 if let Some(s) = status {
220 return Ok(CreateCommandResponse {
221 command_id: existing_id,
222 state: s.state,
223 storage_upload: None,
224 inline_allowed_up_to: self.inline_max_bytes as u64,
225 next: "poll".to_string(),
226 });
227 }
228 }
229 }
230
231 let (initial_state, request_size_bytes) = match &request.params {
233 BodySpec::Inline { inline_base64 } => {
234 let size = inline_base64.len() as u64;
235 (CommandState::Pending, Some(size))
236 }
237 BodySpec::Storage { size, .. } => {
238 if size.unwrap_or(0) > self.inline_max_bytes as u64 {
239 (CommandState::PendingUpload, *size)
240 } else {
241 (CommandState::Pending, *size)
242 }
243 }
244 };
245
246 let metadata = self
248 .command_registry
249 .create_command(
250 &request.deployment_id,
251 &request.command,
252 initial_state,
253 request.deadline,
254 request_size_bytes,
255 )
256 .await?;
257
258 let command_id = metadata.command_id;
259 let deployment_model = metadata.deployment_model;
260
261 if let Some(ref idem_key) = request.idempotency_key {
263 self.store_idempotency(idem_key, &command_id).await?;
264 }
265
266 self.store_params(&command_id, &request.params).await?;
268
269 let storage_upload = if initial_state == CommandState::PendingUpload {
271 Some(self.generate_params_upload(&command_id).await?)
272 } else {
273 None
274 };
275
276 let (final_state, next_action) = if initial_state == CommandState::Pending {
278 match deployment_model {
279 DeploymentModel::Push => {
280 self.dispatch_command_push(&command_id, &request.deployment_id)
282 .await?;
283 (CommandState::Dispatched, "poll")
284 }
285 DeploymentModel::Pull => {
286 self.create_pending_index(&request.deployment_id, &command_id)
288 .await?;
289 debug!(
290 "Command {} ready for pull (deployment will poll)",
291 command_id
292 );
293 (CommandState::Pending, "poll")
294 }
295 }
296 } else {
297 (initial_state, "upload")
299 };
300
301 if let Some(deadline) = request.deadline {
303 self.create_deadline_index(&command_id, deadline).await?;
304 }
305
306 Ok(CreateCommandResponse {
307 command_id,
308 state: final_state,
309 storage_upload,
310 inline_allowed_up_to: self.inline_max_bytes as u64,
311 next: next_action.to_string(),
312 })
313 }
314
315 pub async fn upload_complete(
317 &self,
318 command_id: &str,
319 upload_request: UploadCompleteRequest,
320 ) -> Result<UploadCompleteResponse> {
321 let status = self
323 .command_registry
324 .get_command_status(command_id)
325 .await?
326 .ok_or_else(|| {
327 AlienError::new(ErrorData::CommandNotFound {
328 command_id: command_id.to_string(),
329 })
330 })?;
331
332 if status.state != CommandState::PendingUpload {
334 return Err(AlienError::new(ErrorData::InvalidStateTransition {
335 from: status.state.as_ref().to_string(),
336 to: CommandState::Pending.as_ref().to_string(),
337 }));
338 }
339
340 let storage_get_request = self.generate_storage_get_request(command_id).await?;
342 let params = BodySpec::Storage {
343 size: Some(upload_request.size),
344 storage_get_request: Some(storage_get_request),
345 storage_put_used: None,
346 };
347 self.store_params(command_id, ¶ms).await?;
348
349 self.command_registry
351 .update_command_state(command_id, CommandState::Pending, None, None, None, None)
352 .await?;
353
354 let metadata = self
356 .command_registry
357 .get_command_metadata(command_id)
358 .await?
359 .ok_or_else(|| {
360 AlienError::new(ErrorData::CommandNotFound {
361 command_id: command_id.to_string(),
362 })
363 })?;
364
365 let final_state = match metadata.deployment_model {
366 DeploymentModel::Push => {
367 self.dispatch_command_push(command_id, &status.deployment_id)
368 .await?;
369 CommandState::Dispatched
370 }
371 DeploymentModel::Pull => {
372 self.create_pending_index(&status.deployment_id, command_id)
373 .await?;
374 debug!(
375 "Command {} ready for pull after upload (deployment will poll)",
376 command_id
377 );
378 CommandState::Pending
379 }
380 };
381
382 Ok(UploadCompleteResponse {
383 command_id: command_id.to_string(),
384 state: final_state,
385 })
386 }
387
388 pub async fn get_command_status(&self, command_id: &str) -> Result<CommandStatusResponse> {
392 let status = self
394 .command_registry
395 .get_command_status(command_id)
396 .await?
397 .ok_or_else(|| {
398 AlienError::new(ErrorData::CommandNotFound {
399 command_id: command_id.to_string(),
400 })
401 })?;
402
403 if let Some(deadline) = status.deadline {
405 if Utc::now() > deadline && !status.state.is_terminal() {
406 self.command_registry
408 .update_command_state(
409 command_id,
410 CommandState::Expired,
411 None,
412 Some(Utc::now()),
413 None,
414 None,
415 )
416 .await?;
417
418 self.delete_pending_index(&status.deployment_id, command_id)
420 .await?;
421
422 return Ok(CommandStatusResponse {
424 command_id: command_id.to_string(),
425 state: CommandState::Expired,
426 attempt: status.attempt,
427 response: None,
428 });
429 }
430 }
431
432 let response = if status.state.is_terminal() {
434 self.get_response(command_id).await?
435 } else {
436 None
437 };
438
439 Ok(CommandStatusResponse {
440 command_id: command_id.to_string(),
441 state: status.state,
442 attempt: status.attempt,
443 response,
444 })
445 }
446
447 pub async fn submit_command_response(
451 &self,
452 command_id: &str,
453 mut response: CommandResponse,
454 ) -> Result<()> {
455 let status = self
457 .command_registry
458 .get_command_status(command_id)
459 .await?
460 .ok_or_else(|| {
461 AlienError::new(ErrorData::CommandNotFound {
462 command_id: command_id.to_string(),
463 })
464 })?;
465
466 if status.state.is_terminal() {
468 debug!(
469 "Ignoring duplicate response for terminal command {}",
470 command_id
471 );
472 return Ok(());
473 }
474
475 if status.state != CommandState::Dispatched {
477 return Err(AlienError::new(ErrorData::InvalidStateTransition {
478 from: status.state.as_ref().to_string(),
479 to: CommandState::Succeeded.as_ref().to_string(),
480 }));
481 }
482
483 if let CommandResponse::Success {
485 response: ref mut body,
486 } = response
487 {
488 if let BodySpec::Storage {
489 size,
490 storage_get_request,
491 storage_put_used,
492 } = body
493 {
494 if storage_get_request.is_none() && storage_put_used.unwrap_or(false) {
495 let get_request = self
496 .generate_response_storage_get_request(command_id)
497 .await?;
498 *body = BodySpec::Storage {
499 size: *size,
500 storage_get_request: Some(get_request),
501 storage_put_used: *storage_put_used,
502 };
503 }
504 }
505 }
506
507 self.store_response(command_id, &response).await?;
509
510 self.delete_lease(command_id).await?;
512
513 self.delete_pending_index(&status.deployment_id, command_id)
515 .await?;
516
517 let (new_state, error) = if response.is_success() {
519 (CommandState::Succeeded, None)
520 } else if let CommandResponse::Error { code, message, .. } = &response {
521 (
522 CommandState::Failed,
523 Some(serde_json::json!({ "code": code, "message": message })),
524 )
525 } else {
526 (CommandState::Failed, None)
527 };
528
529 let response_size = match &response {
530 CommandResponse::Success {
531 response: BodySpec::Inline { inline_base64 },
532 } => Some(inline_base64.len() as u64),
533 CommandResponse::Success {
534 response: BodySpec::Storage { size, .. },
535 } => *size,
536 _ => None,
537 };
538
539 self.command_registry
540 .update_command_state(
541 command_id,
542 new_state,
543 None, Some(Utc::now()),
545 response_size,
546 error,
547 )
548 .await?;
549
550 info!(
551 "Command {} completed with state {:?}",
552 command_id, new_state
553 );
554 Ok(())
555 }
556
557 pub async fn acquire_lease(
561 &self,
562 deployment_id: &str,
563 lease_request: &LeaseRequest,
564 ) -> Result<LeaseResponse> {
565 let mut leases = Vec::new();
566
567 let target_prefix = format!("target:{}:pending:", deployment_id);
569 let scan_result = self
570 .kv
571 .scan_prefix(&target_prefix, Some(lease_request.max_leases * 2), None)
572 .await
573 .into_alien_error()
574 .context(ErrorData::KvOperationFailed {
575 operation: "scan_prefix".to_string(),
576 key: target_prefix.clone(),
577 message: "Failed to scan for pending commands".to_string(),
578 })?;
579
580 for (index_key, _) in scan_result.items {
581 if leases.len() >= lease_request.max_leases {
582 break;
583 }
584
585 let command_id = self.extract_command_id_from_index_key(&index_key)?;
586
587 let lease_id = format!("lease_{}", Uuid::new_v4());
589 let lease_duration = Duration::from_secs(lease_request.lease_seconds);
590 let expires_at =
591 Utc::now() + chrono::Duration::seconds(lease_request.lease_seconds as i64);
592
593 let lease_data = LeaseData {
594 lease_id: lease_id.clone(),
595 acquired_at: Utc::now(),
596 expires_at,
597 owner: deployment_id.to_string(),
598 };
599
600 let lease_key = format!("cmd:{}:lease", command_id);
601 let lease_value = serde_json::to_vec(&lease_data).into_alien_error().context(
602 ErrorData::SerializationFailed {
603 message: "Failed to serialize lease data".to_string(),
604 data_type: Some("LeaseData".to_string()),
605 },
606 )?;
607
608 let options = Some(PutOptions {
609 ttl: Some(lease_duration),
610 if_not_exists: true,
611 });
612
613 let success = self
614 .kv
615 .put(&lease_key, lease_value, options)
616 .await
617 .context(ErrorData::KvOperationFailed {
618 operation: "put".to_string(),
619 key: lease_key.clone(),
620 message: "Failed to create lease".to_string(),
621 })?;
622
623 if !success {
624 continue;
626 }
627
628 let metadata = match self
630 .command_registry
631 .get_command_metadata(&command_id)
632 .await?
633 {
634 Some(m) => m,
635 None => {
636 self.delete_lease(&command_id).await?;
638 let _ = self.kv.delete(&index_key).await;
639 continue;
640 }
641 };
642
643 if metadata.state.is_terminal() {
645 self.delete_lease(&command_id).await?;
647 let _ = self.kv.delete(&index_key).await;
648 continue;
649 }
650
651 if let Some(deadline) = metadata.deadline {
653 if Utc::now() > deadline {
654 self.command_registry
656 .update_command_state(
657 &command_id,
658 CommandState::Expired,
659 None,
660 Some(Utc::now()),
661 None,
662 None,
663 )
664 .await?;
665 self.delete_lease(&command_id).await?;
666 let _ = self.kv.delete(&index_key).await;
667 continue;
668 }
669 }
670
671 let params = match self.get_params(&command_id).await? {
673 Some(p) => p,
674 None => {
675 self.delete_lease(&command_id).await?;
677 continue;
678 }
679 };
680
681 self.command_registry
683 .update_command_state(
684 &command_id,
685 CommandState::Dispatched,
686 Some(Utc::now()),
687 None,
688 None,
689 None,
690 )
691 .await?;
692
693 let envelope = self.build_envelope(&command_id, &metadata, params).await?;
695
696 leases.push(LeaseInfo {
697 lease_id,
698 lease_expires_at: expires_at,
699 command_id: command_id.clone(),
700 attempt: metadata.attempt,
701 envelope,
702 });
703 }
704
705 Ok(LeaseResponse { leases })
706 }
707
708 pub async fn release_lease(&self, command_id: &str, lease_id: &str) -> Result<()> {
712 let lease_key = format!("cmd:{}:lease", command_id);
713
714 if let Ok(Some(lease_data)) = self.kv.get(&lease_key).await {
716 let lease: LeaseData = serde_json::from_slice(&lease_data)
717 .into_alien_error()
718 .context(ErrorData::SerializationFailed {
719 message: "Failed to deserialize lease data".to_string(),
720 data_type: Some("LeaseData".to_string()),
721 })?;
722
723 if lease.lease_id != lease_id {
724 return Err(AlienError::new(ErrorData::LeaseNotFound {
725 lease_id: lease_id.to_string(),
726 }));
727 }
728
729 self.delete_lease(command_id).await?;
731
732 self.command_registry.increment_attempt(command_id).await?;
734
735 self.command_registry
737 .update_command_state(command_id, CommandState::Pending, None, None, None, None)
738 .await?;
739
740 debug!("Lease {} released for command {}", lease_id, command_id);
742 }
743
744 Ok(())
745 }
746
747 pub async fn get_command_deployment_id(&self, command_id: &str) -> Result<Option<String>> {
752 let status = self.command_registry.get_command_status(command_id).await?;
753 Ok(status.map(|s| s.deployment_id))
754 }
755
756 pub async fn release_lease_by_id(&self, lease_id: &str) -> Result<()> {
758 let lease_prefix = "cmd:";
760 let scan_result = self
761 .kv
762 .scan_prefix(lease_prefix, None, None)
763 .await
764 .into_alien_error()
765 .context(ErrorData::KvOperationFailed {
766 operation: "scan_prefix".to_string(),
767 key: lease_prefix.to_string(),
768 message: "Failed to scan for lease keys".to_string(),
769 })?;
770
771 for (key, value) in scan_result.items {
772 if key.ends_with(":lease") {
773 if let Ok(lease) = serde_json::from_slice::<LeaseData>(&value) {
774 if lease.lease_id == lease_id {
775 let command_id = key
776 .strip_prefix("cmd:")
777 .and_then(|s| s.strip_suffix(":lease"))
778 .ok_or_else(|| {
779 AlienError::new(ErrorData::Other {
780 message: format!("Invalid lease key format: {}", key),
781 })
782 })?;
783
784 return self.release_lease(command_id, lease_id).await;
785 }
786 }
787 }
788 }
789
790 Err(AlienError::new(ErrorData::LeaseNotFound {
791 lease_id: lease_id.to_string(),
792 }))
793 }
794
795 async fn validate_create_command(&self, request: &CreateCommandRequest) -> Result<()> {
800 if request.command.is_empty() {
801 return Err(AlienError::new(ErrorData::InvalidCommand {
802 message: "Command name cannot be empty".to_string(),
803 }));
804 }
805
806 if request.deployment_id.is_empty() {
807 return Err(AlienError::new(ErrorData::InvalidCommand {
808 message: "Deployment ID cannot be empty".to_string(),
809 }));
810 }
811
812 if let Some(deadline) = request.deadline {
813 if deadline <= Utc::now() {
814 return Err(AlienError::new(ErrorData::InvalidCommand {
815 message: "Deadline must be in the future".to_string(),
816 }));
817 }
818 }
819
820 Ok(())
821 }
822
823 async fn check_idempotency(&self, idem_key: &str) -> Result<Option<String>> {
826 let key = format!("idem:{}", idem_key);
827 if let Some(data) = self
828 .kv
829 .get(&key)
830 .await
831 .context(ErrorData::KvOperationFailed {
832 operation: "get".to_string(),
833 key: key.clone(),
834 message: "Failed to check idempotency".to_string(),
835 })?
836 {
837 let command_id = String::from_utf8(data).into_alien_error().context(
838 ErrorData::SerializationFailed {
839 message: "Invalid idempotency data".to_string(),
840 data_type: Some("String".to_string()),
841 },
842 )?;
843 return Ok(Some(command_id));
844 }
845 Ok(None)
846 }
847
848 async fn store_idempotency(&self, idem_key: &str, command_id: &str) -> Result<()> {
849 let key = format!("idem:{}", idem_key);
850 let ttl = Duration::from_secs(24 * 60 * 60); self.kv
852 .put(
853 &key,
854 command_id.as_bytes().to_vec(),
855 Some(PutOptions {
856 ttl: Some(ttl),
857 if_not_exists: true,
858 }),
859 )
860 .await
861 .context(ErrorData::KvOperationFailed {
862 operation: "put".to_string(),
863 key: key.clone(),
864 message: "Failed to store idempotency".to_string(),
865 })?;
866 Ok(())
867 }
868
869 pub async fn store_params(&self, command_id: &str, params: &BodySpec) -> Result<()> {
872 let key = format!("cmd:{}:params", command_id);
873
874 let data = CommandParamsData {
876 params: params.clone(),
877 };
878 let value = serde_json::to_vec(&data).into_alien_error().context(
879 ErrorData::SerializationFailed {
880 message: "Failed to serialize params".to_string(),
881 data_type: Some("CommandParamsData".to_string()),
882 },
883 )?;
884
885 if value.len() <= KV_VALUE_THRESHOLD {
887 self.kv
888 .put(&key, value, None)
889 .await
890 .context(ErrorData::KvOperationFailed {
891 operation: "put".to_string(),
892 key: key.clone(),
893 message: "Failed to store params".to_string(),
894 })?;
895 return Ok(());
896 }
897
898 if let BodySpec::Inline { inline_base64 } = params {
900 let raw_bytes = general_purpose::STANDARD
901 .decode(inline_base64)
902 .into_alien_error()
903 .context(ErrorData::SerializationFailed {
904 message: "Failed to decode inline base64 params for auto-promotion".to_string(),
905 data_type: Some("base64".to_string()),
906 })?;
907
908 let raw_len = raw_bytes.len() as u64;
909 let blob_path = StoragePath::from(format!("arc/commands/{}/params", command_id));
910
911 self.storage
912 .put(&blob_path, Bytes::from(raw_bytes).into())
913 .await
914 .into_alien_error()
915 .context(ErrorData::StorageOperationFailed {
916 message: "Failed to auto-promote params to blob storage".to_string(),
917 operation: Some("put".to_string()),
918 path: Some(blob_path.to_string()),
919 })?;
920
921 debug!(
922 "Auto-promoted params for command {} to blob ({} bytes raw)",
923 command_id, raw_len
924 );
925
926 let promoted = CommandParamsData {
928 params: BodySpec::Storage {
929 size: Some(raw_len),
930 storage_get_request: None,
931 storage_put_used: Some(true),
932 },
933 };
934 let promoted_value = serde_json::to_vec(&promoted).into_alien_error().context(
935 ErrorData::SerializationFailed {
936 message: "Failed to serialize promoted params reference".to_string(),
937 data_type: Some("CommandParamsData".to_string()),
938 },
939 )?;
940 self.kv.put(&key, promoted_value, None).await.context(
941 ErrorData::KvOperationFailed {
942 operation: "put".to_string(),
943 key: key.clone(),
944 message: "Failed to store promoted params reference".to_string(),
945 },
946 )?;
947 return Ok(());
948 }
949
950 self.kv
952 .put(&key, value, None)
953 .await
954 .context(ErrorData::KvOperationFailed {
955 operation: "put".to_string(),
956 key: key.clone(),
957 message: "Failed to store params".to_string(),
958 })?;
959 Ok(())
960 }
961
962 pub async fn get_params(&self, command_id: &str) -> Result<Option<BodySpec>> {
963 let key = format!("cmd:{}:params", command_id);
964 if let Some(value) = self
965 .kv
966 .get(&key)
967 .await
968 .context(ErrorData::KvOperationFailed {
969 operation: "get".to_string(),
970 key: key.clone(),
971 message: "Failed to get params".to_string(),
972 })?
973 {
974 let data: CommandParamsData = serde_json::from_slice(&value)
975 .into_alien_error()
976 .context(ErrorData::SerializationFailed {
977 message: "Failed to deserialize params".to_string(),
978 data_type: Some("CommandParamsData".to_string()),
979 })?;
980 return Ok(Some(data.params));
981 }
982 Ok(None)
983 }
984
985 pub async fn store_response(&self, command_id: &str, response: &CommandResponse) -> Result<()> {
988 let key = format!("cmd:{}:response", command_id);
989 let data = CommandResponseData {
990 response: response.clone(),
991 };
992 let value = serde_json::to_vec(&data).into_alien_error().context(
993 ErrorData::SerializationFailed {
994 message: "Failed to serialize response".to_string(),
995 data_type: Some("CommandResponseData".to_string()),
996 },
997 )?;
998
999 if value.len() <= KV_VALUE_THRESHOLD {
1001 self.kv
1002 .put(&key, value, None)
1003 .await
1004 .context(ErrorData::KvOperationFailed {
1005 operation: "put".to_string(),
1006 key: key.clone(),
1007 message: "Failed to store response".to_string(),
1008 })?;
1009 return Ok(());
1010 }
1011
1012 if let CommandResponse::Success {
1014 response: BodySpec::Inline { inline_base64 },
1015 } = response
1016 {
1017 let raw_bytes = general_purpose::STANDARD
1018 .decode(inline_base64)
1019 .into_alien_error()
1020 .context(ErrorData::SerializationFailed {
1021 message: "Failed to decode inline base64 response for auto-promotion"
1022 .to_string(),
1023 data_type: Some("base64".to_string()),
1024 })?;
1025
1026 let raw_len = raw_bytes.len() as u64;
1027 let blob_path = StoragePath::from(format!("arc/commands/{}/response", command_id));
1028
1029 self.storage
1030 .put(&blob_path, Bytes::from(raw_bytes).into())
1031 .await
1032 .into_alien_error()
1033 .context(ErrorData::StorageOperationFailed {
1034 message: "Failed to auto-promote response to blob storage".to_string(),
1035 operation: Some("put".to_string()),
1036 path: Some(blob_path.to_string()),
1037 })?;
1038
1039 let get_request = self
1041 .generate_response_storage_get_request(command_id)
1042 .await?;
1043
1044 debug!(
1045 "Auto-promoted response for command {} to blob ({} bytes raw)",
1046 command_id, raw_len
1047 );
1048
1049 let promoted = CommandResponseData {
1051 response: CommandResponse::Success {
1052 response: BodySpec::Storage {
1053 size: Some(raw_len),
1054 storage_get_request: Some(get_request),
1055 storage_put_used: Some(true),
1056 },
1057 },
1058 };
1059 let promoted_value = serde_json::to_vec(&promoted).into_alien_error().context(
1060 ErrorData::SerializationFailed {
1061 message: "Failed to serialize promoted response reference".to_string(),
1062 data_type: Some("CommandResponseData".to_string()),
1063 },
1064 )?;
1065 self.kv.put(&key, promoted_value, None).await.context(
1066 ErrorData::KvOperationFailed {
1067 operation: "put".to_string(),
1068 key: key.clone(),
1069 message: "Failed to store promoted response reference".to_string(),
1070 },
1071 )?;
1072 return Ok(());
1073 }
1074
1075 self.kv
1077 .put(&key, value, None)
1078 .await
1079 .context(ErrorData::KvOperationFailed {
1080 operation: "put".to_string(),
1081 key: key.clone(),
1082 message: "Failed to store response".to_string(),
1083 })?;
1084 Ok(())
1085 }
1086
1087 pub async fn get_response(&self, command_id: &str) -> Result<Option<CommandResponse>> {
1088 let key = format!("cmd:{}:response", command_id);
1089 if let Some(value) = self
1090 .kv
1091 .get(&key)
1092 .await
1093 .context(ErrorData::KvOperationFailed {
1094 operation: "get".to_string(),
1095 key: key.clone(),
1096 message: "Failed to get response".to_string(),
1097 })?
1098 {
1099 let data: CommandResponseData = serde_json::from_slice(&value)
1100 .into_alien_error()
1101 .context(ErrorData::SerializationFailed {
1102 message: "Failed to deserialize response".to_string(),
1103 data_type: Some("CommandResponseData".to_string()),
1104 })?;
1105 return Ok(Some(data.response));
1106 }
1107 Ok(None)
1108 }
1109
1110 async fn create_pending_index(&self, deployment_id: &str, command_id: &str) -> Result<()> {
1113 let timestamp = Utc::now().timestamp_nanos_opt().unwrap_or(0);
1114 let key = format!(
1115 "target:{}:pending:{}:{}",
1116 deployment_id, timestamp, command_id
1117 );
1118
1119 self.kv
1121 .put(&key, vec![], None)
1122 .await
1123 .context(ErrorData::KvOperationFailed {
1124 operation: "put".to_string(),
1125 key: key.clone(),
1126 message: "Failed to create pending index".to_string(),
1127 })?;
1128 Ok(())
1129 }
1130
1131 async fn delete_pending_index(&self, deployment_id: &str, command_id: &str) -> Result<()> {
1132 let prefix = format!("target:{}:pending:", deployment_id);
1134 let scan_result = self
1135 .kv
1136 .scan_prefix(&prefix, Some(100), None)
1137 .await
1138 .into_alien_error()
1139 .context(ErrorData::KvOperationFailed {
1140 operation: "scan_prefix".to_string(),
1141 key: prefix.clone(),
1142 message: "Failed to scan pending index".to_string(),
1143 })?;
1144
1145 for (key, _) in scan_result.items {
1146 if key.ends_with(&format!(":{}", command_id)) {
1147 let _ = self.kv.delete(&key).await;
1148 break;
1149 }
1150 }
1151 Ok(())
1152 }
1153
1154 async fn delete_lease(&self, command_id: &str) -> Result<()> {
1157 let key = format!("cmd:{}:lease", command_id);
1158 let _ = self.kv.delete(&key).await;
1159 Ok(())
1160 }
1161
1162 async fn create_deadline_index(&self, command_id: &str, deadline: DateTime<Utc>) -> Result<()> {
1165 let key = format!(
1166 "deadline:{}:{}",
1167 deadline.timestamp_nanos_opt().unwrap_or(0),
1168 command_id
1169 );
1170
1171 let data = DeadlineIndexData {
1172 command_id: command_id.to_string(),
1173 deadline,
1174 };
1175 let value = serde_json::to_vec(&data).into_alien_error().context(
1176 ErrorData::SerializationFailed {
1177 message: "Failed to serialize deadline index".to_string(),
1178 data_type: Some("DeadlineIndexData".to_string()),
1179 },
1180 )?;
1181
1182 let ttl = deadline.signed_duration_since(Utc::now());
1183 let ttl_duration = if ttl.num_seconds() > 0 {
1184 Some(Duration::from_secs(ttl.num_seconds() as u64))
1185 } else {
1186 None
1187 };
1188
1189 let options = ttl_duration.map(|ttl| PutOptions {
1190 ttl: Some(ttl),
1191 if_not_exists: false,
1192 });
1193
1194 self.kv
1195 .put(&key, value, options)
1196 .await
1197 .context(ErrorData::KvOperationFailed {
1198 operation: "put".to_string(),
1199 key: key.clone(),
1200 message: "Failed to create deadline index".to_string(),
1201 })?;
1202 Ok(())
1203 }
1204
1205 async fn dispatch_command_push(&self, command_id: &str, deployment_id: &str) -> Result<()> {
1208 let metadata = self
1210 .command_registry
1211 .get_command_metadata(command_id)
1212 .await?
1213 .ok_or_else(|| {
1214 AlienError::new(ErrorData::CommandNotFound {
1215 command_id: command_id.to_string(),
1216 })
1217 })?;
1218
1219 let params = self.get_params(command_id).await?.ok_or_else(|| {
1221 AlienError::new(ErrorData::CommandNotFound {
1222 command_id: command_id.to_string(),
1223 })
1224 })?;
1225
1226 let envelope = self.build_envelope(command_id, &metadata, params).await?;
1228
1229 self.command_dispatcher
1231 .dispatch(&envelope)
1232 .await
1233 .map_err(|e| {
1234 e.context(ErrorData::TransportDispatchFailed {
1235 message: "Failed to dispatch command".to_string(),
1236 transport_type: None,
1237 target: Some(deployment_id.to_string()),
1238 })
1239 })?;
1240
1241 self.command_registry
1243 .update_command_state(
1244 command_id,
1245 CommandState::Dispatched,
1246 Some(Utc::now()),
1247 None,
1248 None,
1249 None,
1250 )
1251 .await?;
1252
1253 info!("Command {} dispatched via push", command_id);
1254 Ok(())
1255 }
1256
1257 async fn build_envelope(
1258 &self,
1259 command_id: &str,
1260 metadata: &CommandEnvelopeData,
1261 mut params: BodySpec,
1262 ) -> Result<Envelope> {
1263 let response_handling = self.create_response_handling(command_id).await?;
1264
1265 if let BodySpec::Storage { size, .. } = ¶ms {
1268 let raw_size = size.unwrap_or(0) as usize;
1269 if raw_size > 0 && raw_size <= self.inline_max_bytes {
1270 let blob_path = StoragePath::from(format!("arc/commands/{}/params", command_id));
1271 match self.storage.get(&blob_path).await {
1272 Ok(get_result) => match get_result.bytes().await {
1273 Ok(raw_bytes) => {
1274 params = BodySpec::inline(&raw_bytes);
1275 debug!(
1276 "Re-inlined params for command {} ({} bytes) into envelope",
1277 command_id, raw_size
1278 );
1279 }
1280 Err(e) => {
1281 debug!(
1282 "Failed to read blob bytes for re-inline (command {}), falling back to presigned URL: {}",
1283 command_id, e
1284 );
1285 }
1286 },
1287 Err(e) => {
1288 debug!(
1289 "Failed to read blob for re-inline (command {}), falling back to presigned URL: {}",
1290 command_id, e
1291 );
1292 }
1293 }
1294 }
1295 }
1296
1297 if let BodySpec::Storage {
1300 size,
1301 storage_get_request,
1302 storage_put_used,
1303 } = ¶ms
1304 {
1305 if storage_get_request.is_none() {
1306 let get_request = self.generate_storage_get_request(command_id).await?;
1307 params = BodySpec::Storage {
1308 size: *size,
1309 storage_get_request: Some(get_request),
1310 storage_put_used: *storage_put_used,
1311 };
1312 }
1313 }
1314
1315 Ok(Envelope::new(
1316 metadata.deployment_id.clone(),
1317 command_id.to_string(),
1318 metadata.attempt,
1319 metadata.deadline,
1320 metadata.command.clone(),
1321 params,
1322 response_handling,
1323 ))
1324 }
1325
1326 async fn create_response_handling(&self, command_id: &str) -> Result<ResponseHandling> {
1327 let upload_path = StoragePath::from(format!("arc/commands/{}/response", command_id));
1328 let expires_in = Duration::from_secs(3600);
1329 let presigned = self
1330 .storage
1331 .presigned_put(&upload_path, expires_in)
1332 .await
1333 .context(ErrorData::StorageOperationFailed {
1334 message: "Failed to create response upload URL".to_string(),
1335 operation: Some("presigned_put".to_string()),
1336 path: Some(upload_path.to_string()),
1337 })?;
1338
1339 let (response_token, expires) = self.sign_response_url(command_id);
1340
1341 Ok(ResponseHandling {
1342 max_inline_bytes: self.inline_max_bytes as u64,
1343 submit_response_url: format!(
1344 "{}/commands/{}/response?response_token={}&expires={}",
1345 self.base_url.trim_end_matches('/'),
1346 command_id,
1347 response_token,
1348 expires,
1349 ),
1350 storage_upload_request: presigned,
1351 })
1352 }
1353
1354 async fn generate_params_upload(&self, command_id: &str) -> Result<StorageUpload> {
1355 let upload_path = StoragePath::from(format!("arc/commands/{}/params", command_id));
1356 let expires_in = Duration::from_secs(3600);
1357 let presigned = self
1358 .storage
1359 .presigned_put(&upload_path, expires_in)
1360 .await
1361 .into_alien_error()
1362 .context(ErrorData::StorageOperationFailed {
1363 message: "Failed to create presigned URL".to_string(),
1364 operation: Some("presigned_put".to_string()),
1365 path: Some(upload_path.to_string()),
1366 })?;
1367
1368 Ok(StorageUpload {
1369 put_request: presigned.clone(),
1370 expires_at: presigned.expiration,
1371 })
1372 }
1373
1374 async fn generate_storage_get_request(&self, command_id: &str) -> Result<PresignedRequest> {
1375 let path = StoragePath::from(format!("arc/commands/{}/params", command_id));
1376 let expires_in = Duration::from_secs(3600);
1377 self.storage.presigned_get(&path, expires_in).await.context(
1378 ErrorData::StorageOperationFailed {
1379 message: "Failed to create storage get request".to_string(),
1380 operation: Some("presigned_get".to_string()),
1381 path: Some(path.to_string()),
1382 },
1383 )
1384 }
1385
1386 async fn generate_response_storage_get_request(
1387 &self,
1388 command_id: &str,
1389 ) -> Result<PresignedRequest> {
1390 let path = StoragePath::from(format!("arc/commands/{}/response", command_id));
1391 let expires_in = Duration::from_secs(3600);
1392 self.storage.presigned_get(&path, expires_in).await.context(
1393 ErrorData::StorageOperationFailed {
1394 message: "Failed to create response storage get request".to_string(),
1395 operation: Some("presigned_get".to_string()),
1396 path: Some(path.to_string()),
1397 },
1398 )
1399 }
1400
1401 fn extract_command_id_from_index_key(&self, index_key: &str) -> Result<String> {
1402 index_key
1403 .split(':')
1404 .last()
1405 .ok_or_else(|| {
1406 AlienError::new(ErrorData::Other {
1407 message: format!("Invalid index key format: {}", index_key),
1408 })
1409 })
1410 .map(|s| s.to_string())
1411 }
1412}