Skip to main content

alien_commands/server/
command_registry.rs

1//! Command registry abstraction for ARC server
2//!
3//! The CommandRegistry is the **source of truth** for all command metadata.
4//! It tracks command state, timestamps, sizes, and errors.
5//!
6//! Implementations:
7//! - `InMemoryCommandRegistry`: In-memory implementation for tests and local dev (in this crate)
8//! - `PlatformCommandRegistry`: Platform API integration (in alien-manager)
9//!
10//! The ARC KV store holds only operational data: params/response blobs, pending indices, leases.
11
12use crate::error::Result;
13use alien_core::{CommandState, DeploymentModel};
14use async_trait::async_trait;
15use chrono::{DateTime, Utc};
16use serde::{Deserialize, Serialize};
17use std::collections::HashMap;
18use std::sync::Arc;
19use tokio::sync::RwLock;
20use uuid::Uuid;
21
22/// Metadata returned when creating a command
23#[derive(Debug, Clone)]
24pub struct CommandMetadata {
25    /// Unique command ID
26    pub command_id: String,
27    /// How to dispatch the command (Push or Pull)
28    pub deployment_model: DeploymentModel,
29    /// Project ID for routing/authorization
30    pub project_id: String,
31}
32
33/// Data needed to build an envelope during lease acquisition
34#[derive(Debug, Clone)]
35pub struct CommandEnvelopeData {
36    pub command_id: String,
37    pub deployment_id: String,
38    pub command: String, // command name
39    pub attempt: u32,
40    pub deadline: Option<DateTime<Utc>>,
41    pub state: CommandState,
42    pub deployment_model: DeploymentModel,
43}
44
45/// Full status for GET /commands/{id}
46#[derive(Debug, Clone)]
47pub struct CommandStatus {
48    pub command_id: String,
49    pub deployment_id: String,
50    pub command: String, // command name
51    pub state: CommandState,
52    pub attempt: u32,
53    pub deadline: Option<DateTime<Utc>>,
54    pub created_at: DateTime<Utc>,
55    pub dispatched_at: Option<DateTime<Utc>>,
56    pub completed_at: Option<DateTime<Utc>>,
57    pub error: Option<serde_json::Value>,
58    pub request_size_bytes: Option<u64>,
59    pub response_size_bytes: Option<u64>,
60}
61
62/// Internal command record stored in memory
63#[derive(Debug, Clone, Serialize, Deserialize)]
64#[serde(rename_all = "camelCase")]
65struct CommandRecord {
66    id: String,
67    deployment_id: String,
68    command: String,
69    state: CommandState,
70    attempt: u32,
71    deadline: Option<DateTime<Utc>>,
72    created_at: DateTime<Utc>,
73    dispatched_at: Option<DateTime<Utc>>,
74    completed_at: Option<DateTime<Utc>>,
75    request_size_bytes: Option<u64>,
76    response_size_bytes: Option<u64>,
77    error: Option<serde_json::Value>,
78    deployment_model: DeploymentModel,
79    project_id: String,
80}
81
82/// Abstraction for command metadata storage and lifecycle tracking.
83///
84/// The CommandRegistry is the source of truth for all command metadata.
85/// Implementations store command state, timestamps, and result information.
86#[async_trait]
87pub trait CommandRegistry: Send + Sync {
88    /// Create a new command and return metadata for routing.
89    ///
90    /// The registry generates the command_id, determines the deployment_model,
91    /// and stores all metadata (state, timestamps, etc.).
92    async fn create_command(
93        &self,
94        deployment_id: &str,
95        command_name: &str,
96        initial_state: CommandState,
97        deadline: Option<DateTime<Utc>>,
98        request_size_bytes: Option<u64>,
99    ) -> Result<CommandMetadata>;
100
101    /// Get metadata needed to build an envelope during lease acquisition.
102    ///
103    /// Returns None if command doesn't exist.
104    async fn get_command_metadata(&self, command_id: &str) -> Result<Option<CommandEnvelopeData>>;
105
106    /// Get full command status for status endpoint.
107    ///
108    /// Returns None if command doesn't exist.
109    async fn get_command_status(&self, command_id: &str) -> Result<Option<CommandStatus>>;
110
111    /// Update command state during lifecycle (dispatched, completed, failed).
112    async fn update_command_state(
113        &self,
114        command_id: &str,
115        state: CommandState,
116        dispatched_at: Option<DateTime<Utc>>,
117        completed_at: Option<DateTime<Utc>>,
118        response_size_bytes: Option<u64>,
119        error: Option<serde_json::Value>,
120    ) -> Result<()>;
121
122    /// Increment attempt count (on lease release/expiry).
123    ///
124    /// Returns the new attempt number.
125    async fn increment_attempt(&self, command_id: &str) -> Result<u32>;
126}
127
128/// In-memory implementation for tests and local development.
129///
130/// Tracks command metadata in memory. Configurable deployment model (defaults to Pull).
131pub struct InMemoryCommandRegistry {
132    commands: Arc<RwLock<HashMap<String, CommandRecord>>>,
133    deployment_model: DeploymentModel,
134}
135
136impl InMemoryCommandRegistry {
137    /// Create a new in-memory registry with Pull deployment model (default).
138    pub fn new() -> Self {
139        Self {
140            commands: Arc::new(RwLock::new(HashMap::new())),
141            deployment_model: DeploymentModel::Pull,
142        }
143    }
144
145    /// Create a new in-memory registry with the specified deployment model.
146    pub fn with_deployment_model(deployment_model: DeploymentModel) -> Self {
147        Self {
148            commands: Arc::new(RwLock::new(HashMap::new())),
149            deployment_model,
150        }
151    }
152
153    /// List all command IDs (useful for debugging/testing)
154    #[allow(dead_code)]
155    pub async fn list_command_ids(&self) -> Vec<String> {
156        let commands = self.commands.read().await;
157        commands.keys().cloned().collect()
158    }
159}
160
161impl Default for InMemoryCommandRegistry {
162    fn default() -> Self {
163        Self::new()
164    }
165}
166
167#[async_trait]
168impl CommandRegistry for InMemoryCommandRegistry {
169    async fn create_command(
170        &self,
171        deployment_id: &str,
172        command_name: &str,
173        initial_state: CommandState,
174        deadline: Option<DateTime<Utc>>,
175        request_size_bytes: Option<u64>,
176    ) -> Result<CommandMetadata> {
177        let command_id = format!("cmd_{}", Uuid::new_v4());
178
179        let record = CommandRecord {
180            id: command_id.clone(),
181            deployment_id: deployment_id.to_string(),
182            command: command_name.to_string(),
183            state: initial_state,
184            attempt: 1,
185            deadline,
186            created_at: Utc::now(),
187            dispatched_at: None,
188            completed_at: None,
189            request_size_bytes,
190            response_size_bytes: None,
191            error: None,
192            deployment_model: self.deployment_model,
193            project_id: "local-dev".to_string(),
194        };
195
196        self.commands
197            .write()
198            .await
199            .insert(command_id.clone(), record);
200
201        Ok(CommandMetadata {
202            command_id,
203            deployment_model: self.deployment_model,
204            project_id: "local-dev".to_string(),
205        })
206    }
207
208    async fn get_command_metadata(&self, command_id: &str) -> Result<Option<CommandEnvelopeData>> {
209        let commands = self.commands.read().await;
210
211        Ok(commands.get(command_id).map(|r| CommandEnvelopeData {
212            command_id: r.id.clone(),
213            deployment_id: r.deployment_id.clone(),
214            command: r.command.clone(),
215            attempt: r.attempt,
216            deadline: r.deadline,
217            state: r.state,
218            deployment_model: r.deployment_model,
219        }))
220    }
221
222    async fn get_command_status(&self, command_id: &str) -> Result<Option<CommandStatus>> {
223        let commands = self.commands.read().await;
224
225        Ok(commands.get(command_id).map(|r| CommandStatus {
226            command_id: r.id.clone(),
227            deployment_id: r.deployment_id.clone(),
228            command: r.command.clone(),
229            state: r.state,
230            attempt: r.attempt,
231            deadline: r.deadline,
232            created_at: r.created_at,
233            dispatched_at: r.dispatched_at,
234            completed_at: r.completed_at,
235            error: r.error.clone(),
236            request_size_bytes: r.request_size_bytes,
237            response_size_bytes: r.response_size_bytes,
238        }))
239    }
240
241    async fn update_command_state(
242        &self,
243        command_id: &str,
244        state: CommandState,
245        dispatched_at: Option<DateTime<Utc>>,
246        completed_at: Option<DateTime<Utc>>,
247        response_size_bytes: Option<u64>,
248        error: Option<serde_json::Value>,
249    ) -> Result<()> {
250        let mut commands = self.commands.write().await;
251
252        if let Some(record) = commands.get_mut(command_id) {
253            record.state = state;
254
255            if let Some(ts) = dispatched_at {
256                record.dispatched_at = Some(ts);
257            }
258
259            if let Some(ts) = completed_at {
260                record.completed_at = Some(ts);
261            }
262
263            if let Some(size) = response_size_bytes {
264                record.response_size_bytes = Some(size);
265            }
266
267            if let Some(err) = error {
268                record.error = Some(err);
269            }
270        }
271
272        Ok(())
273    }
274
275    async fn increment_attempt(&self, command_id: &str) -> Result<u32> {
276        let mut commands = self.commands.write().await;
277
278        if let Some(record) = commands.get_mut(command_id) {
279            record.attempt += 1;
280            Ok(record.attempt)
281        } else {
282            Ok(1) // Default if not found
283        }
284    }
285}