1use async_trait::async_trait;
4use std::collections::BTreeMap;
5use std::sync::Arc;
6use tokio::sync::RwLock;
7
8use super::{BaselineStore, StorageHealth};
9use crate::error::StoreError;
10use crate::models::{
11 BaselineRecord, BaselineSummary, BaselineVersion, ListBaselinesQuery, ListBaselinesResponse,
12 ListVerdictsQuery, ListVerdictsResponse, PaginationInfo, VerdictRecord,
13};
14
15#[derive(Debug, Default)]
17pub struct InMemoryStore {
18 #[allow(clippy::type_complexity)]
19 baselines: Arc<RwLock<BTreeMap<(String, String, String), BaselineRecord>>>,
20 verdicts: Arc<RwLock<Vec<VerdictRecord>>>,
21}
22
23impl InMemoryStore {
24 pub fn new() -> Self {
26 Self {
27 baselines: Arc::new(RwLock::new(BTreeMap::new())),
28 verdicts: Arc::new(RwLock::new(Vec::new())),
29 }
30 }
31
32 fn key(project: &str, benchmark: &str, version: &str) -> (String, String, String) {
33 (
34 project.to_string(),
35 benchmark.to_string(),
36 version.to_string(),
37 )
38 }
39}
40
41#[async_trait]
42impl BaselineStore for InMemoryStore {
43 async fn create(&self, record: &BaselineRecord) -> Result<(), StoreError> {
44 let key = Self::key(&record.project, &record.benchmark, &record.version);
45 let mut baselines = self.baselines.write().await;
46
47 if baselines.contains_key(&key) {
48 return Err(StoreError::AlreadyExists(format!(
49 "project={}, benchmark={}, version={}",
50 record.project, record.benchmark, record.version
51 )));
52 }
53
54 baselines.insert(key, record.clone());
55 Ok(())
56 }
57
58 async fn get(
59 &self,
60 project: &str,
61 benchmark: &str,
62 version: &str,
63 ) -> Result<Option<BaselineRecord>, StoreError> {
64 let key = Self::key(project, benchmark, version);
65 let baselines = self.baselines.read().await;
66 Ok(baselines.get(&key).filter(|r| !r.deleted).cloned())
67 }
68
69 async fn get_latest(
70 &self,
71 project: &str,
72 benchmark: &str,
73 ) -> Result<Option<BaselineRecord>, StoreError> {
74 let baselines = self.baselines.read().await;
75 let latest = baselines
76 .values()
77 .filter(|r| r.project == project && r.benchmark == benchmark && !r.deleted)
78 .max_by_key(|r| r.created_at);
79 Ok(latest.cloned())
80 }
81
82 #[allow(clippy::collapsible_if)]
83 async fn list(
84 &self,
85 project: &str,
86 query: &ListBaselinesQuery,
87 ) -> Result<ListBaselinesResponse, StoreError> {
88 let baselines = self.baselines.read().await;
89 let parsed_tags = query.parsed_tags();
90
91 let mut filtered: Vec<_> = baselines
92 .values()
93 .filter(|r| {
94 if r.project != project || r.deleted {
96 return false;
97 }
98
99 if let Some(ref b) = query.benchmark {
101 if &r.benchmark != b {
102 return false;
103 }
104 }
105
106 if let Some(ref p) = query.benchmark_prefix {
108 if !r.benchmark.starts_with(p) {
109 return false;
110 }
111 }
112
113 if let Some(ref gr) = query.git_ref {
115 if r.git_ref.as_deref() != Some(gr) {
116 return false;
117 }
118 }
119
120 if let Some(ref gs) = query.git_sha {
122 if r.git_sha.as_deref() != Some(gs) {
123 return false;
124 }
125 }
126
127 if let Some(since) = query.since {
129 if r.created_at < since {
130 return false;
131 }
132 }
133
134 if let Some(until) = query.until {
136 if r.created_at > until {
137 return false;
138 }
139 }
140
141 if !parsed_tags.is_empty() {
143 for tag in &parsed_tags {
144 if !r.tags.contains(tag) {
145 return false;
146 }
147 }
148 }
149
150 true
151 })
152 .collect();
153
154 filtered.sort_by(|a, b| b.created_at.cmp(&a.created_at));
155
156 let total = filtered.len() as u64;
157 let offset = query.offset as usize;
158 let limit = query.limit as usize;
159
160 let paginated: Vec<_> = filtered
161 .into_iter()
162 .skip(offset)
163 .take(limit)
164 .map(|r| {
165 let mut summary: BaselineSummary = r.clone().into();
166 if query.include_receipt {
167 summary.receipt = Some(r.receipt.clone());
168 }
169 summary
170 })
171 .collect();
172
173 let has_more = (offset + paginated.len()) < total as usize;
174
175 Ok(ListBaselinesResponse {
176 baselines: paginated,
177 pagination: PaginationInfo {
178 total,
179 limit: query.limit,
180 offset: query.offset,
181 has_more,
182 },
183 })
184 }
185
186 async fn update(&self, record: &BaselineRecord) -> Result<(), StoreError> {
187 let key = Self::key(&record.project, &record.benchmark, &record.version);
188 let mut baselines = self.baselines.write().await;
189
190 if !baselines.contains_key(&key) {
191 return Err(StoreError::NotFound(format!(
192 "project={}, benchmark={}, version={}",
193 record.project, record.benchmark, record.version
194 )));
195 }
196
197 baselines.insert(key, record.clone());
198 Ok(())
199 }
200
201 async fn delete(
202 &self,
203 project: &str,
204 benchmark: &str,
205 version: &str,
206 ) -> Result<bool, StoreError> {
207 let key = Self::key(project, benchmark, version);
208 let mut baselines = self.baselines.write().await;
209
210 if let Some(record) = baselines.get_mut(&key) {
211 if record.deleted {
212 return Ok(false);
213 }
214 record.deleted = true;
215 return Ok(true);
216 }
217
218 Ok(false)
219 }
220
221 async fn hard_delete(
222 &self,
223 project: &str,
224 benchmark: &str,
225 version: &str,
226 ) -> Result<bool, StoreError> {
227 let key = Self::key(project, benchmark, version);
228 let mut baselines = self.baselines.write().await;
229 Ok(baselines.remove(&key).is_some())
230 }
231
232 async fn list_versions(
233 &self,
234 project: &str,
235 benchmark: &str,
236 ) -> Result<Vec<BaselineVersion>, StoreError> {
237 let baselines = self.baselines.read().await;
238
239 let mut versions: Vec<_> = baselines
240 .values()
241 .filter(|r| r.project == project && r.benchmark == benchmark && !r.deleted)
242 .map(|r| BaselineVersion {
243 version: r.version.clone(),
244 git_ref: r.git_ref.clone(),
245 git_sha: r.git_sha.clone(),
246 created_at: r.created_at,
247 created_by: None,
248 is_current: false,
249 source: r.source.clone(),
250 })
251 .collect();
252
253 versions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
254
255 if let Some(first) = versions.first_mut() {
256 first.is_current = true;
257 }
258
259 Ok(versions)
260 }
261
262 async fn health_check(&self) -> Result<StorageHealth, StoreError> {
263 Ok(StorageHealth::Healthy)
264 }
265
266 fn backend_type(&self) -> &'static str {
267 "memory"
268 }
269
270 async fn create_verdict(&self, record: &VerdictRecord) -> Result<(), StoreError> {
271 let mut verdicts = self.verdicts.write().await;
272 verdicts.push(record.clone());
273 Ok(())
274 }
275
276 async fn list_verdicts(
277 &self,
278 project: &str,
279 query: &ListVerdictsQuery,
280 ) -> Result<ListVerdictsResponse, StoreError> {
281 let verdicts = self.verdicts.read().await;
282
283 let mut filtered: Vec<_> = verdicts
284 .iter()
285 .filter(|r| {
286 if r.project != project {
287 return false;
288 }
289
290 if let Some(ref b) = query.benchmark
291 && &r.benchmark != b
292 {
293 return false;
294 }
295
296 if let Some(ref s) = query.status
297 && &r.status != s
298 {
299 return false;
300 }
301
302 if let Some(since) = query.since
303 && r.created_at < since
304 {
305 return false;
306 }
307
308 if let Some(until) = query.until
309 && r.created_at > until
310 {
311 return false;
312 }
313
314 true
315 })
316 .cloned()
317 .collect();
318
319 filtered.sort_by(|a, b| b.created_at.cmp(&a.created_at));
320
321 let total = filtered.len() as u64;
322 let offset = query.offset as usize;
323 let limit = query.limit as usize;
324
325 let paginated: Vec<_> = filtered.into_iter().skip(offset).take(limit).collect();
326
327 let has_more = (offset + paginated.len()) < total as usize;
328
329 Ok(ListVerdictsResponse {
330 verdicts: paginated,
331 pagination: PaginationInfo {
332 total,
333 limit: query.limit,
334 offset: query.offset,
335 has_more,
336 },
337 })
338 }
339}