1use std::collections::HashMap;
7use std::sync::Mutex;
8
9use async_trait::async_trait;
10use chrono::Utc;
11
12use crate::error::StorageError;
13use crate::storage_traits::*;
14
15#[derive(Debug, Default)]
21pub struct MemoryCasStore {
22 store: Mutex<HashMap<String, Vec<u8>>>,
23}
24
25impl MemoryCasStore {
26 pub fn new() -> Self {
27 Self::default()
28 }
29}
30
31#[async_trait]
32impl CasStore for MemoryCasStore {
33 async fn put(&self, data: &[u8]) -> StorageResult<ContentDigest> {
34 let digest = ContentDigest::from_bytes(data);
35 let mut store = self.store.lock().unwrap();
36 store.insert(digest.as_str().to_string(), data.to_vec());
37 Ok(digest)
38 }
39
40 async fn get(&self, digest: &ContentDigest) -> StorageResult<Vec<u8>> {
41 let store = self.store.lock().unwrap();
42 store
43 .get(digest.as_str())
44 .cloned()
45 .ok_or_else(|| StorageError::NotFound {
46 digest: digest.as_str().to_string(),
47 })
48 }
49
50 async fn contains(&self, digest: &ContentDigest) -> StorageResult<bool> {
51 let store = self.store.lock().unwrap();
52 Ok(store.contains_key(digest.as_str()))
53 }
54
55 async fn delete(&self, digest: &ContentDigest) -> StorageResult<()> {
56 let mut store = self.store.lock().unwrap();
57 store.remove(digest.as_str());
58 Ok(())
59 }
60}
61
62#[derive(Debug)]
67struct RunState {
68 record: RunRecord,
69 events: Vec<RunEvent>,
70}
71
72#[derive(Debug, Default)]
74pub struct MemoryRunLedger {
75 runs: Mutex<HashMap<String, RunState>>,
76}
77
78impl MemoryRunLedger {
79 pub fn new() -> Self {
80 Self::default()
81 }
82}
83
84#[async_trait]
85impl RunLedger for MemoryRunLedger {
86 async fn create_run(
87 &self,
88 spec_digest: &ContentDigest,
89 metadata: RunMetadata,
90 ) -> StorageResult<RunId> {
91 let run_id = RunId::new();
92 let record = RunRecord {
93 run_id: run_id.clone(),
94 spec_digest: spec_digest.clone(),
95 metadata,
96 status: RunStatus::Running,
97 summary: None,
98 created_at: Utc::now(),
99 completed_at: None,
100 };
101 let mut runs = self.runs.lock().unwrap();
102 runs.insert(
103 run_id.0.clone(),
104 RunState {
105 record,
106 events: Vec::new(),
107 },
108 );
109 Ok(run_id)
110 }
111
112 async fn append_event(&self, run_id: &RunId, event: RunEvent) -> StorageResult<()> {
113 let mut runs = self.runs.lock().unwrap();
114 let state = runs
115 .get_mut(&run_id.0)
116 .ok_or_else(|| StorageError::RunNotFound {
117 run_id: run_id.0.clone(),
118 })?;
119 if state.record.status != RunStatus::Running {
120 return Err(StorageError::InvalidRunState {
121 run_id: run_id.0.clone(),
122 status: format!("{:?}", state.record.status),
123 expected: "Running".to_string(),
124 });
125 }
126 state.events.push(event);
127 Ok(())
128 }
129
130 async fn complete_run(&self, run_id: &RunId, summary: RunSummary) -> StorageResult<()> {
131 let mut runs = self.runs.lock().unwrap();
132 let state = runs
133 .get_mut(&run_id.0)
134 .ok_or_else(|| StorageError::RunNotFound {
135 run_id: run_id.0.clone(),
136 })?;
137 if state.record.status != RunStatus::Running {
138 return Err(StorageError::InvalidRunState {
139 run_id: run_id.0.clone(),
140 status: format!("{:?}", state.record.status),
141 expected: "Running".to_string(),
142 });
143 }
144 state.record.status = RunStatus::Completed;
145 state.record.summary = Some(summary);
146 state.record.completed_at = Some(Utc::now());
147 Ok(())
148 }
149
150 async fn fail_run(&self, run_id: &RunId, summary: RunSummary) -> StorageResult<()> {
151 let mut runs = self.runs.lock().unwrap();
152 let state = runs
153 .get_mut(&run_id.0)
154 .ok_or_else(|| StorageError::RunNotFound {
155 run_id: run_id.0.clone(),
156 })?;
157 if state.record.status != RunStatus::Running {
158 return Err(StorageError::InvalidRunState {
159 run_id: run_id.0.clone(),
160 status: format!("{:?}", state.record.status),
161 expected: "Running".to_string(),
162 });
163 }
164 state.record.status = RunStatus::Failed;
165 state.record.summary = Some(summary);
166 state.record.completed_at = Some(Utc::now());
167 Ok(())
168 }
169
170 async fn cancel_run(&self, run_id: &RunId, summary: RunSummary) -> StorageResult<()> {
171 let mut runs = self.runs.lock().unwrap();
172 let state = runs
173 .get_mut(&run_id.0)
174 .ok_or_else(|| StorageError::RunNotFound {
175 run_id: run_id.0.clone(),
176 })?;
177 if state.record.status != RunStatus::Running {
178 return Err(StorageError::InvalidRunState {
179 run_id: run_id.0.clone(),
180 status: format!("{:?}", state.record.status),
181 expected: "Running".to_string(),
182 });
183 }
184 state.record.status = RunStatus::Cancelled;
185 state.record.summary = Some(summary);
186 state.record.completed_at = Some(Utc::now());
187 Ok(())
188 }
189
190 async fn get_run(&self, run_id: &RunId) -> StorageResult<RunRecord> {
191 let runs = self.runs.lock().unwrap();
192 runs.get(&run_id.0)
193 .map(|s| s.record.clone())
194 .ok_or_else(|| StorageError::RunNotFound {
195 run_id: run_id.0.clone(),
196 })
197 }
198
199 async fn get_events(&self, run_id: &RunId) -> StorageResult<Vec<RunEvent>> {
200 let runs = self.runs.lock().unwrap();
201 let state = runs
202 .get(&run_id.0)
203 .ok_or_else(|| StorageError::RunNotFound {
204 run_id: run_id.0.clone(),
205 })?;
206 let mut events = state.events.clone();
207 events.sort_by_key(|e| e.seq);
208 Ok(events)
209 }
210
211 async fn list_runs(
212 &self,
213 spec_digest: Option<&ContentDigest>,
214 ) -> StorageResult<Vec<RunRecord>> {
215 let runs = self.runs.lock().unwrap();
216 let records: Vec<RunRecord> = runs
217 .values()
218 .filter(|s| {
219 spec_digest
220 .map(|d| s.record.spec_digest == *d)
221 .unwrap_or(true)
222 })
223 .map(|s| s.record.clone())
224 .collect();
225 Ok(records)
226 }
227}
228
229#[derive(Debug, Default)]
237pub struct MemoryReleaseRegistry {
238 releases: Mutex<HashMap<String, Vec<ReleaseRecord>>>,
239}
240
241impl MemoryReleaseRegistry {
242 pub fn new() -> Self {
243 Self::default()
244 }
245}
246
247#[async_trait]
248impl ReleaseRegistry for MemoryReleaseRegistry {
249 async fn promote(
250 &self,
251 name: &str,
252 spec_digest: &ContentDigest,
253 metadata: ReleaseMetadata,
254 ) -> StorageResult<ReleaseRecord> {
255 let record = ReleaseRecord {
256 name: name.to_string(),
257 spec_digest: spec_digest.clone(),
258 metadata,
259 created_at: Utc::now(),
260 };
261 let mut releases = self.releases.lock().unwrap();
262 releases
263 .entry(name.to_string())
264 .or_default()
265 .push(record.clone());
266 Ok(record)
267 }
268
269 async fn rollback(&self, name: &str) -> StorageResult<ReleaseRecord> {
270 let mut releases = self.releases.lock().unwrap();
271 let history = releases
272 .get_mut(name)
273 .ok_or_else(|| StorageError::ReleaseNotFound {
274 name: name.to_string(),
275 })?;
276 if history.len() < 2 {
277 return Err(StorageError::NoPreviousRelease {
278 name: name.to_string(),
279 });
280 }
281 let previous = history[history.len() - 2].clone();
284 history.push(previous.clone());
285 Ok(previous)
286 }
287
288 async fn current(&self, name: &str) -> StorageResult<Option<ReleaseRecord>> {
289 let releases = self.releases.lock().unwrap();
290 Ok(releases.get(name).and_then(|h| h.last().cloned()))
291 }
292
293 async fn history(&self, name: &str) -> StorageResult<Vec<ReleaseRecord>> {
294 let releases = self.releases.lock().unwrap();
295 let mut history = releases.get(name).cloned().unwrap_or_default();
296 history.reverse(); Ok(history)
298 }
299}