Skip to main content

stakpak_server/
event_log.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use stakpak_agent_core::AgentEvent;
4use std::{
5    collections::{HashMap, VecDeque},
6    sync::{
7        Arc, Mutex,
8        atomic::{AtomicU64, Ordering},
9    },
10};
11use tokio::sync::{RwLock, broadcast};
12use uuid::Uuid;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct EventEnvelope {
16    pub id: u64,
17    pub session_id: Uuid,
18    pub run_id: Option<Uuid>,
19    pub timestamp: DateTime<Utc>,
20    pub event: AgentEvent,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
24pub struct GapDetected {
25    pub requested_after_id: u64,
26    pub oldest_available_id: u64,
27    pub newest_available_id: u64,
28    pub resume_hint: String,
29}
30
31pub struct EventSubscription {
32    pub replay: Vec<EventEnvelope>,
33    pub live: broadcast::Receiver<EventEnvelope>,
34    pub gap_detected: Option<GapDetected>,
35}
36
37struct SessionEventBuffer {
38    next_id: AtomicU64,
39    ring: Mutex<VecDeque<EventEnvelope>>,
40    tx: broadcast::Sender<EventEnvelope>,
41}
42
43impl SessionEventBuffer {
44    fn new(capacity: usize) -> Self {
45        let (tx, _rx) = broadcast::channel(capacity.max(1) * 2);
46        Self {
47            next_id: AtomicU64::new(1),
48            ring: Mutex::new(VecDeque::with_capacity(capacity.max(1))),
49            tx,
50        }
51    }
52
53    fn lock_ring(&self) -> std::sync::MutexGuard<'_, VecDeque<EventEnvelope>> {
54        match self.ring.lock() {
55            Ok(guard) => guard,
56            Err(poisoned) => poisoned.into_inner(),
57        }
58    }
59}
60
61#[derive(Clone)]
62pub struct EventLog {
63    capacity: usize,
64    sessions: Arc<RwLock<HashMap<Uuid, Arc<SessionEventBuffer>>>>,
65}
66
67impl EventLog {
68    pub fn new(capacity: usize) -> Self {
69        Self {
70            capacity: capacity.max(1),
71            sessions: Arc::new(RwLock::new(HashMap::new())),
72        }
73    }
74
75    pub async fn publish(
76        &self,
77        session_id: Uuid,
78        run_id: Option<Uuid>,
79        event: AgentEvent,
80    ) -> EventEnvelope {
81        let buffer = self.session_buffer(session_id).await;
82        let event_id = buffer.next_id.fetch_add(1, Ordering::SeqCst);
83
84        let envelope = EventEnvelope {
85            id: event_id,
86            session_id,
87            run_id,
88            timestamp: Utc::now(),
89            event,
90        };
91
92        {
93            let mut ring = buffer.lock_ring();
94            ring.push_back(envelope.clone());
95
96            while ring.len() > self.capacity {
97                let _ = ring.pop_front();
98            }
99
100            let _ = buffer.tx.send(envelope.clone());
101        }
102
103        envelope
104    }
105
106    pub async fn subscribe(&self, session_id: Uuid, after_id: Option<u64>) -> EventSubscription {
107        let buffer = self.session_buffer(session_id).await;
108
109        let (replay, gap_detected, live) = {
110            let ring = buffer.lock_ring();
111            let live = buffer.tx.subscribe();
112
113            match after_id {
114                None => (Vec::new(), None, live),
115                Some(requested_after_id) => {
116                    let oldest = ring.front().map(|event| event.id);
117                    let newest = ring.back().map(|event| event.id);
118
119                    let (gap, replay) = match (oldest, newest) {
120                        (Some(oldest_available_id), Some(newest_available_id))
121                            if requested_after_id.saturating_add(1) < oldest_available_id =>
122                        {
123                            (
124                                Some(GapDetected {
125                                    requested_after_id,
126                                    oldest_available_id,
127                                    newest_available_id,
128                                    resume_hint: "refresh_snapshot_then_resume".to_string(),
129                                }),
130                                Vec::new(),
131                            )
132                        }
133                        _ => {
134                            let replay = ring
135                                .iter()
136                                .filter(|event| event.id > requested_after_id)
137                                .cloned()
138                                .collect();
139                            (None, replay)
140                        }
141                    };
142
143                    (replay, gap, live)
144                }
145            }
146        };
147
148        EventSubscription {
149            replay,
150            live,
151            gap_detected,
152        }
153    }
154
155    pub async fn snapshot_bounds(&self, session_id: Uuid) -> Option<(u64, u64)> {
156        let buffer = self.session_buffer(session_id).await;
157        let ring = buffer.lock_ring();
158
159        let oldest = ring.front().map(|event| event.id)?;
160        let newest = ring.back().map(|event| event.id)?;
161
162        Some((oldest, newest))
163    }
164
165    async fn session_buffer(&self, session_id: Uuid) -> Arc<SessionEventBuffer> {
166        {
167            let guard = self.sessions.read().await;
168            if let Some(existing) = guard.get(&session_id) {
169                return existing.clone();
170            }
171        }
172
173        let mut guard = self.sessions.write().await;
174        guard
175            .entry(session_id)
176            .or_insert_with(|| Arc::new(SessionEventBuffer::new(self.capacity)))
177            .clone()
178    }
179}
180
181#[cfg(test)]
182mod tests {
183    use super::*;
184    use stakpak_agent_core::TurnFinishReason;
185
186    fn run_started(run_id: Uuid) -> AgentEvent {
187        AgentEvent::RunStarted { run_id }
188    }
189
190    fn turn_completed(run_id: Uuid, turn: usize) -> AgentEvent {
191        AgentEvent::TurnCompleted {
192            run_id,
193            turn,
194            finish_reason: TurnFinishReason::Stop,
195        }
196    }
197
198    #[tokio::test]
199    async fn publish_assigns_monotonic_event_ids_per_session() {
200        let log = EventLog::new(16);
201        let session_id = Uuid::new_v4();
202        let run_id = Uuid::new_v4();
203
204        let first = log
205            .publish(session_id, Some(run_id), run_started(run_id))
206            .await;
207        let second = log
208            .publish(session_id, Some(run_id), turn_completed(run_id, 1))
209            .await;
210
211        assert_eq!(first.id, 1);
212        assert_eq!(second.id, 2);
213    }
214
215    #[tokio::test]
216    async fn replay_returns_events_newer_than_last_event_id() {
217        let log = EventLog::new(16);
218        let session_id = Uuid::new_v4();
219        let run_id = Uuid::new_v4();
220
221        let _ = log
222            .publish(session_id, Some(run_id), run_started(run_id))
223            .await;
224        let second = log
225            .publish(session_id, Some(run_id), turn_completed(run_id, 1))
226            .await;
227        let third = log
228            .publish(session_id, Some(run_id), turn_completed(run_id, 2))
229            .await;
230
231        let subscription = log.subscribe(session_id, Some(second.id)).await;
232
233        assert!(subscription.gap_detected.is_none());
234        assert_eq!(subscription.replay.len(), 1);
235        assert_eq!(subscription.replay[0].id, third.id);
236
237        match &subscription.replay[0].event {
238            AgentEvent::TurnCompleted { turn, .. } => assert_eq!(*turn, 2),
239            other => panic!("expected turn_completed event, got {other:?}"),
240        }
241    }
242
243    #[tokio::test]
244    async fn subscribe_reports_gap_when_cursor_falls_outside_ring() {
245        let log = EventLog::new(3);
246        let session_id = Uuid::new_v4();
247        let run_id = Uuid::new_v4();
248
249        for turn in 0..5 {
250            let _ = log
251                .publish(session_id, Some(run_id), turn_completed(run_id, turn))
252                .await;
253        }
254
255        let subscription = log.subscribe(session_id, Some(1)).await;
256
257        assert!(subscription.replay.is_empty());
258
259        let gap = match subscription.gap_detected {
260            Some(gap) => gap,
261            None => panic!("expected gap_detected payload"),
262        };
263
264        assert_eq!(gap.requested_after_id, 1);
265        assert_eq!(gap.resume_hint, "refresh_snapshot_then_resume".to_string());
266
267        let bounds = log.snapshot_bounds(session_id).await;
268        let (oldest, newest) = match bounds {
269            Some(bounds) => bounds,
270            None => panic!("expected replay bounds for populated session"),
271        };
272
273        assert_eq!(gap.oldest_available_id, oldest);
274        assert_eq!(gap.newest_available_id, newest);
275    }
276
277    #[tokio::test]
278    async fn publish_is_durable_without_subscribers() {
279        let log = EventLog::new(8);
280        let session_id = Uuid::new_v4();
281        let run_id = Uuid::new_v4();
282
283        for turn in 0..4 {
284            let _ = log
285                .publish(session_id, Some(run_id), turn_completed(run_id, turn))
286                .await;
287        }
288
289        let subscription = log.subscribe(session_id, Some(0)).await;
290        assert_eq!(subscription.replay.len(), 4);
291    }
292
293    #[tokio::test]
294    async fn replay_is_session_scoped() {
295        let log = EventLog::new(8);
296        let session_a = Uuid::new_v4();
297        let session_b = Uuid::new_v4();
298        let run_a = Uuid::new_v4();
299        let run_b = Uuid::new_v4();
300
301        let _ = log
302            .publish(session_a, Some(run_a), run_started(run_a))
303            .await;
304        let _ = log
305            .publish(session_b, Some(run_b), run_started(run_b))
306            .await;
307
308        let sub_a = log.subscribe(session_a, Some(0)).await;
309        let sub_b = log.subscribe(session_b, Some(0)).await;
310
311        assert_eq!(sub_a.replay.len(), 1);
312        assert_eq!(sub_b.replay.len(), 1);
313        assert_eq!(sub_a.replay[0].session_id, session_a);
314        assert_eq!(sub_b.replay[0].session_id, session_b);
315    }
316}