Skip to main content

alien_commands/server/
mod.rs

1//! Command Server Implementation
2//!
3//! The Command server implements the command lifecycle:
4//! - CommandRegistry is the SOURCE OF TRUTH for all metadata (state, timestamps, etc.)
5//! - KV stores ONLY operational data (params/response blobs, indices, leases)
6
7use std::sync::Arc;
8use std::time::Duration;
9
10use alien_bindings::presigned::PresignedRequest;
11use alien_bindings::traits::{Kv, PutOptions, Storage};
12use alien_core::DeploymentModel;
13use alien_error::{AlienError, Context, ContextError, IntoAlienError};
14use chrono::{DateTime, Utc};
15use hex;
16use hmac::{Hmac, Mac};
17use serde::{Deserialize, Serialize};
18use sha2::Sha256;
19use tracing::{debug, info};
20use uuid::Uuid;
21
22use base64::{engine::general_purpose, Engine as _};
23use bytes::Bytes;
24use object_store::path::Path as StoragePath;
25
26use crate::error::{ErrorData, Result};
27use crate::types::*;
28use crate::INLINE_MAX_BYTES;
29
30/// Max serialized KV value size. Conservative threshold below the hard 24KB
31/// boundary (Azure Table Storage) to account for JSON wrapping overhead.
32const KV_VALUE_THRESHOLD: usize = 20_000;
33
34pub mod axum_handlers;
35pub mod command_registry;
36pub mod dispatchers;
37pub mod storage;
38
39pub use axum_handlers::{
40    create_axum_router, CommandPayloadResponse, HasCommandServer, StorePayloadRequest,
41};
42pub use command_registry::{
43    CommandEnvelopeData, CommandMetadata, CommandRegistry, CommandStatus, InMemoryCommandRegistry,
44};
45pub use dispatchers::{CommandDispatcher, NullCommandDispatcher};
46
47// =============================================================================
48// KV Data Structures (Operational Data Only)
49// =============================================================================
50
51/// Params stored in KV (just the blob)
52#[derive(Debug, Clone, Serialize, Deserialize)]
53struct CommandParamsData {
54    pub params: BodySpec,
55}
56
57/// Response stored in KV (just the blob)
58#[derive(Debug, Clone, Serialize, Deserialize)]
59struct CommandResponseData {
60    pub response: CommandResponse,
61}
62
63/// Lease record with TTL
64#[derive(Debug, Clone, Serialize, Deserialize)]
65struct LeaseData {
66    pub lease_id: String,
67    pub acquired_at: DateTime<Utc>,
68    pub expires_at: DateTime<Utc>,
69    pub owner: String,
70}
71
72/// Deadline index data
73#[derive(Debug, Clone, Serialize, Deserialize)]
74struct DeadlineIndexData {
75    pub command_id: String,
76    pub deadline: DateTime<Utc>,
77}
78
79// =============================================================================
80// Command Server
81// =============================================================================
82
83/// Core command server implementation.
84///
85/// Uses CommandRegistry as source of truth for metadata.
86/// Uses KV for operational data (params, responses, indices, leases).
87pub struct CommandServer {
88    kv: Arc<dyn Kv>,
89    storage: Arc<dyn Storage>,
90    command_dispatcher: Arc<dyn CommandDispatcher>,
91    command_registry: Arc<dyn CommandRegistry>,
92    inline_max_bytes: usize,
93    base_url: String,
94    response_signing_key: Vec<u8>,
95}
96
97impl CommandServer {
98    /// Create a new command server instance
99    pub fn new(
100        kv: Arc<dyn Kv>,
101        storage: Arc<dyn Storage>,
102        command_dispatcher: Arc<dyn CommandDispatcher>,
103        command_registry: Arc<dyn CommandRegistry>,
104        base_url: String,
105        response_signing_key: Vec<u8>,
106    ) -> Self {
107        Self {
108            kv,
109            storage,
110            command_dispatcher,
111            command_registry,
112            inline_max_bytes: INLINE_MAX_BYTES,
113            base_url,
114            response_signing_key,
115        }
116    }
117
118    /// Create a new command server with custom inline size limit
119    pub fn with_inline_limit(
120        kv: Arc<dyn Kv>,
121        storage: Arc<dyn Storage>,
122        command_dispatcher: Arc<dyn CommandDispatcher>,
123        command_registry: Arc<dyn CommandRegistry>,
124        base_url: String,
125        inline_max_bytes: usize,
126        response_signing_key: Vec<u8>,
127    ) -> Self {
128        Self {
129            kv,
130            storage,
131            command_dispatcher,
132            command_registry,
133            inline_max_bytes,
134            base_url,
135            response_signing_key,
136        }
137    }
138
139    /// Maximum allowed response token lifetime (2 hours).
140    /// Tokens are issued with 1-hour expiry; this cap provides headroom for clock skew
141    /// while rejecting tokens with absurdly far-future expiration.
142    const MAX_RESPONSE_TOKEN_LIFETIME_SECS: i64 = 7200;
143
144    /// Sign a response URL for a specific command.
145    ///
146    /// Returns `(hmac_hex, expires_epoch)`. The HMAC is computed over
147    /// `"arc.v1:{command_id}:{expires}"` using the server's signing key.
148    fn sign_response_url(&self, command_id: &str) -> (String, i64) {
149        let expires = Utc::now().timestamp() + 3600; // 1 hour
150        let message = format!("commands.v1:{}:{}", command_id, expires);
151
152        type HmacSha256 = Hmac<Sha256>;
153        let mut mac =
154            HmacSha256::new_from_slice(&self.response_signing_key).expect("HMAC accepts any key");
155        mac.update(message.as_bytes());
156        let result = mac.finalize();
157        let token = hex::encode(result.into_bytes());
158
159        (token, expires)
160    }
161
162    /// Verify a response token for a specific command.
163    ///
164    /// Performs HMAC verification first (constant-time), then checks expiration,
165    /// to avoid leaking timing information about token validity windows.
166    pub fn verify_response_token(
167        &self,
168        command_id: &str,
169        token: &str,
170        expires: i64,
171    ) -> bool {
172        // Validate token format: SHA-256 HMAC = 32 bytes = 64 hex chars.
173        if token.len() != 64 {
174            return false;
175        }
176
177        let message = format!("commands.v1:{}:{}", command_id, expires);
178
179        type HmacSha256 = Hmac<Sha256>;
180        let mut mac =
181            HmacSha256::new_from_slice(&self.response_signing_key).expect("HMAC accepts any key");
182        mac.update(message.as_bytes());
183
184        // Decode hex and verify HMAC (constant-time comparison).
185        let Ok(token_bytes) = hex::decode(token) else {
186            return false;
187        };
188        let hmac_valid = mac.verify_slice(&token_bytes).is_ok();
189
190        // Check expiration and max lifetime AFTER HMAC to avoid timing leaks.
191        let now = Utc::now().timestamp();
192        let not_expired = now <= expires;
193        let within_max_lifetime = expires <= now + Self::MAX_RESPONSE_TOKEN_LIFETIME_SECS;
194
195        hmac_valid && not_expired && within_max_lifetime
196    }
197
198    // =========================================================================
199    // Public API Methods
200    // =========================================================================
201
202    /// Create a new command.
203    ///
204    /// Flow:
205    /// 1. Validate request
206    /// 2. Registry creates command metadata (source of truth)
207    /// 3. KV stores params blob
208    /// 4. KV creates pending index (for Pull) or dispatch (for Push)
209    pub async fn create_command(
210        &self,
211        request: CreateCommandRequest,
212    ) -> Result<CreateCommandResponse> {
213        // Validate the request
214        self.validate_create_command(&request).await?;
215
216        // Check idempotency if key provided
217        if let Some(ref idem_key) = request.idempotency_key {
218            if let Some(existing_id) = self.check_idempotency(idem_key).await? {
219                // Return existing command status
220                let status = self
221                    .command_registry
222                    .get_command_status(&existing_id)
223                    .await?;
224                if let Some(s) = status {
225                    return Ok(CreateCommandResponse {
226                        command_id: existing_id,
227                        state: s.state,
228                        storage_upload: None,
229                        inline_allowed_up_to: self.inline_max_bytes as u64,
230                        next: "poll".to_string(),
231                    });
232                }
233            }
234        }
235
236        // Determine initial state and request size
237        let (initial_state, request_size_bytes) = match &request.params {
238            BodySpec::Inline { inline_base64 } => {
239                let size = inline_base64.len() as u64;
240                (CommandState::Pending, Some(size))
241            }
242            BodySpec::Storage { size, .. } => {
243                if size.unwrap_or(0) > self.inline_max_bytes as u64 {
244                    (CommandState::PendingUpload, *size)
245                } else {
246                    (CommandState::Pending, *size)
247                }
248            }
249        };
250
251        // 1. Registry creates command metadata (SOURCE OF TRUTH)
252        let metadata = self
253            .command_registry
254            .create_command(
255                &request.deployment_id,
256                &request.command,
257                initial_state,
258                request.deadline,
259                request_size_bytes,
260            )
261            .await?;
262
263        let command_id = metadata.command_id;
264        let deployment_model = metadata.deployment_model;
265
266        // 2. Store idempotency mapping in KV
267        if let Some(ref idem_key) = request.idempotency_key {
268            self.store_idempotency(idem_key, &command_id).await?;
269        }
270
271        // 3. Store params in KV
272        self.store_params(&command_id, &request.params).await?;
273
274        // 4. Generate storage upload URL if needed
275        let storage_upload = if initial_state == CommandState::PendingUpload {
276            Some(self.generate_params_upload(&command_id).await?)
277        } else {
278            None
279        };
280
281        // 5. Handle dispatch based on state and deployment model
282        let (final_state, next_action) = if initial_state == CommandState::Pending {
283            match deployment_model {
284                DeploymentModel::Push => {
285                    // Push model: dispatch immediately
286                    self.dispatch_command_push(&command_id, &request.deployment_id)
287                        .await?;
288                    (CommandState::Dispatched, "poll")
289                }
290                DeploymentModel::Pull => {
291                    // Pull model: create pending index, deployment will poll
292                    self.create_pending_index(&request.deployment_id, &command_id)
293                        .await?;
294                    debug!(
295                        "Command {} ready for pull (deployment will poll)",
296                        command_id
297                    );
298                    (CommandState::Pending, "poll")
299                }
300            }
301        } else {
302            // PendingUpload - need upload first
303            (initial_state, "upload")
304        };
305
306        // 6. Create deadline index if deadline provided
307        if let Some(deadline) = request.deadline {
308            self.create_deadline_index(&command_id, deadline).await?;
309        }
310
311        Ok(CreateCommandResponse {
312            command_id,
313            state: final_state,
314            storage_upload,
315            inline_allowed_up_to: self.inline_max_bytes as u64,
316            next: next_action.to_string(),
317        })
318    }
319
320    /// Mark upload as complete and dispatch command.
321    pub async fn upload_complete(
322        &self,
323        command_id: &str,
324        upload_request: UploadCompleteRequest,
325    ) -> Result<UploadCompleteResponse> {
326        // 1. Get current status from registry (source of truth)
327        let status = self
328            .command_registry
329            .get_command_status(command_id)
330            .await?
331            .ok_or_else(|| {
332                AlienError::new(ErrorData::CommandNotFound {
333                    command_id: command_id.to_string(),
334                })
335            })?;
336
337        // 2. Validate current state
338        if status.state != CommandState::PendingUpload {
339            return Err(AlienError::new(ErrorData::InvalidStateTransition {
340                from: status.state.as_ref().to_string(),
341                to: CommandState::Pending.as_ref().to_string(),
342            }));
343        }
344
345        // 3. Update params in KV with storage reference
346        let storage_get_request = self.generate_storage_get_request(command_id).await?;
347        let params = BodySpec::Storage {
348            size: Some(upload_request.size),
349            storage_get_request: Some(storage_get_request),
350            storage_put_used: None,
351        };
352        self.store_params(command_id, &params).await?;
353
354        // 4. Update registry to Pending state
355        self.command_registry
356            .update_command_state(command_id, CommandState::Pending, None, None, None, None)
357            .await?;
358
359        // 5. Get deployment model from registry and handle dispatch
360        let metadata = self
361            .command_registry
362            .get_command_metadata(command_id)
363            .await?
364            .ok_or_else(|| {
365                AlienError::new(ErrorData::CommandNotFound {
366                    command_id: command_id.to_string(),
367                })
368            })?;
369
370        let final_state = match metadata.deployment_model {
371            DeploymentModel::Push => {
372                self.dispatch_command_push(command_id, &status.deployment_id)
373                    .await?;
374                CommandState::Dispatched
375            }
376            DeploymentModel::Pull => {
377                self.create_pending_index(&status.deployment_id, command_id)
378                    .await?;
379                debug!(
380                    "Command {} ready for pull after upload (deployment will poll)",
381                    command_id
382                );
383                CommandState::Pending
384            }
385        };
386
387        Ok(UploadCompleteResponse {
388            command_id: command_id.to_string(),
389            state: final_state,
390        })
391    }
392
393    /// Get command status.
394    ///
395    /// Queries registry for metadata (source of truth), KV for response blob.
396    pub async fn get_command_status(&self, command_id: &str) -> Result<CommandStatusResponse> {
397        // 1. Get status from registry (SOURCE OF TRUTH)
398        let status = self
399            .command_registry
400            .get_command_status(command_id)
401            .await?
402            .ok_or_else(|| {
403                AlienError::new(ErrorData::CommandNotFound {
404                    command_id: command_id.to_string(),
405                })
406            })?;
407
408        // 2. Check deadline expiry inline
409        if let Some(deadline) = status.deadline {
410            if Utc::now() > deadline && !status.state.is_terminal() {
411                // Expire the command
412                self.command_registry
413                    .update_command_state(
414                        command_id,
415                        CommandState::Expired,
416                        None,
417                        Some(Utc::now()),
418                        None,
419                        None,
420                    )
421                    .await?;
422
423                // Clean up pending index
424                self.delete_pending_index(&status.deployment_id, command_id)
425                    .await?;
426
427                // Return expired status directly (avoid recursion)
428                return Ok(CommandStatusResponse {
429                    command_id: command_id.to_string(),
430                    state: CommandState::Expired,
431                    attempt: status.attempt,
432                    response: None,
433                });
434            }
435        }
436
437        // 3. Get response blob from KV if terminal state
438        let response = if status.state.is_terminal() {
439            self.get_response(command_id).await?
440        } else {
441            None
442        };
443
444        Ok(CommandStatusResponse {
445            command_id: command_id.to_string(),
446            state: status.state,
447            attempt: status.attempt,
448            response,
449        })
450    }
451
452    /// Submit response from deployment.
453    ///
454    /// Stores response blob in KV, updates state in registry.
455    pub async fn submit_command_response(
456        &self,
457        command_id: &str,
458        mut response: CommandResponse,
459    ) -> Result<()> {
460        // 1. Get current status from registry
461        let status = self
462            .command_registry
463            .get_command_status(command_id)
464            .await?
465            .ok_or_else(|| {
466                AlienError::new(ErrorData::CommandNotFound {
467                    command_id: command_id.to_string(),
468                })
469            })?;
470
471        // 2. Handle duplicate responses gracefully
472        if status.state.is_terminal() {
473            debug!(
474                "Ignoring duplicate response for terminal command {}",
475                command_id
476            );
477            return Ok(());
478        }
479
480        // 3. Validate state transition
481        if status.state != CommandState::Dispatched {
482            return Err(AlienError::new(ErrorData::InvalidStateTransition {
483                from: status.state.as_ref().to_string(),
484                to: CommandState::Succeeded.as_ref().to_string(),
485            }));
486        }
487
488        // 4. If response was uploaded to storage, generate download URL
489        if let CommandResponse::Success {
490            response: ref mut body,
491        } = response
492        {
493            if let BodySpec::Storage {
494                size,
495                storage_get_request,
496                storage_put_used,
497            } = body
498            {
499                if storage_get_request.is_none() && storage_put_used.unwrap_or(false) {
500                    let get_request = self
501                        .generate_response_storage_get_request(command_id)
502                        .await?;
503                    *body = BodySpec::Storage {
504                        size: *size,
505                        storage_get_request: Some(get_request),
506                        storage_put_used: *storage_put_used,
507                    };
508                }
509            }
510        }
511
512        // 5. Store response blob in KV
513        self.store_response(command_id, &response).await?;
514
515        // 6. Clean up lease from KV
516        self.delete_lease(command_id).await?;
517
518        // 7. Clean up pending index from KV (terminal state)
519        self.delete_pending_index(&status.deployment_id, command_id)
520            .await?;
521
522        // 8. Update registry state (SOURCE OF TRUTH)
523        let (new_state, error) = if response.is_success() {
524            (CommandState::Succeeded, None)
525        } else if let CommandResponse::Error { code, message, .. } = &response {
526            (
527                CommandState::Failed,
528                Some(serde_json::json!({ "code": code, "message": message })),
529            )
530        } else {
531            (CommandState::Failed, None)
532        };
533
534        let response_size = match &response {
535            CommandResponse::Success {
536                response: BodySpec::Inline { inline_base64 },
537            } => Some(inline_base64.len() as u64),
538            CommandResponse::Success {
539                response: BodySpec::Storage { size, .. },
540            } => *size,
541            _ => None,
542        };
543
544        self.command_registry
545            .update_command_state(
546                command_id,
547                new_state,
548                None, // dispatched_at already set
549                Some(Utc::now()),
550                response_size,
551                error,
552            )
553            .await?;
554
555        info!(
556            "Command {} completed with state {:?}",
557            command_id, new_state
558        );
559        Ok(())
560    }
561
562    /// Acquire leases for polling deployments.
563    ///
564    /// Scans KV pending index, queries registry for metadata, creates leases in KV.
565    pub async fn acquire_lease(
566        &self,
567        deployment_id: &str,
568        lease_request: &LeaseRequest,
569    ) -> Result<LeaseResponse> {
570        let mut leases = Vec::new();
571
572        // 1. Scan KV pending index
573        let target_prefix = format!("target:{}:pending:", deployment_id);
574        let scan_result = self
575            .kv
576            .scan_prefix(&target_prefix, Some(lease_request.max_leases * 2), None)
577            .await
578            .into_alien_error()
579            .context(ErrorData::KvOperationFailed {
580                operation: "scan_prefix".to_string(),
581                key: target_prefix.clone(),
582                message: "Failed to scan for pending commands".to_string(),
583            })?;
584
585        for (index_key, _) in scan_result.items {
586            if leases.len() >= lease_request.max_leases {
587                break;
588            }
589
590            let command_id = self.extract_command_id_from_index_key(&index_key)?;
591
592            // 2. Try to acquire lease atomically in KV
593            let lease_id = format!("lease_{}", Uuid::new_v4());
594            let lease_duration = Duration::from_secs(lease_request.lease_seconds);
595            let expires_at =
596                Utc::now() + chrono::Duration::seconds(lease_request.lease_seconds as i64);
597
598            let lease_data = LeaseData {
599                lease_id: lease_id.clone(),
600                acquired_at: Utc::now(),
601                expires_at,
602                owner: deployment_id.to_string(),
603            };
604
605            let lease_key = format!("cmd:{}:lease", command_id);
606            let lease_value = serde_json::to_vec(&lease_data).into_alien_error().context(
607                ErrorData::SerializationFailed {
608                    message: "Failed to serialize lease data".to_string(),
609                    data_type: Some("LeaseData".to_string()),
610                },
611            )?;
612
613            let options = Some(PutOptions {
614                ttl: Some(lease_duration),
615                if_not_exists: true,
616            });
617
618            let success = self
619                .kv
620                .put(&lease_key, lease_value, options)
621                .await
622                .context(ErrorData::KvOperationFailed {
623                    operation: "put".to_string(),
624                    key: lease_key.clone(),
625                    message: "Failed to create lease".to_string(),
626                })?;
627
628            if !success {
629                // Lease already exists, skip
630                continue;
631            }
632
633            // 3. Get metadata from registry
634            let metadata = match self
635                .command_registry
636                .get_command_metadata(&command_id)
637                .await?
638            {
639                Some(m) => m,
640                None => {
641                    // Command doesn't exist in registry, clean up
642                    self.delete_lease(&command_id).await?;
643                    let _ = self.kv.delete(&index_key).await;
644                    continue;
645                }
646            };
647
648            // 4. Check if command is in terminal state (stale index)
649            if metadata.state.is_terminal() {
650                // Clean up stale data
651                self.delete_lease(&command_id).await?;
652                let _ = self.kv.delete(&index_key).await;
653                continue;
654            }
655
656            // 5. Check deadline expiry
657            if let Some(deadline) = metadata.deadline {
658                if Utc::now() > deadline {
659                    // Expire the command
660                    self.command_registry
661                        .update_command_state(
662                            &command_id,
663                            CommandState::Expired,
664                            None,
665                            Some(Utc::now()),
666                            None,
667                            None,
668                        )
669                        .await?;
670                    self.delete_lease(&command_id).await?;
671                    let _ = self.kv.delete(&index_key).await;
672                    continue;
673                }
674            }
675
676            // 6. Get params from KV
677            let params = match self.get_params(&command_id).await? {
678                Some(p) => p,
679                None => {
680                    // No params, something went wrong
681                    self.delete_lease(&command_id).await?;
682                    continue;
683                }
684            };
685
686            // 7. Update registry state to Dispatched
687            self.command_registry
688                .update_command_state(
689                    &command_id,
690                    CommandState::Dispatched,
691                    Some(Utc::now()),
692                    None,
693                    None,
694                    None,
695                )
696                .await?;
697
698            // 8. Build envelope
699            let envelope = self.build_envelope(&command_id, &metadata, params).await?;
700
701            leases.push(LeaseInfo {
702                lease_id,
703                lease_expires_at: expires_at,
704                command_id: command_id.clone(),
705                attempt: metadata.attempt,
706                envelope,
707            });
708        }
709
710        Ok(LeaseResponse { leases })
711    }
712
713    /// Release a lease manually.
714    ///
715    /// Increments attempt count in registry, returns command to Pending state.
716    pub async fn release_lease(&self, command_id: &str, lease_id: &str) -> Result<()> {
717        let lease_key = format!("cmd:{}:lease", command_id);
718
719        // 1. Verify lease ownership
720        if let Ok(Some(lease_data)) = self.kv.get(&lease_key).await {
721            let lease: LeaseData = serde_json::from_slice(&lease_data)
722                .into_alien_error()
723                .context(ErrorData::SerializationFailed {
724                    message: "Failed to deserialize lease data".to_string(),
725                    data_type: Some("LeaseData".to_string()),
726                })?;
727
728            if lease.lease_id != lease_id {
729                return Err(AlienError::new(ErrorData::LeaseNotFound {
730                    lease_id: lease_id.to_string(),
731                }));
732            }
733
734            // 2. Delete lease from KV
735            self.delete_lease(command_id).await?;
736
737            // 3. Increment attempt in registry
738            self.command_registry.increment_attempt(command_id).await?;
739
740            // 4. Update registry state back to Pending
741            self.command_registry
742                .update_command_state(command_id, CommandState::Pending, None, None, None, None)
743                .await?;
744
745            // Note: Pending index is NOT removed on lease, so command is still there
746            debug!("Lease {} released for command {}", lease_id, command_id);
747        }
748
749        Ok(())
750    }
751
752    /// Get the deployment_id that owns a command.
753    ///
754    /// Used by the manager's auth layer to check whether the caller has access
755    /// to a specific command without fetching the full status.
756    pub async fn get_command_deployment_id(&self, command_id: &str) -> Result<Option<String>> {
757        let status = self.command_registry.get_command_status(command_id).await?;
758        Ok(status.map(|s| s.deployment_id))
759    }
760
761    /// Release a lease by lease_id only (for the API).
762    pub async fn release_lease_by_id(&self, lease_id: &str) -> Result<()> {
763        // Scan for leases to find the one with this lease_id
764        let lease_prefix = "cmd:";
765        let scan_result = self
766            .kv
767            .scan_prefix(lease_prefix, None, None)
768            .await
769            .into_alien_error()
770            .context(ErrorData::KvOperationFailed {
771                operation: "scan_prefix".to_string(),
772                key: lease_prefix.to_string(),
773                message: "Failed to scan for lease keys".to_string(),
774            })?;
775
776        for (key, value) in scan_result.items {
777            if key.ends_with(":lease") {
778                if let Ok(lease) = serde_json::from_slice::<LeaseData>(&value) {
779                    if lease.lease_id == lease_id {
780                        let command_id = key
781                            .strip_prefix("cmd:")
782                            .and_then(|s| s.strip_suffix(":lease"))
783                            .ok_or_else(|| {
784                                AlienError::new(ErrorData::Other {
785                                    message: format!("Invalid lease key format: {}", key),
786                                })
787                            })?;
788
789                        return self.release_lease(command_id, lease_id).await;
790                    }
791                }
792            }
793        }
794
795        Err(AlienError::new(ErrorData::LeaseNotFound {
796            lease_id: lease_id.to_string(),
797        }))
798    }
799
800    // =========================================================================
801    // Internal Helper Methods
802    // =========================================================================
803
804    async fn validate_create_command(&self, request: &CreateCommandRequest) -> Result<()> {
805        if request.command.is_empty() {
806            return Err(AlienError::new(ErrorData::InvalidCommand {
807                message: "Command name cannot be empty".to_string(),
808            }));
809        }
810
811        if request.deployment_id.is_empty() {
812            return Err(AlienError::new(ErrorData::InvalidCommand {
813                message: "Deployment ID cannot be empty".to_string(),
814            }));
815        }
816
817        if let Some(deadline) = request.deadline {
818            if deadline <= Utc::now() {
819                return Err(AlienError::new(ErrorData::InvalidCommand {
820                    message: "Deadline must be in the future".to_string(),
821                }));
822            }
823        }
824
825        Ok(())
826    }
827
828    // --- Idempotency ---
829
830    async fn check_idempotency(&self, idem_key: &str) -> Result<Option<String>> {
831        let key = format!("idem:{}", idem_key);
832        if let Some(data) = self
833            .kv
834            .get(&key)
835            .await
836            .context(ErrorData::KvOperationFailed {
837                operation: "get".to_string(),
838                key: key.clone(),
839                message: "Failed to check idempotency".to_string(),
840            })?
841        {
842            let command_id = String::from_utf8(data).into_alien_error().context(
843                ErrorData::SerializationFailed {
844                    message: "Invalid idempotency data".to_string(),
845                    data_type: Some("String".to_string()),
846                },
847            )?;
848            return Ok(Some(command_id));
849        }
850        Ok(None)
851    }
852
853    async fn store_idempotency(&self, idem_key: &str, command_id: &str) -> Result<()> {
854        let key = format!("idem:{}", idem_key);
855        let ttl = Duration::from_secs(24 * 60 * 60); // 24 hours
856        self.kv
857            .put(
858                &key,
859                command_id.as_bytes().to_vec(),
860                Some(PutOptions {
861                    ttl: Some(ttl),
862                    if_not_exists: true,
863                }),
864            )
865            .await
866            .context(ErrorData::KvOperationFailed {
867                operation: "put".to_string(),
868                key: key.clone(),
869                message: "Failed to store idempotency".to_string(),
870            })?;
871        Ok(())
872    }
873
874    // --- Params ---
875
876    pub async fn store_params(&self, command_id: &str, params: &BodySpec) -> Result<()> {
877        let key = format!("cmd:{}:params", command_id);
878
879        // Try serializing as-is first
880        let data = CommandParamsData {
881            params: params.clone(),
882        };
883        let value = serde_json::to_vec(&data).into_alien_error().context(
884            ErrorData::SerializationFailed {
885                message: "Failed to serialize params".to_string(),
886                data_type: Some("CommandParamsData".to_string()),
887            },
888        )?;
889
890        // If it fits in KV, store directly (fast path)
891        if value.len() <= KV_VALUE_THRESHOLD {
892            self.kv
893                .put(&key, value, None)
894                .await
895                .context(ErrorData::KvOperationFailed {
896                    operation: "put".to_string(),
897                    key: key.clone(),
898                    message: "Failed to store params".to_string(),
899                })?;
900            return Ok(());
901        }
902
903        // Auto-promote: inline data exceeds KV limit, store raw bytes in blob
904        if let BodySpec::Inline { inline_base64 } = params {
905            let raw_bytes = general_purpose::STANDARD
906                .decode(inline_base64)
907                .into_alien_error()
908                .context(ErrorData::SerializationFailed {
909                    message: "Failed to decode inline base64 params for auto-promotion".to_string(),
910                    data_type: Some("base64".to_string()),
911                })?;
912
913            let raw_len = raw_bytes.len() as u64;
914            let blob_path = StoragePath::from(format!("arc/commands/{}/params", command_id));
915
916            self.storage
917                .put(&blob_path, Bytes::from(raw_bytes).into())
918                .await
919                .into_alien_error()
920                .context(ErrorData::StorageOperationFailed {
921                    message: "Failed to auto-promote params to blob storage".to_string(),
922                    operation: Some("put".to_string()),
923                    path: Some(blob_path.to_string()),
924                })?;
925
926            debug!(
927                "Auto-promoted params for command {} to blob ({} bytes raw)",
928                command_id, raw_len
929            );
930
931            // Store tiny reference in KV instead
932            let promoted = CommandParamsData {
933                params: BodySpec::Storage {
934                    size: Some(raw_len),
935                    storage_get_request: None,
936                    storage_put_used: Some(true),
937                },
938            };
939            let promoted_value = serde_json::to_vec(&promoted).into_alien_error().context(
940                ErrorData::SerializationFailed {
941                    message: "Failed to serialize promoted params reference".to_string(),
942                    data_type: Some("CommandParamsData".to_string()),
943                },
944            )?;
945            self.kv.put(&key, promoted_value, None).await.context(
946                ErrorData::KvOperationFailed {
947                    operation: "put".to_string(),
948                    key: key.clone(),
949                    message: "Failed to store promoted params reference".to_string(),
950                },
951            )?;
952            return Ok(());
953        }
954
955        // Storage references are always tiny, store as-is
956        self.kv
957            .put(&key, value, None)
958            .await
959            .context(ErrorData::KvOperationFailed {
960                operation: "put".to_string(),
961                key: key.clone(),
962                message: "Failed to store params".to_string(),
963            })?;
964        Ok(())
965    }
966
967    pub async fn get_params(&self, command_id: &str) -> Result<Option<BodySpec>> {
968        let key = format!("cmd:{}:params", command_id);
969        if let Some(value) = self
970            .kv
971            .get(&key)
972            .await
973            .context(ErrorData::KvOperationFailed {
974                operation: "get".to_string(),
975                key: key.clone(),
976                message: "Failed to get params".to_string(),
977            })?
978        {
979            let data: CommandParamsData = serde_json::from_slice(&value)
980                .into_alien_error()
981                .context(ErrorData::SerializationFailed {
982                    message: "Failed to deserialize params".to_string(),
983                    data_type: Some("CommandParamsData".to_string()),
984                })?;
985            return Ok(Some(data.params));
986        }
987        Ok(None)
988    }
989
990    // --- Response ---
991
992    pub async fn store_response(&self, command_id: &str, response: &CommandResponse) -> Result<()> {
993        let key = format!("cmd:{}:response", command_id);
994        let data = CommandResponseData {
995            response: response.clone(),
996        };
997        let value = serde_json::to_vec(&data).into_alien_error().context(
998            ErrorData::SerializationFailed {
999                message: "Failed to serialize response".to_string(),
1000                data_type: Some("CommandResponseData".to_string()),
1001            },
1002        )?;
1003
1004        // If it fits in KV, store directly (fast path)
1005        if value.len() <= KV_VALUE_THRESHOLD {
1006            self.kv
1007                .put(&key, value, None)
1008                .await
1009                .context(ErrorData::KvOperationFailed {
1010                    operation: "put".to_string(),
1011                    key: key.clone(),
1012                    message: "Failed to store response".to_string(),
1013                })?;
1014            return Ok(());
1015        }
1016
1017        // Auto-promote: inline response exceeds KV limit
1018        if let CommandResponse::Success {
1019            response: BodySpec::Inline { inline_base64 },
1020        } = response
1021        {
1022            let raw_bytes = general_purpose::STANDARD
1023                .decode(inline_base64)
1024                .into_alien_error()
1025                .context(ErrorData::SerializationFailed {
1026                    message: "Failed to decode inline base64 response for auto-promotion"
1027                        .to_string(),
1028                    data_type: Some("base64".to_string()),
1029                })?;
1030
1031            let raw_len = raw_bytes.len() as u64;
1032            let blob_path = StoragePath::from(format!("arc/commands/{}/response", command_id));
1033
1034            self.storage
1035                .put(&blob_path, Bytes::from(raw_bytes).into())
1036                .await
1037                .into_alien_error()
1038                .context(ErrorData::StorageOperationFailed {
1039                    message: "Failed to auto-promote response to blob storage".to_string(),
1040                    operation: Some("put".to_string()),
1041                    path: Some(blob_path.to_string()),
1042                })?;
1043
1044            // Generate presigned GET URL for the caller
1045            let get_request = self
1046                .generate_response_storage_get_request(command_id)
1047                .await?;
1048
1049            debug!(
1050                "Auto-promoted response for command {} to blob ({} bytes raw)",
1051                command_id, raw_len
1052            );
1053
1054            // Store tiny reference in KV
1055            let promoted = CommandResponseData {
1056                response: CommandResponse::Success {
1057                    response: BodySpec::Storage {
1058                        size: Some(raw_len),
1059                        storage_get_request: Some(get_request),
1060                        storage_put_used: Some(true),
1061                    },
1062                },
1063            };
1064            let promoted_value = serde_json::to_vec(&promoted).into_alien_error().context(
1065                ErrorData::SerializationFailed {
1066                    message: "Failed to serialize promoted response reference".to_string(),
1067                    data_type: Some("CommandResponseData".to_string()),
1068                },
1069            )?;
1070            self.kv.put(&key, promoted_value, None).await.context(
1071                ErrorData::KvOperationFailed {
1072                    operation: "put".to_string(),
1073                    key: key.clone(),
1074                    message: "Failed to store promoted response reference".to_string(),
1075                },
1076            )?;
1077            return Ok(());
1078        }
1079
1080        // Error responses or storage references are always small, store as-is
1081        self.kv
1082            .put(&key, value, None)
1083            .await
1084            .context(ErrorData::KvOperationFailed {
1085                operation: "put".to_string(),
1086                key: key.clone(),
1087                message: "Failed to store response".to_string(),
1088            })?;
1089        Ok(())
1090    }
1091
1092    pub async fn get_response(&self, command_id: &str) -> Result<Option<CommandResponse>> {
1093        let key = format!("cmd:{}:response", command_id);
1094        if let Some(value) = self
1095            .kv
1096            .get(&key)
1097            .await
1098            .context(ErrorData::KvOperationFailed {
1099                operation: "get".to_string(),
1100                key: key.clone(),
1101                message: "Failed to get response".to_string(),
1102            })?
1103        {
1104            let data: CommandResponseData = serde_json::from_slice(&value)
1105                .into_alien_error()
1106                .context(ErrorData::SerializationFailed {
1107                    message: "Failed to deserialize response".to_string(),
1108                    data_type: Some("CommandResponseData".to_string()),
1109                })?;
1110            return Ok(Some(data.response));
1111        }
1112        Ok(None)
1113    }
1114
1115    // --- Pending Index ---
1116
1117    async fn create_pending_index(&self, deployment_id: &str, command_id: &str) -> Result<()> {
1118        let timestamp = Utc::now().timestamp_nanos_opt().unwrap_or(0);
1119        let key = format!(
1120            "target:{}:pending:{}:{}",
1121            deployment_id, timestamp, command_id
1122        );
1123
1124        // Store empty value - just for ordering
1125        self.kv
1126            .put(&key, vec![], None)
1127            .await
1128            .context(ErrorData::KvOperationFailed {
1129                operation: "put".to_string(),
1130                key: key.clone(),
1131                message: "Failed to create pending index".to_string(),
1132            })?;
1133        Ok(())
1134    }
1135
1136    async fn delete_pending_index(&self, deployment_id: &str, command_id: &str) -> Result<()> {
1137        // We need to scan to find the exact key since we don't know the timestamp
1138        let prefix = format!("target:{}:pending:", deployment_id);
1139        let scan_result = self
1140            .kv
1141            .scan_prefix(&prefix, Some(100), None)
1142            .await
1143            .into_alien_error()
1144            .context(ErrorData::KvOperationFailed {
1145                operation: "scan_prefix".to_string(),
1146                key: prefix.clone(),
1147                message: "Failed to scan pending index".to_string(),
1148            })?;
1149
1150        for (key, _) in scan_result.items {
1151            if key.ends_with(&format!(":{}", command_id)) {
1152                let _ = self.kv.delete(&key).await;
1153                break;
1154            }
1155        }
1156        Ok(())
1157    }
1158
1159    // --- Lease ---
1160
1161    async fn delete_lease(&self, command_id: &str) -> Result<()> {
1162        let key = format!("cmd:{}:lease", command_id);
1163        let _ = self.kv.delete(&key).await;
1164        Ok(())
1165    }
1166
1167    // --- Deadline Index ---
1168
1169    async fn create_deadline_index(&self, command_id: &str, deadline: DateTime<Utc>) -> Result<()> {
1170        let key = format!(
1171            "deadline:{}:{}",
1172            deadline.timestamp_nanos_opt().unwrap_or(0),
1173            command_id
1174        );
1175
1176        let data = DeadlineIndexData {
1177            command_id: command_id.to_string(),
1178            deadline,
1179        };
1180        let value = serde_json::to_vec(&data).into_alien_error().context(
1181            ErrorData::SerializationFailed {
1182                message: "Failed to serialize deadline index".to_string(),
1183                data_type: Some("DeadlineIndexData".to_string()),
1184            },
1185        )?;
1186
1187        let ttl = deadline.signed_duration_since(Utc::now());
1188        let ttl_duration = if ttl.num_seconds() > 0 {
1189            Some(Duration::from_secs(ttl.num_seconds() as u64))
1190        } else {
1191            None
1192        };
1193
1194        let options = ttl_duration.map(|ttl| PutOptions {
1195            ttl: Some(ttl),
1196            if_not_exists: false,
1197        });
1198
1199        self.kv
1200            .put(&key, value, options)
1201            .await
1202            .context(ErrorData::KvOperationFailed {
1203                operation: "put".to_string(),
1204                key: key.clone(),
1205                message: "Failed to create deadline index".to_string(),
1206            })?;
1207        Ok(())
1208    }
1209
1210    // --- Dispatch ---
1211
1212    async fn dispatch_command_push(&self, command_id: &str, deployment_id: &str) -> Result<()> {
1213        // Get metadata from registry
1214        let metadata = self
1215            .command_registry
1216            .get_command_metadata(command_id)
1217            .await?
1218            .ok_or_else(|| {
1219                AlienError::new(ErrorData::CommandNotFound {
1220                    command_id: command_id.to_string(),
1221                })
1222            })?;
1223
1224        // Get params from KV
1225        let params = self.get_params(command_id).await?.ok_or_else(|| {
1226            AlienError::new(ErrorData::CommandNotFound {
1227                command_id: command_id.to_string(),
1228            })
1229        })?;
1230
1231        // Build envelope
1232        let envelope = self.build_envelope(command_id, &metadata, params).await?;
1233
1234        // Dispatch via transport
1235        self.command_dispatcher
1236            .dispatch(&envelope)
1237            .await
1238            .map_err(|e| {
1239                e.context(ErrorData::TransportDispatchFailed {
1240                    message: "Failed to dispatch command".to_string(),
1241                    transport_type: None,
1242                    target: Some(deployment_id.to_string()),
1243                })
1244            })?;
1245
1246        // Update registry state
1247        self.command_registry
1248            .update_command_state(
1249                command_id,
1250                CommandState::Dispatched,
1251                Some(Utc::now()),
1252                None,
1253                None,
1254                None,
1255            )
1256            .await?;
1257
1258        info!("Command {} dispatched via push", command_id);
1259        Ok(())
1260    }
1261
1262    async fn build_envelope(
1263        &self,
1264        command_id: &str,
1265        metadata: &CommandEnvelopeData,
1266        mut params: BodySpec,
1267    ) -> Result<Envelope> {
1268        let response_handling = self.create_response_handling(command_id).await?;
1269
1270        // Re-inline: if params are in blob but fit in transport limit, read and embed inline.
1271        // This avoids unnecessary storage downloads for medium-sized params (18KB–150KB).
1272        if let BodySpec::Storage { size, .. } = &params {
1273            let raw_size = size.unwrap_or(0) as usize;
1274            if raw_size > 0 && raw_size <= self.inline_max_bytes {
1275                let blob_path = StoragePath::from(format!("arc/commands/{}/params", command_id));
1276                match self.storage.get(&blob_path).await {
1277                    Ok(get_result) => match get_result.bytes().await {
1278                        Ok(raw_bytes) => {
1279                            params = BodySpec::inline(&raw_bytes);
1280                            debug!(
1281                                "Re-inlined params for command {} ({} bytes) into envelope",
1282                                command_id, raw_size
1283                            );
1284                        }
1285                        Err(e) => {
1286                            debug!(
1287                                    "Failed to read blob bytes for re-inline (command {}), falling back to presigned URL: {}",
1288                                    command_id, e
1289                                );
1290                        }
1291                    },
1292                    Err(e) => {
1293                        debug!(
1294                            "Failed to read blob for re-inline (command {}), falling back to presigned URL: {}",
1295                            command_id, e
1296                        );
1297                    }
1298                }
1299            }
1300        }
1301
1302        // If params are still Storage (either too large or re-inline failed),
1303        // ensure they have a presigned GET request
1304        if let BodySpec::Storage {
1305            size,
1306            storage_get_request,
1307            storage_put_used,
1308        } = &params
1309        {
1310            if storage_get_request.is_none() {
1311                let get_request = self.generate_storage_get_request(command_id).await?;
1312                params = BodySpec::Storage {
1313                    size: *size,
1314                    storage_get_request: Some(get_request),
1315                    storage_put_used: *storage_put_used,
1316                };
1317            }
1318        }
1319
1320        Ok(Envelope::new(
1321            metadata.deployment_id.clone(),
1322            command_id.to_string(),
1323            metadata.attempt,
1324            metadata.deadline,
1325            metadata.command.clone(),
1326            params,
1327            response_handling,
1328        ))
1329    }
1330
1331    async fn create_response_handling(&self, command_id: &str) -> Result<ResponseHandling> {
1332        let upload_path = StoragePath::from(format!("arc/commands/{}/response", command_id));
1333        let expires_in = Duration::from_secs(3600);
1334        let presigned = self
1335            .storage
1336            .presigned_put(&upload_path, expires_in)
1337            .await
1338            .context(ErrorData::StorageOperationFailed {
1339                message: "Failed to create response upload URL".to_string(),
1340                operation: Some("presigned_put".to_string()),
1341                path: Some(upload_path.to_string()),
1342            })?;
1343
1344        let (response_token, expires) = self.sign_response_url(command_id);
1345
1346        Ok(ResponseHandling {
1347            max_inline_bytes: self.inline_max_bytes as u64,
1348            submit_response_url: format!(
1349                "{}/commands/{}/response?response_token={}&expires={}",
1350                self.base_url.trim_end_matches('/'),
1351                command_id,
1352                response_token,
1353                expires,
1354            ),
1355            storage_upload_request: presigned,
1356        })
1357    }
1358
1359    async fn generate_params_upload(&self, command_id: &str) -> Result<StorageUpload> {
1360        let upload_path = StoragePath::from(format!("arc/commands/{}/params", command_id));
1361        let expires_in = Duration::from_secs(3600);
1362        let presigned = self
1363            .storage
1364            .presigned_put(&upload_path, expires_in)
1365            .await
1366            .into_alien_error()
1367            .context(ErrorData::StorageOperationFailed {
1368                message: "Failed to create presigned URL".to_string(),
1369                operation: Some("presigned_put".to_string()),
1370                path: Some(upload_path.to_string()),
1371            })?;
1372
1373        Ok(StorageUpload {
1374            put_request: presigned.clone(),
1375            expires_at: presigned.expiration,
1376        })
1377    }
1378
1379    async fn generate_storage_get_request(&self, command_id: &str) -> Result<PresignedRequest> {
1380        let path = StoragePath::from(format!("arc/commands/{}/params", command_id));
1381        let expires_in = Duration::from_secs(3600);
1382        self.storage.presigned_get(&path, expires_in).await.context(
1383            ErrorData::StorageOperationFailed {
1384                message: "Failed to create storage get request".to_string(),
1385                operation: Some("presigned_get".to_string()),
1386                path: Some(path.to_string()),
1387            },
1388        )
1389    }
1390
1391    async fn generate_response_storage_get_request(
1392        &self,
1393        command_id: &str,
1394    ) -> Result<PresignedRequest> {
1395        let path = StoragePath::from(format!("arc/commands/{}/response", command_id));
1396        let expires_in = Duration::from_secs(3600);
1397        self.storage.presigned_get(&path, expires_in).await.context(
1398            ErrorData::StorageOperationFailed {
1399                message: "Failed to create response storage get request".to_string(),
1400                operation: Some("presigned_get".to_string()),
1401                path: Some(path.to_string()),
1402            },
1403        )
1404    }
1405
1406    fn extract_command_id_from_index_key(&self, index_key: &str) -> Result<String> {
1407        index_key
1408            .split(':')
1409            .last()
1410            .ok_or_else(|| {
1411                AlienError::new(ErrorData::Other {
1412                    message: format!("Invalid index key format: {}", index_key),
1413                })
1414            })
1415            .map(|s| s.to_string())
1416    }
1417}