mlua_swarm/store/issue/
inmemory.rs1use super::{Inner, IssueId, IssuePayload, IssueStatus, IssueStore, IssueStoreError, SharedInner};
5use async_trait::async_trait;
6use std::sync::Mutex;
7
8#[derive(Default)]
12pub struct InMemoryIssueStore {
13 inner: SharedInner,
14}
15
16impl InMemoryIssueStore {
17 pub fn new() -> Self {
19 Self {
20 inner: Mutex::new(Inner::default()),
21 }
22 }
23}
24
25#[async_trait]
26impl IssueStore for InMemoryIssueStore {
27 fn name(&self) -> &str {
28 "in-memory"
29 }
30
31 async fn create(&self, payload: IssuePayload) -> Result<(), IssueStoreError> {
32 let mut inner = self.inner.lock().unwrap();
33 let id = payload.issue_id.clone();
34 if inner.payloads.contains_key(&id) {
35 return Err(IssueStoreError::Duplicate(id));
36 }
37 inner.order.push(id.clone());
38 inner.statuses.insert(id.clone(), IssueStatus::Pending);
39 inner.pending.push_back(id.clone());
40 inner.payloads.insert(id, payload);
41 Ok(())
42 }
43
44 async fn get(&self, id: &IssueId) -> Result<IssuePayload, IssueStoreError> {
45 let inner = self.inner.lock().unwrap();
46 inner
47 .payloads
48 .get(id)
49 .cloned()
50 .ok_or_else(|| IssueStoreError::NotFound(id.clone()))
51 }
52
53 async fn status(&self, id: &IssueId) -> Result<IssueStatus, IssueStoreError> {
54 let inner = self.inner.lock().unwrap();
55 inner
56 .statuses
57 .get(id)
58 .cloned()
59 .ok_or_else(|| IssueStoreError::NotFound(id.clone()))
60 }
61
62 async fn list(&self) -> Result<Vec<(IssueId, IssueStatus)>, IssueStoreError> {
63 let inner = self.inner.lock().unwrap();
64 Ok(inner
65 .order
66 .iter()
67 .map(|id| {
68 let st = inner
69 .statuses
70 .get(id)
71 .cloned()
72 .unwrap_or(IssueStatus::Pending);
73 (id.clone(), st)
74 })
75 .collect())
76 }
77
78 async fn pop_pending(&self) -> Result<Option<IssuePayload>, IssueStoreError> {
79 let mut inner = self.inner.lock().unwrap();
80 let Some(id) = inner.pending.pop_front() else {
81 return Ok(None);
82 };
83 let payload = inner
84 .payloads
85 .get(&id)
86 .cloned()
87 .ok_or_else(|| IssueStoreError::NotFound(id.clone()))?;
88 inner.statuses.insert(id, IssueStatus::InFlight);
89 Ok(Some(payload))
90 }
91
92 async fn update_status(
93 &self,
94 id: &IssueId,
95 status: IssueStatus,
96 ) -> Result<(), IssueStoreError> {
97 let mut inner = self.inner.lock().unwrap();
98 if !inner.payloads.contains_key(id) {
99 return Err(IssueStoreError::NotFound(id.clone()));
100 }
101 inner.statuses.insert(id.clone(), status);
102 Ok(())
103 }
104}
105
106#[cfg(test)]
111mod tests {
112 use super::*;
113 use crate::blueprint::store::BlueprintId;
114
115 fn mk(id: &str) -> IssuePayload {
116 IssuePayload {
117 issue_id: IssueId::new(id),
118 blueprint_id: BlueprintId::new("main"),
119 intent: format!("intent for {id}"),
120 }
121 }
122
123 #[tokio::test]
124 async fn create_then_get() {
125 let s = InMemoryIssueStore::new();
126 s.create(mk("i1")).await.unwrap();
127 let got = s.get(&IssueId::new("i1")).await.unwrap();
128 assert_eq!(got.issue_id, IssueId::new("i1"));
129 assert_eq!(
130 s.status(&IssueId::new("i1")).await.unwrap(),
131 IssueStatus::Pending
132 );
133 }
134
135 #[tokio::test]
136 async fn duplicate_create_rejected() {
137 let s = InMemoryIssueStore::new();
138 s.create(mk("i1")).await.unwrap();
139 let err = s.create(mk("i1")).await.unwrap_err();
140 assert!(matches!(err, IssueStoreError::Duplicate(_)));
141 }
142
143 #[tokio::test]
144 async fn pop_pending_fifo_and_transitions_inflight() {
145 let s = InMemoryIssueStore::new();
146 s.create(mk("a")).await.unwrap();
147 s.create(mk("b")).await.unwrap();
148
149 let p1 = s.pop_pending().await.unwrap().unwrap();
150 assert_eq!(p1.issue_id, IssueId::new("a"));
151 assert_eq!(
152 s.status(&IssueId::new("a")).await.unwrap(),
153 IssueStatus::InFlight
154 );
155
156 let p2 = s.pop_pending().await.unwrap().unwrap();
157 assert_eq!(p2.issue_id, IssueId::new("b"));
158
159 assert!(s.pop_pending().await.unwrap().is_none());
160 }
161
162 #[tokio::test]
163 async fn update_status_to_applied() {
164 let s = InMemoryIssueStore::new();
165 s.create(mk("x")).await.unwrap();
166 s.pop_pending().await.unwrap();
167 s.update_status(
168 &IssueId::new("x"),
169 IssueStatus::Applied {
170 new_version: "abc123".into(),
171 },
172 )
173 .await
174 .unwrap();
175 match s.status(&IssueId::new("x")).await.unwrap() {
176 IssueStatus::Applied { new_version } => assert_eq!(new_version, "abc123"),
177 other => panic!("unexpected: {other:?}"),
178 }
179 }
180
181 #[tokio::test]
182 async fn list_returns_insertion_order() {
183 let s = InMemoryIssueStore::new();
184 s.create(mk("a")).await.unwrap();
185 s.create(mk("b")).await.unwrap();
186 s.create(mk("c")).await.unwrap();
187 let v = s.list().await.unwrap();
188 let ids: Vec<_> = v.iter().map(|(i, _)| i.0.clone()).collect();
189 assert_eq!(ids, vec!["a", "b", "c"]);
190 }
191
192 #[tokio::test]
193 async fn update_status_unknown_fails() {
194 let s = InMemoryIssueStore::new();
195 let err = s
196 .update_status(&IssueId::new("nope"), IssueStatus::Pending)
197 .await
198 .unwrap_err();
199 assert!(matches!(err, IssueStoreError::NotFound(_)));
200 }
201}