1use std::sync::Arc;
8use std::time::Duration;
9
10use alien_bindings::presigned::PresignedRequest;
11use alien_bindings::traits::{Kv, PutOptions, Storage};
12use alien_core::DeploymentModel;
13use alien_error::{AlienError, Context, ContextError, IntoAlienError};
14use chrono::{DateTime, Utc};
15use hex;
16use hmac::{Hmac, Mac};
17use serde::{Deserialize, Serialize};
18use sha2::Sha256;
19use tracing::{debug, info};
20use uuid::Uuid;
21
22use base64::{engine::general_purpose, Engine as _};
23use bytes::Bytes;
24use object_store::path::Path as StoragePath;
25
26use crate::error::{ErrorData, Result};
27use crate::types::*;
28use crate::INLINE_MAX_BYTES;
29
30const KV_VALUE_THRESHOLD: usize = 20_000;
33
34pub mod axum_handlers;
35pub mod command_registry;
36pub mod dispatchers;
37pub mod storage;
38
39pub use axum_handlers::{
40 create_axum_router, CommandPayloadResponse, HasCommandServer, StorePayloadRequest,
41};
42pub use command_registry::{
43 CommandEnvelopeData, CommandMetadata, CommandRegistry, CommandStatus, InMemoryCommandRegistry,
44};
45pub use dispatchers::{CommandDispatcher, NullCommandDispatcher};
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
53struct CommandParamsData {
54 pub params: BodySpec,
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
59struct CommandResponseData {
60 pub response: CommandResponse,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
65struct LeaseData {
66 pub lease_id: String,
67 pub acquired_at: DateTime<Utc>,
68 pub expires_at: DateTime<Utc>,
69 pub owner: String,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
74struct DeadlineIndexData {
75 pub command_id: String,
76 pub deadline: DateTime<Utc>,
77}
78
79pub struct CommandServer {
88 kv: Arc<dyn Kv>,
89 storage: Arc<dyn Storage>,
90 command_dispatcher: Arc<dyn CommandDispatcher>,
91 command_registry: Arc<dyn CommandRegistry>,
92 inline_max_bytes: usize,
93 base_url: String,
94 response_signing_key: Vec<u8>,
95}
96
97impl CommandServer {
98 pub fn new(
100 kv: Arc<dyn Kv>,
101 storage: Arc<dyn Storage>,
102 command_dispatcher: Arc<dyn CommandDispatcher>,
103 command_registry: Arc<dyn CommandRegistry>,
104 base_url: String,
105 response_signing_key: Vec<u8>,
106 ) -> Self {
107 Self {
108 kv,
109 storage,
110 command_dispatcher,
111 command_registry,
112 inline_max_bytes: INLINE_MAX_BYTES,
113 base_url,
114 response_signing_key,
115 }
116 }
117
118 pub fn with_inline_limit(
120 kv: Arc<dyn Kv>,
121 storage: Arc<dyn Storage>,
122 command_dispatcher: Arc<dyn CommandDispatcher>,
123 command_registry: Arc<dyn CommandRegistry>,
124 base_url: String,
125 inline_max_bytes: usize,
126 response_signing_key: Vec<u8>,
127 ) -> Self {
128 Self {
129 kv,
130 storage,
131 command_dispatcher,
132 command_registry,
133 inline_max_bytes,
134 base_url,
135 response_signing_key,
136 }
137 }
138
139 const MAX_RESPONSE_TOKEN_LIFETIME_SECS: i64 = 7200;
143
144 fn sign_response_url(&self, command_id: &str) -> (String, i64) {
149 let expires = Utc::now().timestamp() + 3600; let message = format!("commands.v1:{}:{}", command_id, expires);
151
152 type HmacSha256 = Hmac<Sha256>;
153 let mut mac =
154 HmacSha256::new_from_slice(&self.response_signing_key).expect("HMAC accepts any key");
155 mac.update(message.as_bytes());
156 let result = mac.finalize();
157 let token = hex::encode(result.into_bytes());
158
159 (token, expires)
160 }
161
162 pub fn verify_response_token(
167 &self,
168 command_id: &str,
169 token: &str,
170 expires: i64,
171 ) -> bool {
172 if token.len() != 64 {
174 return false;
175 }
176
177 let message = format!("commands.v1:{}:{}", command_id, expires);
178
179 type HmacSha256 = Hmac<Sha256>;
180 let mut mac =
181 HmacSha256::new_from_slice(&self.response_signing_key).expect("HMAC accepts any key");
182 mac.update(message.as_bytes());
183
184 let Ok(token_bytes) = hex::decode(token) else {
186 return false;
187 };
188 let hmac_valid = mac.verify_slice(&token_bytes).is_ok();
189
190 let now = Utc::now().timestamp();
192 let not_expired = now <= expires;
193 let within_max_lifetime = expires <= now + Self::MAX_RESPONSE_TOKEN_LIFETIME_SECS;
194
195 hmac_valid && not_expired && within_max_lifetime
196 }
197
198 pub async fn create_command(
210 &self,
211 request: CreateCommandRequest,
212 ) -> Result<CreateCommandResponse> {
213 self.validate_create_command(&request).await?;
215
216 if let Some(ref idem_key) = request.idempotency_key {
218 if let Some(existing_id) = self.check_idempotency(idem_key).await? {
219 let status = self
221 .command_registry
222 .get_command_status(&existing_id)
223 .await?;
224 if let Some(s) = status {
225 return Ok(CreateCommandResponse {
226 command_id: existing_id,
227 state: s.state,
228 storage_upload: None,
229 inline_allowed_up_to: self.inline_max_bytes as u64,
230 next: "poll".to_string(),
231 });
232 }
233 }
234 }
235
236 let (initial_state, request_size_bytes) = match &request.params {
238 BodySpec::Inline { inline_base64 } => {
239 let size = inline_base64.len() as u64;
240 (CommandState::Pending, Some(size))
241 }
242 BodySpec::Storage { size, .. } => {
243 if size.unwrap_or(0) > self.inline_max_bytes as u64 {
244 (CommandState::PendingUpload, *size)
245 } else {
246 (CommandState::Pending, *size)
247 }
248 }
249 };
250
251 let metadata = self
253 .command_registry
254 .create_command(
255 &request.deployment_id,
256 &request.command,
257 initial_state,
258 request.deadline,
259 request_size_bytes,
260 )
261 .await?;
262
263 let command_id = metadata.command_id;
264 let deployment_model = metadata.deployment_model;
265
266 if let Some(ref idem_key) = request.idempotency_key {
268 self.store_idempotency(idem_key, &command_id).await?;
269 }
270
271 self.store_params(&command_id, &request.params).await?;
273
274 let storage_upload = if initial_state == CommandState::PendingUpload {
276 Some(self.generate_params_upload(&command_id).await?)
277 } else {
278 None
279 };
280
281 let (final_state, next_action) = if initial_state == CommandState::Pending {
283 match deployment_model {
284 DeploymentModel::Push => {
285 self.dispatch_command_push(&command_id, &request.deployment_id)
287 .await?;
288 (CommandState::Dispatched, "poll")
289 }
290 DeploymentModel::Pull => {
291 self.create_pending_index(&request.deployment_id, &command_id)
293 .await?;
294 debug!(
295 "Command {} ready for pull (deployment will poll)",
296 command_id
297 );
298 (CommandState::Pending, "poll")
299 }
300 }
301 } else {
302 (initial_state, "upload")
304 };
305
306 if let Some(deadline) = request.deadline {
308 self.create_deadline_index(&command_id, deadline).await?;
309 }
310
311 Ok(CreateCommandResponse {
312 command_id,
313 state: final_state,
314 storage_upload,
315 inline_allowed_up_to: self.inline_max_bytes as u64,
316 next: next_action.to_string(),
317 })
318 }
319
320 pub async fn upload_complete(
322 &self,
323 command_id: &str,
324 upload_request: UploadCompleteRequest,
325 ) -> Result<UploadCompleteResponse> {
326 let status = self
328 .command_registry
329 .get_command_status(command_id)
330 .await?
331 .ok_or_else(|| {
332 AlienError::new(ErrorData::CommandNotFound {
333 command_id: command_id.to_string(),
334 })
335 })?;
336
337 if status.state != CommandState::PendingUpload {
339 return Err(AlienError::new(ErrorData::InvalidStateTransition {
340 from: status.state.as_ref().to_string(),
341 to: CommandState::Pending.as_ref().to_string(),
342 }));
343 }
344
345 let storage_get_request = self.generate_storage_get_request(command_id).await?;
347 let params = BodySpec::Storage {
348 size: Some(upload_request.size),
349 storage_get_request: Some(storage_get_request),
350 storage_put_used: None,
351 };
352 self.store_params(command_id, ¶ms).await?;
353
354 self.command_registry
356 .update_command_state(command_id, CommandState::Pending, None, None, None, None)
357 .await?;
358
359 let metadata = self
361 .command_registry
362 .get_command_metadata(command_id)
363 .await?
364 .ok_or_else(|| {
365 AlienError::new(ErrorData::CommandNotFound {
366 command_id: command_id.to_string(),
367 })
368 })?;
369
370 let final_state = match metadata.deployment_model {
371 DeploymentModel::Push => {
372 self.dispatch_command_push(command_id, &status.deployment_id)
373 .await?;
374 CommandState::Dispatched
375 }
376 DeploymentModel::Pull => {
377 self.create_pending_index(&status.deployment_id, command_id)
378 .await?;
379 debug!(
380 "Command {} ready for pull after upload (deployment will poll)",
381 command_id
382 );
383 CommandState::Pending
384 }
385 };
386
387 Ok(UploadCompleteResponse {
388 command_id: command_id.to_string(),
389 state: final_state,
390 })
391 }
392
393 pub async fn get_command_status(&self, command_id: &str) -> Result<CommandStatusResponse> {
397 let status = self
399 .command_registry
400 .get_command_status(command_id)
401 .await?
402 .ok_or_else(|| {
403 AlienError::new(ErrorData::CommandNotFound {
404 command_id: command_id.to_string(),
405 })
406 })?;
407
408 if let Some(deadline) = status.deadline {
410 if Utc::now() > deadline && !status.state.is_terminal() {
411 self.command_registry
413 .update_command_state(
414 command_id,
415 CommandState::Expired,
416 None,
417 Some(Utc::now()),
418 None,
419 None,
420 )
421 .await?;
422
423 self.delete_pending_index(&status.deployment_id, command_id)
425 .await?;
426
427 return Ok(CommandStatusResponse {
429 command_id: command_id.to_string(),
430 state: CommandState::Expired,
431 attempt: status.attempt,
432 response: None,
433 });
434 }
435 }
436
437 let response = if status.state.is_terminal() {
439 self.get_response(command_id).await?
440 } else {
441 None
442 };
443
444 Ok(CommandStatusResponse {
445 command_id: command_id.to_string(),
446 state: status.state,
447 attempt: status.attempt,
448 response,
449 })
450 }
451
452 pub async fn submit_command_response(
456 &self,
457 command_id: &str,
458 mut response: CommandResponse,
459 ) -> Result<()> {
460 let status = self
462 .command_registry
463 .get_command_status(command_id)
464 .await?
465 .ok_or_else(|| {
466 AlienError::new(ErrorData::CommandNotFound {
467 command_id: command_id.to_string(),
468 })
469 })?;
470
471 if status.state.is_terminal() {
473 debug!(
474 "Ignoring duplicate response for terminal command {}",
475 command_id
476 );
477 return Ok(());
478 }
479
480 if status.state != CommandState::Dispatched {
482 return Err(AlienError::new(ErrorData::InvalidStateTransition {
483 from: status.state.as_ref().to_string(),
484 to: CommandState::Succeeded.as_ref().to_string(),
485 }));
486 }
487
488 if let CommandResponse::Success {
490 response: ref mut body,
491 } = response
492 {
493 if let BodySpec::Storage {
494 size,
495 storage_get_request,
496 storage_put_used,
497 } = body
498 {
499 if storage_get_request.is_none() && storage_put_used.unwrap_or(false) {
500 let get_request = self
501 .generate_response_storage_get_request(command_id)
502 .await?;
503 *body = BodySpec::Storage {
504 size: *size,
505 storage_get_request: Some(get_request),
506 storage_put_used: *storage_put_used,
507 };
508 }
509 }
510 }
511
512 self.store_response(command_id, &response).await?;
514
515 self.delete_lease(command_id).await?;
517
518 self.delete_pending_index(&status.deployment_id, command_id)
520 .await?;
521
522 let (new_state, error) = if response.is_success() {
524 (CommandState::Succeeded, None)
525 } else if let CommandResponse::Error { code, message, .. } = &response {
526 (
527 CommandState::Failed,
528 Some(serde_json::json!({ "code": code, "message": message })),
529 )
530 } else {
531 (CommandState::Failed, None)
532 };
533
534 let response_size = match &response {
535 CommandResponse::Success {
536 response: BodySpec::Inline { inline_base64 },
537 } => Some(inline_base64.len() as u64),
538 CommandResponse::Success {
539 response: BodySpec::Storage { size, .. },
540 } => *size,
541 _ => None,
542 };
543
544 self.command_registry
545 .update_command_state(
546 command_id,
547 new_state,
548 None, Some(Utc::now()),
550 response_size,
551 error,
552 )
553 .await?;
554
555 info!(
556 "Command {} completed with state {:?}",
557 command_id, new_state
558 );
559 Ok(())
560 }
561
562 pub async fn acquire_lease(
566 &self,
567 deployment_id: &str,
568 lease_request: &LeaseRequest,
569 ) -> Result<LeaseResponse> {
570 let mut leases = Vec::new();
571
572 let target_prefix = format!("target:{}:pending:", deployment_id);
574 let scan_result = self
575 .kv
576 .scan_prefix(&target_prefix, Some(lease_request.max_leases * 2), None)
577 .await
578 .into_alien_error()
579 .context(ErrorData::KvOperationFailed {
580 operation: "scan_prefix".to_string(),
581 key: target_prefix.clone(),
582 message: "Failed to scan for pending commands".to_string(),
583 })?;
584
585 for (index_key, _) in scan_result.items {
586 if leases.len() >= lease_request.max_leases {
587 break;
588 }
589
590 let command_id = self.extract_command_id_from_index_key(&index_key)?;
591
592 let lease_id = format!("lease_{}", Uuid::new_v4());
594 let lease_duration = Duration::from_secs(lease_request.lease_seconds);
595 let expires_at =
596 Utc::now() + chrono::Duration::seconds(lease_request.lease_seconds as i64);
597
598 let lease_data = LeaseData {
599 lease_id: lease_id.clone(),
600 acquired_at: Utc::now(),
601 expires_at,
602 owner: deployment_id.to_string(),
603 };
604
605 let lease_key = format!("cmd:{}:lease", command_id);
606 let lease_value = serde_json::to_vec(&lease_data).into_alien_error().context(
607 ErrorData::SerializationFailed {
608 message: "Failed to serialize lease data".to_string(),
609 data_type: Some("LeaseData".to_string()),
610 },
611 )?;
612
613 let options = Some(PutOptions {
614 ttl: Some(lease_duration),
615 if_not_exists: true,
616 });
617
618 let success = self
619 .kv
620 .put(&lease_key, lease_value, options)
621 .await
622 .context(ErrorData::KvOperationFailed {
623 operation: "put".to_string(),
624 key: lease_key.clone(),
625 message: "Failed to create lease".to_string(),
626 })?;
627
628 if !success {
629 continue;
631 }
632
633 let metadata = match self
635 .command_registry
636 .get_command_metadata(&command_id)
637 .await?
638 {
639 Some(m) => m,
640 None => {
641 self.delete_lease(&command_id).await?;
643 let _ = self.kv.delete(&index_key).await;
644 continue;
645 }
646 };
647
648 if metadata.state.is_terminal() {
650 self.delete_lease(&command_id).await?;
652 let _ = self.kv.delete(&index_key).await;
653 continue;
654 }
655
656 if let Some(deadline) = metadata.deadline {
658 if Utc::now() > deadline {
659 self.command_registry
661 .update_command_state(
662 &command_id,
663 CommandState::Expired,
664 None,
665 Some(Utc::now()),
666 None,
667 None,
668 )
669 .await?;
670 self.delete_lease(&command_id).await?;
671 let _ = self.kv.delete(&index_key).await;
672 continue;
673 }
674 }
675
676 let params = match self.get_params(&command_id).await? {
678 Some(p) => p,
679 None => {
680 self.delete_lease(&command_id).await?;
682 continue;
683 }
684 };
685
686 self.command_registry
688 .update_command_state(
689 &command_id,
690 CommandState::Dispatched,
691 Some(Utc::now()),
692 None,
693 None,
694 None,
695 )
696 .await?;
697
698 let envelope = self.build_envelope(&command_id, &metadata, params).await?;
700
701 leases.push(LeaseInfo {
702 lease_id,
703 lease_expires_at: expires_at,
704 command_id: command_id.clone(),
705 attempt: metadata.attempt,
706 envelope,
707 });
708 }
709
710 Ok(LeaseResponse { leases })
711 }
712
713 pub async fn release_lease(&self, command_id: &str, lease_id: &str) -> Result<()> {
717 let lease_key = format!("cmd:{}:lease", command_id);
718
719 if let Ok(Some(lease_data)) = self.kv.get(&lease_key).await {
721 let lease: LeaseData = serde_json::from_slice(&lease_data)
722 .into_alien_error()
723 .context(ErrorData::SerializationFailed {
724 message: "Failed to deserialize lease data".to_string(),
725 data_type: Some("LeaseData".to_string()),
726 })?;
727
728 if lease.lease_id != lease_id {
729 return Err(AlienError::new(ErrorData::LeaseNotFound {
730 lease_id: lease_id.to_string(),
731 }));
732 }
733
734 self.delete_lease(command_id).await?;
736
737 self.command_registry.increment_attempt(command_id).await?;
739
740 self.command_registry
742 .update_command_state(command_id, CommandState::Pending, None, None, None, None)
743 .await?;
744
745 debug!("Lease {} released for command {}", lease_id, command_id);
747 }
748
749 Ok(())
750 }
751
752 pub async fn get_command_deployment_id(&self, command_id: &str) -> Result<Option<String>> {
757 let status = self.command_registry.get_command_status(command_id).await?;
758 Ok(status.map(|s| s.deployment_id))
759 }
760
761 pub async fn release_lease_by_id(&self, lease_id: &str) -> Result<()> {
763 let lease_prefix = "cmd:";
765 let scan_result = self
766 .kv
767 .scan_prefix(lease_prefix, None, None)
768 .await
769 .into_alien_error()
770 .context(ErrorData::KvOperationFailed {
771 operation: "scan_prefix".to_string(),
772 key: lease_prefix.to_string(),
773 message: "Failed to scan for lease keys".to_string(),
774 })?;
775
776 for (key, value) in scan_result.items {
777 if key.ends_with(":lease") {
778 if let Ok(lease) = serde_json::from_slice::<LeaseData>(&value) {
779 if lease.lease_id == lease_id {
780 let command_id = key
781 .strip_prefix("cmd:")
782 .and_then(|s| s.strip_suffix(":lease"))
783 .ok_or_else(|| {
784 AlienError::new(ErrorData::Other {
785 message: format!("Invalid lease key format: {}", key),
786 })
787 })?;
788
789 return self.release_lease(command_id, lease_id).await;
790 }
791 }
792 }
793 }
794
795 Err(AlienError::new(ErrorData::LeaseNotFound {
796 lease_id: lease_id.to_string(),
797 }))
798 }
799
800 async fn validate_create_command(&self, request: &CreateCommandRequest) -> Result<()> {
805 if request.command.is_empty() {
806 return Err(AlienError::new(ErrorData::InvalidCommand {
807 message: "Command name cannot be empty".to_string(),
808 }));
809 }
810
811 if request.deployment_id.is_empty() {
812 return Err(AlienError::new(ErrorData::InvalidCommand {
813 message: "Deployment ID cannot be empty".to_string(),
814 }));
815 }
816
817 if let Some(deadline) = request.deadline {
818 if deadline <= Utc::now() {
819 return Err(AlienError::new(ErrorData::InvalidCommand {
820 message: "Deadline must be in the future".to_string(),
821 }));
822 }
823 }
824
825 Ok(())
826 }
827
828 async fn check_idempotency(&self, idem_key: &str) -> Result<Option<String>> {
831 let key = format!("idem:{}", idem_key);
832 if let Some(data) = self
833 .kv
834 .get(&key)
835 .await
836 .context(ErrorData::KvOperationFailed {
837 operation: "get".to_string(),
838 key: key.clone(),
839 message: "Failed to check idempotency".to_string(),
840 })?
841 {
842 let command_id = String::from_utf8(data).into_alien_error().context(
843 ErrorData::SerializationFailed {
844 message: "Invalid idempotency data".to_string(),
845 data_type: Some("String".to_string()),
846 },
847 )?;
848 return Ok(Some(command_id));
849 }
850 Ok(None)
851 }
852
853 async fn store_idempotency(&self, idem_key: &str, command_id: &str) -> Result<()> {
854 let key = format!("idem:{}", idem_key);
855 let ttl = Duration::from_secs(24 * 60 * 60); self.kv
857 .put(
858 &key,
859 command_id.as_bytes().to_vec(),
860 Some(PutOptions {
861 ttl: Some(ttl),
862 if_not_exists: true,
863 }),
864 )
865 .await
866 .context(ErrorData::KvOperationFailed {
867 operation: "put".to_string(),
868 key: key.clone(),
869 message: "Failed to store idempotency".to_string(),
870 })?;
871 Ok(())
872 }
873
874 pub async fn store_params(&self, command_id: &str, params: &BodySpec) -> Result<()> {
877 let key = format!("cmd:{}:params", command_id);
878
879 let data = CommandParamsData {
881 params: params.clone(),
882 };
883 let value = serde_json::to_vec(&data).into_alien_error().context(
884 ErrorData::SerializationFailed {
885 message: "Failed to serialize params".to_string(),
886 data_type: Some("CommandParamsData".to_string()),
887 },
888 )?;
889
890 if value.len() <= KV_VALUE_THRESHOLD {
892 self.kv
893 .put(&key, value, None)
894 .await
895 .context(ErrorData::KvOperationFailed {
896 operation: "put".to_string(),
897 key: key.clone(),
898 message: "Failed to store params".to_string(),
899 })?;
900 return Ok(());
901 }
902
903 if let BodySpec::Inline { inline_base64 } = params {
905 let raw_bytes = general_purpose::STANDARD
906 .decode(inline_base64)
907 .into_alien_error()
908 .context(ErrorData::SerializationFailed {
909 message: "Failed to decode inline base64 params for auto-promotion".to_string(),
910 data_type: Some("base64".to_string()),
911 })?;
912
913 let raw_len = raw_bytes.len() as u64;
914 let blob_path = StoragePath::from(format!("arc/commands/{}/params", command_id));
915
916 self.storage
917 .put(&blob_path, Bytes::from(raw_bytes).into())
918 .await
919 .into_alien_error()
920 .context(ErrorData::StorageOperationFailed {
921 message: "Failed to auto-promote params to blob storage".to_string(),
922 operation: Some("put".to_string()),
923 path: Some(blob_path.to_string()),
924 })?;
925
926 debug!(
927 "Auto-promoted params for command {} to blob ({} bytes raw)",
928 command_id, raw_len
929 );
930
931 let promoted = CommandParamsData {
933 params: BodySpec::Storage {
934 size: Some(raw_len),
935 storage_get_request: None,
936 storage_put_used: Some(true),
937 },
938 };
939 let promoted_value = serde_json::to_vec(&promoted).into_alien_error().context(
940 ErrorData::SerializationFailed {
941 message: "Failed to serialize promoted params reference".to_string(),
942 data_type: Some("CommandParamsData".to_string()),
943 },
944 )?;
945 self.kv.put(&key, promoted_value, None).await.context(
946 ErrorData::KvOperationFailed {
947 operation: "put".to_string(),
948 key: key.clone(),
949 message: "Failed to store promoted params reference".to_string(),
950 },
951 )?;
952 return Ok(());
953 }
954
955 self.kv
957 .put(&key, value, None)
958 .await
959 .context(ErrorData::KvOperationFailed {
960 operation: "put".to_string(),
961 key: key.clone(),
962 message: "Failed to store params".to_string(),
963 })?;
964 Ok(())
965 }
966
967 pub async fn get_params(&self, command_id: &str) -> Result<Option<BodySpec>> {
968 let key = format!("cmd:{}:params", command_id);
969 if let Some(value) = self
970 .kv
971 .get(&key)
972 .await
973 .context(ErrorData::KvOperationFailed {
974 operation: "get".to_string(),
975 key: key.clone(),
976 message: "Failed to get params".to_string(),
977 })?
978 {
979 let data: CommandParamsData = serde_json::from_slice(&value)
980 .into_alien_error()
981 .context(ErrorData::SerializationFailed {
982 message: "Failed to deserialize params".to_string(),
983 data_type: Some("CommandParamsData".to_string()),
984 })?;
985 return Ok(Some(data.params));
986 }
987 Ok(None)
988 }
989
990 pub async fn store_response(&self, command_id: &str, response: &CommandResponse) -> Result<()> {
993 let key = format!("cmd:{}:response", command_id);
994 let data = CommandResponseData {
995 response: response.clone(),
996 };
997 let value = serde_json::to_vec(&data).into_alien_error().context(
998 ErrorData::SerializationFailed {
999 message: "Failed to serialize response".to_string(),
1000 data_type: Some("CommandResponseData".to_string()),
1001 },
1002 )?;
1003
1004 if value.len() <= KV_VALUE_THRESHOLD {
1006 self.kv
1007 .put(&key, value, None)
1008 .await
1009 .context(ErrorData::KvOperationFailed {
1010 operation: "put".to_string(),
1011 key: key.clone(),
1012 message: "Failed to store response".to_string(),
1013 })?;
1014 return Ok(());
1015 }
1016
1017 if let CommandResponse::Success {
1019 response: BodySpec::Inline { inline_base64 },
1020 } = response
1021 {
1022 let raw_bytes = general_purpose::STANDARD
1023 .decode(inline_base64)
1024 .into_alien_error()
1025 .context(ErrorData::SerializationFailed {
1026 message: "Failed to decode inline base64 response for auto-promotion"
1027 .to_string(),
1028 data_type: Some("base64".to_string()),
1029 })?;
1030
1031 let raw_len = raw_bytes.len() as u64;
1032 let blob_path = StoragePath::from(format!("arc/commands/{}/response", command_id));
1033
1034 self.storage
1035 .put(&blob_path, Bytes::from(raw_bytes).into())
1036 .await
1037 .into_alien_error()
1038 .context(ErrorData::StorageOperationFailed {
1039 message: "Failed to auto-promote response to blob storage".to_string(),
1040 operation: Some("put".to_string()),
1041 path: Some(blob_path.to_string()),
1042 })?;
1043
1044 let get_request = self
1046 .generate_response_storage_get_request(command_id)
1047 .await?;
1048
1049 debug!(
1050 "Auto-promoted response for command {} to blob ({} bytes raw)",
1051 command_id, raw_len
1052 );
1053
1054 let promoted = CommandResponseData {
1056 response: CommandResponse::Success {
1057 response: BodySpec::Storage {
1058 size: Some(raw_len),
1059 storage_get_request: Some(get_request),
1060 storage_put_used: Some(true),
1061 },
1062 },
1063 };
1064 let promoted_value = serde_json::to_vec(&promoted).into_alien_error().context(
1065 ErrorData::SerializationFailed {
1066 message: "Failed to serialize promoted response reference".to_string(),
1067 data_type: Some("CommandResponseData".to_string()),
1068 },
1069 )?;
1070 self.kv.put(&key, promoted_value, None).await.context(
1071 ErrorData::KvOperationFailed {
1072 operation: "put".to_string(),
1073 key: key.clone(),
1074 message: "Failed to store promoted response reference".to_string(),
1075 },
1076 )?;
1077 return Ok(());
1078 }
1079
1080 self.kv
1082 .put(&key, value, None)
1083 .await
1084 .context(ErrorData::KvOperationFailed {
1085 operation: "put".to_string(),
1086 key: key.clone(),
1087 message: "Failed to store response".to_string(),
1088 })?;
1089 Ok(())
1090 }
1091
1092 pub async fn get_response(&self, command_id: &str) -> Result<Option<CommandResponse>> {
1093 let key = format!("cmd:{}:response", command_id);
1094 if let Some(value) = self
1095 .kv
1096 .get(&key)
1097 .await
1098 .context(ErrorData::KvOperationFailed {
1099 operation: "get".to_string(),
1100 key: key.clone(),
1101 message: "Failed to get response".to_string(),
1102 })?
1103 {
1104 let data: CommandResponseData = serde_json::from_slice(&value)
1105 .into_alien_error()
1106 .context(ErrorData::SerializationFailed {
1107 message: "Failed to deserialize response".to_string(),
1108 data_type: Some("CommandResponseData".to_string()),
1109 })?;
1110 return Ok(Some(data.response));
1111 }
1112 Ok(None)
1113 }
1114
1115 async fn create_pending_index(&self, deployment_id: &str, command_id: &str) -> Result<()> {
1118 let timestamp = Utc::now().timestamp_nanos_opt().unwrap_or(0);
1119 let key = format!(
1120 "target:{}:pending:{}:{}",
1121 deployment_id, timestamp, command_id
1122 );
1123
1124 self.kv
1126 .put(&key, vec![], None)
1127 .await
1128 .context(ErrorData::KvOperationFailed {
1129 operation: "put".to_string(),
1130 key: key.clone(),
1131 message: "Failed to create pending index".to_string(),
1132 })?;
1133 Ok(())
1134 }
1135
1136 async fn delete_pending_index(&self, deployment_id: &str, command_id: &str) -> Result<()> {
1137 let prefix = format!("target:{}:pending:", deployment_id);
1139 let scan_result = self
1140 .kv
1141 .scan_prefix(&prefix, Some(100), None)
1142 .await
1143 .into_alien_error()
1144 .context(ErrorData::KvOperationFailed {
1145 operation: "scan_prefix".to_string(),
1146 key: prefix.clone(),
1147 message: "Failed to scan pending index".to_string(),
1148 })?;
1149
1150 for (key, _) in scan_result.items {
1151 if key.ends_with(&format!(":{}", command_id)) {
1152 let _ = self.kv.delete(&key).await;
1153 break;
1154 }
1155 }
1156 Ok(())
1157 }
1158
1159 async fn delete_lease(&self, command_id: &str) -> Result<()> {
1162 let key = format!("cmd:{}:lease", command_id);
1163 let _ = self.kv.delete(&key).await;
1164 Ok(())
1165 }
1166
1167 async fn create_deadline_index(&self, command_id: &str, deadline: DateTime<Utc>) -> Result<()> {
1170 let key = format!(
1171 "deadline:{}:{}",
1172 deadline.timestamp_nanos_opt().unwrap_or(0),
1173 command_id
1174 );
1175
1176 let data = DeadlineIndexData {
1177 command_id: command_id.to_string(),
1178 deadline,
1179 };
1180 let value = serde_json::to_vec(&data).into_alien_error().context(
1181 ErrorData::SerializationFailed {
1182 message: "Failed to serialize deadline index".to_string(),
1183 data_type: Some("DeadlineIndexData".to_string()),
1184 },
1185 )?;
1186
1187 let ttl = deadline.signed_duration_since(Utc::now());
1188 let ttl_duration = if ttl.num_seconds() > 0 {
1189 Some(Duration::from_secs(ttl.num_seconds() as u64))
1190 } else {
1191 None
1192 };
1193
1194 let options = ttl_duration.map(|ttl| PutOptions {
1195 ttl: Some(ttl),
1196 if_not_exists: false,
1197 });
1198
1199 self.kv
1200 .put(&key, value, options)
1201 .await
1202 .context(ErrorData::KvOperationFailed {
1203 operation: "put".to_string(),
1204 key: key.clone(),
1205 message: "Failed to create deadline index".to_string(),
1206 })?;
1207 Ok(())
1208 }
1209
1210 async fn dispatch_command_push(&self, command_id: &str, deployment_id: &str) -> Result<()> {
1213 let metadata = self
1215 .command_registry
1216 .get_command_metadata(command_id)
1217 .await?
1218 .ok_or_else(|| {
1219 AlienError::new(ErrorData::CommandNotFound {
1220 command_id: command_id.to_string(),
1221 })
1222 })?;
1223
1224 let params = self.get_params(command_id).await?.ok_or_else(|| {
1226 AlienError::new(ErrorData::CommandNotFound {
1227 command_id: command_id.to_string(),
1228 })
1229 })?;
1230
1231 let envelope = self.build_envelope(command_id, &metadata, params).await?;
1233
1234 self.command_dispatcher
1236 .dispatch(&envelope)
1237 .await
1238 .map_err(|e| {
1239 e.context(ErrorData::TransportDispatchFailed {
1240 message: "Failed to dispatch command".to_string(),
1241 transport_type: None,
1242 target: Some(deployment_id.to_string()),
1243 })
1244 })?;
1245
1246 self.command_registry
1248 .update_command_state(
1249 command_id,
1250 CommandState::Dispatched,
1251 Some(Utc::now()),
1252 None,
1253 None,
1254 None,
1255 )
1256 .await?;
1257
1258 info!("Command {} dispatched via push", command_id);
1259 Ok(())
1260 }
1261
1262 async fn build_envelope(
1263 &self,
1264 command_id: &str,
1265 metadata: &CommandEnvelopeData,
1266 mut params: BodySpec,
1267 ) -> Result<Envelope> {
1268 let response_handling = self.create_response_handling(command_id).await?;
1269
1270 if let BodySpec::Storage { size, .. } = ¶ms {
1273 let raw_size = size.unwrap_or(0) as usize;
1274 if raw_size > 0 && raw_size <= self.inline_max_bytes {
1275 let blob_path = StoragePath::from(format!("arc/commands/{}/params", command_id));
1276 match self.storage.get(&blob_path).await {
1277 Ok(get_result) => match get_result.bytes().await {
1278 Ok(raw_bytes) => {
1279 params = BodySpec::inline(&raw_bytes);
1280 debug!(
1281 "Re-inlined params for command {} ({} bytes) into envelope",
1282 command_id, raw_size
1283 );
1284 }
1285 Err(e) => {
1286 debug!(
1287 "Failed to read blob bytes for re-inline (command {}), falling back to presigned URL: {}",
1288 command_id, e
1289 );
1290 }
1291 },
1292 Err(e) => {
1293 debug!(
1294 "Failed to read blob for re-inline (command {}), falling back to presigned URL: {}",
1295 command_id, e
1296 );
1297 }
1298 }
1299 }
1300 }
1301
1302 if let BodySpec::Storage {
1305 size,
1306 storage_get_request,
1307 storage_put_used,
1308 } = ¶ms
1309 {
1310 if storage_get_request.is_none() {
1311 let get_request = self.generate_storage_get_request(command_id).await?;
1312 params = BodySpec::Storage {
1313 size: *size,
1314 storage_get_request: Some(get_request),
1315 storage_put_used: *storage_put_used,
1316 };
1317 }
1318 }
1319
1320 Ok(Envelope::new(
1321 metadata.deployment_id.clone(),
1322 command_id.to_string(),
1323 metadata.attempt,
1324 metadata.deadline,
1325 metadata.command.clone(),
1326 params,
1327 response_handling,
1328 ))
1329 }
1330
1331 async fn create_response_handling(&self, command_id: &str) -> Result<ResponseHandling> {
1332 let upload_path = StoragePath::from(format!("arc/commands/{}/response", command_id));
1333 let expires_in = Duration::from_secs(3600);
1334 let presigned = self
1335 .storage
1336 .presigned_put(&upload_path, expires_in)
1337 .await
1338 .context(ErrorData::StorageOperationFailed {
1339 message: "Failed to create response upload URL".to_string(),
1340 operation: Some("presigned_put".to_string()),
1341 path: Some(upload_path.to_string()),
1342 })?;
1343
1344 let (response_token, expires) = self.sign_response_url(command_id);
1345
1346 Ok(ResponseHandling {
1347 max_inline_bytes: self.inline_max_bytes as u64,
1348 submit_response_url: format!(
1349 "{}/commands/{}/response?response_token={}&expires={}",
1350 self.base_url.trim_end_matches('/'),
1351 command_id,
1352 response_token,
1353 expires,
1354 ),
1355 storage_upload_request: presigned,
1356 })
1357 }
1358
1359 async fn generate_params_upload(&self, command_id: &str) -> Result<StorageUpload> {
1360 let upload_path = StoragePath::from(format!("arc/commands/{}/params", command_id));
1361 let expires_in = Duration::from_secs(3600);
1362 let presigned = self
1363 .storage
1364 .presigned_put(&upload_path, expires_in)
1365 .await
1366 .into_alien_error()
1367 .context(ErrorData::StorageOperationFailed {
1368 message: "Failed to create presigned URL".to_string(),
1369 operation: Some("presigned_put".to_string()),
1370 path: Some(upload_path.to_string()),
1371 })?;
1372
1373 Ok(StorageUpload {
1374 put_request: presigned.clone(),
1375 expires_at: presigned.expiration,
1376 })
1377 }
1378
1379 async fn generate_storage_get_request(&self, command_id: &str) -> Result<PresignedRequest> {
1380 let path = StoragePath::from(format!("arc/commands/{}/params", command_id));
1381 let expires_in = Duration::from_secs(3600);
1382 self.storage.presigned_get(&path, expires_in).await.context(
1383 ErrorData::StorageOperationFailed {
1384 message: "Failed to create storage get request".to_string(),
1385 operation: Some("presigned_get".to_string()),
1386 path: Some(path.to_string()),
1387 },
1388 )
1389 }
1390
1391 async fn generate_response_storage_get_request(
1392 &self,
1393 command_id: &str,
1394 ) -> Result<PresignedRequest> {
1395 let path = StoragePath::from(format!("arc/commands/{}/response", command_id));
1396 let expires_in = Duration::from_secs(3600);
1397 self.storage.presigned_get(&path, expires_in).await.context(
1398 ErrorData::StorageOperationFailed {
1399 message: "Failed to create response storage get request".to_string(),
1400 operation: Some("presigned_get".to_string()),
1401 path: Some(path.to_string()),
1402 },
1403 )
1404 }
1405
1406 fn extract_command_id_from_index_key(&self, index_key: &str) -> Result<String> {
1407 index_key
1408 .split(':')
1409 .last()
1410 .ok_or_else(|| {
1411 AlienError::new(ErrorData::Other {
1412 message: format!("Invalid index key format: {}", index_key),
1413 })
1414 })
1415 .map(|s| s.to_string())
1416 }
1417}