1use std::sync::Arc;
8use std::time::Duration;
9
10use alien_bindings::presigned::PresignedRequest;
11use alien_bindings::traits::{Kv, PutOptions, Storage};
12use alien_core::DeploymentModel;
13use alien_error::{AlienError, Context, ContextError, IntoAlienError};
14use chrono::{DateTime, Utc};
15use serde::{Deserialize, Serialize};
16use tracing::{debug, info};
17use uuid::Uuid;
18
19#[cfg(feature = "server")]
20use object_store::path::Path as StoragePath;
21
22use crate::error::{ErrorData, Result};
23use crate::types::*;
24use crate::INLINE_MAX_BYTES;
25
26pub mod axum_handlers;
27pub mod command_registry;
28pub mod dispatchers;
29pub mod storage;
30
31pub use axum_handlers::{create_axum_router, HasCommandServer};
32pub use command_registry::{
33 CommandEnvelopeData, CommandMetadata, CommandRegistry, CommandStatus, InMemoryCommandRegistry,
34};
35pub use dispatchers::{CommandDispatcher, NullCommandDispatcher};
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
43struct CommandParamsData {
44 pub params: BodySpec,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49struct CommandResponseData {
50 pub response: CommandResponse,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55struct LeaseData {
56 pub lease_id: String,
57 pub acquired_at: DateTime<Utc>,
58 pub expires_at: DateTime<Utc>,
59 pub owner: String,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
64struct DeadlineIndexData {
65 pub command_id: String,
66 pub deadline: DateTime<Utc>,
67}
68
69pub struct CommandServer {
78 kv: Arc<dyn Kv>,
79 storage: Arc<dyn Storage>,
80 command_dispatcher: Arc<dyn CommandDispatcher>,
81 command_registry: Arc<dyn CommandRegistry>,
82 inline_max_bytes: usize,
83 base_url: String,
84}
85
86impl CommandServer {
87 pub fn new(
89 kv: Arc<dyn Kv>,
90 storage: Arc<dyn Storage>,
91 command_dispatcher: Arc<dyn CommandDispatcher>,
92 command_registry: Arc<dyn CommandRegistry>,
93 base_url: String,
94 ) -> Self {
95 Self {
96 kv,
97 storage,
98 command_dispatcher,
99 command_registry,
100 inline_max_bytes: INLINE_MAX_BYTES,
101 base_url,
102 }
103 }
104
105 pub fn with_inline_limit(
107 kv: Arc<dyn Kv>,
108 storage: Arc<dyn Storage>,
109 command_dispatcher: Arc<dyn CommandDispatcher>,
110 command_registry: Arc<dyn CommandRegistry>,
111 base_url: String,
112 inline_max_bytes: usize,
113 ) -> Self {
114 Self {
115 kv,
116 storage,
117 command_dispatcher,
118 command_registry,
119 inline_max_bytes,
120 base_url,
121 }
122 }
123
124 pub async fn create_command(
136 &self,
137 request: CreateCommandRequest,
138 ) -> Result<CreateCommandResponse> {
139 self.validate_create_command(&request).await?;
141
142 if let Some(ref idem_key) = request.idempotency_key {
144 if let Some(existing_id) = self.check_idempotency(idem_key).await? {
145 let status = self
147 .command_registry
148 .get_command_status(&existing_id)
149 .await?;
150 if let Some(s) = status {
151 return Ok(CreateCommandResponse {
152 command_id: existing_id,
153 state: s.state,
154 storage_upload: None,
155 inline_allowed_up_to: self.inline_max_bytes as u64,
156 next: "poll".to_string(),
157 });
158 }
159 }
160 }
161
162 let (initial_state, request_size_bytes) = match &request.params {
164 BodySpec::Inline { inline_base64 } => {
165 let size = inline_base64.len() as u64;
166 (CommandState::Pending, Some(size))
167 }
168 BodySpec::Storage { size, .. } => {
169 if size.unwrap_or(0) > self.inline_max_bytes as u64 {
170 (CommandState::PendingUpload, *size)
171 } else {
172 (CommandState::Pending, *size)
173 }
174 }
175 };
176
177 let metadata = self
179 .command_registry
180 .create_command(
181 &request.deployment_id,
182 &request.command,
183 initial_state,
184 request.deadline,
185 request_size_bytes,
186 )
187 .await?;
188
189 let command_id = metadata.command_id;
190 let deployment_model = metadata.deployment_model;
191
192 if let Some(ref idem_key) = request.idempotency_key {
194 self.store_idempotency(idem_key, &command_id).await?;
195 }
196
197 self.store_params(&command_id, &request.params).await?;
199
200 let storage_upload = if initial_state == CommandState::PendingUpload {
202 Some(self.generate_params_upload(&command_id).await?)
203 } else {
204 None
205 };
206
207 let (final_state, next_action) = if initial_state == CommandState::Pending {
209 match deployment_model {
210 DeploymentModel::Push => {
211 self.dispatch_command_push(&command_id, &request.deployment_id)
213 .await?;
214 (CommandState::Dispatched, "poll")
215 }
216 DeploymentModel::Pull => {
217 self.create_pending_index(&request.deployment_id, &command_id)
219 .await?;
220 debug!(
221 "Command {} ready for pull (deployment will poll)",
222 command_id
223 );
224 (CommandState::Pending, "poll")
225 }
226 }
227 } else {
228 (initial_state, "upload")
230 };
231
232 if let Some(deadline) = request.deadline {
234 self.create_deadline_index(&command_id, deadline).await?;
235 }
236
237 Ok(CreateCommandResponse {
238 command_id,
239 state: final_state,
240 storage_upload,
241 inline_allowed_up_to: self.inline_max_bytes as u64,
242 next: next_action.to_string(),
243 })
244 }
245
246 pub async fn upload_complete(
248 &self,
249 command_id: &str,
250 upload_request: UploadCompleteRequest,
251 ) -> Result<UploadCompleteResponse> {
252 let status = self
254 .command_registry
255 .get_command_status(command_id)
256 .await?
257 .ok_or_else(|| {
258 AlienError::new(ErrorData::CommandNotFound {
259 command_id: command_id.to_string(),
260 })
261 })?;
262
263 if status.state != CommandState::PendingUpload {
265 return Err(AlienError::new(ErrorData::InvalidStateTransition {
266 from: status.state.as_ref().to_string(),
267 to: CommandState::Pending.as_ref().to_string(),
268 }));
269 }
270
271 let storage_get_request = self.generate_storage_get_request(command_id).await?;
273 let params = BodySpec::Storage {
274 size: Some(upload_request.size),
275 storage_get_request: Some(storage_get_request),
276 storage_put_used: None,
277 };
278 self.store_params(command_id, ¶ms).await?;
279
280 self.command_registry
282 .update_command_state(command_id, CommandState::Pending, None, None, None, None)
283 .await?;
284
285 let metadata = self
287 .command_registry
288 .get_command_metadata(command_id)
289 .await?
290 .ok_or_else(|| {
291 AlienError::new(ErrorData::CommandNotFound {
292 command_id: command_id.to_string(),
293 })
294 })?;
295
296 let final_state = match metadata.deployment_model {
297 DeploymentModel::Push => {
298 self.dispatch_command_push(command_id, &status.deployment_id)
299 .await?;
300 CommandState::Dispatched
301 }
302 DeploymentModel::Pull => {
303 self.create_pending_index(&status.deployment_id, command_id)
304 .await?;
305 debug!(
306 "Command {} ready for pull after upload (deployment will poll)",
307 command_id
308 );
309 CommandState::Pending
310 }
311 };
312
313 Ok(UploadCompleteResponse {
314 command_id: command_id.to_string(),
315 state: final_state,
316 })
317 }
318
319 pub async fn get_command_status(&self, command_id: &str) -> Result<CommandStatusResponse> {
323 let status = self
325 .command_registry
326 .get_command_status(command_id)
327 .await?
328 .ok_or_else(|| {
329 AlienError::new(ErrorData::CommandNotFound {
330 command_id: command_id.to_string(),
331 })
332 })?;
333
334 if let Some(deadline) = status.deadline {
336 if Utc::now() > deadline && !status.state.is_terminal() {
337 self.command_registry
339 .update_command_state(
340 command_id,
341 CommandState::Expired,
342 None,
343 Some(Utc::now()),
344 None,
345 None,
346 )
347 .await?;
348
349 self.delete_pending_index(&status.deployment_id, command_id)
351 .await?;
352
353 return Ok(CommandStatusResponse {
355 command_id: command_id.to_string(),
356 state: CommandState::Expired,
357 attempt: status.attempt,
358 response: None,
359 });
360 }
361 }
362
363 let response = if status.state.is_terminal() {
365 self.get_response(command_id).await?
366 } else {
367 None
368 };
369
370 Ok(CommandStatusResponse {
371 command_id: command_id.to_string(),
372 state: status.state,
373 attempt: status.attempt,
374 response,
375 })
376 }
377
378 pub async fn submit_command_response(
382 &self,
383 command_id: &str,
384 mut response: CommandResponse,
385 ) -> Result<()> {
386 let status = self
388 .command_registry
389 .get_command_status(command_id)
390 .await?
391 .ok_or_else(|| {
392 AlienError::new(ErrorData::CommandNotFound {
393 command_id: command_id.to_string(),
394 })
395 })?;
396
397 if status.state.is_terminal() {
399 debug!(
400 "Ignoring duplicate response for terminal command {}",
401 command_id
402 );
403 return Ok(());
404 }
405
406 if status.state != CommandState::Dispatched {
408 return Err(AlienError::new(ErrorData::InvalidStateTransition {
409 from: status.state.as_ref().to_string(),
410 to: CommandState::Succeeded.as_ref().to_string(),
411 }));
412 }
413
414 if let CommandResponse::Success {
416 response: ref mut body,
417 } = response
418 {
419 if let BodySpec::Storage {
420 size,
421 storage_get_request,
422 storage_put_used,
423 } = body
424 {
425 if storage_get_request.is_none() && storage_put_used.unwrap_or(false) {
426 let get_request = self
427 .generate_response_storage_get_request(command_id)
428 .await?;
429 *body = BodySpec::Storage {
430 size: *size,
431 storage_get_request: Some(get_request),
432 storage_put_used: *storage_put_used,
433 };
434 }
435 }
436 }
437
438 self.store_response(command_id, &response).await?;
440
441 self.delete_lease(command_id).await?;
443
444 self.delete_pending_index(&status.deployment_id, command_id)
446 .await?;
447
448 let (new_state, error) = if response.is_success() {
450 (CommandState::Succeeded, None)
451 } else if let CommandResponse::Error { code, message, .. } = &response {
452 (
453 CommandState::Failed,
454 Some(serde_json::json!({ "code": code, "message": message })),
455 )
456 } else {
457 (CommandState::Failed, None)
458 };
459
460 let response_size = match &response {
461 CommandResponse::Success {
462 response: BodySpec::Inline { inline_base64 },
463 } => Some(inline_base64.len() as u64),
464 CommandResponse::Success {
465 response: BodySpec::Storage { size, .. },
466 } => *size,
467 _ => None,
468 };
469
470 self.command_registry
471 .update_command_state(
472 command_id,
473 new_state,
474 None, Some(Utc::now()),
476 response_size,
477 error,
478 )
479 .await?;
480
481 info!(
482 "Command {} completed with state {:?}",
483 command_id, new_state
484 );
485 Ok(())
486 }
487
488 pub async fn acquire_lease(
492 &self,
493 deployment_id: &str,
494 lease_request: &LeaseRequest,
495 ) -> Result<LeaseResponse> {
496 let mut leases = Vec::new();
497
498 let target_prefix = format!("target:{}:pending:", deployment_id);
500 let scan_result = self
501 .kv
502 .scan_prefix(&target_prefix, Some(lease_request.max_leases * 2), None)
503 .await
504 .into_alien_error()
505 .context(ErrorData::KvOperationFailed {
506 operation: "scan_prefix".to_string(),
507 key: target_prefix.clone(),
508 message: "Failed to scan for pending commands".to_string(),
509 })?;
510
511 for (index_key, _) in scan_result.items {
512 if leases.len() >= lease_request.max_leases {
513 break;
514 }
515
516 let command_id = self.extract_command_id_from_index_key(&index_key)?;
517
518 let lease_id = format!("lease_{}", Uuid::new_v4());
520 let lease_duration = Duration::from_secs(lease_request.lease_seconds);
521 let expires_at =
522 Utc::now() + chrono::Duration::seconds(lease_request.lease_seconds as i64);
523
524 let lease_data = LeaseData {
525 lease_id: lease_id.clone(),
526 acquired_at: Utc::now(),
527 expires_at,
528 owner: deployment_id.to_string(),
529 };
530
531 let lease_key = format!("cmd:{}:lease", command_id);
532 let lease_value = serde_json::to_vec(&lease_data).into_alien_error().context(
533 ErrorData::SerializationFailed {
534 message: "Failed to serialize lease data".to_string(),
535 data_type: Some("LeaseData".to_string()),
536 },
537 )?;
538
539 let options = Some(PutOptions {
540 ttl: Some(lease_duration),
541 if_not_exists: true,
542 });
543
544 let success = self
545 .kv
546 .put(&lease_key, lease_value, options)
547 .await
548 .context(ErrorData::KvOperationFailed {
549 operation: "put".to_string(),
550 key: lease_key.clone(),
551 message: "Failed to create lease".to_string(),
552 })?;
553
554 if !success {
555 continue;
557 }
558
559 let metadata = match self
561 .command_registry
562 .get_command_metadata(&command_id)
563 .await?
564 {
565 Some(m) => m,
566 None => {
567 self.delete_lease(&command_id).await?;
569 let _ = self.kv.delete(&index_key).await;
570 continue;
571 }
572 };
573
574 if metadata.state.is_terminal() {
576 self.delete_lease(&command_id).await?;
578 let _ = self.kv.delete(&index_key).await;
579 continue;
580 }
581
582 if let Some(deadline) = metadata.deadline {
584 if Utc::now() > deadline {
585 self.command_registry
587 .update_command_state(
588 &command_id,
589 CommandState::Expired,
590 None,
591 Some(Utc::now()),
592 None,
593 None,
594 )
595 .await?;
596 self.delete_lease(&command_id).await?;
597 let _ = self.kv.delete(&index_key).await;
598 continue;
599 }
600 }
601
602 let params = match self.get_params(&command_id).await? {
604 Some(p) => p,
605 None => {
606 self.delete_lease(&command_id).await?;
608 continue;
609 }
610 };
611
612 self.command_registry
614 .update_command_state(
615 &command_id,
616 CommandState::Dispatched,
617 Some(Utc::now()),
618 None,
619 None,
620 None,
621 )
622 .await?;
623
624 let envelope = self.build_envelope(&command_id, &metadata, params).await?;
626
627 leases.push(LeaseInfo {
628 lease_id,
629 lease_expires_at: expires_at,
630 command_id: command_id.clone(),
631 attempt: metadata.attempt,
632 envelope,
633 });
634 }
635
636 Ok(LeaseResponse { leases })
637 }
638
639 pub async fn release_lease(&self, command_id: &str, lease_id: &str) -> Result<()> {
643 let lease_key = format!("cmd:{}:lease", command_id);
644
645 if let Ok(Some(lease_data)) = self.kv.get(&lease_key).await {
647 let lease: LeaseData = serde_json::from_slice(&lease_data)
648 .into_alien_error()
649 .context(ErrorData::SerializationFailed {
650 message: "Failed to deserialize lease data".to_string(),
651 data_type: Some("LeaseData".to_string()),
652 })?;
653
654 if lease.lease_id != lease_id {
655 return Err(AlienError::new(ErrorData::LeaseNotFound {
656 lease_id: lease_id.to_string(),
657 }));
658 }
659
660 self.delete_lease(command_id).await?;
662
663 self.command_registry.increment_attempt(command_id).await?;
665
666 self.command_registry
668 .update_command_state(command_id, CommandState::Pending, None, None, None, None)
669 .await?;
670
671 debug!("Lease {} released for command {}", lease_id, command_id);
673 }
674
675 Ok(())
676 }
677
678 pub async fn release_lease_by_id(&self, lease_id: &str) -> Result<()> {
680 let lease_prefix = "cmd:";
682 let scan_result = self
683 .kv
684 .scan_prefix(lease_prefix, None, None)
685 .await
686 .into_alien_error()
687 .context(ErrorData::KvOperationFailed {
688 operation: "scan_prefix".to_string(),
689 key: lease_prefix.to_string(),
690 message: "Failed to scan for lease keys".to_string(),
691 })?;
692
693 for (key, value) in scan_result.items {
694 if key.ends_with(":lease") {
695 if let Ok(lease) = serde_json::from_slice::<LeaseData>(&value) {
696 if lease.lease_id == lease_id {
697 let command_id = key
698 .strip_prefix("cmd:")
699 .and_then(|s| s.strip_suffix(":lease"))
700 .ok_or_else(|| {
701 AlienError::new(ErrorData::Other {
702 message: format!("Invalid lease key format: {}", key),
703 })
704 })?;
705
706 return self.release_lease(command_id, lease_id).await;
707 }
708 }
709 }
710 }
711
712 Err(AlienError::new(ErrorData::LeaseNotFound {
713 lease_id: lease_id.to_string(),
714 }))
715 }
716
717 async fn validate_create_command(&self, request: &CreateCommandRequest) -> Result<()> {
722 if request.command.is_empty() {
723 return Err(AlienError::new(ErrorData::InvalidCommand {
724 message: "Command name cannot be empty".to_string(),
725 }));
726 }
727
728 if request.deployment_id.is_empty() {
729 return Err(AlienError::new(ErrorData::InvalidCommand {
730 message: "Deployment ID cannot be empty".to_string(),
731 }));
732 }
733
734 if let Some(deadline) = request.deadline {
735 if deadline <= Utc::now() {
736 return Err(AlienError::new(ErrorData::InvalidCommand {
737 message: "Deadline must be in the future".to_string(),
738 }));
739 }
740 }
741
742 Ok(())
743 }
744
745 async fn check_idempotency(&self, idem_key: &str) -> Result<Option<String>> {
748 let key = format!("idem:{}", idem_key);
749 if let Some(data) = self
750 .kv
751 .get(&key)
752 .await
753 .context(ErrorData::KvOperationFailed {
754 operation: "get".to_string(),
755 key: key.clone(),
756 message: "Failed to check idempotency".to_string(),
757 })?
758 {
759 let command_id = String::from_utf8(data).into_alien_error().context(
760 ErrorData::SerializationFailed {
761 message: "Invalid idempotency data".to_string(),
762 data_type: Some("String".to_string()),
763 },
764 )?;
765 return Ok(Some(command_id));
766 }
767 Ok(None)
768 }
769
770 async fn store_idempotency(&self, idem_key: &str, command_id: &str) -> Result<()> {
771 let key = format!("idem:{}", idem_key);
772 let ttl = Duration::from_secs(24 * 60 * 60); self.kv
774 .put(
775 &key,
776 command_id.as_bytes().to_vec(),
777 Some(PutOptions {
778 ttl: Some(ttl),
779 if_not_exists: true,
780 }),
781 )
782 .await
783 .context(ErrorData::KvOperationFailed {
784 operation: "put".to_string(),
785 key: key.clone(),
786 message: "Failed to store idempotency".to_string(),
787 })?;
788 Ok(())
789 }
790
791 pub async fn store_params(&self, command_id: &str, params: &BodySpec) -> Result<()> {
794 let key = format!("cmd:{}:params", command_id);
795 let data = CommandParamsData {
796 params: params.clone(),
797 };
798 let value = serde_json::to_vec(&data).into_alien_error().context(
799 ErrorData::SerializationFailed {
800 message: "Failed to serialize params".to_string(),
801 data_type: Some("CommandParamsData".to_string()),
802 },
803 )?;
804 self.kv
805 .put(&key, value, None)
806 .await
807 .context(ErrorData::KvOperationFailed {
808 operation: "put".to_string(),
809 key: key.clone(),
810 message: "Failed to store params".to_string(),
811 })?;
812 Ok(())
813 }
814
815 pub async fn get_params(&self, command_id: &str) -> Result<Option<BodySpec>> {
816 let key = format!("cmd:{}:params", command_id);
817 if let Some(value) = self
818 .kv
819 .get(&key)
820 .await
821 .context(ErrorData::KvOperationFailed {
822 operation: "get".to_string(),
823 key: key.clone(),
824 message: "Failed to get params".to_string(),
825 })?
826 {
827 let data: CommandParamsData = serde_json::from_slice(&value)
828 .into_alien_error()
829 .context(ErrorData::SerializationFailed {
830 message: "Failed to deserialize params".to_string(),
831 data_type: Some("CommandParamsData".to_string()),
832 })?;
833 return Ok(Some(data.params));
834 }
835 Ok(None)
836 }
837
838 pub async fn store_response(&self, command_id: &str, response: &CommandResponse) -> Result<()> {
841 let key = format!("cmd:{}:response", command_id);
842 let data = CommandResponseData {
843 response: response.clone(),
844 };
845 let value = serde_json::to_vec(&data).into_alien_error().context(
846 ErrorData::SerializationFailed {
847 message: "Failed to serialize response".to_string(),
848 data_type: Some("CommandResponseData".to_string()),
849 },
850 )?;
851 self.kv
852 .put(&key, value, None)
853 .await
854 .context(ErrorData::KvOperationFailed {
855 operation: "put".to_string(),
856 key: key.clone(),
857 message: "Failed to store response".to_string(),
858 })?;
859 Ok(())
860 }
861
862 pub async fn get_response(&self, command_id: &str) -> Result<Option<CommandResponse>> {
863 let key = format!("cmd:{}:response", command_id);
864 if let Some(value) = self
865 .kv
866 .get(&key)
867 .await
868 .context(ErrorData::KvOperationFailed {
869 operation: "get".to_string(),
870 key: key.clone(),
871 message: "Failed to get response".to_string(),
872 })?
873 {
874 let data: CommandResponseData = serde_json::from_slice(&value)
875 .into_alien_error()
876 .context(ErrorData::SerializationFailed {
877 message: "Failed to deserialize response".to_string(),
878 data_type: Some("CommandResponseData".to_string()),
879 })?;
880 return Ok(Some(data.response));
881 }
882 Ok(None)
883 }
884
885 async fn create_pending_index(&self, deployment_id: &str, command_id: &str) -> Result<()> {
888 let timestamp = Utc::now().timestamp_nanos_opt().unwrap_or(0);
889 let key = format!(
890 "target:{}:pending:{}:{}",
891 deployment_id, timestamp, command_id
892 );
893
894 self.kv
896 .put(&key, vec![], None)
897 .await
898 .context(ErrorData::KvOperationFailed {
899 operation: "put".to_string(),
900 key: key.clone(),
901 message: "Failed to create pending index".to_string(),
902 })?;
903 Ok(())
904 }
905
906 async fn delete_pending_index(&self, deployment_id: &str, command_id: &str) -> Result<()> {
907 let prefix = format!("target:{}:pending:", deployment_id);
909 let scan_result = self
910 .kv
911 .scan_prefix(&prefix, Some(100), None)
912 .await
913 .into_alien_error()
914 .context(ErrorData::KvOperationFailed {
915 operation: "scan_prefix".to_string(),
916 key: prefix.clone(),
917 message: "Failed to scan pending index".to_string(),
918 })?;
919
920 for (key, _) in scan_result.items {
921 if key.ends_with(&format!(":{}", command_id)) {
922 let _ = self.kv.delete(&key).await;
923 break;
924 }
925 }
926 Ok(())
927 }
928
929 async fn delete_lease(&self, command_id: &str) -> Result<()> {
932 let key = format!("cmd:{}:lease", command_id);
933 let _ = self.kv.delete(&key).await;
934 Ok(())
935 }
936
937 async fn create_deadline_index(&self, command_id: &str, deadline: DateTime<Utc>) -> Result<()> {
940 let key = format!(
941 "deadline:{}:{}",
942 deadline.timestamp_nanos_opt().unwrap_or(0),
943 command_id
944 );
945
946 let data = DeadlineIndexData {
947 command_id: command_id.to_string(),
948 deadline,
949 };
950 let value = serde_json::to_vec(&data).into_alien_error().context(
951 ErrorData::SerializationFailed {
952 message: "Failed to serialize deadline index".to_string(),
953 data_type: Some("DeadlineIndexData".to_string()),
954 },
955 )?;
956
957 let ttl = deadline.signed_duration_since(Utc::now());
958 let ttl_duration = if ttl.num_seconds() > 0 {
959 Some(Duration::from_secs(ttl.num_seconds() as u64))
960 } else {
961 None
962 };
963
964 let options = ttl_duration.map(|ttl| PutOptions {
965 ttl: Some(ttl),
966 if_not_exists: false,
967 });
968
969 self.kv
970 .put(&key, value, options)
971 .await
972 .context(ErrorData::KvOperationFailed {
973 operation: "put".to_string(),
974 key: key.clone(),
975 message: "Failed to create deadline index".to_string(),
976 })?;
977 Ok(())
978 }
979
980 async fn dispatch_command_push(&self, command_id: &str, deployment_id: &str) -> Result<()> {
983 let metadata = self
985 .command_registry
986 .get_command_metadata(command_id)
987 .await?
988 .ok_or_else(|| {
989 AlienError::new(ErrorData::CommandNotFound {
990 command_id: command_id.to_string(),
991 })
992 })?;
993
994 let params = self.get_params(command_id).await?.ok_or_else(|| {
996 AlienError::new(ErrorData::CommandNotFound {
997 command_id: command_id.to_string(),
998 })
999 })?;
1000
1001 let envelope = self.build_envelope(command_id, &metadata, params).await?;
1003
1004 self.command_dispatcher
1006 .dispatch(&envelope)
1007 .await
1008 .map_err(|e| {
1009 e.context(ErrorData::TransportDispatchFailed {
1010 message: "Failed to dispatch command".to_string(),
1011 transport_type: None,
1012 target: Some(deployment_id.to_string()),
1013 })
1014 })?;
1015
1016 self.command_registry
1018 .update_command_state(
1019 command_id,
1020 CommandState::Dispatched,
1021 Some(Utc::now()),
1022 None,
1023 None,
1024 None,
1025 )
1026 .await?;
1027
1028 info!("Command {} dispatched via push", command_id);
1029 Ok(())
1030 }
1031
1032 async fn build_envelope(
1033 &self,
1034 command_id: &str,
1035 metadata: &CommandEnvelopeData,
1036 mut params: BodySpec,
1037 ) -> Result<Envelope> {
1038 let response_handling = self.create_response_handling(command_id).await?;
1039
1040 if let BodySpec::Storage {
1042 size,
1043 storage_get_request,
1044 storage_put_used,
1045 } = ¶ms
1046 {
1047 if storage_get_request.is_none() {
1048 let get_request = self.generate_storage_get_request(command_id).await?;
1049 params = BodySpec::Storage {
1050 size: *size,
1051 storage_get_request: Some(get_request),
1052 storage_put_used: *storage_put_used,
1053 };
1054 }
1055 }
1056
1057 Ok(Envelope::new(
1058 metadata.deployment_id.clone(),
1059 command_id.to_string(),
1060 metadata.attempt,
1061 metadata.deadline,
1062 metadata.command.clone(),
1063 params,
1064 response_handling,
1065 ))
1066 }
1067
1068 async fn create_response_handling(&self, command_id: &str) -> Result<ResponseHandling> {
1069 let upload_path = StoragePath::from(format!("arc/commands/{}/response", command_id));
1070 let expires_in = Duration::from_secs(3600);
1071 let presigned = self
1072 .storage
1073 .presigned_put(&upload_path, expires_in)
1074 .await
1075 .context(ErrorData::StorageOperationFailed {
1076 message: "Failed to create response upload URL".to_string(),
1077 operation: Some("presigned_put".to_string()),
1078 path: Some(upload_path.to_string()),
1079 })?;
1080
1081 Ok(ResponseHandling {
1082 max_inline_bytes: self.inline_max_bytes as u64,
1083 submit_response_url: format!(
1084 "{}/commands/{}/response",
1085 self.base_url.trim_end_matches('/'),
1086 command_id
1087 ),
1088 storage_upload_request: presigned,
1089 })
1090 }
1091
1092 async fn generate_params_upload(&self, command_id: &str) -> Result<StorageUpload> {
1093 let upload_path = StoragePath::from(format!("arc/commands/{}/params", command_id));
1094 let expires_in = Duration::from_secs(3600);
1095 let presigned = self
1096 .storage
1097 .presigned_put(&upload_path, expires_in)
1098 .await
1099 .into_alien_error()
1100 .context(ErrorData::StorageOperationFailed {
1101 message: "Failed to create presigned URL".to_string(),
1102 operation: Some("presigned_put".to_string()),
1103 path: Some(upload_path.to_string()),
1104 })?;
1105
1106 Ok(StorageUpload {
1107 put_request: presigned.clone(),
1108 expires_at: presigned.expiration,
1109 })
1110 }
1111
1112 async fn generate_storage_get_request(&self, command_id: &str) -> Result<PresignedRequest> {
1113 let path = StoragePath::from(format!("arc/commands/{}/params", command_id));
1114 let expires_in = Duration::from_secs(3600);
1115 self.storage.presigned_get(&path, expires_in).await.context(
1116 ErrorData::StorageOperationFailed {
1117 message: "Failed to create storage get request".to_string(),
1118 operation: Some("presigned_get".to_string()),
1119 path: Some(path.to_string()),
1120 },
1121 )
1122 }
1123
1124 async fn generate_response_storage_get_request(
1125 &self,
1126 command_id: &str,
1127 ) -> Result<PresignedRequest> {
1128 let path = StoragePath::from(format!("arc/commands/{}/response", command_id));
1129 let expires_in = Duration::from_secs(3600);
1130 self.storage.presigned_get(&path, expires_in).await.context(
1131 ErrorData::StorageOperationFailed {
1132 message: "Failed to create response storage get request".to_string(),
1133 operation: Some("presigned_get".to_string()),
1134 path: Some(path.to_string()),
1135 },
1136 )
1137 }
1138
1139 fn extract_command_id_from_index_key(&self, index_key: &str) -> Result<String> {
1140 index_key
1141 .split(':')
1142 .last()
1143 .ok_or_else(|| {
1144 AlienError::new(ErrorData::Other {
1145 message: format!("Invalid index key format: {}", index_key),
1146 })
1147 })
1148 .map(|s| s.to_string())
1149 }
1150}