Skip to main content

alien_commands/server/
mod.rs

1//! Command Server Implementation
2//!
3//! The Command server implements the command lifecycle:
4//! - CommandRegistry is the SOURCE OF TRUTH for all metadata (state, timestamps, etc.)
5//! - KV stores ONLY operational data (params/response blobs, indices, leases)
6
7use std::sync::Arc;
8use std::time::Duration;
9
10use alien_bindings::presigned::PresignedRequest;
11use alien_bindings::traits::{Kv, PutOptions, Storage};
12use alien_core::DeploymentModel;
13use alien_error::{AlienError, Context, ContextError, IntoAlienError};
14use chrono::{DateTime, Utc};
15use serde::{Deserialize, Serialize};
16use tracing::{debug, info};
17use uuid::Uuid;
18
19#[cfg(feature = "server")]
20use object_store::path::Path as StoragePath;
21
22use crate::error::{ErrorData, Result};
23use crate::types::*;
24use crate::INLINE_MAX_BYTES;
25
26pub mod axum_handlers;
27pub mod command_registry;
28pub mod dispatchers;
29pub mod storage;
30
31pub use axum_handlers::{create_axum_router, HasCommandServer};
32pub use command_registry::{
33    CommandEnvelopeData, CommandMetadata, CommandRegistry, CommandStatus, InMemoryCommandRegistry,
34};
35pub use dispatchers::{CommandDispatcher, NullCommandDispatcher};
36
37// =============================================================================
38// KV Data Structures (Operational Data Only)
39// =============================================================================
40
41/// Params stored in KV (just the blob)
42#[derive(Debug, Clone, Serialize, Deserialize)]
43struct CommandParamsData {
44    pub params: BodySpec,
45}
46
47/// Response stored in KV (just the blob)
48#[derive(Debug, Clone, Serialize, Deserialize)]
49struct CommandResponseData {
50    pub response: CommandResponse,
51}
52
53/// Lease record with TTL
54#[derive(Debug, Clone, Serialize, Deserialize)]
55struct LeaseData {
56    pub lease_id: String,
57    pub acquired_at: DateTime<Utc>,
58    pub expires_at: DateTime<Utc>,
59    pub owner: String,
60}
61
62/// Deadline index data
63#[derive(Debug, Clone, Serialize, Deserialize)]
64struct DeadlineIndexData {
65    pub command_id: String,
66    pub deadline: DateTime<Utc>,
67}
68
69// =============================================================================
70// ARC Server
71// =============================================================================
72
73/// Core ARC server implementation.
74///
75/// Uses CommandRegistry as source of truth for metadata.
76/// Uses KV for operational data (params, responses, indices, leases).
77pub struct CommandServer {
78    kv: Arc<dyn Kv>,
79    storage: Arc<dyn Storage>,
80    command_dispatcher: Arc<dyn CommandDispatcher>,
81    command_registry: Arc<dyn CommandRegistry>,
82    inline_max_bytes: usize,
83    base_url: String,
84}
85
86impl CommandServer {
87    /// Create a new ARC server instance
88    pub fn new(
89        kv: Arc<dyn Kv>,
90        storage: Arc<dyn Storage>,
91        command_dispatcher: Arc<dyn CommandDispatcher>,
92        command_registry: Arc<dyn CommandRegistry>,
93        base_url: String,
94    ) -> Self {
95        Self {
96            kv,
97            storage,
98            command_dispatcher,
99            command_registry,
100            inline_max_bytes: INLINE_MAX_BYTES,
101            base_url,
102        }
103    }
104
105    /// Create a new ARC server with custom inline size limit
106    pub fn with_inline_limit(
107        kv: Arc<dyn Kv>,
108        storage: Arc<dyn Storage>,
109        command_dispatcher: Arc<dyn CommandDispatcher>,
110        command_registry: Arc<dyn CommandRegistry>,
111        base_url: String,
112        inline_max_bytes: usize,
113    ) -> Self {
114        Self {
115            kv,
116            storage,
117            command_dispatcher,
118            command_registry,
119            inline_max_bytes,
120            base_url,
121        }
122    }
123
124    // =========================================================================
125    // Public API Methods
126    // =========================================================================
127
128    /// Create a new command.
129    ///
130    /// Flow:
131    /// 1. Validate request
132    /// 2. Registry creates command metadata (source of truth)
133    /// 3. KV stores params blob
134    /// 4. KV creates pending index (for Pull) or dispatch (for Push)
135    pub async fn create_command(
136        &self,
137        request: CreateCommandRequest,
138    ) -> Result<CreateCommandResponse> {
139        // Validate the request
140        self.validate_create_command(&request).await?;
141
142        // Check idempotency if key provided
143        if let Some(ref idem_key) = request.idempotency_key {
144            if let Some(existing_id) = self.check_idempotency(idem_key).await? {
145                // Return existing command status
146                let status = self
147                    .command_registry
148                    .get_command_status(&existing_id)
149                    .await?;
150                if let Some(s) = status {
151                    return Ok(CreateCommandResponse {
152                        command_id: existing_id,
153                        state: s.state,
154                        storage_upload: None,
155                        inline_allowed_up_to: self.inline_max_bytes as u64,
156                        next: "poll".to_string(),
157                    });
158                }
159            }
160        }
161
162        // Determine initial state and request size
163        let (initial_state, request_size_bytes) = match &request.params {
164            BodySpec::Inline { inline_base64 } => {
165                let size = inline_base64.len() as u64;
166                (CommandState::Pending, Some(size))
167            }
168            BodySpec::Storage { size, .. } => {
169                if size.unwrap_or(0) > self.inline_max_bytes as u64 {
170                    (CommandState::PendingUpload, *size)
171                } else {
172                    (CommandState::Pending, *size)
173                }
174            }
175        };
176
177        // 1. Registry creates command metadata (SOURCE OF TRUTH)
178        let metadata = self
179            .command_registry
180            .create_command(
181                &request.deployment_id,
182                &request.command,
183                initial_state,
184                request.deadline,
185                request_size_bytes,
186            )
187            .await?;
188
189        let command_id = metadata.command_id;
190        let deployment_model = metadata.deployment_model;
191
192        // 2. Store idempotency mapping in KV
193        if let Some(ref idem_key) = request.idempotency_key {
194            self.store_idempotency(idem_key, &command_id).await?;
195        }
196
197        // 3. Store params in KV
198        self.store_params(&command_id, &request.params).await?;
199
200        // 4. Generate storage upload URL if needed
201        let storage_upload = if initial_state == CommandState::PendingUpload {
202            Some(self.generate_params_upload(&command_id).await?)
203        } else {
204            None
205        };
206
207        // 5. Handle dispatch based on state and deployment model
208        let (final_state, next_action) = if initial_state == CommandState::Pending {
209            match deployment_model {
210                DeploymentModel::Push => {
211                    // Push model: dispatch immediately
212                    self.dispatch_command_push(&command_id, &request.deployment_id)
213                        .await?;
214                    (CommandState::Dispatched, "poll")
215                }
216                DeploymentModel::Pull => {
217                    // Pull model: create pending index, deployment will poll
218                    self.create_pending_index(&request.deployment_id, &command_id)
219                        .await?;
220                    debug!(
221                        "Command {} ready for pull (deployment will poll)",
222                        command_id
223                    );
224                    (CommandState::Pending, "poll")
225                }
226            }
227        } else {
228            // PendingUpload - need upload first
229            (initial_state, "upload")
230        };
231
232        // 6. Create deadline index if deadline provided
233        if let Some(deadline) = request.deadline {
234            self.create_deadline_index(&command_id, deadline).await?;
235        }
236
237        Ok(CreateCommandResponse {
238            command_id,
239            state: final_state,
240            storage_upload,
241            inline_allowed_up_to: self.inline_max_bytes as u64,
242            next: next_action.to_string(),
243        })
244    }
245
246    /// Mark upload as complete and dispatch command.
247    pub async fn upload_complete(
248        &self,
249        command_id: &str,
250        upload_request: UploadCompleteRequest,
251    ) -> Result<UploadCompleteResponse> {
252        // 1. Get current status from registry (source of truth)
253        let status = self
254            .command_registry
255            .get_command_status(command_id)
256            .await?
257            .ok_or_else(|| {
258                AlienError::new(ErrorData::CommandNotFound {
259                    command_id: command_id.to_string(),
260                })
261            })?;
262
263        // 2. Validate current state
264        if status.state != CommandState::PendingUpload {
265            return Err(AlienError::new(ErrorData::InvalidStateTransition {
266                from: status.state.as_ref().to_string(),
267                to: CommandState::Pending.as_ref().to_string(),
268            }));
269        }
270
271        // 3. Update params in KV with storage reference
272        let storage_get_request = self.generate_storage_get_request(command_id).await?;
273        let params = BodySpec::Storage {
274            size: Some(upload_request.size),
275            storage_get_request: Some(storage_get_request),
276            storage_put_used: None,
277        };
278        self.store_params(command_id, &params).await?;
279
280        // 4. Update registry to Pending state
281        self.command_registry
282            .update_command_state(command_id, CommandState::Pending, None, None, None, None)
283            .await?;
284
285        // 5. Get deployment model from registry and handle dispatch
286        let metadata = self
287            .command_registry
288            .get_command_metadata(command_id)
289            .await?
290            .ok_or_else(|| {
291                AlienError::new(ErrorData::CommandNotFound {
292                    command_id: command_id.to_string(),
293                })
294            })?;
295
296        let final_state = match metadata.deployment_model {
297            DeploymentModel::Push => {
298                self.dispatch_command_push(command_id, &status.deployment_id)
299                    .await?;
300                CommandState::Dispatched
301            }
302            DeploymentModel::Pull => {
303                self.create_pending_index(&status.deployment_id, command_id)
304                    .await?;
305                debug!(
306                    "Command {} ready for pull after upload (deployment will poll)",
307                    command_id
308                );
309                CommandState::Pending
310            }
311        };
312
313        Ok(UploadCompleteResponse {
314            command_id: command_id.to_string(),
315            state: final_state,
316        })
317    }
318
319    /// Get command status.
320    ///
321    /// Queries registry for metadata (source of truth), KV for response blob.
322    pub async fn get_command_status(&self, command_id: &str) -> Result<CommandStatusResponse> {
323        // 1. Get status from registry (SOURCE OF TRUTH)
324        let status = self
325            .command_registry
326            .get_command_status(command_id)
327            .await?
328            .ok_or_else(|| {
329                AlienError::new(ErrorData::CommandNotFound {
330                    command_id: command_id.to_string(),
331                })
332            })?;
333
334        // 2. Check deadline expiry inline
335        if let Some(deadline) = status.deadline {
336            if Utc::now() > deadline && !status.state.is_terminal() {
337                // Expire the command
338                self.command_registry
339                    .update_command_state(
340                        command_id,
341                        CommandState::Expired,
342                        None,
343                        Some(Utc::now()),
344                        None,
345                        None,
346                    )
347                    .await?;
348
349                // Clean up pending index
350                self.delete_pending_index(&status.deployment_id, command_id)
351                    .await?;
352
353                // Return expired status directly (avoid recursion)
354                return Ok(CommandStatusResponse {
355                    command_id: command_id.to_string(),
356                    state: CommandState::Expired,
357                    attempt: status.attempt,
358                    response: None,
359                });
360            }
361        }
362
363        // 3. Get response blob from KV if terminal state
364        let response = if status.state.is_terminal() {
365            self.get_response(command_id).await?
366        } else {
367            None
368        };
369
370        Ok(CommandStatusResponse {
371            command_id: command_id.to_string(),
372            state: status.state,
373            attempt: status.attempt,
374            response,
375        })
376    }
377
378    /// Submit response from deployment.
379    ///
380    /// Stores response blob in KV, updates state in registry.
381    pub async fn submit_command_response(
382        &self,
383        command_id: &str,
384        mut response: CommandResponse,
385    ) -> Result<()> {
386        // 1. Get current status from registry
387        let status = self
388            .command_registry
389            .get_command_status(command_id)
390            .await?
391            .ok_or_else(|| {
392                AlienError::new(ErrorData::CommandNotFound {
393                    command_id: command_id.to_string(),
394                })
395            })?;
396
397        // 2. Handle duplicate responses gracefully
398        if status.state.is_terminal() {
399            debug!(
400                "Ignoring duplicate response for terminal command {}",
401                command_id
402            );
403            return Ok(());
404        }
405
406        // 3. Validate state transition
407        if status.state != CommandState::Dispatched {
408            return Err(AlienError::new(ErrorData::InvalidStateTransition {
409                from: status.state.as_ref().to_string(),
410                to: CommandState::Succeeded.as_ref().to_string(),
411            }));
412        }
413
414        // 4. If response was uploaded to storage, generate download URL
415        if let CommandResponse::Success {
416            response: ref mut body,
417        } = response
418        {
419            if let BodySpec::Storage {
420                size,
421                storage_get_request,
422                storage_put_used,
423            } = body
424            {
425                if storage_get_request.is_none() && storage_put_used.unwrap_or(false) {
426                    let get_request = self
427                        .generate_response_storage_get_request(command_id)
428                        .await?;
429                    *body = BodySpec::Storage {
430                        size: *size,
431                        storage_get_request: Some(get_request),
432                        storage_put_used: *storage_put_used,
433                    };
434                }
435            }
436        }
437
438        // 5. Store response blob in KV
439        self.store_response(command_id, &response).await?;
440
441        // 6. Clean up lease from KV
442        self.delete_lease(command_id).await?;
443
444        // 7. Clean up pending index from KV (terminal state)
445        self.delete_pending_index(&status.deployment_id, command_id)
446            .await?;
447
448        // 8. Update registry state (SOURCE OF TRUTH)
449        let (new_state, error) = if response.is_success() {
450            (CommandState::Succeeded, None)
451        } else if let CommandResponse::Error { code, message, .. } = &response {
452            (
453                CommandState::Failed,
454                Some(serde_json::json!({ "code": code, "message": message })),
455            )
456        } else {
457            (CommandState::Failed, None)
458        };
459
460        let response_size = match &response {
461            CommandResponse::Success {
462                response: BodySpec::Inline { inline_base64 },
463            } => Some(inline_base64.len() as u64),
464            CommandResponse::Success {
465                response: BodySpec::Storage { size, .. },
466            } => *size,
467            _ => None,
468        };
469
470        self.command_registry
471            .update_command_state(
472                command_id,
473                new_state,
474                None, // dispatched_at already set
475                Some(Utc::now()),
476                response_size,
477                error,
478            )
479            .await?;
480
481        info!(
482            "Command {} completed with state {:?}",
483            command_id, new_state
484        );
485        Ok(())
486    }
487
488    /// Acquire leases for polling deployments.
489    ///
490    /// Scans KV pending index, queries registry for metadata, creates leases in KV.
491    pub async fn acquire_lease(
492        &self,
493        deployment_id: &str,
494        lease_request: &LeaseRequest,
495    ) -> Result<LeaseResponse> {
496        let mut leases = Vec::new();
497
498        // 1. Scan KV pending index
499        let target_prefix = format!("target:{}:pending:", deployment_id);
500        let scan_result = self
501            .kv
502            .scan_prefix(&target_prefix, Some(lease_request.max_leases * 2), None)
503            .await
504            .into_alien_error()
505            .context(ErrorData::KvOperationFailed {
506                operation: "scan_prefix".to_string(),
507                key: target_prefix.clone(),
508                message: "Failed to scan for pending commands".to_string(),
509            })?;
510
511        for (index_key, _) in scan_result.items {
512            if leases.len() >= lease_request.max_leases {
513                break;
514            }
515
516            let command_id = self.extract_command_id_from_index_key(&index_key)?;
517
518            // 2. Try to acquire lease atomically in KV
519            let lease_id = format!("lease_{}", Uuid::new_v4());
520            let lease_duration = Duration::from_secs(lease_request.lease_seconds);
521            let expires_at =
522                Utc::now() + chrono::Duration::seconds(lease_request.lease_seconds as i64);
523
524            let lease_data = LeaseData {
525                lease_id: lease_id.clone(),
526                acquired_at: Utc::now(),
527                expires_at,
528                owner: deployment_id.to_string(),
529            };
530
531            let lease_key = format!("cmd:{}:lease", command_id);
532            let lease_value = serde_json::to_vec(&lease_data).into_alien_error().context(
533                ErrorData::SerializationFailed {
534                    message: "Failed to serialize lease data".to_string(),
535                    data_type: Some("LeaseData".to_string()),
536                },
537            )?;
538
539            let options = Some(PutOptions {
540                ttl: Some(lease_duration),
541                if_not_exists: true,
542            });
543
544            let success = self
545                .kv
546                .put(&lease_key, lease_value, options)
547                .await
548                .context(ErrorData::KvOperationFailed {
549                    operation: "put".to_string(),
550                    key: lease_key.clone(),
551                    message: "Failed to create lease".to_string(),
552                })?;
553
554            if !success {
555                // Lease already exists, skip
556                continue;
557            }
558
559            // 3. Get metadata from registry
560            let metadata = match self
561                .command_registry
562                .get_command_metadata(&command_id)
563                .await?
564            {
565                Some(m) => m,
566                None => {
567                    // Command doesn't exist in registry, clean up
568                    self.delete_lease(&command_id).await?;
569                    let _ = self.kv.delete(&index_key).await;
570                    continue;
571                }
572            };
573
574            // 4. Check if command is in terminal state (stale index)
575            if metadata.state.is_terminal() {
576                // Clean up stale data
577                self.delete_lease(&command_id).await?;
578                let _ = self.kv.delete(&index_key).await;
579                continue;
580            }
581
582            // 5. Check deadline expiry
583            if let Some(deadline) = metadata.deadline {
584                if Utc::now() > deadline {
585                    // Expire the command
586                    self.command_registry
587                        .update_command_state(
588                            &command_id,
589                            CommandState::Expired,
590                            None,
591                            Some(Utc::now()),
592                            None,
593                            None,
594                        )
595                        .await?;
596                    self.delete_lease(&command_id).await?;
597                    let _ = self.kv.delete(&index_key).await;
598                    continue;
599                }
600            }
601
602            // 6. Get params from KV
603            let params = match self.get_params(&command_id).await? {
604                Some(p) => p,
605                None => {
606                    // No params, something went wrong
607                    self.delete_lease(&command_id).await?;
608                    continue;
609                }
610            };
611
612            // 7. Update registry state to Dispatched
613            self.command_registry
614                .update_command_state(
615                    &command_id,
616                    CommandState::Dispatched,
617                    Some(Utc::now()),
618                    None,
619                    None,
620                    None,
621                )
622                .await?;
623
624            // 8. Build envelope
625            let envelope = self.build_envelope(&command_id, &metadata, params).await?;
626
627            leases.push(LeaseInfo {
628                lease_id,
629                lease_expires_at: expires_at,
630                command_id: command_id.clone(),
631                attempt: metadata.attempt,
632                envelope,
633            });
634        }
635
636        Ok(LeaseResponse { leases })
637    }
638
639    /// Release a lease manually.
640    ///
641    /// Increments attempt count in registry, returns command to Pending state.
642    pub async fn release_lease(&self, command_id: &str, lease_id: &str) -> Result<()> {
643        let lease_key = format!("cmd:{}:lease", command_id);
644
645        // 1. Verify lease ownership
646        if let Ok(Some(lease_data)) = self.kv.get(&lease_key).await {
647            let lease: LeaseData = serde_json::from_slice(&lease_data)
648                .into_alien_error()
649                .context(ErrorData::SerializationFailed {
650                    message: "Failed to deserialize lease data".to_string(),
651                    data_type: Some("LeaseData".to_string()),
652                })?;
653
654            if lease.lease_id != lease_id {
655                return Err(AlienError::new(ErrorData::LeaseNotFound {
656                    lease_id: lease_id.to_string(),
657                }));
658            }
659
660            // 2. Delete lease from KV
661            self.delete_lease(command_id).await?;
662
663            // 3. Increment attempt in registry
664            self.command_registry.increment_attempt(command_id).await?;
665
666            // 4. Update registry state back to Pending
667            self.command_registry
668                .update_command_state(command_id, CommandState::Pending, None, None, None, None)
669                .await?;
670
671            // Note: Pending index is NOT removed on lease, so command is still there
672            debug!("Lease {} released for command {}", lease_id, command_id);
673        }
674
675        Ok(())
676    }
677
678    /// Release a lease by lease_id only (for the API).
679    pub async fn release_lease_by_id(&self, lease_id: &str) -> Result<()> {
680        // Scan for leases to find the one with this lease_id
681        let lease_prefix = "cmd:";
682        let scan_result = self
683            .kv
684            .scan_prefix(lease_prefix, None, None)
685            .await
686            .into_alien_error()
687            .context(ErrorData::KvOperationFailed {
688                operation: "scan_prefix".to_string(),
689                key: lease_prefix.to_string(),
690                message: "Failed to scan for lease keys".to_string(),
691            })?;
692
693        for (key, value) in scan_result.items {
694            if key.ends_with(":lease") {
695                if let Ok(lease) = serde_json::from_slice::<LeaseData>(&value) {
696                    if lease.lease_id == lease_id {
697                        let command_id = key
698                            .strip_prefix("cmd:")
699                            .and_then(|s| s.strip_suffix(":lease"))
700                            .ok_or_else(|| {
701                                AlienError::new(ErrorData::Other {
702                                    message: format!("Invalid lease key format: {}", key),
703                                })
704                            })?;
705
706                        return self.release_lease(command_id, lease_id).await;
707                    }
708                }
709            }
710        }
711
712        Err(AlienError::new(ErrorData::LeaseNotFound {
713            lease_id: lease_id.to_string(),
714        }))
715    }
716
717    // =========================================================================
718    // Internal Helper Methods
719    // =========================================================================
720
721    async fn validate_create_command(&self, request: &CreateCommandRequest) -> Result<()> {
722        if request.command.is_empty() {
723            return Err(AlienError::new(ErrorData::InvalidCommand {
724                message: "Command name cannot be empty".to_string(),
725            }));
726        }
727
728        if request.deployment_id.is_empty() {
729            return Err(AlienError::new(ErrorData::InvalidCommand {
730                message: "Deployment ID cannot be empty".to_string(),
731            }));
732        }
733
734        if let Some(deadline) = request.deadline {
735            if deadline <= Utc::now() {
736                return Err(AlienError::new(ErrorData::InvalidCommand {
737                    message: "Deadline must be in the future".to_string(),
738                }));
739            }
740        }
741
742        Ok(())
743    }
744
745    // --- Idempotency ---
746
747    async fn check_idempotency(&self, idem_key: &str) -> Result<Option<String>> {
748        let key = format!("idem:{}", idem_key);
749        if let Some(data) = self
750            .kv
751            .get(&key)
752            .await
753            .context(ErrorData::KvOperationFailed {
754                operation: "get".to_string(),
755                key: key.clone(),
756                message: "Failed to check idempotency".to_string(),
757            })?
758        {
759            let command_id = String::from_utf8(data).into_alien_error().context(
760                ErrorData::SerializationFailed {
761                    message: "Invalid idempotency data".to_string(),
762                    data_type: Some("String".to_string()),
763                },
764            )?;
765            return Ok(Some(command_id));
766        }
767        Ok(None)
768    }
769
770    async fn store_idempotency(&self, idem_key: &str, command_id: &str) -> Result<()> {
771        let key = format!("idem:{}", idem_key);
772        let ttl = Duration::from_secs(24 * 60 * 60); // 24 hours
773        self.kv
774            .put(
775                &key,
776                command_id.as_bytes().to_vec(),
777                Some(PutOptions {
778                    ttl: Some(ttl),
779                    if_not_exists: true,
780                }),
781            )
782            .await
783            .context(ErrorData::KvOperationFailed {
784                operation: "put".to_string(),
785                key: key.clone(),
786                message: "Failed to store idempotency".to_string(),
787            })?;
788        Ok(())
789    }
790
791    // --- Params ---
792
793    pub async fn store_params(&self, command_id: &str, params: &BodySpec) -> Result<()> {
794        let key = format!("cmd:{}:params", command_id);
795        let data = CommandParamsData {
796            params: params.clone(),
797        };
798        let value = serde_json::to_vec(&data).into_alien_error().context(
799            ErrorData::SerializationFailed {
800                message: "Failed to serialize params".to_string(),
801                data_type: Some("CommandParamsData".to_string()),
802            },
803        )?;
804        self.kv
805            .put(&key, value, None)
806            .await
807            .context(ErrorData::KvOperationFailed {
808                operation: "put".to_string(),
809                key: key.clone(),
810                message: "Failed to store params".to_string(),
811            })?;
812        Ok(())
813    }
814
815    pub async fn get_params(&self, command_id: &str) -> Result<Option<BodySpec>> {
816        let key = format!("cmd:{}:params", command_id);
817        if let Some(value) = self
818            .kv
819            .get(&key)
820            .await
821            .context(ErrorData::KvOperationFailed {
822                operation: "get".to_string(),
823                key: key.clone(),
824                message: "Failed to get params".to_string(),
825            })?
826        {
827            let data: CommandParamsData = serde_json::from_slice(&value)
828                .into_alien_error()
829                .context(ErrorData::SerializationFailed {
830                    message: "Failed to deserialize params".to_string(),
831                    data_type: Some("CommandParamsData".to_string()),
832                })?;
833            return Ok(Some(data.params));
834        }
835        Ok(None)
836    }
837
838    // --- Response ---
839
840    pub async fn store_response(&self, command_id: &str, response: &CommandResponse) -> Result<()> {
841        let key = format!("cmd:{}:response", command_id);
842        let data = CommandResponseData {
843            response: response.clone(),
844        };
845        let value = serde_json::to_vec(&data).into_alien_error().context(
846            ErrorData::SerializationFailed {
847                message: "Failed to serialize response".to_string(),
848                data_type: Some("CommandResponseData".to_string()),
849            },
850        )?;
851        self.kv
852            .put(&key, value, None)
853            .await
854            .context(ErrorData::KvOperationFailed {
855                operation: "put".to_string(),
856                key: key.clone(),
857                message: "Failed to store response".to_string(),
858            })?;
859        Ok(())
860    }
861
862    pub async fn get_response(&self, command_id: &str) -> Result<Option<CommandResponse>> {
863        let key = format!("cmd:{}:response", command_id);
864        if let Some(value) = self
865            .kv
866            .get(&key)
867            .await
868            .context(ErrorData::KvOperationFailed {
869                operation: "get".to_string(),
870                key: key.clone(),
871                message: "Failed to get response".to_string(),
872            })?
873        {
874            let data: CommandResponseData = serde_json::from_slice(&value)
875                .into_alien_error()
876                .context(ErrorData::SerializationFailed {
877                    message: "Failed to deserialize response".to_string(),
878                    data_type: Some("CommandResponseData".to_string()),
879                })?;
880            return Ok(Some(data.response));
881        }
882        Ok(None)
883    }
884
885    // --- Pending Index ---
886
887    async fn create_pending_index(&self, deployment_id: &str, command_id: &str) -> Result<()> {
888        let timestamp = Utc::now().timestamp_nanos_opt().unwrap_or(0);
889        let key = format!(
890            "target:{}:pending:{}:{}",
891            deployment_id, timestamp, command_id
892        );
893
894        // Store empty value - just for ordering
895        self.kv
896            .put(&key, vec![], None)
897            .await
898            .context(ErrorData::KvOperationFailed {
899                operation: "put".to_string(),
900                key: key.clone(),
901                message: "Failed to create pending index".to_string(),
902            })?;
903        Ok(())
904    }
905
906    async fn delete_pending_index(&self, deployment_id: &str, command_id: &str) -> Result<()> {
907        // We need to scan to find the exact key since we don't know the timestamp
908        let prefix = format!("target:{}:pending:", deployment_id);
909        let scan_result = self
910            .kv
911            .scan_prefix(&prefix, Some(100), None)
912            .await
913            .into_alien_error()
914            .context(ErrorData::KvOperationFailed {
915                operation: "scan_prefix".to_string(),
916                key: prefix.clone(),
917                message: "Failed to scan pending index".to_string(),
918            })?;
919
920        for (key, _) in scan_result.items {
921            if key.ends_with(&format!(":{}", command_id)) {
922                let _ = self.kv.delete(&key).await;
923                break;
924            }
925        }
926        Ok(())
927    }
928
929    // --- Lease ---
930
931    async fn delete_lease(&self, command_id: &str) -> Result<()> {
932        let key = format!("cmd:{}:lease", command_id);
933        let _ = self.kv.delete(&key).await;
934        Ok(())
935    }
936
937    // --- Deadline Index ---
938
939    async fn create_deadline_index(&self, command_id: &str, deadline: DateTime<Utc>) -> Result<()> {
940        let key = format!(
941            "deadline:{}:{}",
942            deadline.timestamp_nanos_opt().unwrap_or(0),
943            command_id
944        );
945
946        let data = DeadlineIndexData {
947            command_id: command_id.to_string(),
948            deadline,
949        };
950        let value = serde_json::to_vec(&data).into_alien_error().context(
951            ErrorData::SerializationFailed {
952                message: "Failed to serialize deadline index".to_string(),
953                data_type: Some("DeadlineIndexData".to_string()),
954            },
955        )?;
956
957        let ttl = deadline.signed_duration_since(Utc::now());
958        let ttl_duration = if ttl.num_seconds() > 0 {
959            Some(Duration::from_secs(ttl.num_seconds() as u64))
960        } else {
961            None
962        };
963
964        let options = ttl_duration.map(|ttl| PutOptions {
965            ttl: Some(ttl),
966            if_not_exists: false,
967        });
968
969        self.kv
970            .put(&key, value, options)
971            .await
972            .context(ErrorData::KvOperationFailed {
973                operation: "put".to_string(),
974                key: key.clone(),
975                message: "Failed to create deadline index".to_string(),
976            })?;
977        Ok(())
978    }
979
980    // --- Dispatch ---
981
982    async fn dispatch_command_push(&self, command_id: &str, deployment_id: &str) -> Result<()> {
983        // Get metadata from registry
984        let metadata = self
985            .command_registry
986            .get_command_metadata(command_id)
987            .await?
988            .ok_or_else(|| {
989                AlienError::new(ErrorData::CommandNotFound {
990                    command_id: command_id.to_string(),
991                })
992            })?;
993
994        // Get params from KV
995        let params = self.get_params(command_id).await?.ok_or_else(|| {
996            AlienError::new(ErrorData::CommandNotFound {
997                command_id: command_id.to_string(),
998            })
999        })?;
1000
1001        // Build envelope
1002        let envelope = self.build_envelope(command_id, &metadata, params).await?;
1003
1004        // Dispatch via transport
1005        self.command_dispatcher
1006            .dispatch(&envelope)
1007            .await
1008            .map_err(|e| {
1009                e.context(ErrorData::TransportDispatchFailed {
1010                    message: "Failed to dispatch command".to_string(),
1011                    transport_type: None,
1012                    target: Some(deployment_id.to_string()),
1013                })
1014            })?;
1015
1016        // Update registry state
1017        self.command_registry
1018            .update_command_state(
1019                command_id,
1020                CommandState::Dispatched,
1021                Some(Utc::now()),
1022                None,
1023                None,
1024                None,
1025            )
1026            .await?;
1027
1028        info!("Command {} dispatched via push", command_id);
1029        Ok(())
1030    }
1031
1032    async fn build_envelope(
1033        &self,
1034        command_id: &str,
1035        metadata: &CommandEnvelopeData,
1036        mut params: BodySpec,
1037    ) -> Result<Envelope> {
1038        let response_handling = self.create_response_handling(command_id).await?;
1039
1040        // Ensure storage params have proper storage_get_request
1041        if let BodySpec::Storage {
1042            size,
1043            storage_get_request,
1044            storage_put_used,
1045        } = &params
1046        {
1047            if storage_get_request.is_none() {
1048                let get_request = self.generate_storage_get_request(command_id).await?;
1049                params = BodySpec::Storage {
1050                    size: *size,
1051                    storage_get_request: Some(get_request),
1052                    storage_put_used: *storage_put_used,
1053                };
1054            }
1055        }
1056
1057        Ok(Envelope::new(
1058            metadata.deployment_id.clone(),
1059            command_id.to_string(),
1060            metadata.attempt,
1061            metadata.deadline,
1062            metadata.command.clone(),
1063            params,
1064            response_handling,
1065        ))
1066    }
1067
1068    async fn create_response_handling(&self, command_id: &str) -> Result<ResponseHandling> {
1069        let upload_path = StoragePath::from(format!("arc/commands/{}/response", command_id));
1070        let expires_in = Duration::from_secs(3600);
1071        let presigned = self
1072            .storage
1073            .presigned_put(&upload_path, expires_in)
1074            .await
1075            .context(ErrorData::StorageOperationFailed {
1076                message: "Failed to create response upload URL".to_string(),
1077                operation: Some("presigned_put".to_string()),
1078                path: Some(upload_path.to_string()),
1079            })?;
1080
1081        Ok(ResponseHandling {
1082            max_inline_bytes: self.inline_max_bytes as u64,
1083            submit_response_url: format!(
1084                "{}/commands/{}/response",
1085                self.base_url.trim_end_matches('/'),
1086                command_id
1087            ),
1088            storage_upload_request: presigned,
1089        })
1090    }
1091
1092    async fn generate_params_upload(&self, command_id: &str) -> Result<StorageUpload> {
1093        let upload_path = StoragePath::from(format!("arc/commands/{}/params", command_id));
1094        let expires_in = Duration::from_secs(3600);
1095        let presigned = self
1096            .storage
1097            .presigned_put(&upload_path, expires_in)
1098            .await
1099            .into_alien_error()
1100            .context(ErrorData::StorageOperationFailed {
1101                message: "Failed to create presigned URL".to_string(),
1102                operation: Some("presigned_put".to_string()),
1103                path: Some(upload_path.to_string()),
1104            })?;
1105
1106        Ok(StorageUpload {
1107            put_request: presigned.clone(),
1108            expires_at: presigned.expiration,
1109        })
1110    }
1111
1112    async fn generate_storage_get_request(&self, command_id: &str) -> Result<PresignedRequest> {
1113        let path = StoragePath::from(format!("arc/commands/{}/params", command_id));
1114        let expires_in = Duration::from_secs(3600);
1115        self.storage.presigned_get(&path, expires_in).await.context(
1116            ErrorData::StorageOperationFailed {
1117                message: "Failed to create storage get request".to_string(),
1118                operation: Some("presigned_get".to_string()),
1119                path: Some(path.to_string()),
1120            },
1121        )
1122    }
1123
1124    async fn generate_response_storage_get_request(
1125        &self,
1126        command_id: &str,
1127    ) -> Result<PresignedRequest> {
1128        let path = StoragePath::from(format!("arc/commands/{}/response", command_id));
1129        let expires_in = Duration::from_secs(3600);
1130        self.storage.presigned_get(&path, expires_in).await.context(
1131            ErrorData::StorageOperationFailed {
1132                message: "Failed to create response storage get request".to_string(),
1133                operation: Some("presigned_get".to_string()),
1134                path: Some(path.to_string()),
1135            },
1136        )
1137    }
1138
1139    fn extract_command_id_from_index_key(&self, index_key: &str) -> Result<String> {
1140        index_key
1141            .split(':')
1142            .last()
1143            .ok_or_else(|| {
1144                AlienError::new(ErrorData::Other {
1145                    message: format!("Invalid index key format: {}", index_key),
1146                })
1147            })
1148            .map(|s| s.to_string())
1149    }
1150}