1use async_trait::async_trait;
4use chrono::Utc;
5use std::sync::Arc;
6use tokio::sync::RwLock;
7
8use crate::error::StoreError;
9use crate::models::{
10 AffectedProject, DEPENDENCY_EVENT_SCHEMA_V1, DependencyEvent, DependencyImpactQuery,
11 DependencyImpactResponse, FLEET_ALERT_SCHEMA_V1, FleetAlert, ListFleetAlertsQuery,
12 ListFleetAlertsResponse, RecordDependencyEventRequest, RecordDependencyEventResponse,
13 generate_ulid,
14};
15
16#[async_trait]
18pub trait FleetStore: Send + Sync {
19 async fn record_dependency_events(
21 &self,
22 request: &RecordDependencyEventRequest,
23 ) -> Result<RecordDependencyEventResponse, StoreError>;
24
25 async fn list_fleet_alerts(
27 &self,
28 query: &ListFleetAlertsQuery,
29 ) -> Result<ListFleetAlertsResponse, StoreError>;
30
31 async fn dependency_impact(
33 &self,
34 dep_name: &str,
35 query: &DependencyImpactQuery,
36 ) -> Result<DependencyImpactResponse, StoreError>;
37}
38
39#[derive(Debug, Default)]
41pub struct InMemoryFleetStore {
42 events: Arc<RwLock<Vec<DependencyEvent>>>,
43}
44
45impl InMemoryFleetStore {
46 pub fn new() -> Self {
48 Self {
49 events: Arc::new(RwLock::new(Vec::new())),
50 }
51 }
52}
53
54#[async_trait]
55impl FleetStore for InMemoryFleetStore {
56 async fn record_dependency_events(
57 &self,
58 request: &RecordDependencyEventRequest,
59 ) -> Result<RecordDependencyEventResponse, StoreError> {
60 let mut events = self.events.write().await;
61 let now = Utc::now();
62 let mut recorded = 0;
63
64 for dep in &request.dependency_changes {
65 let event = DependencyEvent {
66 schema: DEPENDENCY_EVENT_SCHEMA_V1.to_string(),
67 id: generate_ulid(),
68 project: request.project.clone(),
69 benchmark: request.benchmark.clone(),
70 dep_name: dep.name.clone(),
71 old_version: dep.old_version.clone(),
72 new_version: dep.new_version.clone(),
73 metric: request.metric.clone(),
74 delta_pct: request.delta_pct,
75 created_at: now,
76 };
77 events.push(event);
78 recorded += 1;
79 }
80
81 Ok(RecordDependencyEventResponse { recorded })
82 }
83
84 async fn list_fleet_alerts(
85 &self,
86 query: &ListFleetAlertsQuery,
87 ) -> Result<ListFleetAlertsResponse, StoreError> {
88 let events = self.events.read().await;
89 let min_affected = if query.min_affected == 0 {
90 2
91 } else {
92 query.min_affected
93 };
94
95 let mut dep_groups: std::collections::BTreeMap<
97 (String, Option<String>, Option<String>),
98 Vec<&DependencyEvent>,
99 > = std::collections::BTreeMap::new();
100
101 for event in events.iter() {
102 if let Some(since) = query.since
104 && event.created_at < since
105 {
106 continue;
107 }
108 if event.delta_pct <= 0.0 {
110 continue;
111 }
112
113 let key = (
114 event.dep_name.clone(),
115 event.old_version.clone(),
116 event.new_version.clone(),
117 );
118 dep_groups.entry(key).or_default().push(event);
119 }
120
121 let mut alerts = Vec::new();
124 for ((dep_name, old_ver, new_ver), group_events) in &dep_groups {
125 let distinct_projects: std::collections::HashSet<&str> =
127 group_events.iter().map(|e| e.project.as_str()).collect();
128
129 if distinct_projects.len() < min_affected {
130 continue;
131 }
132
133 let affected: Vec<AffectedProject> = group_events
134 .iter()
135 .map(|e| AffectedProject {
136 project: e.project.clone(),
137 benchmark: e.benchmark.clone(),
138 metric: e.metric.clone(),
139 delta_pct: e.delta_pct,
140 })
141 .collect();
142
143 let avg_delta =
144 affected.iter().map(|a| a.delta_pct).sum::<f64>() / affected.len() as f64;
145
146 let confidence = (distinct_projects.len() as f64 / 5.0).min(1.0);
148
149 let first_seen = group_events
150 .iter()
151 .map(|e| e.created_at)
152 .min()
153 .unwrap_or_else(Utc::now);
154
155 alerts.push(FleetAlert {
156 schema: FLEET_ALERT_SCHEMA_V1.to_string(),
157 id: generate_ulid(),
158 dependency: dep_name.clone(),
159 old_version: old_ver.clone(),
160 new_version: new_ver.clone(),
161 affected_projects: affected,
162 confidence,
163 avg_delta_pct: avg_delta,
164 first_seen,
165 });
166 }
167
168 alerts.sort_by(|a, b| {
170 b.confidence
171 .partial_cmp(&a.confidence)
172 .unwrap_or(std::cmp::Ordering::Equal)
173 .then(
174 b.avg_delta_pct
175 .partial_cmp(&a.avg_delta_pct)
176 .unwrap_or(std::cmp::Ordering::Equal),
177 )
178 });
179
180 let limit = query.limit as usize;
181 if limit > 0 {
182 alerts.truncate(limit);
183 }
184
185 Ok(ListFleetAlertsResponse { alerts })
186 }
187
188 async fn dependency_impact(
189 &self,
190 dep_name: &str,
191 query: &DependencyImpactQuery,
192 ) -> Result<DependencyImpactResponse, StoreError> {
193 let events = self.events.read().await;
194
195 let filtered: Vec<DependencyEvent> = events
196 .iter()
197 .filter(|e| {
198 if e.dep_name != dep_name {
199 return false;
200 }
201 if let Some(since) = query.since
202 && e.created_at < since
203 {
204 return false;
205 }
206 true
207 })
208 .cloned()
209 .collect();
210
211 let project_count = filtered
212 .iter()
213 .map(|e| e.project.as_str())
214 .collect::<std::collections::HashSet<_>>()
215 .len();
216
217 let avg_delta = if filtered.is_empty() {
218 0.0
219 } else {
220 filtered.iter().map(|e| e.delta_pct).sum::<f64>() / filtered.len() as f64
221 };
222
223 let limit = query.limit as usize;
224 let events_out = if limit > 0 {
225 filtered.into_iter().take(limit).collect()
226 } else {
227 filtered
228 };
229
230 Ok(DependencyImpactResponse {
231 dependency: dep_name.to_string(),
232 events: events_out,
233 project_count,
234 avg_delta_pct: avg_delta,
235 })
236 }
237}
238
239#[cfg(test)]
240mod tests {
241 use super::*;
242 use crate::models::DependencyChange;
243
244 fn make_request(
245 project: &str,
246 benchmark: &str,
247 deps: Vec<(&str, Option<&str>, Option<&str>)>,
248 delta: f64,
249 ) -> RecordDependencyEventRequest {
250 RecordDependencyEventRequest {
251 project: project.to_string(),
252 benchmark: benchmark.to_string(),
253 dependency_changes: deps
254 .into_iter()
255 .map(|(name, old, new)| DependencyChange {
256 name: name.to_string(),
257 old_version: old.map(|s| s.to_string()),
258 new_version: new.map(|s| s.to_string()),
259 })
260 .collect(),
261 metric: "wall_ms".to_string(),
262 delta_pct: delta,
263 }
264 }
265
266 #[tokio::test]
267 async fn test_record_and_list_events() {
268 let store = InMemoryFleetStore::new();
269
270 let req = make_request(
271 "proj-a",
272 "bench-1",
273 vec![("serde", Some("1.0.0"), Some("1.1.0"))],
274 5.0,
275 );
276 let resp = store.record_dependency_events(&req).await.unwrap();
277 assert_eq!(resp.recorded, 1);
278
279 let impact = store
280 .dependency_impact(
281 "serde",
282 &DependencyImpactQuery {
283 since: None,
284 limit: 50,
285 },
286 )
287 .await
288 .unwrap();
289 assert_eq!(impact.events.len(), 1);
290 assert_eq!(impact.project_count, 1);
291 }
292
293 #[tokio::test]
294 async fn test_fleet_alerts_require_multiple_projects() {
295 let store = InMemoryFleetStore::new();
296
297 let req = make_request(
299 "proj-a",
300 "bench-1",
301 vec![("tokio", Some("1.0.0"), Some("1.1.0"))],
302 10.0,
303 );
304 store.record_dependency_events(&req).await.unwrap();
305
306 let alerts = store
307 .list_fleet_alerts(&ListFleetAlertsQuery {
308 min_affected: 2,
309 since: None,
310 limit: 50,
311 })
312 .await
313 .unwrap();
314 assert!(alerts.alerts.is_empty());
315
316 let req2 = make_request(
318 "proj-b",
319 "bench-2",
320 vec![("tokio", Some("1.0.0"), Some("1.1.0"))],
321 8.0,
322 );
323 store.record_dependency_events(&req2).await.unwrap();
324
325 let alerts = store
326 .list_fleet_alerts(&ListFleetAlertsQuery {
327 min_affected: 2,
328 since: None,
329 limit: 50,
330 })
331 .await
332 .unwrap();
333 assert_eq!(alerts.alerts.len(), 1);
334 assert_eq!(alerts.alerts[0].dependency, "tokio");
335 assert_eq!(alerts.alerts[0].affected_projects.len(), 2);
336 assert!((alerts.alerts[0].avg_delta_pct - 9.0).abs() < 0.01);
337 }
338
339 #[tokio::test]
340 async fn test_fleet_alerts_ignore_improvements() {
341 let store = InMemoryFleetStore::new();
342
343 let req1 = make_request(
345 "proj-a",
346 "bench-1",
347 vec![("serde", Some("1.0.0"), Some("1.1.0"))],
348 -5.0,
349 );
350 store.record_dependency_events(&req1).await.unwrap();
351
352 let req2 = make_request(
353 "proj-b",
354 "bench-2",
355 vec![("serde", Some("1.0.0"), Some("1.1.0"))],
356 -3.0,
357 );
358 store.record_dependency_events(&req2).await.unwrap();
359
360 let alerts = store
361 .list_fleet_alerts(&ListFleetAlertsQuery {
362 min_affected: 2,
363 since: None,
364 limit: 50,
365 })
366 .await
367 .unwrap();
368 assert!(alerts.alerts.is_empty());
369 }
370
371 #[tokio::test]
372 async fn test_dependency_impact_filters_by_name() {
373 let store = InMemoryFleetStore::new();
374
375 let req = make_request(
376 "proj-a",
377 "bench-1",
378 vec![
379 ("serde", Some("1.0.0"), Some("1.1.0")),
380 ("tokio", Some("1.0.0"), Some("1.1.0")),
381 ],
382 5.0,
383 );
384 store.record_dependency_events(&req).await.unwrap();
385
386 let impact = store
387 .dependency_impact(
388 "serde",
389 &DependencyImpactQuery {
390 since: None,
391 limit: 50,
392 },
393 )
394 .await
395 .unwrap();
396 assert_eq!(impact.events.len(), 1);
397 assert_eq!(impact.dependency, "serde");
398 }
399
400 #[tokio::test]
401 async fn test_confidence_scaling() {
402 let store = InMemoryFleetStore::new();
403
404 for i in 0..5 {
406 let req = make_request(
407 &format!("proj-{}", i),
408 "bench",
409 vec![("hyper", Some("0.14.0"), Some("1.0.0"))],
410 10.0,
411 );
412 store.record_dependency_events(&req).await.unwrap();
413 }
414
415 let alerts = store
416 .list_fleet_alerts(&ListFleetAlertsQuery {
417 min_affected: 2,
418 since: None,
419 limit: 50,
420 })
421 .await
422 .unwrap();
423 assert_eq!(alerts.alerts.len(), 1);
424 assert!((alerts.alerts[0].confidence - 1.0).abs() < 0.01);
425 }
426}