1use std::{
6 collections::VecDeque,
7 sync::{
8 Arc, Mutex,
9 atomic::{AtomicU64, Ordering},
10 },
11};
12
13use crate::{
14 domain::{AgentError, AgentId, RunId},
15 event::{
16 AgentEvent, ArchiveCursor, CompiledEventFilter, EventCursor, EventFilter, EventFrame,
17 EventKind, EventOverflowNotice, EventOverflowReason, EventStreamScope,
18 SubscriberOverflowPolicy, SubscriberQueueConfig, SubscriptionOptions, cursor_compatible,
19 },
20};
21
22pub trait AgentEventBus: Send + Sync {
27 fn publish(&self, frame: EventFrame) -> Result<(), AgentError>;
31
32 fn subscribe_all(&self, cursor: Option<EventCursor>) -> Result<AgentEventStream, AgentError>;
36
37 fn subscribe_all_with_options(
41 &self,
42 cursor: Option<EventCursor>,
43 options: SubscriptionOptions,
44 ) -> Result<AgentEventStream, AgentError>;
45
46 fn subscribe_run(
50 &self,
51 run_id: RunId,
52 cursor: Option<EventCursor>,
53 ) -> Result<AgentEventStream, AgentError>;
54
55 fn subscribe_run_with_options(
59 &self,
60 run_id: RunId,
61 cursor: Option<EventCursor>,
62 options: SubscriptionOptions,
63 ) -> Result<AgentEventStream, AgentError>;
64
65 fn subscribe_agent(
69 &self,
70 agent_id: AgentId,
71 cursor: Option<EventCursor>,
72 ) -> Result<AgentEventStream, AgentError>;
73
74 fn subscribe_agent_with_options(
78 &self,
79 agent_id: AgentId,
80 cursor: Option<EventCursor>,
81 options: SubscriptionOptions,
82 ) -> Result<AgentEventStream, AgentError>;
83
84 fn subscribe_filtered(
88 &self,
89 filter: CompiledEventFilter,
90 cursor: Option<EventCursor>,
91 ) -> Result<AgentEventStream, AgentError>;
92}
93
94pub trait EventArchive: Send + Sync {
99 fn replay_filtered_from_cursor(
104 &self,
105 filter: CompiledEventFilter,
106 cursor: ArchiveCursor,
107 ) -> Result<AgentEventStream, AgentError>;
108}
109
110#[derive(Clone, Debug)]
111pub struct AgentEventStream {
114 frames: VecDeque<EventFrame>,
115}
116
117impl AgentEventStream {
118 pub fn new(frames: impl IntoIterator<Item = EventFrame>) -> Self {
122 Self {
123 frames: frames.into_iter().collect(),
124 }
125 }
126}
127
128impl Iterator for AgentEventStream {
129 type Item = EventFrame;
130
131 fn next(&mut self) -> Option<Self::Item> {
132 self.frames.pop_front()
133 }
134}
135
136#[derive(Clone, Debug, Default)]
137pub struct InMemoryAgentEventBus {
140 frames: Arc<Mutex<Vec<EventFrame>>>,
141 next_event_seq: Arc<AtomicU64>,
142}
143
144impl InMemoryAgentEventBus {
145 pub fn publish(&self, frame: EventFrame) -> Result<(), AgentError> {
149 let frame = self.assign_live_sequence(frame);
150 self.frames
151 .lock()
152 .map_err(|_| AgentError::contract_violation("event bus lock poisoned"))?
153 .push(frame);
154 Ok(())
155 }
156
157 pub fn publish_all(
161 &self,
162 frames: impl IntoIterator<Item = EventFrame>,
163 ) -> Result<(), AgentError> {
164 let frames = frames
165 .into_iter()
166 .map(|frame| self.assign_live_sequence(frame))
167 .collect::<Vec<_>>();
168 let mut locked = self
169 .frames
170 .lock()
171 .map_err(|_| AgentError::contract_violation("event bus lock poisoned"))?;
172 locked.extend(frames);
173 Ok(())
174 }
175
176 fn filtered_stream(
177 &self,
178 requested_scope: EventStreamScope,
179 filter: CompiledEventFilter,
180 cursor: Option<EventCursor>,
181 queue: SubscriberQueueConfig,
182 ) -> Result<AgentEventStream, AgentError> {
183 cursor_compatible(&requested_scope, cursor.as_ref())?;
184 reject_live_overflow_policy(&queue)?;
185 let start_after = cursor.as_ref().map(|cursor| cursor.event_seq);
186 let frames = self
187 .frames
188 .lock()
189 .map_err(|_| AgentError::contract_violation("event bus lock poisoned"))?
190 .iter()
191 .filter(|frame| start_after.is_none_or(|seq| frame.cursor.event_seq > seq))
192 .filter(|frame| filter.matches_envelope(&frame.event.envelope))
193 .map(|frame| {
194 let mut frame = frame.clone();
195 frame.cursor = frame.event.envelope.cursor(requested_scope.clone());
196 frame
197 })
198 .collect::<Vec<_>>();
199 let frames = apply_queue_bounds(frames, &queue);
200 Ok(AgentEventStream::new(frames))
201 }
202
203 fn assign_live_sequence(&self, mut frame: EventFrame) -> EventFrame {
204 let event_seq = self.next_event_seq.fetch_add(1, Ordering::SeqCst) + 1;
205 frame.event.envelope.event_seq = event_seq;
206 frame.cursor = frame.event.envelope.cursor(frame.cursor.scope.clone());
207 frame
208 }
209}
210
211impl AgentEventBus for InMemoryAgentEventBus {
212 fn publish(&self, frame: EventFrame) -> Result<(), AgentError> {
213 InMemoryAgentEventBus::publish(self, frame)
214 }
215
216 fn subscribe_all(&self, cursor: Option<EventCursor>) -> Result<AgentEventStream, AgentError> {
217 self.subscribe_all_with_options(cursor, SubscriptionOptions::default())
218 }
219
220 fn subscribe_all_with_options(
221 &self,
222 cursor: Option<EventCursor>,
223 options: SubscriptionOptions,
224 ) -> Result<AgentEventStream, AgentError> {
225 self.filtered_stream(
226 EventStreamScope::All,
227 EventFilter::default().compile()?,
228 cursor,
229 options.queue,
230 )
231 }
232
233 fn subscribe_run(
234 &self,
235 run_id: RunId,
236 cursor: Option<EventCursor>,
237 ) -> Result<AgentEventStream, AgentError> {
238 self.subscribe_run_with_options(run_id, cursor, SubscriptionOptions::default())
239 }
240
241 fn subscribe_run_with_options(
242 &self,
243 run_id: RunId,
244 cursor: Option<EventCursor>,
245 options: SubscriptionOptions,
246 ) -> Result<AgentEventStream, AgentError> {
247 self.filtered_stream(
248 EventStreamScope::Run(run_id.clone()),
249 EventFilter::run(run_id).compile()?,
250 cursor,
251 options.queue,
252 )
253 }
254
255 fn subscribe_agent(
256 &self,
257 agent_id: AgentId,
258 cursor: Option<EventCursor>,
259 ) -> Result<AgentEventStream, AgentError> {
260 self.subscribe_agent_with_options(agent_id, cursor, SubscriptionOptions::default())
261 }
262
263 fn subscribe_agent_with_options(
264 &self,
265 agent_id: AgentId,
266 cursor: Option<EventCursor>,
267 options: SubscriptionOptions,
268 ) -> Result<AgentEventStream, AgentError> {
269 self.filtered_stream(
270 EventStreamScope::Agent(agent_id.clone()),
271 EventFilter::agent(agent_id).compile()?,
272 cursor,
273 options.queue,
274 )
275 }
276
277 fn subscribe_filtered(
278 &self,
279 filter: CompiledEventFilter,
280 cursor: Option<EventCursor>,
281 ) -> Result<AgentEventStream, AgentError> {
282 let queue = filter.queue.clone();
283 self.filtered_stream(filter.cursor_scope(), filter, cursor, queue)
284 }
285}
286
287fn apply_queue_bounds(
288 frames: impl IntoIterator<Item = EventFrame>,
289 queue: &SubscriberQueueConfig,
290) -> Vec<EventFrame> {
291 let capacity = queue.capacity.get();
292 let normal_capacity = capacity.saturating_sub(queue.terminal_reserve.get().min(capacity));
293 let mut bounded = VecDeque::new();
294 let mut overflow = OverflowAccumulator::default();
295 let mut summary = ProgressSummaryAccumulator::default();
296
297 for frame in frames {
298 if frame.event.envelope.event_kind.is_terminal() {
299 flush_progress_summary(
300 &mut bounded,
301 queue,
302 normal_capacity,
303 &mut summary,
304 &mut overflow,
305 );
306 while bounded.len() >= capacity {
307 if !drop_oldest_nonterminal(
308 &mut bounded,
309 &mut overflow,
310 EventOverflowReason::PolicyDroppedNonTerminal,
311 ) {
312 if let Some(dropped) = bounded.pop_front() {
313 overflow.record_drop(&dropped, EventOverflowReason::SubscriberQueueFull);
314 } else {
315 break;
316 }
317 }
318 }
319 push_with_notice(&mut bounded, frame, queue.overflow.clone(), &mut overflow);
320 continue;
321 }
322
323 match queue.overflow {
324 SubscriberOverflowPolicy::DropNonTerminal => {
325 if can_accept_nonterminal(&bounded, capacity, normal_capacity) {
326 push_with_notice(&mut bounded, frame, queue.overflow.clone(), &mut overflow);
327 } else {
328 overflow.record_drop(&frame, EventOverflowReason::PolicyDroppedNonTerminal);
329 }
330 }
331 SubscriberOverflowPolicy::DropProgress => {
332 if can_accept_nonterminal(&bounded, capacity, normal_capacity) {
333 push_with_notice(&mut bounded, frame, queue.overflow.clone(), &mut overflow);
334 } else if is_progress_event(&frame.event.envelope.event_kind) {
335 overflow.record_drop(&frame, EventOverflowReason::PolicyDroppedProgress);
336 } else if drop_oldest_progress(&mut bounded, &mut overflow) {
337 push_with_notice(&mut bounded, frame, queue.overflow.clone(), &mut overflow);
338 } else {
339 overflow.record_drop(&frame, EventOverflowReason::SubscriberQueueFull);
340 }
341 }
342 SubscriberOverflowPolicy::SummarizeAndContinue => {
343 if is_progress_event(&frame.event.envelope.event_kind) {
344 summary.record_progress(frame);
345 } else {
346 flush_progress_summary(
347 &mut bounded,
348 queue,
349 normal_capacity,
350 &mut summary,
351 &mut overflow,
352 );
353 if can_accept_nonterminal(&bounded, capacity, normal_capacity) {
354 push_with_notice(
355 &mut bounded,
356 frame,
357 queue.overflow.clone(),
358 &mut overflow,
359 );
360 } else {
361 overflow.record_drop(&frame, EventOverflowReason::SubscriberQueueFull);
362 }
363 }
364 }
365 SubscriberOverflowPolicy::FailSubscriber => {
366 if can_accept_nonterminal(&bounded, capacity, normal_capacity) {
367 push_with_notice(&mut bounded, frame, queue.overflow.clone(), &mut overflow);
368 } else {
369 overflow.record_drop(&frame, EventOverflowReason::SubscriberQueueFull);
370 if let Some(last) = bounded.back_mut() {
371 last.overflow = Some(overflow.take_notice(queue.overflow.clone()));
372 }
373 break;
374 }
375 }
376 SubscriberOverflowPolicy::BackpressureCaller => unreachable!(
377 "live event bus rejects backpressure overflow policy before queue bounding"
378 ),
379 }
380 }
381
382 flush_progress_summary(
383 &mut bounded,
384 queue,
385 normal_capacity,
386 &mut summary,
387 &mut overflow,
388 );
389
390 if overflow.has_drop() {
391 if let Some(last) = bounded.back_mut() {
392 last.overflow = Some(overflow.notice(queue.overflow.clone()));
393 }
394 }
395
396 bounded.into_iter().collect()
397}
398
399fn reject_live_overflow_policy(queue: &SubscriberQueueConfig) -> Result<(), AgentError> {
400 if queue.overflow == SubscriberOverflowPolicy::BackpressureCaller {
401 return Err(AgentError::contract_violation(
402 "InvalidOverflowPolicy: backpressure_caller is rejected for live event bus subscriptions",
403 ));
404 }
405 Ok(())
406}
407
408fn can_accept_nonterminal(
409 frames: &VecDeque<EventFrame>,
410 capacity: usize,
411 normal_capacity: usize,
412) -> bool {
413 frames.len() < capacity && nonterminal_count(frames) < normal_capacity
414}
415
416fn push_with_notice(
417 frames: &mut VecDeque<EventFrame>,
418 mut frame: EventFrame,
419 policy: SubscriberOverflowPolicy,
420 overflow: &mut OverflowAccumulator,
421) {
422 if overflow.has_drop() {
423 frame.overflow = Some(overflow.take_notice(policy));
424 }
425 frames.push_back(frame);
426}
427
428fn drop_oldest_nonterminal(
429 frames: &mut VecDeque<EventFrame>,
430 overflow: &mut OverflowAccumulator,
431 reason: EventOverflowReason,
432) -> bool {
433 let Some(index) = frames
434 .iter()
435 .position(|frame| !frame.event.envelope.event_kind.is_terminal())
436 else {
437 return false;
438 };
439 if let Some(dropped) = frames.remove(index) {
440 overflow.record_drop(&dropped, reason);
441 true
442 } else {
443 false
444 }
445}
446
447fn drop_oldest_progress(
448 frames: &mut VecDeque<EventFrame>,
449 overflow: &mut OverflowAccumulator,
450) -> bool {
451 let Some(index) = frames
452 .iter()
453 .position(|frame| is_progress_event(&frame.event.envelope.event_kind))
454 else {
455 return false;
456 };
457 if let Some(dropped) = frames.remove(index) {
458 overflow.record_drop(&dropped, EventOverflowReason::PolicyDroppedProgress);
459 true
460 } else {
461 false
462 }
463}
464
465fn nonterminal_count(frames: &VecDeque<EventFrame>) -> usize {
466 frames
467 .iter()
468 .filter(|frame| !frame.event.envelope.event_kind.is_terminal())
469 .count()
470}
471
472fn is_progress_event(kind: &EventKind) -> bool {
473 matches!(
474 kind,
475 EventKind::ModelStreamDelta
476 | EventKind::StreamRuleRepeatStateRecorded
477 | EventKind::RealtimeInputSent
478 | EventKind::RealtimeOutputReceived
479 | EventKind::RealtimeBackpressureApplied
480 | EventKind::IsolationProcessIoCaptured
481 | EventKind::IsolationProcessStatsRecorded
482 | EventKind::UsageRecorded
483 | EventKind::CostEstimated
484 | EventKind::CostCorrected
485 )
486}
487
488fn flush_progress_summary(
489 frames: &mut VecDeque<EventFrame>,
490 queue: &SubscriberQueueConfig,
491 normal_capacity: usize,
492 summary: &mut ProgressSummaryAccumulator,
493 overflow: &mut OverflowAccumulator,
494) {
495 let Some(frame) = summary.take_summary_frame() else {
496 return;
497 };
498 if can_accept_nonterminal(frames, queue.capacity.get(), normal_capacity) {
499 push_with_notice(frames, frame, queue.overflow.clone(), overflow);
500 } else {
501 overflow.record_drop(&frame, EventOverflowReason::PolicyDroppedProgress);
502 }
503}
504
505#[derive(Default)]
506struct ProgressSummaryAccumulator {
507 dropped_count: u64,
508 gap_start: Option<EventCursor>,
509 gap_end: Option<EventCursor>,
510 repair_from: Option<crate::domain::JournalCursor>,
511 summary_frame: Option<EventFrame>,
512}
513
514impl ProgressSummaryAccumulator {
515 fn record_progress(&mut self, frame: EventFrame) {
516 self.dropped_count += 1;
517 self.gap_start.get_or_insert_with(|| frame.cursor.clone());
518 self.gap_end = Some(frame.cursor.clone());
519 if self.repair_from.is_none() {
520 self.repair_from = frame.cursor.journal_cursor.clone();
521 }
522 self.summary_frame = Some(frame);
523 }
524
525 fn take_summary_frame(&mut self) -> Option<EventFrame> {
526 if self.dropped_count == 0 {
527 return None;
528 }
529 let mut frame = self.summary_frame.take()?;
530 let notice = EventOverflowNotice {
531 policy: SubscriberOverflowPolicy::SummarizeAndContinue,
532 dropped_count: self.dropped_count,
533 gap_start: self.gap_start.clone(),
534 gap_end: self
535 .gap_end
536 .clone()
537 .unwrap_or_else(|| self.gap_start.clone().expect("summary gap start")),
538 repair_from: self.repair_from.clone(),
539 terminal_preserved: true,
540 reason: EventOverflowReason::PolicyDroppedProgress,
541 };
542 frame.event = AgentEvent::with_redacted_summary(
543 frame.event.envelope.clone(),
544 format!(
545 "redacted progress summary for {} dropped progress frames",
546 self.dropped_count
547 ),
548 );
549 frame.overflow = Some(notice);
550 *self = Self::default();
551 Some(frame)
552 }
553}
554
555#[derive(Default)]
556struct OverflowAccumulator {
557 dropped_count: u64,
558 gap_start: Option<EventCursor>,
559 gap_end: Option<EventCursor>,
560 repair_from: Option<crate::domain::JournalCursor>,
561 terminal_dropped: bool,
562 reason: Option<EventOverflowReason>,
563}
564
565impl OverflowAccumulator {
566 fn record_drop(&mut self, frame: &EventFrame, reason: EventOverflowReason) {
567 self.dropped_count += 1;
568 self.gap_start.get_or_insert_with(|| frame.cursor.clone());
569 self.gap_end = Some(frame.cursor.clone());
570 if self.repair_from.is_none() {
571 self.repair_from = frame.cursor.journal_cursor.clone();
572 }
573 self.terminal_dropped |= frame.event.envelope.event_kind.is_terminal();
574 self.reason.get_or_insert(reason);
575 }
576
577 fn has_drop(&self) -> bool {
578 self.dropped_count > 0
579 }
580
581 fn take_notice(&mut self, policy: SubscriberOverflowPolicy) -> EventOverflowNotice {
582 let notice = self.notice(policy);
583 *self = Self::default();
584 notice
585 }
586
587 fn notice(&self, policy: SubscriberOverflowPolicy) -> EventOverflowNotice {
588 EventOverflowNotice {
589 policy,
590 dropped_count: self.dropped_count,
591 gap_start: self.gap_start.clone(),
592 gap_end: self
593 .gap_end
594 .clone()
595 .unwrap_or_else(|| self.gap_start.clone().expect("overflow gap start")),
596 repair_from: self.repair_from.clone(),
597 terminal_preserved: !self.terminal_dropped,
598 reason: if self.terminal_dropped {
599 EventOverflowReason::SubscriberQueueFull
600 } else {
601 self.reason
602 .clone()
603 .unwrap_or(EventOverflowReason::SubscriberQueueFull)
604 },
605 }
606 }
607}