Skip to main content

perfgate_server/storage/
fleet.rs

1//! Fleet-wide dependency event storage.
2
3use async_trait::async_trait;
4use chrono::Utc;
5use std::sync::Arc;
6use tokio::sync::RwLock;
7
8use crate::error::StoreError;
9use crate::models::{
10    AffectedProject, DEPENDENCY_EVENT_SCHEMA_V1, DependencyEvent, DependencyImpactQuery,
11    DependencyImpactResponse, FLEET_ALERT_SCHEMA_V1, FleetAlert, ListFleetAlertsQuery,
12    ListFleetAlertsResponse, RecordDependencyEventRequest, RecordDependencyEventResponse,
13    generate_ulid,
14};
15
16/// Trait for fleet-wide dependency event storage operations.
17#[async_trait]
18pub trait FleetStore: Send + Sync {
19    /// Records dependency change events.
20    async fn record_dependency_events(
21        &self,
22        request: &RecordDependencyEventRequest,
23    ) -> Result<RecordDependencyEventResponse, StoreError>;
24
25    /// Lists fleet-wide alerts (correlated regressions across projects).
26    async fn list_fleet_alerts(
27        &self,
28        query: &ListFleetAlertsQuery,
29    ) -> Result<ListFleetAlertsResponse, StoreError>;
30
31    /// Gets the impact of a specific dependency across projects.
32    async fn dependency_impact(
33        &self,
34        dep_name: &str,
35        query: &DependencyImpactQuery,
36    ) -> Result<DependencyImpactResponse, StoreError>;
37}
38
39/// In-memory implementation of fleet storage for testing and development.
40#[derive(Debug, Default)]
41pub struct InMemoryFleetStore {
42    events: Arc<RwLock<Vec<DependencyEvent>>>,
43}
44
45impl InMemoryFleetStore {
46    /// Creates a new empty in-memory fleet store.
47    pub fn new() -> Self {
48        Self {
49            events: Arc::new(RwLock::new(Vec::new())),
50        }
51    }
52}
53
54#[async_trait]
55impl FleetStore for InMemoryFleetStore {
56    async fn record_dependency_events(
57        &self,
58        request: &RecordDependencyEventRequest,
59    ) -> Result<RecordDependencyEventResponse, StoreError> {
60        let mut events = self.events.write().await;
61        let now = Utc::now();
62        let mut recorded = 0;
63
64        for dep in &request.dependency_changes {
65            let event = DependencyEvent {
66                schema: DEPENDENCY_EVENT_SCHEMA_V1.to_string(),
67                id: generate_ulid(),
68                project: request.project.clone(),
69                benchmark: request.benchmark.clone(),
70                dep_name: dep.name.clone(),
71                old_version: dep.old_version.clone(),
72                new_version: dep.new_version.clone(),
73                metric: request.metric.clone(),
74                delta_pct: request.delta_pct,
75                created_at: now,
76            };
77            events.push(event);
78            recorded += 1;
79        }
80
81        Ok(RecordDependencyEventResponse { recorded })
82    }
83
84    async fn list_fleet_alerts(
85        &self,
86        query: &ListFleetAlertsQuery,
87    ) -> Result<ListFleetAlertsResponse, StoreError> {
88        let events = self.events.read().await;
89        let min_affected = if query.min_affected == 0 {
90            2
91        } else {
92            query.min_affected
93        };
94
95        // Group events by (dep_name, old_version, new_version) to find correlated regressions
96        let mut dep_groups: std::collections::BTreeMap<
97            (String, Option<String>, Option<String>),
98            Vec<&DependencyEvent>,
99        > = std::collections::BTreeMap::new();
100
101        for event in events.iter() {
102            // Filter by since if specified
103            if let Some(since) = query.since
104                && event.created_at < since
105            {
106                continue;
107            }
108            // Only include regressions (positive delta)
109            if event.delta_pct <= 0.0 {
110                continue;
111            }
112
113            let key = (
114                event.dep_name.clone(),
115                event.old_version.clone(),
116                event.new_version.clone(),
117            );
118            dep_groups.entry(key).or_default().push(event);
119        }
120
121        // NOTE: Alert IDs are regenerated on each query since alerts are computed
122        // on-the-fly from events. A persistent store should cache alert records.
123        let mut alerts = Vec::new();
124        for ((dep_name, old_ver, new_ver), group_events) in &dep_groups {
125            // Count distinct projects
126            let distinct_projects: std::collections::HashSet<&str> =
127                group_events.iter().map(|e| e.project.as_str()).collect();
128
129            if distinct_projects.len() < min_affected {
130                continue;
131            }
132
133            let affected: Vec<AffectedProject> = group_events
134                .iter()
135                .map(|e| AffectedProject {
136                    project: e.project.clone(),
137                    benchmark: e.benchmark.clone(),
138                    metric: e.metric.clone(),
139                    delta_pct: e.delta_pct,
140                })
141                .collect();
142
143            let avg_delta =
144                affected.iter().map(|a| a.delta_pct).sum::<f64>() / affected.len() as f64;
145
146            // Confidence: scale by number of distinct projects (cap at 1.0)
147            let confidence = (distinct_projects.len() as f64 / 5.0).min(1.0);
148
149            let first_seen = group_events
150                .iter()
151                .map(|e| e.created_at)
152                .min()
153                .unwrap_or_else(Utc::now);
154
155            alerts.push(FleetAlert {
156                schema: FLEET_ALERT_SCHEMA_V1.to_string(),
157                id: generate_ulid(),
158                dependency: dep_name.clone(),
159                old_version: old_ver.clone(),
160                new_version: new_ver.clone(),
161                affected_projects: affected,
162                confidence,
163                avg_delta_pct: avg_delta,
164                first_seen,
165            });
166        }
167
168        // Sort by confidence descending, then by avg_delta descending
169        alerts.sort_by(|a, b| {
170            b.confidence
171                .partial_cmp(&a.confidence)
172                .unwrap_or(std::cmp::Ordering::Equal)
173                .then(
174                    b.avg_delta_pct
175                        .partial_cmp(&a.avg_delta_pct)
176                        .unwrap_or(std::cmp::Ordering::Equal),
177                )
178        });
179
180        let limit = query.limit as usize;
181        if limit > 0 {
182            alerts.truncate(limit);
183        }
184
185        Ok(ListFleetAlertsResponse { alerts })
186    }
187
188    async fn dependency_impact(
189        &self,
190        dep_name: &str,
191        query: &DependencyImpactQuery,
192    ) -> Result<DependencyImpactResponse, StoreError> {
193        let events = self.events.read().await;
194
195        let filtered: Vec<DependencyEvent> = events
196            .iter()
197            .filter(|e| {
198                if e.dep_name != dep_name {
199                    return false;
200                }
201                if let Some(since) = query.since
202                    && e.created_at < since
203                {
204                    return false;
205                }
206                true
207            })
208            .cloned()
209            .collect();
210
211        let project_count = filtered
212            .iter()
213            .map(|e| e.project.as_str())
214            .collect::<std::collections::HashSet<_>>()
215            .len();
216
217        let avg_delta = if filtered.is_empty() {
218            0.0
219        } else {
220            filtered.iter().map(|e| e.delta_pct).sum::<f64>() / filtered.len() as f64
221        };
222
223        let limit = query.limit as usize;
224        let events_out = if limit > 0 {
225            filtered.into_iter().take(limit).collect()
226        } else {
227            filtered
228        };
229
230        Ok(DependencyImpactResponse {
231            dependency: dep_name.to_string(),
232            events: events_out,
233            project_count,
234            avg_delta_pct: avg_delta,
235        })
236    }
237}
238
239#[cfg(test)]
240mod tests {
241    use super::*;
242    use crate::models::DependencyChange;
243
244    fn make_request(
245        project: &str,
246        benchmark: &str,
247        deps: Vec<(&str, Option<&str>, Option<&str>)>,
248        delta: f64,
249    ) -> RecordDependencyEventRequest {
250        RecordDependencyEventRequest {
251            project: project.to_string(),
252            benchmark: benchmark.to_string(),
253            dependency_changes: deps
254                .into_iter()
255                .map(|(name, old, new)| DependencyChange {
256                    name: name.to_string(),
257                    old_version: old.map(|s| s.to_string()),
258                    new_version: new.map(|s| s.to_string()),
259                })
260                .collect(),
261            metric: "wall_ms".to_string(),
262            delta_pct: delta,
263        }
264    }
265
266    #[tokio::test]
267    async fn test_record_and_list_events() {
268        let store = InMemoryFleetStore::new();
269
270        let req = make_request(
271            "proj-a",
272            "bench-1",
273            vec![("serde", Some("1.0.0"), Some("1.1.0"))],
274            5.0,
275        );
276        let resp = store.record_dependency_events(&req).await.unwrap();
277        assert_eq!(resp.recorded, 1);
278
279        let impact = store
280            .dependency_impact(
281                "serde",
282                &DependencyImpactQuery {
283                    since: None,
284                    limit: 50,
285                },
286            )
287            .await
288            .unwrap();
289        assert_eq!(impact.events.len(), 1);
290        assert_eq!(impact.project_count, 1);
291    }
292
293    #[tokio::test]
294    async fn test_fleet_alerts_require_multiple_projects() {
295        let store = InMemoryFleetStore::new();
296
297        // Single project event should not generate an alert
298        let req = make_request(
299            "proj-a",
300            "bench-1",
301            vec![("tokio", Some("1.0.0"), Some("1.1.0"))],
302            10.0,
303        );
304        store.record_dependency_events(&req).await.unwrap();
305
306        let alerts = store
307            .list_fleet_alerts(&ListFleetAlertsQuery {
308                min_affected: 2,
309                since: None,
310                limit: 50,
311            })
312            .await
313            .unwrap();
314        assert!(alerts.alerts.is_empty());
315
316        // Second project with same dependency change should trigger alert
317        let req2 = make_request(
318            "proj-b",
319            "bench-2",
320            vec![("tokio", Some("1.0.0"), Some("1.1.0"))],
321            8.0,
322        );
323        store.record_dependency_events(&req2).await.unwrap();
324
325        let alerts = store
326            .list_fleet_alerts(&ListFleetAlertsQuery {
327                min_affected: 2,
328                since: None,
329                limit: 50,
330            })
331            .await
332            .unwrap();
333        assert_eq!(alerts.alerts.len(), 1);
334        assert_eq!(alerts.alerts[0].dependency, "tokio");
335        assert_eq!(alerts.alerts[0].affected_projects.len(), 2);
336        assert!((alerts.alerts[0].avg_delta_pct - 9.0).abs() < 0.01);
337    }
338
339    #[tokio::test]
340    async fn test_fleet_alerts_ignore_improvements() {
341        let store = InMemoryFleetStore::new();
342
343        // Negative delta (improvement) should not trigger alerts
344        let req1 = make_request(
345            "proj-a",
346            "bench-1",
347            vec![("serde", Some("1.0.0"), Some("1.1.0"))],
348            -5.0,
349        );
350        store.record_dependency_events(&req1).await.unwrap();
351
352        let req2 = make_request(
353            "proj-b",
354            "bench-2",
355            vec![("serde", Some("1.0.0"), Some("1.1.0"))],
356            -3.0,
357        );
358        store.record_dependency_events(&req2).await.unwrap();
359
360        let alerts = store
361            .list_fleet_alerts(&ListFleetAlertsQuery {
362                min_affected: 2,
363                since: None,
364                limit: 50,
365            })
366            .await
367            .unwrap();
368        assert!(alerts.alerts.is_empty());
369    }
370
371    #[tokio::test]
372    async fn test_dependency_impact_filters_by_name() {
373        let store = InMemoryFleetStore::new();
374
375        let req = make_request(
376            "proj-a",
377            "bench-1",
378            vec![
379                ("serde", Some("1.0.0"), Some("1.1.0")),
380                ("tokio", Some("1.0.0"), Some("1.1.0")),
381            ],
382            5.0,
383        );
384        store.record_dependency_events(&req).await.unwrap();
385
386        let impact = store
387            .dependency_impact(
388                "serde",
389                &DependencyImpactQuery {
390                    since: None,
391                    limit: 50,
392                },
393            )
394            .await
395            .unwrap();
396        assert_eq!(impact.events.len(), 1);
397        assert_eq!(impact.dependency, "serde");
398    }
399
400    #[tokio::test]
401    async fn test_confidence_scaling() {
402        let store = InMemoryFleetStore::new();
403
404        // Record events from 5 projects - confidence should be 1.0
405        for i in 0..5 {
406            let req = make_request(
407                &format!("proj-{}", i),
408                "bench",
409                vec![("hyper", Some("0.14.0"), Some("1.0.0"))],
410                10.0,
411            );
412            store.record_dependency_events(&req).await.unwrap();
413        }
414
415        let alerts = store
416            .list_fleet_alerts(&ListFleetAlertsQuery {
417                min_affected: 2,
418                since: None,
419                limit: 50,
420            })
421            .await
422            .unwrap();
423        assert_eq!(alerts.alerts.len(), 1);
424        assert!((alerts.alerts[0].confidence - 1.0).abs() < 0.01);
425    }
426}