1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use stakpak_agent_core::AgentEvent;
4use std::{
5 collections::{HashMap, VecDeque},
6 sync::{
7 Arc, Mutex,
8 atomic::{AtomicU64, Ordering},
9 },
10};
11use tokio::sync::{RwLock, broadcast};
12use uuid::Uuid;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct EventEnvelope {
16 pub id: u64,
17 pub session_id: Uuid,
18 pub run_id: Option<Uuid>,
19 pub timestamp: DateTime<Utc>,
20 pub event: AgentEvent,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
24pub struct GapDetected {
25 pub requested_after_id: u64,
26 pub oldest_available_id: u64,
27 pub newest_available_id: u64,
28 pub resume_hint: String,
29}
30
31pub struct EventSubscription {
32 pub replay: Vec<EventEnvelope>,
33 pub live: broadcast::Receiver<EventEnvelope>,
34 pub gap_detected: Option<GapDetected>,
35}
36
37struct SessionEventBuffer {
38 next_id: AtomicU64,
39 ring: Mutex<VecDeque<EventEnvelope>>,
40 tx: broadcast::Sender<EventEnvelope>,
41}
42
43impl SessionEventBuffer {
44 fn new(capacity: usize) -> Self {
45 let (tx, _rx) = broadcast::channel(capacity.max(1) * 2);
46 Self {
47 next_id: AtomicU64::new(1),
48 ring: Mutex::new(VecDeque::with_capacity(capacity.max(1))),
49 tx,
50 }
51 }
52
53 fn lock_ring(&self) -> std::sync::MutexGuard<'_, VecDeque<EventEnvelope>> {
54 match self.ring.lock() {
55 Ok(guard) => guard,
56 Err(poisoned) => poisoned.into_inner(),
57 }
58 }
59}
60
61#[derive(Clone)]
62pub struct EventLog {
63 capacity: usize,
64 sessions: Arc<RwLock<HashMap<Uuid, Arc<SessionEventBuffer>>>>,
65}
66
67impl EventLog {
68 pub fn new(capacity: usize) -> Self {
69 Self {
70 capacity: capacity.max(1),
71 sessions: Arc::new(RwLock::new(HashMap::new())),
72 }
73 }
74
75 pub async fn publish(
76 &self,
77 session_id: Uuid,
78 run_id: Option<Uuid>,
79 event: AgentEvent,
80 ) -> EventEnvelope {
81 let buffer = self.session_buffer(session_id).await;
82 let event_id = buffer.next_id.fetch_add(1, Ordering::SeqCst);
83
84 let envelope = EventEnvelope {
85 id: event_id,
86 session_id,
87 run_id,
88 timestamp: Utc::now(),
89 event,
90 };
91
92 {
93 let mut ring = buffer.lock_ring();
94 ring.push_back(envelope.clone());
95
96 while ring.len() > self.capacity {
97 let _ = ring.pop_front();
98 }
99
100 let _ = buffer.tx.send(envelope.clone());
101 }
102
103 envelope
104 }
105
106 pub async fn subscribe(&self, session_id: Uuid, after_id: Option<u64>) -> EventSubscription {
107 let buffer = self.session_buffer(session_id).await;
108
109 let (replay, gap_detected, live) = {
110 let ring = buffer.lock_ring();
111 let live = buffer.tx.subscribe();
112
113 match after_id {
114 None => (Vec::new(), None, live),
115 Some(requested_after_id) => {
116 let oldest = ring.front().map(|event| event.id);
117 let newest = ring.back().map(|event| event.id);
118
119 let (gap, replay) = match (oldest, newest) {
120 (Some(oldest_available_id), Some(newest_available_id))
121 if requested_after_id.saturating_add(1) < oldest_available_id =>
122 {
123 (
124 Some(GapDetected {
125 requested_after_id,
126 oldest_available_id,
127 newest_available_id,
128 resume_hint: "refresh_snapshot_then_resume".to_string(),
129 }),
130 Vec::new(),
131 )
132 }
133 _ => {
134 let replay = ring
135 .iter()
136 .filter(|event| event.id > requested_after_id)
137 .cloned()
138 .collect();
139 (None, replay)
140 }
141 };
142
143 (replay, gap, live)
144 }
145 }
146 };
147
148 EventSubscription {
149 replay,
150 live,
151 gap_detected,
152 }
153 }
154
155 pub async fn snapshot_bounds(&self, session_id: Uuid) -> Option<(u64, u64)> {
156 let buffer = self.session_buffer(session_id).await;
157 let ring = buffer.lock_ring();
158
159 let oldest = ring.front().map(|event| event.id)?;
160 let newest = ring.back().map(|event| event.id)?;
161
162 Some((oldest, newest))
163 }
164
165 async fn session_buffer(&self, session_id: Uuid) -> Arc<SessionEventBuffer> {
166 {
167 let guard = self.sessions.read().await;
168 if let Some(existing) = guard.get(&session_id) {
169 return existing.clone();
170 }
171 }
172
173 let mut guard = self.sessions.write().await;
174 guard
175 .entry(session_id)
176 .or_insert_with(|| Arc::new(SessionEventBuffer::new(self.capacity)))
177 .clone()
178 }
179}
180
181#[cfg(test)]
182mod tests {
183 use super::*;
184 use stakpak_agent_core::TurnFinishReason;
185
186 fn run_started(run_id: Uuid) -> AgentEvent {
187 AgentEvent::RunStarted { run_id }
188 }
189
190 fn turn_completed(run_id: Uuid, turn: usize) -> AgentEvent {
191 AgentEvent::TurnCompleted {
192 run_id,
193 turn,
194 finish_reason: TurnFinishReason::Stop,
195 }
196 }
197
198 #[tokio::test]
199 async fn publish_assigns_monotonic_event_ids_per_session() {
200 let log = EventLog::new(16);
201 let session_id = Uuid::new_v4();
202 let run_id = Uuid::new_v4();
203
204 let first = log
205 .publish(session_id, Some(run_id), run_started(run_id))
206 .await;
207 let second = log
208 .publish(session_id, Some(run_id), turn_completed(run_id, 1))
209 .await;
210
211 assert_eq!(first.id, 1);
212 assert_eq!(second.id, 2);
213 }
214
215 #[tokio::test]
216 async fn replay_returns_events_newer_than_last_event_id() {
217 let log = EventLog::new(16);
218 let session_id = Uuid::new_v4();
219 let run_id = Uuid::new_v4();
220
221 let _ = log
222 .publish(session_id, Some(run_id), run_started(run_id))
223 .await;
224 let second = log
225 .publish(session_id, Some(run_id), turn_completed(run_id, 1))
226 .await;
227 let third = log
228 .publish(session_id, Some(run_id), turn_completed(run_id, 2))
229 .await;
230
231 let subscription = log.subscribe(session_id, Some(second.id)).await;
232
233 assert!(subscription.gap_detected.is_none());
234 assert_eq!(subscription.replay.len(), 1);
235 assert_eq!(subscription.replay[0].id, third.id);
236
237 match &subscription.replay[0].event {
238 AgentEvent::TurnCompleted { turn, .. } => assert_eq!(*turn, 2),
239 other => panic!("expected turn_completed event, got {other:?}"),
240 }
241 }
242
243 #[tokio::test]
244 async fn subscribe_reports_gap_when_cursor_falls_outside_ring() {
245 let log = EventLog::new(3);
246 let session_id = Uuid::new_v4();
247 let run_id = Uuid::new_v4();
248
249 for turn in 0..5 {
250 let _ = log
251 .publish(session_id, Some(run_id), turn_completed(run_id, turn))
252 .await;
253 }
254
255 let subscription = log.subscribe(session_id, Some(1)).await;
256
257 assert!(subscription.replay.is_empty());
258
259 let gap = match subscription.gap_detected {
260 Some(gap) => gap,
261 None => panic!("expected gap_detected payload"),
262 };
263
264 assert_eq!(gap.requested_after_id, 1);
265 assert_eq!(gap.resume_hint, "refresh_snapshot_then_resume".to_string());
266
267 let bounds = log.snapshot_bounds(session_id).await;
268 let (oldest, newest) = match bounds {
269 Some(bounds) => bounds,
270 None => panic!("expected replay bounds for populated session"),
271 };
272
273 assert_eq!(gap.oldest_available_id, oldest);
274 assert_eq!(gap.newest_available_id, newest);
275 }
276
277 #[tokio::test]
278 async fn publish_is_durable_without_subscribers() {
279 let log = EventLog::new(8);
280 let session_id = Uuid::new_v4();
281 let run_id = Uuid::new_v4();
282
283 for turn in 0..4 {
284 let _ = log
285 .publish(session_id, Some(run_id), turn_completed(run_id, turn))
286 .await;
287 }
288
289 let subscription = log.subscribe(session_id, Some(0)).await;
290 assert_eq!(subscription.replay.len(), 4);
291 }
292
293 #[tokio::test]
294 async fn replay_is_session_scoped() {
295 let log = EventLog::new(8);
296 let session_a = Uuid::new_v4();
297 let session_b = Uuid::new_v4();
298 let run_a = Uuid::new_v4();
299 let run_b = Uuid::new_v4();
300
301 let _ = log
302 .publish(session_a, Some(run_a), run_started(run_a))
303 .await;
304 let _ = log
305 .publish(session_b, Some(run_b), run_started(run_b))
306 .await;
307
308 let sub_a = log.subscribe(session_a, Some(0)).await;
309 let sub_b = log.subscribe(session_b, Some(0)).await;
310
311 assert_eq!(sub_a.replay.len(), 1);
312 assert_eq!(sub_b.replay.len(), 1);
313 assert_eq!(sub_a.replay[0].session_id, session_a);
314 assert_eq!(sub_b.replay[0].session_id, session_b);
315 }
316}