1use crate::error::Result;
13use alien_core::{CommandState, DeploymentModel};
14use async_trait::async_trait;
15use chrono::{DateTime, Utc};
16use serde::{Deserialize, Serialize};
17use std::collections::HashMap;
18use std::sync::Arc;
19use tokio::sync::RwLock;
20use uuid::Uuid;
21
22#[derive(Debug, Clone)]
24pub struct CommandMetadata {
25 pub command_id: String,
27 pub deployment_model: DeploymentModel,
29 pub project_id: String,
31}
32
33#[derive(Debug, Clone)]
35pub struct CommandEnvelopeData {
36 pub command_id: String,
37 pub deployment_id: String,
38 pub command: String, pub attempt: u32,
40 pub deadline: Option<DateTime<Utc>>,
41 pub state: CommandState,
42 pub deployment_model: DeploymentModel,
43}
44
45#[derive(Debug, Clone)]
47pub struct CommandStatus {
48 pub command_id: String,
49 pub deployment_id: String,
50 pub command: String, pub state: CommandState,
52 pub attempt: u32,
53 pub deadline: Option<DateTime<Utc>>,
54 pub created_at: DateTime<Utc>,
55 pub dispatched_at: Option<DateTime<Utc>>,
56 pub completed_at: Option<DateTime<Utc>>,
57 pub error: Option<serde_json::Value>,
58 pub request_size_bytes: Option<u64>,
59 pub response_size_bytes: Option<u64>,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
64#[serde(rename_all = "camelCase")]
65struct CommandRecord {
66 id: String,
67 deployment_id: String,
68 command: String,
69 state: CommandState,
70 attempt: u32,
71 deadline: Option<DateTime<Utc>>,
72 created_at: DateTime<Utc>,
73 dispatched_at: Option<DateTime<Utc>>,
74 completed_at: Option<DateTime<Utc>>,
75 request_size_bytes: Option<u64>,
76 response_size_bytes: Option<u64>,
77 error: Option<serde_json::Value>,
78 deployment_model: DeploymentModel,
79 project_id: String,
80}
81
82#[async_trait]
87pub trait CommandRegistry: Send + Sync {
88 async fn create_command(
93 &self,
94 deployment_id: &str,
95 command_name: &str,
96 initial_state: CommandState,
97 deadline: Option<DateTime<Utc>>,
98 request_size_bytes: Option<u64>,
99 ) -> Result<CommandMetadata>;
100
101 async fn get_command_metadata(&self, command_id: &str) -> Result<Option<CommandEnvelopeData>>;
105
106 async fn get_command_status(&self, command_id: &str) -> Result<Option<CommandStatus>>;
110
111 async fn update_command_state(
113 &self,
114 command_id: &str,
115 state: CommandState,
116 dispatched_at: Option<DateTime<Utc>>,
117 completed_at: Option<DateTime<Utc>>,
118 response_size_bytes: Option<u64>,
119 error: Option<serde_json::Value>,
120 ) -> Result<()>;
121
122 async fn increment_attempt(&self, command_id: &str) -> Result<u32>;
126}
127
128pub struct InMemoryCommandRegistry {
132 commands: Arc<RwLock<HashMap<String, CommandRecord>>>,
133 deployment_model: DeploymentModel,
134}
135
136impl InMemoryCommandRegistry {
137 pub fn new() -> Self {
139 Self {
140 commands: Arc::new(RwLock::new(HashMap::new())),
141 deployment_model: DeploymentModel::Pull,
142 }
143 }
144
145 pub fn with_deployment_model(deployment_model: DeploymentModel) -> Self {
147 Self {
148 commands: Arc::new(RwLock::new(HashMap::new())),
149 deployment_model,
150 }
151 }
152
153 #[allow(dead_code)]
155 pub async fn list_command_ids(&self) -> Vec<String> {
156 let commands = self.commands.read().await;
157 commands.keys().cloned().collect()
158 }
159}
160
161impl Default for InMemoryCommandRegistry {
162 fn default() -> Self {
163 Self::new()
164 }
165}
166
167#[async_trait]
168impl CommandRegistry for InMemoryCommandRegistry {
169 async fn create_command(
170 &self,
171 deployment_id: &str,
172 command_name: &str,
173 initial_state: CommandState,
174 deadline: Option<DateTime<Utc>>,
175 request_size_bytes: Option<u64>,
176 ) -> Result<CommandMetadata> {
177 let command_id = format!("cmd_{}", Uuid::new_v4());
178
179 let record = CommandRecord {
180 id: command_id.clone(),
181 deployment_id: deployment_id.to_string(),
182 command: command_name.to_string(),
183 state: initial_state,
184 attempt: 1,
185 deadline,
186 created_at: Utc::now(),
187 dispatched_at: None,
188 completed_at: None,
189 request_size_bytes,
190 response_size_bytes: None,
191 error: None,
192 deployment_model: self.deployment_model,
193 project_id: "local-dev".to_string(),
194 };
195
196 self.commands
197 .write()
198 .await
199 .insert(command_id.clone(), record);
200
201 Ok(CommandMetadata {
202 command_id,
203 deployment_model: self.deployment_model,
204 project_id: "local-dev".to_string(),
205 })
206 }
207
208 async fn get_command_metadata(&self, command_id: &str) -> Result<Option<CommandEnvelopeData>> {
209 let commands = self.commands.read().await;
210
211 Ok(commands.get(command_id).map(|r| CommandEnvelopeData {
212 command_id: r.id.clone(),
213 deployment_id: r.deployment_id.clone(),
214 command: r.command.clone(),
215 attempt: r.attempt,
216 deadline: r.deadline,
217 state: r.state,
218 deployment_model: r.deployment_model,
219 }))
220 }
221
222 async fn get_command_status(&self, command_id: &str) -> Result<Option<CommandStatus>> {
223 let commands = self.commands.read().await;
224
225 Ok(commands.get(command_id).map(|r| CommandStatus {
226 command_id: r.id.clone(),
227 deployment_id: r.deployment_id.clone(),
228 command: r.command.clone(),
229 state: r.state,
230 attempt: r.attempt,
231 deadline: r.deadline,
232 created_at: r.created_at,
233 dispatched_at: r.dispatched_at,
234 completed_at: r.completed_at,
235 error: r.error.clone(),
236 request_size_bytes: r.request_size_bytes,
237 response_size_bytes: r.response_size_bytes,
238 }))
239 }
240
241 async fn update_command_state(
242 &self,
243 command_id: &str,
244 state: CommandState,
245 dispatched_at: Option<DateTime<Utc>>,
246 completed_at: Option<DateTime<Utc>>,
247 response_size_bytes: Option<u64>,
248 error: Option<serde_json::Value>,
249 ) -> Result<()> {
250 let mut commands = self.commands.write().await;
251
252 if let Some(record) = commands.get_mut(command_id) {
253 record.state = state;
254
255 if let Some(ts) = dispatched_at {
256 record.dispatched_at = Some(ts);
257 }
258
259 if let Some(ts) = completed_at {
260 record.completed_at = Some(ts);
261 }
262
263 if let Some(size) = response_size_bytes {
264 record.response_size_bytes = Some(size);
265 }
266
267 if let Some(err) = error {
268 record.error = Some(err);
269 }
270 }
271
272 Ok(())
273 }
274
275 async fn increment_attempt(&self, command_id: &str) -> Result<u32> {
276 let mut commands = self.commands.write().await;
277
278 if let Some(record) = commands.get_mut(command_id) {
279 record.attempt += 1;
280 Ok(record.attempt)
281 } else {
282 Ok(1) }
284 }
285}