Skip to main content

alien_commands/server/
mod.rs

1//! Command Server Implementation
2//!
3//! The Command server implements the command lifecycle:
4//! - CommandRegistry is the SOURCE OF TRUTH for all metadata (state, timestamps, etc.)
5//! - KV stores ONLY operational data (params/response blobs, indices, leases)
6
7use std::sync::Arc;
8use std::time::Duration;
9
10use alien_bindings::presigned::PresignedRequest;
11use alien_bindings::traits::{Kv, PutOptions, Storage};
12use alien_core::DeploymentModel;
13use alien_error::{AlienError, Context, ContextError, IntoAlienError};
14use chrono::{DateTime, Utc};
15use hex;
16use hmac::{Hmac, Mac};
17use serde::{Deserialize, Serialize};
18use sha2::Sha256;
19use tracing::{debug, info};
20use uuid::Uuid;
21
22use base64::{engine::general_purpose, Engine as _};
23use bytes::Bytes;
24use object_store::path::Path as StoragePath;
25
26use crate::error::{ErrorData, Result};
27use crate::types::*;
28use crate::INLINE_MAX_BYTES;
29
30/// Max serialized KV value size. Conservative threshold below the hard 24KB
31/// boundary (Azure Table Storage) to account for JSON wrapping overhead.
32const KV_VALUE_THRESHOLD: usize = 20_000;
33
34pub mod axum_handlers;
35pub mod command_registry;
36pub mod dispatchers;
37pub mod storage;
38
39pub use axum_handlers::{
40    create_axum_router, CommandPayloadResponse, HasCommandServer, StorePayloadRequest,
41};
42pub use command_registry::{
43    CommandEnvelopeData, CommandMetadata, CommandRegistry, CommandStatus, InMemoryCommandRegistry,
44};
45pub use dispatchers::{CommandDispatcher, NullCommandDispatcher};
46
47// =============================================================================
48// KV Data Structures (Operational Data Only)
49// =============================================================================
50
51/// Params stored in KV (just the blob)
52#[derive(Debug, Clone, Serialize, Deserialize)]
53struct CommandParamsData {
54    pub params: BodySpec,
55}
56
57/// Response stored in KV (just the blob)
58#[derive(Debug, Clone, Serialize, Deserialize)]
59struct CommandResponseData {
60    pub response: CommandResponse,
61}
62
63/// Lease record with TTL
64#[derive(Debug, Clone, Serialize, Deserialize)]
65struct LeaseData {
66    pub lease_id: String,
67    pub acquired_at: DateTime<Utc>,
68    pub expires_at: DateTime<Utc>,
69    pub owner: String,
70}
71
72/// Deadline index data
73#[derive(Debug, Clone, Serialize, Deserialize)]
74struct DeadlineIndexData {
75    pub command_id: String,
76    pub deadline: DateTime<Utc>,
77}
78
79// =============================================================================
80// Command Server
81// =============================================================================
82
83/// Core command server implementation.
84///
85/// Uses CommandRegistry as source of truth for metadata.
86/// Uses KV for operational data (params, responses, indices, leases).
87pub struct CommandServer {
88    kv: Arc<dyn Kv>,
89    storage: Arc<dyn Storage>,
90    command_dispatcher: Arc<dyn CommandDispatcher>,
91    command_registry: Arc<dyn CommandRegistry>,
92    inline_max_bytes: usize,
93    base_url: String,
94    response_signing_key: Vec<u8>,
95}
96
97impl CommandServer {
98    /// Create a new command server instance
99    pub fn new(
100        kv: Arc<dyn Kv>,
101        storage: Arc<dyn Storage>,
102        command_dispatcher: Arc<dyn CommandDispatcher>,
103        command_registry: Arc<dyn CommandRegistry>,
104        base_url: String,
105        response_signing_key: Vec<u8>,
106    ) -> Self {
107        Self {
108            kv,
109            storage,
110            command_dispatcher,
111            command_registry,
112            inline_max_bytes: INLINE_MAX_BYTES,
113            base_url,
114            response_signing_key,
115        }
116    }
117
118    /// Create a new command server with custom inline size limit
119    pub fn with_inline_limit(
120        kv: Arc<dyn Kv>,
121        storage: Arc<dyn Storage>,
122        command_dispatcher: Arc<dyn CommandDispatcher>,
123        command_registry: Arc<dyn CommandRegistry>,
124        base_url: String,
125        inline_max_bytes: usize,
126        response_signing_key: Vec<u8>,
127    ) -> Self {
128        Self {
129            kv,
130            storage,
131            command_dispatcher,
132            command_registry,
133            inline_max_bytes,
134            base_url,
135            response_signing_key,
136        }
137    }
138
139    /// Maximum allowed response token lifetime (2 hours).
140    /// Tokens are issued with 1-hour expiry; this cap provides headroom for clock skew
141    /// while rejecting tokens with absurdly far-future expiration.
142    const MAX_RESPONSE_TOKEN_LIFETIME_SECS: i64 = 7200;
143
144    /// Sign a response URL for a specific command.
145    ///
146    /// Returns `(hmac_hex, expires_epoch)`. The HMAC is computed over
147    /// `"arc.v1:{command_id}:{expires}"` using the server's signing key.
148    fn sign_response_url(&self, command_id: &str) -> (String, i64) {
149        let expires = Utc::now().timestamp() + 3600; // 1 hour
150        let message = format!("commands.v1:{}:{}", command_id, expires);
151
152        type HmacSha256 = Hmac<Sha256>;
153        let mut mac =
154            HmacSha256::new_from_slice(&self.response_signing_key).expect("HMAC accepts any key");
155        mac.update(message.as_bytes());
156        let result = mac.finalize();
157        let token = hex::encode(result.into_bytes());
158
159        (token, expires)
160    }
161
162    /// Verify a response token for a specific command.
163    ///
164    /// Performs HMAC verification first (constant-time), then checks expiration,
165    /// to avoid leaking timing information about token validity windows.
166    pub fn verify_response_token(&self, command_id: &str, token: &str, expires: i64) -> bool {
167        // Validate token format: SHA-256 HMAC = 32 bytes = 64 hex chars.
168        if token.len() != 64 {
169            return false;
170        }
171
172        let message = format!("commands.v1:{}:{}", command_id, expires);
173
174        type HmacSha256 = Hmac<Sha256>;
175        let mut mac =
176            HmacSha256::new_from_slice(&self.response_signing_key).expect("HMAC accepts any key");
177        mac.update(message.as_bytes());
178
179        // Decode hex and verify HMAC (constant-time comparison).
180        let Ok(token_bytes) = hex::decode(token) else {
181            return false;
182        };
183        let hmac_valid = mac.verify_slice(&token_bytes).is_ok();
184
185        // Check expiration and max lifetime AFTER HMAC to avoid timing leaks.
186        let now = Utc::now().timestamp();
187        let not_expired = now <= expires;
188        let within_max_lifetime = expires <= now + Self::MAX_RESPONSE_TOKEN_LIFETIME_SECS;
189
190        hmac_valid && not_expired && within_max_lifetime
191    }
192
193    // =========================================================================
194    // Public API Methods
195    // =========================================================================
196
197    /// Create a new command.
198    ///
199    /// Flow:
200    /// 1. Validate request
201    /// 2. Registry creates command metadata (source of truth)
202    /// 3. KV stores params blob
203    /// 4. KV creates pending index (for Pull) or dispatch (for Push)
204    pub async fn create_command(
205        &self,
206        request: CreateCommandRequest,
207    ) -> Result<CreateCommandResponse> {
208        // Validate the request
209        self.validate_create_command(&request).await?;
210
211        // Check idempotency if key provided
212        if let Some(ref idem_key) = request.idempotency_key {
213            if let Some(existing_id) = self.check_idempotency(idem_key).await? {
214                // Return existing command status
215                let status = self
216                    .command_registry
217                    .get_command_status(&existing_id)
218                    .await?;
219                if let Some(s) = status {
220                    return Ok(CreateCommandResponse {
221                        command_id: existing_id,
222                        state: s.state,
223                        storage_upload: None,
224                        inline_allowed_up_to: self.inline_max_bytes as u64,
225                        next: "poll".to_string(),
226                    });
227                }
228            }
229        }
230
231        // Determine initial state and request size
232        let (initial_state, request_size_bytes) = match &request.params {
233            BodySpec::Inline { inline_base64 } => {
234                let size = inline_base64.len() as u64;
235                (CommandState::Pending, Some(size))
236            }
237            BodySpec::Storage { size, .. } => {
238                if size.unwrap_or(0) > self.inline_max_bytes as u64 {
239                    (CommandState::PendingUpload, *size)
240                } else {
241                    (CommandState::Pending, *size)
242                }
243            }
244        };
245
246        // 1. Registry creates command metadata (SOURCE OF TRUTH)
247        let metadata = self
248            .command_registry
249            .create_command(
250                &request.deployment_id,
251                &request.command,
252                initial_state,
253                request.deadline,
254                request_size_bytes,
255            )
256            .await?;
257
258        let command_id = metadata.command_id;
259        let deployment_model = metadata.deployment_model;
260
261        // 2. Store idempotency mapping in KV
262        if let Some(ref idem_key) = request.idempotency_key {
263            self.store_idempotency(idem_key, &command_id).await?;
264        }
265
266        // 3. Store params in KV
267        self.store_params(&command_id, &request.params).await?;
268
269        // 4. Generate storage upload URL if needed
270        let storage_upload = if initial_state == CommandState::PendingUpload {
271            Some(self.generate_params_upload(&command_id).await?)
272        } else {
273            None
274        };
275
276        // 5. Handle dispatch based on state and deployment model
277        let (final_state, next_action) = if initial_state == CommandState::Pending {
278            match deployment_model {
279                DeploymentModel::Push => {
280                    // Push model: dispatch immediately
281                    self.dispatch_command_push(&command_id, &request.deployment_id)
282                        .await?;
283                    (CommandState::Dispatched, "poll")
284                }
285                DeploymentModel::Pull => {
286                    // Pull model: create pending index, deployment will poll
287                    self.create_pending_index(&request.deployment_id, &command_id)
288                        .await?;
289                    debug!(
290                        "Command {} ready for pull (deployment will poll)",
291                        command_id
292                    );
293                    (CommandState::Pending, "poll")
294                }
295            }
296        } else {
297            // PendingUpload - need upload first
298            (initial_state, "upload")
299        };
300
301        // 6. Create deadline index if deadline provided
302        if let Some(deadline) = request.deadline {
303            self.create_deadline_index(&command_id, deadline).await?;
304        }
305
306        Ok(CreateCommandResponse {
307            command_id,
308            state: final_state,
309            storage_upload,
310            inline_allowed_up_to: self.inline_max_bytes as u64,
311            next: next_action.to_string(),
312        })
313    }
314
315    /// Mark upload as complete and dispatch command.
316    pub async fn upload_complete(
317        &self,
318        command_id: &str,
319        upload_request: UploadCompleteRequest,
320    ) -> Result<UploadCompleteResponse> {
321        // 1. Get current status from registry (source of truth)
322        let status = self
323            .command_registry
324            .get_command_status(command_id)
325            .await?
326            .ok_or_else(|| {
327                AlienError::new(ErrorData::CommandNotFound {
328                    command_id: command_id.to_string(),
329                })
330            })?;
331
332        // 2. Validate current state
333        if status.state != CommandState::PendingUpload {
334            return Err(AlienError::new(ErrorData::InvalidStateTransition {
335                from: status.state.as_ref().to_string(),
336                to: CommandState::Pending.as_ref().to_string(),
337            }));
338        }
339
340        // 3. Update params in KV with storage reference
341        let storage_get_request = self.generate_storage_get_request(command_id).await?;
342        let params = BodySpec::Storage {
343            size: Some(upload_request.size),
344            storage_get_request: Some(storage_get_request),
345            storage_put_used: None,
346        };
347        self.store_params(command_id, &params).await?;
348
349        // 4. Update registry to Pending state
350        self.command_registry
351            .update_command_state(command_id, CommandState::Pending, None, None, None, None)
352            .await?;
353
354        // 5. Get deployment model from registry and handle dispatch
355        let metadata = self
356            .command_registry
357            .get_command_metadata(command_id)
358            .await?
359            .ok_or_else(|| {
360                AlienError::new(ErrorData::CommandNotFound {
361                    command_id: command_id.to_string(),
362                })
363            })?;
364
365        let final_state = match metadata.deployment_model {
366            DeploymentModel::Push => {
367                self.dispatch_command_push(command_id, &status.deployment_id)
368                    .await?;
369                CommandState::Dispatched
370            }
371            DeploymentModel::Pull => {
372                self.create_pending_index(&status.deployment_id, command_id)
373                    .await?;
374                debug!(
375                    "Command {} ready for pull after upload (deployment will poll)",
376                    command_id
377                );
378                CommandState::Pending
379            }
380        };
381
382        Ok(UploadCompleteResponse {
383            command_id: command_id.to_string(),
384            state: final_state,
385        })
386    }
387
388    /// Get command status.
389    ///
390    /// Queries registry for metadata (source of truth), KV for response blob.
391    pub async fn get_command_status(&self, command_id: &str) -> Result<CommandStatusResponse> {
392        // 1. Get status from registry (SOURCE OF TRUTH)
393        let status = self
394            .command_registry
395            .get_command_status(command_id)
396            .await?
397            .ok_or_else(|| {
398                AlienError::new(ErrorData::CommandNotFound {
399                    command_id: command_id.to_string(),
400                })
401            })?;
402
403        // 2. Check deadline expiry inline
404        if let Some(deadline) = status.deadline {
405            if Utc::now() > deadline && !status.state.is_terminal() {
406                // Expire the command
407                self.command_registry
408                    .update_command_state(
409                        command_id,
410                        CommandState::Expired,
411                        None,
412                        Some(Utc::now()),
413                        None,
414                        None,
415                    )
416                    .await?;
417
418                // Clean up pending index
419                self.delete_pending_index(&status.deployment_id, command_id)
420                    .await?;
421
422                // Return expired status directly (avoid recursion)
423                return Ok(CommandStatusResponse {
424                    command_id: command_id.to_string(),
425                    state: CommandState::Expired,
426                    attempt: status.attempt,
427                    response: None,
428                });
429            }
430        }
431
432        // 3. Get response blob from KV if terminal state
433        let response = if status.state.is_terminal() {
434            self.get_response(command_id).await?
435        } else {
436            None
437        };
438
439        Ok(CommandStatusResponse {
440            command_id: command_id.to_string(),
441            state: status.state,
442            attempt: status.attempt,
443            response,
444        })
445    }
446
447    /// Submit response from deployment.
448    ///
449    /// Stores response blob in KV, updates state in registry.
450    pub async fn submit_command_response(
451        &self,
452        command_id: &str,
453        mut response: CommandResponse,
454    ) -> Result<()> {
455        // 1. Get current status from registry
456        let status = self
457            .command_registry
458            .get_command_status(command_id)
459            .await?
460            .ok_or_else(|| {
461                AlienError::new(ErrorData::CommandNotFound {
462                    command_id: command_id.to_string(),
463                })
464            })?;
465
466        // 2. Handle duplicate responses gracefully
467        if status.state.is_terminal() {
468            debug!(
469                "Ignoring duplicate response for terminal command {}",
470                command_id
471            );
472            return Ok(());
473        }
474
475        // 3. Validate state transition
476        if status.state != CommandState::Dispatched {
477            return Err(AlienError::new(ErrorData::InvalidStateTransition {
478                from: status.state.as_ref().to_string(),
479                to: CommandState::Succeeded.as_ref().to_string(),
480            }));
481        }
482
483        // 4. If response was uploaded to storage, generate download URL
484        if let CommandResponse::Success {
485            response: ref mut body,
486        } = response
487        {
488            if let BodySpec::Storage {
489                size,
490                storage_get_request,
491                storage_put_used,
492            } = body
493            {
494                if storage_get_request.is_none() && storage_put_used.unwrap_or(false) {
495                    let get_request = self
496                        .generate_response_storage_get_request(command_id)
497                        .await?;
498                    *body = BodySpec::Storage {
499                        size: *size,
500                        storage_get_request: Some(get_request),
501                        storage_put_used: *storage_put_used,
502                    };
503                }
504            }
505        }
506
507        // 5. Store response blob in KV
508        self.store_response(command_id, &response).await?;
509
510        // 6. Clean up lease from KV
511        self.delete_lease(command_id).await?;
512
513        // 7. Clean up pending index from KV (terminal state)
514        self.delete_pending_index(&status.deployment_id, command_id)
515            .await?;
516
517        // 8. Update registry state (SOURCE OF TRUTH)
518        let (new_state, error) = if response.is_success() {
519            (CommandState::Succeeded, None)
520        } else if let CommandResponse::Error { code, message, .. } = &response {
521            (
522                CommandState::Failed,
523                Some(serde_json::json!({ "code": code, "message": message })),
524            )
525        } else {
526            (CommandState::Failed, None)
527        };
528
529        let response_size = match &response {
530            CommandResponse::Success {
531                response: BodySpec::Inline { inline_base64 },
532            } => Some(inline_base64.len() as u64),
533            CommandResponse::Success {
534                response: BodySpec::Storage { size, .. },
535            } => *size,
536            _ => None,
537        };
538
539        self.command_registry
540            .update_command_state(
541                command_id,
542                new_state,
543                None, // dispatched_at already set
544                Some(Utc::now()),
545                response_size,
546                error,
547            )
548            .await?;
549
550        info!(
551            "Command {} completed with state {:?}",
552            command_id, new_state
553        );
554        Ok(())
555    }
556
557    /// Acquire leases for polling deployments.
558    ///
559    /// Scans KV pending index, queries registry for metadata, creates leases in KV.
560    pub async fn acquire_lease(
561        &self,
562        deployment_id: &str,
563        lease_request: &LeaseRequest,
564    ) -> Result<LeaseResponse> {
565        let mut leases = Vec::new();
566
567        // 1. Scan KV pending index
568        let target_prefix = format!("target:{}:pending:", deployment_id);
569        let scan_result = self
570            .kv
571            .scan_prefix(&target_prefix, Some(lease_request.max_leases * 2), None)
572            .await
573            .into_alien_error()
574            .context(ErrorData::KvOperationFailed {
575                operation: "scan_prefix".to_string(),
576                key: target_prefix.clone(),
577                message: "Failed to scan for pending commands".to_string(),
578            })?;
579
580        for (index_key, _) in scan_result.items {
581            if leases.len() >= lease_request.max_leases {
582                break;
583            }
584
585            let command_id = self.extract_command_id_from_index_key(&index_key)?;
586
587            // 2. Try to acquire lease atomically in KV
588            let lease_id = format!("lease_{}", Uuid::new_v4());
589            let lease_duration = Duration::from_secs(lease_request.lease_seconds);
590            let expires_at =
591                Utc::now() + chrono::Duration::seconds(lease_request.lease_seconds as i64);
592
593            let lease_data = LeaseData {
594                lease_id: lease_id.clone(),
595                acquired_at: Utc::now(),
596                expires_at,
597                owner: deployment_id.to_string(),
598            };
599
600            let lease_key = format!("cmd:{}:lease", command_id);
601            let lease_value = serde_json::to_vec(&lease_data).into_alien_error().context(
602                ErrorData::SerializationFailed {
603                    message: "Failed to serialize lease data".to_string(),
604                    data_type: Some("LeaseData".to_string()),
605                },
606            )?;
607
608            let options = Some(PutOptions {
609                ttl: Some(lease_duration),
610                if_not_exists: true,
611            });
612
613            let success = self
614                .kv
615                .put(&lease_key, lease_value, options)
616                .await
617                .context(ErrorData::KvOperationFailed {
618                    operation: "put".to_string(),
619                    key: lease_key.clone(),
620                    message: "Failed to create lease".to_string(),
621                })?;
622
623            if !success {
624                // Lease already exists, skip
625                continue;
626            }
627
628            // 3. Get metadata from registry
629            let metadata = match self
630                .command_registry
631                .get_command_metadata(&command_id)
632                .await?
633            {
634                Some(m) => m,
635                None => {
636                    // Command doesn't exist in registry, clean up
637                    self.delete_lease(&command_id).await?;
638                    let _ = self.kv.delete(&index_key).await;
639                    continue;
640                }
641            };
642
643            // 4. Check if command is in terminal state (stale index)
644            if metadata.state.is_terminal() {
645                // Clean up stale data
646                self.delete_lease(&command_id).await?;
647                let _ = self.kv.delete(&index_key).await;
648                continue;
649            }
650
651            // 5. Check deadline expiry
652            if let Some(deadline) = metadata.deadline {
653                if Utc::now() > deadline {
654                    // Expire the command
655                    self.command_registry
656                        .update_command_state(
657                            &command_id,
658                            CommandState::Expired,
659                            None,
660                            Some(Utc::now()),
661                            None,
662                            None,
663                        )
664                        .await?;
665                    self.delete_lease(&command_id).await?;
666                    let _ = self.kv.delete(&index_key).await;
667                    continue;
668                }
669            }
670
671            // 6. Get params from KV
672            let params = match self.get_params(&command_id).await? {
673                Some(p) => p,
674                None => {
675                    // No params, something went wrong
676                    self.delete_lease(&command_id).await?;
677                    continue;
678                }
679            };
680
681            // 7. Update registry state to Dispatched
682            self.command_registry
683                .update_command_state(
684                    &command_id,
685                    CommandState::Dispatched,
686                    Some(Utc::now()),
687                    None,
688                    None,
689                    None,
690                )
691                .await?;
692
693            // 8. Build envelope
694            let envelope = self.build_envelope(&command_id, &metadata, params).await?;
695
696            leases.push(LeaseInfo {
697                lease_id,
698                lease_expires_at: expires_at,
699                command_id: command_id.clone(),
700                attempt: metadata.attempt,
701                envelope,
702            });
703        }
704
705        Ok(LeaseResponse { leases })
706    }
707
708    /// Release a lease manually.
709    ///
710    /// Increments attempt count in registry, returns command to Pending state.
711    pub async fn release_lease(&self, command_id: &str, lease_id: &str) -> Result<()> {
712        let lease_key = format!("cmd:{}:lease", command_id);
713
714        // 1. Verify lease ownership
715        if let Ok(Some(lease_data)) = self.kv.get(&lease_key).await {
716            let lease: LeaseData = serde_json::from_slice(&lease_data)
717                .into_alien_error()
718                .context(ErrorData::SerializationFailed {
719                    message: "Failed to deserialize lease data".to_string(),
720                    data_type: Some("LeaseData".to_string()),
721                })?;
722
723            if lease.lease_id != lease_id {
724                return Err(AlienError::new(ErrorData::LeaseNotFound {
725                    lease_id: lease_id.to_string(),
726                }));
727            }
728
729            // 2. Delete lease from KV
730            self.delete_lease(command_id).await?;
731
732            // 3. Increment attempt in registry
733            self.command_registry.increment_attempt(command_id).await?;
734
735            // 4. Update registry state back to Pending
736            self.command_registry
737                .update_command_state(command_id, CommandState::Pending, None, None, None, None)
738                .await?;
739
740            // Note: Pending index is NOT removed on lease, so command is still there
741            debug!("Lease {} released for command {}", lease_id, command_id);
742        }
743
744        Ok(())
745    }
746
747    /// Get the deployment_id that owns a command.
748    ///
749    /// Used by the manager's auth layer to check whether the caller has access
750    /// to a specific command without fetching the full status.
751    pub async fn get_command_deployment_id(&self, command_id: &str) -> Result<Option<String>> {
752        let status = self.command_registry.get_command_status(command_id).await?;
753        Ok(status.map(|s| s.deployment_id))
754    }
755
756    /// Release a lease by lease_id only (for the API).
757    pub async fn release_lease_by_id(&self, lease_id: &str) -> Result<()> {
758        // Scan for leases to find the one with this lease_id
759        let lease_prefix = "cmd:";
760        let scan_result = self
761            .kv
762            .scan_prefix(lease_prefix, None, None)
763            .await
764            .into_alien_error()
765            .context(ErrorData::KvOperationFailed {
766                operation: "scan_prefix".to_string(),
767                key: lease_prefix.to_string(),
768                message: "Failed to scan for lease keys".to_string(),
769            })?;
770
771        for (key, value) in scan_result.items {
772            if key.ends_with(":lease") {
773                if let Ok(lease) = serde_json::from_slice::<LeaseData>(&value) {
774                    if lease.lease_id == lease_id {
775                        let command_id = key
776                            .strip_prefix("cmd:")
777                            .and_then(|s| s.strip_suffix(":lease"))
778                            .ok_or_else(|| {
779                                AlienError::new(ErrorData::Other {
780                                    message: format!("Invalid lease key format: {}", key),
781                                })
782                            })?;
783
784                        return self.release_lease(command_id, lease_id).await;
785                    }
786                }
787            }
788        }
789
790        Err(AlienError::new(ErrorData::LeaseNotFound {
791            lease_id: lease_id.to_string(),
792        }))
793    }
794
795    // =========================================================================
796    // Internal Helper Methods
797    // =========================================================================
798
799    async fn validate_create_command(&self, request: &CreateCommandRequest) -> Result<()> {
800        if request.command.is_empty() {
801            return Err(AlienError::new(ErrorData::InvalidCommand {
802                message: "Command name cannot be empty".to_string(),
803            }));
804        }
805
806        if request.deployment_id.is_empty() {
807            return Err(AlienError::new(ErrorData::InvalidCommand {
808                message: "Deployment ID cannot be empty".to_string(),
809            }));
810        }
811
812        if let Some(deadline) = request.deadline {
813            if deadline <= Utc::now() {
814                return Err(AlienError::new(ErrorData::InvalidCommand {
815                    message: "Deadline must be in the future".to_string(),
816                }));
817            }
818        }
819
820        Ok(())
821    }
822
823    // --- Idempotency ---
824
825    async fn check_idempotency(&self, idem_key: &str) -> Result<Option<String>> {
826        let key = format!("idem:{}", idem_key);
827        if let Some(data) = self
828            .kv
829            .get(&key)
830            .await
831            .context(ErrorData::KvOperationFailed {
832                operation: "get".to_string(),
833                key: key.clone(),
834                message: "Failed to check idempotency".to_string(),
835            })?
836        {
837            let command_id = String::from_utf8(data).into_alien_error().context(
838                ErrorData::SerializationFailed {
839                    message: "Invalid idempotency data".to_string(),
840                    data_type: Some("String".to_string()),
841                },
842            )?;
843            return Ok(Some(command_id));
844        }
845        Ok(None)
846    }
847
848    async fn store_idempotency(&self, idem_key: &str, command_id: &str) -> Result<()> {
849        let key = format!("idem:{}", idem_key);
850        let ttl = Duration::from_secs(24 * 60 * 60); // 24 hours
851        self.kv
852            .put(
853                &key,
854                command_id.as_bytes().to_vec(),
855                Some(PutOptions {
856                    ttl: Some(ttl),
857                    if_not_exists: true,
858                }),
859            )
860            .await
861            .context(ErrorData::KvOperationFailed {
862                operation: "put".to_string(),
863                key: key.clone(),
864                message: "Failed to store idempotency".to_string(),
865            })?;
866        Ok(())
867    }
868
869    // --- Params ---
870
871    pub async fn store_params(&self, command_id: &str, params: &BodySpec) -> Result<()> {
872        let key = format!("cmd:{}:params", command_id);
873
874        // Try serializing as-is first
875        let data = CommandParamsData {
876            params: params.clone(),
877        };
878        let value = serde_json::to_vec(&data).into_alien_error().context(
879            ErrorData::SerializationFailed {
880                message: "Failed to serialize params".to_string(),
881                data_type: Some("CommandParamsData".to_string()),
882            },
883        )?;
884
885        // If it fits in KV, store directly (fast path)
886        if value.len() <= KV_VALUE_THRESHOLD {
887            self.kv
888                .put(&key, value, None)
889                .await
890                .context(ErrorData::KvOperationFailed {
891                    operation: "put".to_string(),
892                    key: key.clone(),
893                    message: "Failed to store params".to_string(),
894                })?;
895            return Ok(());
896        }
897
898        // Auto-promote: inline data exceeds KV limit, store raw bytes in blob
899        if let BodySpec::Inline { inline_base64 } = params {
900            let raw_bytes = general_purpose::STANDARD
901                .decode(inline_base64)
902                .into_alien_error()
903                .context(ErrorData::SerializationFailed {
904                    message: "Failed to decode inline base64 params for auto-promotion".to_string(),
905                    data_type: Some("base64".to_string()),
906                })?;
907
908            let raw_len = raw_bytes.len() as u64;
909            let blob_path = StoragePath::from(format!("arc/commands/{}/params", command_id));
910
911            self.storage
912                .put(&blob_path, Bytes::from(raw_bytes).into())
913                .await
914                .into_alien_error()
915                .context(ErrorData::StorageOperationFailed {
916                    message: "Failed to auto-promote params to blob storage".to_string(),
917                    operation: Some("put".to_string()),
918                    path: Some(blob_path.to_string()),
919                })?;
920
921            debug!(
922                "Auto-promoted params for command {} to blob ({} bytes raw)",
923                command_id, raw_len
924            );
925
926            // Store tiny reference in KV instead
927            let promoted = CommandParamsData {
928                params: BodySpec::Storage {
929                    size: Some(raw_len),
930                    storage_get_request: None,
931                    storage_put_used: Some(true),
932                },
933            };
934            let promoted_value = serde_json::to_vec(&promoted).into_alien_error().context(
935                ErrorData::SerializationFailed {
936                    message: "Failed to serialize promoted params reference".to_string(),
937                    data_type: Some("CommandParamsData".to_string()),
938                },
939            )?;
940            self.kv.put(&key, promoted_value, None).await.context(
941                ErrorData::KvOperationFailed {
942                    operation: "put".to_string(),
943                    key: key.clone(),
944                    message: "Failed to store promoted params reference".to_string(),
945                },
946            )?;
947            return Ok(());
948        }
949
950        // Storage references are always tiny, store as-is
951        self.kv
952            .put(&key, value, None)
953            .await
954            .context(ErrorData::KvOperationFailed {
955                operation: "put".to_string(),
956                key: key.clone(),
957                message: "Failed to store params".to_string(),
958            })?;
959        Ok(())
960    }
961
962    pub async fn get_params(&self, command_id: &str) -> Result<Option<BodySpec>> {
963        let key = format!("cmd:{}:params", command_id);
964        if let Some(value) = self
965            .kv
966            .get(&key)
967            .await
968            .context(ErrorData::KvOperationFailed {
969                operation: "get".to_string(),
970                key: key.clone(),
971                message: "Failed to get params".to_string(),
972            })?
973        {
974            let data: CommandParamsData = serde_json::from_slice(&value)
975                .into_alien_error()
976                .context(ErrorData::SerializationFailed {
977                    message: "Failed to deserialize params".to_string(),
978                    data_type: Some("CommandParamsData".to_string()),
979                })?;
980            return Ok(Some(data.params));
981        }
982        Ok(None)
983    }
984
985    // --- Response ---
986
987    pub async fn store_response(&self, command_id: &str, response: &CommandResponse) -> Result<()> {
988        let key = format!("cmd:{}:response", command_id);
989        let data = CommandResponseData {
990            response: response.clone(),
991        };
992        let value = serde_json::to_vec(&data).into_alien_error().context(
993            ErrorData::SerializationFailed {
994                message: "Failed to serialize response".to_string(),
995                data_type: Some("CommandResponseData".to_string()),
996            },
997        )?;
998
999        // If it fits in KV, store directly (fast path)
1000        if value.len() <= KV_VALUE_THRESHOLD {
1001            self.kv
1002                .put(&key, value, None)
1003                .await
1004                .context(ErrorData::KvOperationFailed {
1005                    operation: "put".to_string(),
1006                    key: key.clone(),
1007                    message: "Failed to store response".to_string(),
1008                })?;
1009            return Ok(());
1010        }
1011
1012        // Auto-promote: inline response exceeds KV limit
1013        if let CommandResponse::Success {
1014            response: BodySpec::Inline { inline_base64 },
1015        } = response
1016        {
1017            let raw_bytes = general_purpose::STANDARD
1018                .decode(inline_base64)
1019                .into_alien_error()
1020                .context(ErrorData::SerializationFailed {
1021                    message: "Failed to decode inline base64 response for auto-promotion"
1022                        .to_string(),
1023                    data_type: Some("base64".to_string()),
1024                })?;
1025
1026            let raw_len = raw_bytes.len() as u64;
1027            let blob_path = StoragePath::from(format!("arc/commands/{}/response", command_id));
1028
1029            self.storage
1030                .put(&blob_path, Bytes::from(raw_bytes).into())
1031                .await
1032                .into_alien_error()
1033                .context(ErrorData::StorageOperationFailed {
1034                    message: "Failed to auto-promote response to blob storage".to_string(),
1035                    operation: Some("put".to_string()),
1036                    path: Some(blob_path.to_string()),
1037                })?;
1038
1039            // Generate presigned GET URL for the caller
1040            let get_request = self
1041                .generate_response_storage_get_request(command_id)
1042                .await?;
1043
1044            debug!(
1045                "Auto-promoted response for command {} to blob ({} bytes raw)",
1046                command_id, raw_len
1047            );
1048
1049            // Store tiny reference in KV
1050            let promoted = CommandResponseData {
1051                response: CommandResponse::Success {
1052                    response: BodySpec::Storage {
1053                        size: Some(raw_len),
1054                        storage_get_request: Some(get_request),
1055                        storage_put_used: Some(true),
1056                    },
1057                },
1058            };
1059            let promoted_value = serde_json::to_vec(&promoted).into_alien_error().context(
1060                ErrorData::SerializationFailed {
1061                    message: "Failed to serialize promoted response reference".to_string(),
1062                    data_type: Some("CommandResponseData".to_string()),
1063                },
1064            )?;
1065            self.kv.put(&key, promoted_value, None).await.context(
1066                ErrorData::KvOperationFailed {
1067                    operation: "put".to_string(),
1068                    key: key.clone(),
1069                    message: "Failed to store promoted response reference".to_string(),
1070                },
1071            )?;
1072            return Ok(());
1073        }
1074
1075        // Error responses or storage references are always small, store as-is
1076        self.kv
1077            .put(&key, value, None)
1078            .await
1079            .context(ErrorData::KvOperationFailed {
1080                operation: "put".to_string(),
1081                key: key.clone(),
1082                message: "Failed to store response".to_string(),
1083            })?;
1084        Ok(())
1085    }
1086
1087    pub async fn get_response(&self, command_id: &str) -> Result<Option<CommandResponse>> {
1088        let key = format!("cmd:{}:response", command_id);
1089        if let Some(value) = self
1090            .kv
1091            .get(&key)
1092            .await
1093            .context(ErrorData::KvOperationFailed {
1094                operation: "get".to_string(),
1095                key: key.clone(),
1096                message: "Failed to get response".to_string(),
1097            })?
1098        {
1099            let data: CommandResponseData = serde_json::from_slice(&value)
1100                .into_alien_error()
1101                .context(ErrorData::SerializationFailed {
1102                    message: "Failed to deserialize response".to_string(),
1103                    data_type: Some("CommandResponseData".to_string()),
1104                })?;
1105            return Ok(Some(data.response));
1106        }
1107        Ok(None)
1108    }
1109
1110    // --- Pending Index ---
1111
1112    async fn create_pending_index(&self, deployment_id: &str, command_id: &str) -> Result<()> {
1113        let timestamp = Utc::now().timestamp_nanos_opt().unwrap_or(0);
1114        let key = format!(
1115            "target:{}:pending:{}:{}",
1116            deployment_id, timestamp, command_id
1117        );
1118
1119        // Store empty value - just for ordering
1120        self.kv
1121            .put(&key, vec![], None)
1122            .await
1123            .context(ErrorData::KvOperationFailed {
1124                operation: "put".to_string(),
1125                key: key.clone(),
1126                message: "Failed to create pending index".to_string(),
1127            })?;
1128        Ok(())
1129    }
1130
1131    async fn delete_pending_index(&self, deployment_id: &str, command_id: &str) -> Result<()> {
1132        // We need to scan to find the exact key since we don't know the timestamp
1133        let prefix = format!("target:{}:pending:", deployment_id);
1134        let scan_result = self
1135            .kv
1136            .scan_prefix(&prefix, Some(100), None)
1137            .await
1138            .into_alien_error()
1139            .context(ErrorData::KvOperationFailed {
1140                operation: "scan_prefix".to_string(),
1141                key: prefix.clone(),
1142                message: "Failed to scan pending index".to_string(),
1143            })?;
1144
1145        for (key, _) in scan_result.items {
1146            if key.ends_with(&format!(":{}", command_id)) {
1147                let _ = self.kv.delete(&key).await;
1148                break;
1149            }
1150        }
1151        Ok(())
1152    }
1153
1154    // --- Lease ---
1155
1156    async fn delete_lease(&self, command_id: &str) -> Result<()> {
1157        let key = format!("cmd:{}:lease", command_id);
1158        let _ = self.kv.delete(&key).await;
1159        Ok(())
1160    }
1161
1162    // --- Deadline Index ---
1163
1164    async fn create_deadline_index(&self, command_id: &str, deadline: DateTime<Utc>) -> Result<()> {
1165        let key = format!(
1166            "deadline:{}:{}",
1167            deadline.timestamp_nanos_opt().unwrap_or(0),
1168            command_id
1169        );
1170
1171        let data = DeadlineIndexData {
1172            command_id: command_id.to_string(),
1173            deadline,
1174        };
1175        let value = serde_json::to_vec(&data).into_alien_error().context(
1176            ErrorData::SerializationFailed {
1177                message: "Failed to serialize deadline index".to_string(),
1178                data_type: Some("DeadlineIndexData".to_string()),
1179            },
1180        )?;
1181
1182        let ttl = deadline.signed_duration_since(Utc::now());
1183        let ttl_duration = if ttl.num_seconds() > 0 {
1184            Some(Duration::from_secs(ttl.num_seconds() as u64))
1185        } else {
1186            None
1187        };
1188
1189        let options = ttl_duration.map(|ttl| PutOptions {
1190            ttl: Some(ttl),
1191            if_not_exists: false,
1192        });
1193
1194        self.kv
1195            .put(&key, value, options)
1196            .await
1197            .context(ErrorData::KvOperationFailed {
1198                operation: "put".to_string(),
1199                key: key.clone(),
1200                message: "Failed to create deadline index".to_string(),
1201            })?;
1202        Ok(())
1203    }
1204
1205    // --- Dispatch ---
1206
1207    async fn dispatch_command_push(&self, command_id: &str, deployment_id: &str) -> Result<()> {
1208        // Get metadata from registry
1209        let metadata = self
1210            .command_registry
1211            .get_command_metadata(command_id)
1212            .await?
1213            .ok_or_else(|| {
1214                AlienError::new(ErrorData::CommandNotFound {
1215                    command_id: command_id.to_string(),
1216                })
1217            })?;
1218
1219        // Get params from KV
1220        let params = self.get_params(command_id).await?.ok_or_else(|| {
1221            AlienError::new(ErrorData::CommandNotFound {
1222                command_id: command_id.to_string(),
1223            })
1224        })?;
1225
1226        // Build envelope
1227        let envelope = self.build_envelope(command_id, &metadata, params).await?;
1228
1229        // Dispatch via transport
1230        self.command_dispatcher
1231            .dispatch(&envelope)
1232            .await
1233            .map_err(|e| {
1234                e.context(ErrorData::TransportDispatchFailed {
1235                    message: "Failed to dispatch command".to_string(),
1236                    transport_type: None,
1237                    target: Some(deployment_id.to_string()),
1238                })
1239            })?;
1240
1241        // Update registry state
1242        self.command_registry
1243            .update_command_state(
1244                command_id,
1245                CommandState::Dispatched,
1246                Some(Utc::now()),
1247                None,
1248                None,
1249                None,
1250            )
1251            .await?;
1252
1253        info!("Command {} dispatched via push", command_id);
1254        Ok(())
1255    }
1256
1257    async fn build_envelope(
1258        &self,
1259        command_id: &str,
1260        metadata: &CommandEnvelopeData,
1261        mut params: BodySpec,
1262    ) -> Result<Envelope> {
1263        let response_handling = self.create_response_handling(command_id).await?;
1264
1265        // Re-inline: if params are in blob but fit in transport limit, read and embed inline.
1266        // This avoids unnecessary storage downloads for medium-sized params (18KB–150KB).
1267        if let BodySpec::Storage { size, .. } = &params {
1268            let raw_size = size.unwrap_or(0) as usize;
1269            if raw_size > 0 && raw_size <= self.inline_max_bytes {
1270                let blob_path = StoragePath::from(format!("arc/commands/{}/params", command_id));
1271                match self.storage.get(&blob_path).await {
1272                    Ok(get_result) => match get_result.bytes().await {
1273                        Ok(raw_bytes) => {
1274                            params = BodySpec::inline(&raw_bytes);
1275                            debug!(
1276                                "Re-inlined params for command {} ({} bytes) into envelope",
1277                                command_id, raw_size
1278                            );
1279                        }
1280                        Err(e) => {
1281                            debug!(
1282                                    "Failed to read blob bytes for re-inline (command {}), falling back to presigned URL: {}",
1283                                    command_id, e
1284                                );
1285                        }
1286                    },
1287                    Err(e) => {
1288                        debug!(
1289                            "Failed to read blob for re-inline (command {}), falling back to presigned URL: {}",
1290                            command_id, e
1291                        );
1292                    }
1293                }
1294            }
1295        }
1296
1297        // If params are still Storage (either too large or re-inline failed),
1298        // ensure they have a presigned GET request
1299        if let BodySpec::Storage {
1300            size,
1301            storage_get_request,
1302            storage_put_used,
1303        } = &params
1304        {
1305            if storage_get_request.is_none() {
1306                let get_request = self.generate_storage_get_request(command_id).await?;
1307                params = BodySpec::Storage {
1308                    size: *size,
1309                    storage_get_request: Some(get_request),
1310                    storage_put_used: *storage_put_used,
1311                };
1312            }
1313        }
1314
1315        Ok(Envelope::new(
1316            metadata.deployment_id.clone(),
1317            command_id.to_string(),
1318            metadata.attempt,
1319            metadata.deadline,
1320            metadata.command.clone(),
1321            params,
1322            response_handling,
1323        ))
1324    }
1325
1326    async fn create_response_handling(&self, command_id: &str) -> Result<ResponseHandling> {
1327        let upload_path = StoragePath::from(format!("arc/commands/{}/response", command_id));
1328        let expires_in = Duration::from_secs(3600);
1329        let presigned = self
1330            .storage
1331            .presigned_put(&upload_path, expires_in)
1332            .await
1333            .context(ErrorData::StorageOperationFailed {
1334                message: "Failed to create response upload URL".to_string(),
1335                operation: Some("presigned_put".to_string()),
1336                path: Some(upload_path.to_string()),
1337            })?;
1338
1339        let (response_token, expires) = self.sign_response_url(command_id);
1340
1341        Ok(ResponseHandling {
1342            max_inline_bytes: self.inline_max_bytes as u64,
1343            submit_response_url: format!(
1344                "{}/commands/{}/response?response_token={}&expires={}",
1345                self.base_url.trim_end_matches('/'),
1346                command_id,
1347                response_token,
1348                expires,
1349            ),
1350            storage_upload_request: presigned,
1351        })
1352    }
1353
1354    async fn generate_params_upload(&self, command_id: &str) -> Result<StorageUpload> {
1355        let upload_path = StoragePath::from(format!("arc/commands/{}/params", command_id));
1356        let expires_in = Duration::from_secs(3600);
1357        let presigned = self
1358            .storage
1359            .presigned_put(&upload_path, expires_in)
1360            .await
1361            .into_alien_error()
1362            .context(ErrorData::StorageOperationFailed {
1363                message: "Failed to create presigned URL".to_string(),
1364                operation: Some("presigned_put".to_string()),
1365                path: Some(upload_path.to_string()),
1366            })?;
1367
1368        Ok(StorageUpload {
1369            put_request: presigned.clone(),
1370            expires_at: presigned.expiration,
1371        })
1372    }
1373
1374    async fn generate_storage_get_request(&self, command_id: &str) -> Result<PresignedRequest> {
1375        let path = StoragePath::from(format!("arc/commands/{}/params", command_id));
1376        let expires_in = Duration::from_secs(3600);
1377        self.storage.presigned_get(&path, expires_in).await.context(
1378            ErrorData::StorageOperationFailed {
1379                message: "Failed to create storage get request".to_string(),
1380                operation: Some("presigned_get".to_string()),
1381                path: Some(path.to_string()),
1382            },
1383        )
1384    }
1385
1386    async fn generate_response_storage_get_request(
1387        &self,
1388        command_id: &str,
1389    ) -> Result<PresignedRequest> {
1390        let path = StoragePath::from(format!("arc/commands/{}/response", command_id));
1391        let expires_in = Duration::from_secs(3600);
1392        self.storage.presigned_get(&path, expires_in).await.context(
1393            ErrorData::StorageOperationFailed {
1394                message: "Failed to create response storage get request".to_string(),
1395                operation: Some("presigned_get".to_string()),
1396                path: Some(path.to_string()),
1397            },
1398        )
1399    }
1400
1401    fn extract_command_id_from_index_key(&self, index_key: &str) -> Result<String> {
1402        index_key
1403            .split(':')
1404            .last()
1405            .ok_or_else(|| {
1406                AlienError::new(ErrorData::Other {
1407                    message: format!("Invalid index key format: {}", index_key),
1408                })
1409            })
1410            .map(|s| s.to_string())
1411    }
1412}