Skip to main content

oxidized_state/
fakes.rs

1//! In-memory fakes for storage traits (testing only)
2//!
3//! Provides `MemoryCasStore`, `MemoryRunLedger`, and `MemoryReleaseRegistry`
4//! that satisfy the trait contracts without any external dependencies.
5
6use std::collections::HashMap;
7use std::sync::Mutex;
8
9use async_trait::async_trait;
10use chrono::Utc;
11
12use crate::error::StorageError;
13use crate::storage_traits::*;
14
15// ---------------------------------------------------------------------------
16// MemoryCasStore
17// ---------------------------------------------------------------------------
18
19/// In-memory content-addressed store backed by a `HashMap<digest, bytes>`.
20#[derive(Debug, Default)]
21pub struct MemoryCasStore {
22    store: Mutex<HashMap<String, Vec<u8>>>,
23}
24
25impl MemoryCasStore {
26    pub fn new() -> Self {
27        Self::default()
28    }
29}
30
31#[async_trait]
32impl CasStore for MemoryCasStore {
33    async fn put(&self, data: &[u8]) -> StorageResult<ContentDigest> {
34        let digest = ContentDigest::from_bytes(data);
35        let mut store = self.store.lock().unwrap();
36        store.insert(digest.as_str().to_string(), data.to_vec());
37        Ok(digest)
38    }
39
40    async fn get(&self, digest: &ContentDigest) -> StorageResult<Vec<u8>> {
41        let store = self.store.lock().unwrap();
42        store
43            .get(digest.as_str())
44            .cloned()
45            .ok_or_else(|| StorageError::NotFound {
46                digest: digest.as_str().to_string(),
47            })
48    }
49
50    async fn contains(&self, digest: &ContentDigest) -> StorageResult<bool> {
51        let store = self.store.lock().unwrap();
52        Ok(store.contains_key(digest.as_str()))
53    }
54
55    async fn delete(&self, digest: &ContentDigest) -> StorageResult<()> {
56        let mut store = self.store.lock().unwrap();
57        store.remove(digest.as_str());
58        Ok(())
59    }
60}
61
62// ---------------------------------------------------------------------------
63// MemoryRunLedger
64// ---------------------------------------------------------------------------
65
66#[derive(Debug)]
67struct RunState {
68    record: RunRecord,
69    events: Vec<RunEvent>,
70}
71
72/// In-memory run ledger backed by a `HashMap<RunId, RunState>`.
73#[derive(Debug, Default)]
74pub struct MemoryRunLedger {
75    runs: Mutex<HashMap<String, RunState>>,
76}
77
78impl MemoryRunLedger {
79    pub fn new() -> Self {
80        Self::default()
81    }
82}
83
84#[async_trait]
85impl RunLedger for MemoryRunLedger {
86    async fn create_run(
87        &self,
88        spec_digest: &ContentDigest,
89        metadata: RunMetadata,
90    ) -> StorageResult<RunId> {
91        let run_id = RunId::new();
92        let record = RunRecord {
93            run_id: run_id.clone(),
94            spec_digest: spec_digest.clone(),
95            metadata,
96            status: RunStatus::Running,
97            summary: None,
98            created_at: Utc::now(),
99            completed_at: None,
100        };
101        let mut runs = self.runs.lock().unwrap();
102        runs.insert(
103            run_id.0.clone(),
104            RunState {
105                record,
106                events: Vec::new(),
107            },
108        );
109        Ok(run_id)
110    }
111
112    async fn append_event(&self, run_id: &RunId, event: RunEvent) -> StorageResult<()> {
113        let mut runs = self.runs.lock().unwrap();
114        let state = runs
115            .get_mut(&run_id.0)
116            .ok_or_else(|| StorageError::RunNotFound {
117                run_id: run_id.0.clone(),
118            })?;
119        if state.record.status != RunStatus::Running {
120            return Err(StorageError::InvalidRunState {
121                run_id: run_id.0.clone(),
122                status: format!("{:?}", state.record.status),
123                expected: "Running".to_string(),
124            });
125        }
126        state.events.push(event);
127        Ok(())
128    }
129
130    async fn complete_run(&self, run_id: &RunId, summary: RunSummary) -> StorageResult<()> {
131        let mut runs = self.runs.lock().unwrap();
132        let state = runs
133            .get_mut(&run_id.0)
134            .ok_or_else(|| StorageError::RunNotFound {
135                run_id: run_id.0.clone(),
136            })?;
137        if state.record.status != RunStatus::Running {
138            return Err(StorageError::InvalidRunState {
139                run_id: run_id.0.clone(),
140                status: format!("{:?}", state.record.status),
141                expected: "Running".to_string(),
142            });
143        }
144        state.record.status = RunStatus::Completed;
145        state.record.summary = Some(summary);
146        state.record.completed_at = Some(Utc::now());
147        Ok(())
148    }
149
150    async fn fail_run(&self, run_id: &RunId, summary: RunSummary) -> StorageResult<()> {
151        let mut runs = self.runs.lock().unwrap();
152        let state = runs
153            .get_mut(&run_id.0)
154            .ok_or_else(|| StorageError::RunNotFound {
155                run_id: run_id.0.clone(),
156            })?;
157        if state.record.status != RunStatus::Running {
158            return Err(StorageError::InvalidRunState {
159                run_id: run_id.0.clone(),
160                status: format!("{:?}", state.record.status),
161                expected: "Running".to_string(),
162            });
163        }
164        state.record.status = RunStatus::Failed;
165        state.record.summary = Some(summary);
166        state.record.completed_at = Some(Utc::now());
167        Ok(())
168    }
169
170    async fn cancel_run(&self, run_id: &RunId, summary: RunSummary) -> StorageResult<()> {
171        let mut runs = self.runs.lock().unwrap();
172        let state = runs
173            .get_mut(&run_id.0)
174            .ok_or_else(|| StorageError::RunNotFound {
175                run_id: run_id.0.clone(),
176            })?;
177        if state.record.status != RunStatus::Running {
178            return Err(StorageError::InvalidRunState {
179                run_id: run_id.0.clone(),
180                status: format!("{:?}", state.record.status),
181                expected: "Running".to_string(),
182            });
183        }
184        state.record.status = RunStatus::Cancelled;
185        state.record.summary = Some(summary);
186        state.record.completed_at = Some(Utc::now());
187        Ok(())
188    }
189
190    async fn get_run(&self, run_id: &RunId) -> StorageResult<RunRecord> {
191        let runs = self.runs.lock().unwrap();
192        runs.get(&run_id.0)
193            .map(|s| s.record.clone())
194            .ok_or_else(|| StorageError::RunNotFound {
195                run_id: run_id.0.clone(),
196            })
197    }
198
199    async fn get_events(&self, run_id: &RunId) -> StorageResult<Vec<RunEvent>> {
200        let runs = self.runs.lock().unwrap();
201        let state = runs
202            .get(&run_id.0)
203            .ok_or_else(|| StorageError::RunNotFound {
204                run_id: run_id.0.clone(),
205            })?;
206        let mut events = state.events.clone();
207        events.sort_by_key(|e| e.seq);
208        Ok(events)
209    }
210
211    async fn list_runs(
212        &self,
213        spec_digest: Option<&ContentDigest>,
214    ) -> StorageResult<Vec<RunRecord>> {
215        let runs = self.runs.lock().unwrap();
216        let records: Vec<RunRecord> = runs
217            .values()
218            .filter(|s| {
219                spec_digest
220                    .map(|d| s.record.spec_digest == *d)
221                    .unwrap_or(true)
222            })
223            .map(|s| s.record.clone())
224            .collect();
225        Ok(records)
226    }
227}
228
229// ---------------------------------------------------------------------------
230// MemoryReleaseRegistry
231// ---------------------------------------------------------------------------
232
233/// In-memory release registry backed by a `HashMap<name, Vec<ReleaseRecord>>`.
234///
235/// Each agent name maps to its full release history (newest last internally).
236#[derive(Debug, Default)]
237pub struct MemoryReleaseRegistry {
238    releases: Mutex<HashMap<String, Vec<ReleaseRecord>>>,
239}
240
241impl MemoryReleaseRegistry {
242    pub fn new() -> Self {
243        Self::default()
244    }
245}
246
247#[async_trait]
248impl ReleaseRegistry for MemoryReleaseRegistry {
249    async fn promote(
250        &self,
251        name: &str,
252        spec_digest: &ContentDigest,
253        metadata: ReleaseMetadata,
254    ) -> StorageResult<ReleaseRecord> {
255        let record = ReleaseRecord {
256            name: name.to_string(),
257            spec_digest: spec_digest.clone(),
258            metadata,
259            created_at: Utc::now(),
260        };
261        let mut releases = self.releases.lock().unwrap();
262        releases
263            .entry(name.to_string())
264            .or_default()
265            .push(record.clone());
266        Ok(record)
267    }
268
269    async fn rollback(&self, name: &str) -> StorageResult<ReleaseRecord> {
270        let mut releases = self.releases.lock().unwrap();
271        let history = releases
272            .get_mut(name)
273            .ok_or_else(|| StorageError::ReleaseNotFound {
274                name: name.to_string(),
275            })?;
276        if history.len() < 2 {
277            return Err(StorageError::NoPreviousRelease {
278                name: name.to_string(),
279            });
280        }
281        // Append-only: clone the previous release as a new entry
282        // instead of destroying the current one.
283        let previous = history[history.len() - 2].clone();
284        history.push(previous.clone());
285        Ok(previous)
286    }
287
288    async fn current(&self, name: &str) -> StorageResult<Option<ReleaseRecord>> {
289        let releases = self.releases.lock().unwrap();
290        Ok(releases.get(name).and_then(|h| h.last().cloned()))
291    }
292
293    async fn history(&self, name: &str) -> StorageResult<Vec<ReleaseRecord>> {
294        let releases = self.releases.lock().unwrap();
295        let mut history = releases.get(name).cloned().unwrap_or_default();
296        history.reverse(); // newest first
297        Ok(history)
298    }
299}