Skip to main content

agent_sdk_core/ports/
event_bus.rs

1//! Event bus port and in-memory event bus helpers. Use this module for live
2//! observation separate from durable journal truth. Publishing mutates the bus state
3//! and may notify subscribers.
4//!
5use std::{
6    collections::VecDeque,
7    sync::{
8        Arc, Mutex,
9        atomic::{AtomicU64, Ordering},
10    },
11};
12
13use crate::{
14    domain::{AgentError, AgentId, RunId},
15    event::{
16        AgentEvent, ArchiveCursor, CompiledEventFilter, EventCursor, EventFilter, EventFrame,
17        EventKind, EventOverflowNotice, EventOverflowReason, EventStreamScope,
18        SubscriberOverflowPolicy, SubscriberQueueConfig, SubscriptionOptions, cursor_compatible,
19    },
20};
21
22/// Port or behavior contract for agent event bus. Implementors should
23/// preserve policy, redaction, idempotency, and replay expectations
24/// from the surrounding module. Implementations may perform side
25/// effects only as described by the trait methods.
26pub trait AgentEventBus: Send + Sync {
27    /// Mutates the in-memory event/subscription state and may wake local
28    /// subscribers. It does not persist durable journal truth or call network
29    /// sinks.
30    fn publish(&self, frame: EventFrame) -> Result<(), AgentError>;
31
32    /// Creates a live stream for all visible event frames from the cursor.
33    /// Implementations may register subscriber state or read buffered frames,
34    /// but must not publish new events or append journal records.
35    fn subscribe_all(&self, cursor: Option<EventCursor>) -> Result<AgentEventStream, AgentError>;
36
37    /// Creates a live stream for all visible event frames with queue options.
38    /// Implementations create a live subscription stream from bus state; subscribing must not
39    /// publish new events or append journal records.
40    fn subscribe_all_with_options(
41        &self,
42        cursor: Option<EventCursor>,
43        options: SubscriptionOptions,
44    ) -> Result<AgentEventStream, AgentError>;
45
46    /// Creates a live stream scoped to one run from the cursor.
47    /// Implementations may register subscriber state or read buffered frames,
48    /// but must not publish new events or append journal records.
49    fn subscribe_run(
50        &self,
51        run_id: RunId,
52        cursor: Option<EventCursor>,
53    ) -> Result<AgentEventStream, AgentError>;
54
55    /// Creates a live stream scoped to one run with queue options.
56    /// Implementations create a live subscription stream from bus state; subscribing must not
57    /// publish new events or append journal records.
58    fn subscribe_run_with_options(
59        &self,
60        run_id: RunId,
61        cursor: Option<EventCursor>,
62        options: SubscriptionOptions,
63    ) -> Result<AgentEventStream, AgentError>;
64
65    /// Creates a live stream scoped to one agent from the cursor.
66    /// Implementations may register subscriber state or read buffered frames,
67    /// but must not publish new events or append journal records.
68    fn subscribe_agent(
69        &self,
70        agent_id: AgentId,
71        cursor: Option<EventCursor>,
72    ) -> Result<AgentEventStream, AgentError>;
73
74    /// Creates a live stream scoped to one agent with queue options.
75    /// Implementations create a live subscription stream from bus state; subscribing must not
76    /// publish new events or append journal records.
77    fn subscribe_agent_with_options(
78        &self,
79        agent_id: AgentId,
80        cursor: Option<EventCursor>,
81        options: SubscriptionOptions,
82    ) -> Result<AgentEventStream, AgentError>;
83
84    /// Creates a live stream for frames matching a compiled envelope filter.
85    /// Implementations create a live subscription stream from bus state; subscribing must not
86    /// publish new events or append journal records.
87    fn subscribe_filtered(
88        &self,
89        filter: CompiledEventFilter,
90        cursor: Option<EventCursor>,
91    ) -> Result<AgentEventStream, AgentError>;
92}
93
94/// Port or behavior contract for event archive. Implementors should
95/// preserve policy, redaction, idempotency, and replay expectations
96/// from the surrounding module. Implementations may perform side
97/// effects only as described by the trait methods.
98pub trait EventArchive: Send + Sync {
99    /// Replays archived frames matching a compiled envelope filter from the
100    /// archive cursor.
101    /// Implementations read archived event frames from the requested cursor and return a
102    /// stream; replaying must not publish new events or append journal records.
103    fn replay_filtered_from_cursor(
104        &self,
105        filter: CompiledEventFilter,
106        cursor: ArchiveCursor,
107    ) -> Result<AgentEventStream, AgentError>;
108}
109
110#[derive(Clone, Debug)]
111/// Carries agent event stream data across a host-port boundary.
112/// Constructing the value does not call the host; the port method that receives it documents any adapter, network, or storage effect.
113pub struct AgentEventStream {
114    frames: VecDeque<EventFrame>,
115}
116
117impl AgentEventStream {
118    /// Creates a new ports::event_bus value with explicit
119    /// caller-provided inputs. This constructor is data-only and
120    /// performs no I/O or external side effects.
121    pub fn new(frames: impl IntoIterator<Item = EventFrame>) -> Self {
122        Self {
123            frames: frames.into_iter().collect(),
124        }
125    }
126}
127
128impl Iterator for AgentEventStream {
129    type Item = EventFrame;
130
131    fn next(&mut self) -> Option<Self::Item> {
132        self.frames.pop_front()
133    }
134}
135
136#[derive(Clone, Debug, Default)]
137/// Carries in memory agent event bus data across a host-port boundary.
138/// Constructing the value does not call the host; the port method that receives it documents any adapter, network, or storage effect.
139pub struct InMemoryAgentEventBus {
140    frames: Arc<Mutex<Vec<EventFrame>>>,
141    next_event_seq: Arc<AtomicU64>,
142}
143
144impl InMemoryAgentEventBus {
145    /// Mutates the in-memory event/subscription state and may wake local
146    /// subscribers. It does not persist durable journal truth or call network
147    /// sinks.
148    pub fn publish(&self, frame: EventFrame) -> Result<(), AgentError> {
149        let frame = self.assign_live_sequence(frame);
150        self.frames
151            .lock()
152            .map_err(|_| AgentError::contract_violation("event bus lock poisoned"))?
153            .push(frame);
154        Ok(())
155    }
156
157    /// Mutates the in-memory event/subscription state and may wake local
158    /// subscribers. It does not persist durable journal truth or call network
159    /// sinks.
160    pub fn publish_all(
161        &self,
162        frames: impl IntoIterator<Item = EventFrame>,
163    ) -> Result<(), AgentError> {
164        let frames = frames
165            .into_iter()
166            .map(|frame| self.assign_live_sequence(frame))
167            .collect::<Vec<_>>();
168        let mut locked = self
169            .frames
170            .lock()
171            .map_err(|_| AgentError::contract_violation("event bus lock poisoned"))?;
172        locked.extend(frames);
173        Ok(())
174    }
175
176    fn filtered_stream(
177        &self,
178        requested_scope: EventStreamScope,
179        filter: CompiledEventFilter,
180        cursor: Option<EventCursor>,
181        queue: SubscriberQueueConfig,
182    ) -> Result<AgentEventStream, AgentError> {
183        cursor_compatible(&requested_scope, cursor.as_ref())?;
184        reject_live_overflow_policy(&queue)?;
185        let start_after = cursor.as_ref().map(|cursor| cursor.event_seq);
186        let frames = self
187            .frames
188            .lock()
189            .map_err(|_| AgentError::contract_violation("event bus lock poisoned"))?
190            .iter()
191            .filter(|frame| start_after.is_none_or(|seq| frame.cursor.event_seq > seq))
192            .filter(|frame| filter.matches_envelope(&frame.event.envelope))
193            .map(|frame| {
194                let mut frame = frame.clone();
195                frame.cursor = frame.event.envelope.cursor(requested_scope.clone());
196                frame
197            })
198            .collect::<Vec<_>>();
199        let frames = apply_queue_bounds(frames, &queue);
200        Ok(AgentEventStream::new(frames))
201    }
202
203    fn assign_live_sequence(&self, mut frame: EventFrame) -> EventFrame {
204        let event_seq = self.next_event_seq.fetch_add(1, Ordering::SeqCst) + 1;
205        frame.event.envelope.event_seq = event_seq;
206        frame.cursor = frame.event.envelope.cursor(frame.cursor.scope.clone());
207        frame
208    }
209}
210
211impl AgentEventBus for InMemoryAgentEventBus {
212    fn publish(&self, frame: EventFrame) -> Result<(), AgentError> {
213        InMemoryAgentEventBus::publish(self, frame)
214    }
215
216    fn subscribe_all(&self, cursor: Option<EventCursor>) -> Result<AgentEventStream, AgentError> {
217        self.subscribe_all_with_options(cursor, SubscriptionOptions::default())
218    }
219
220    fn subscribe_all_with_options(
221        &self,
222        cursor: Option<EventCursor>,
223        options: SubscriptionOptions,
224    ) -> Result<AgentEventStream, AgentError> {
225        self.filtered_stream(
226            EventStreamScope::All,
227            EventFilter::default().compile()?,
228            cursor,
229            options.queue,
230        )
231    }
232
233    fn subscribe_run(
234        &self,
235        run_id: RunId,
236        cursor: Option<EventCursor>,
237    ) -> Result<AgentEventStream, AgentError> {
238        self.subscribe_run_with_options(run_id, cursor, SubscriptionOptions::default())
239    }
240
241    fn subscribe_run_with_options(
242        &self,
243        run_id: RunId,
244        cursor: Option<EventCursor>,
245        options: SubscriptionOptions,
246    ) -> Result<AgentEventStream, AgentError> {
247        self.filtered_stream(
248            EventStreamScope::Run(run_id.clone()),
249            EventFilter::run(run_id).compile()?,
250            cursor,
251            options.queue,
252        )
253    }
254
255    fn subscribe_agent(
256        &self,
257        agent_id: AgentId,
258        cursor: Option<EventCursor>,
259    ) -> Result<AgentEventStream, AgentError> {
260        self.subscribe_agent_with_options(agent_id, cursor, SubscriptionOptions::default())
261    }
262
263    fn subscribe_agent_with_options(
264        &self,
265        agent_id: AgentId,
266        cursor: Option<EventCursor>,
267        options: SubscriptionOptions,
268    ) -> Result<AgentEventStream, AgentError> {
269        self.filtered_stream(
270            EventStreamScope::Agent(agent_id.clone()),
271            EventFilter::agent(agent_id).compile()?,
272            cursor,
273            options.queue,
274        )
275    }
276
277    fn subscribe_filtered(
278        &self,
279        filter: CompiledEventFilter,
280        cursor: Option<EventCursor>,
281    ) -> Result<AgentEventStream, AgentError> {
282        let queue = filter.queue.clone();
283        self.filtered_stream(filter.cursor_scope(), filter, cursor, queue)
284    }
285}
286
287fn apply_queue_bounds(
288    frames: impl IntoIterator<Item = EventFrame>,
289    queue: &SubscriberQueueConfig,
290) -> Vec<EventFrame> {
291    let capacity = queue.capacity.get();
292    let normal_capacity = capacity.saturating_sub(queue.terminal_reserve.get().min(capacity));
293    let mut bounded = VecDeque::new();
294    let mut overflow = OverflowAccumulator::default();
295    let mut summary = ProgressSummaryAccumulator::default();
296
297    for frame in frames {
298        if frame.event.envelope.event_kind.is_terminal() {
299            flush_progress_summary(
300                &mut bounded,
301                queue,
302                normal_capacity,
303                &mut summary,
304                &mut overflow,
305            );
306            while bounded.len() >= capacity {
307                if !drop_oldest_nonterminal(
308                    &mut bounded,
309                    &mut overflow,
310                    EventOverflowReason::PolicyDroppedNonTerminal,
311                ) {
312                    if let Some(dropped) = bounded.pop_front() {
313                        overflow.record_drop(&dropped, EventOverflowReason::SubscriberQueueFull);
314                    } else {
315                        break;
316                    }
317                }
318            }
319            push_with_notice(&mut bounded, frame, queue.overflow.clone(), &mut overflow);
320            continue;
321        }
322
323        match queue.overflow {
324            SubscriberOverflowPolicy::DropNonTerminal => {
325                if can_accept_nonterminal(&bounded, capacity, normal_capacity) {
326                    push_with_notice(&mut bounded, frame, queue.overflow.clone(), &mut overflow);
327                } else {
328                    overflow.record_drop(&frame, EventOverflowReason::PolicyDroppedNonTerminal);
329                }
330            }
331            SubscriberOverflowPolicy::DropProgress => {
332                if can_accept_nonterminal(&bounded, capacity, normal_capacity) {
333                    push_with_notice(&mut bounded, frame, queue.overflow.clone(), &mut overflow);
334                } else if is_progress_event(&frame.event.envelope.event_kind) {
335                    overflow.record_drop(&frame, EventOverflowReason::PolicyDroppedProgress);
336                } else if drop_oldest_progress(&mut bounded, &mut overflow) {
337                    push_with_notice(&mut bounded, frame, queue.overflow.clone(), &mut overflow);
338                } else {
339                    overflow.record_drop(&frame, EventOverflowReason::SubscriberQueueFull);
340                }
341            }
342            SubscriberOverflowPolicy::SummarizeAndContinue => {
343                if is_progress_event(&frame.event.envelope.event_kind) {
344                    summary.record_progress(frame);
345                } else {
346                    flush_progress_summary(
347                        &mut bounded,
348                        queue,
349                        normal_capacity,
350                        &mut summary,
351                        &mut overflow,
352                    );
353                    if can_accept_nonterminal(&bounded, capacity, normal_capacity) {
354                        push_with_notice(
355                            &mut bounded,
356                            frame,
357                            queue.overflow.clone(),
358                            &mut overflow,
359                        );
360                    } else {
361                        overflow.record_drop(&frame, EventOverflowReason::SubscriberQueueFull);
362                    }
363                }
364            }
365            SubscriberOverflowPolicy::FailSubscriber => {
366                if can_accept_nonterminal(&bounded, capacity, normal_capacity) {
367                    push_with_notice(&mut bounded, frame, queue.overflow.clone(), &mut overflow);
368                } else {
369                    overflow.record_drop(&frame, EventOverflowReason::SubscriberQueueFull);
370                    if let Some(last) = bounded.back_mut() {
371                        last.overflow = Some(overflow.take_notice(queue.overflow.clone()));
372                    }
373                    break;
374                }
375            }
376            SubscriberOverflowPolicy::BackpressureCaller => unreachable!(
377                "live event bus rejects backpressure overflow policy before queue bounding"
378            ),
379        }
380    }
381
382    flush_progress_summary(
383        &mut bounded,
384        queue,
385        normal_capacity,
386        &mut summary,
387        &mut overflow,
388    );
389
390    if overflow.has_drop() {
391        if let Some(last) = bounded.back_mut() {
392            last.overflow = Some(overflow.notice(queue.overflow.clone()));
393        }
394    }
395
396    bounded.into_iter().collect()
397}
398
399fn reject_live_overflow_policy(queue: &SubscriberQueueConfig) -> Result<(), AgentError> {
400    if queue.overflow == SubscriberOverflowPolicy::BackpressureCaller {
401        return Err(AgentError::contract_violation(
402            "InvalidOverflowPolicy: backpressure_caller is rejected for live event bus subscriptions",
403        ));
404    }
405    Ok(())
406}
407
408fn can_accept_nonterminal(
409    frames: &VecDeque<EventFrame>,
410    capacity: usize,
411    normal_capacity: usize,
412) -> bool {
413    frames.len() < capacity && nonterminal_count(frames) < normal_capacity
414}
415
416fn push_with_notice(
417    frames: &mut VecDeque<EventFrame>,
418    mut frame: EventFrame,
419    policy: SubscriberOverflowPolicy,
420    overflow: &mut OverflowAccumulator,
421) {
422    if overflow.has_drop() {
423        frame.overflow = Some(overflow.take_notice(policy));
424    }
425    frames.push_back(frame);
426}
427
428fn drop_oldest_nonterminal(
429    frames: &mut VecDeque<EventFrame>,
430    overflow: &mut OverflowAccumulator,
431    reason: EventOverflowReason,
432) -> bool {
433    let Some(index) = frames
434        .iter()
435        .position(|frame| !frame.event.envelope.event_kind.is_terminal())
436    else {
437        return false;
438    };
439    if let Some(dropped) = frames.remove(index) {
440        overflow.record_drop(&dropped, reason);
441        true
442    } else {
443        false
444    }
445}
446
447fn drop_oldest_progress(
448    frames: &mut VecDeque<EventFrame>,
449    overflow: &mut OverflowAccumulator,
450) -> bool {
451    let Some(index) = frames
452        .iter()
453        .position(|frame| is_progress_event(&frame.event.envelope.event_kind))
454    else {
455        return false;
456    };
457    if let Some(dropped) = frames.remove(index) {
458        overflow.record_drop(&dropped, EventOverflowReason::PolicyDroppedProgress);
459        true
460    } else {
461        false
462    }
463}
464
465fn nonterminal_count(frames: &VecDeque<EventFrame>) -> usize {
466    frames
467        .iter()
468        .filter(|frame| !frame.event.envelope.event_kind.is_terminal())
469        .count()
470}
471
472fn is_progress_event(kind: &EventKind) -> bool {
473    matches!(
474        kind,
475        EventKind::ModelStreamDelta
476            | EventKind::StreamRuleRepeatStateRecorded
477            | EventKind::RealtimeInputSent
478            | EventKind::RealtimeOutputReceived
479            | EventKind::RealtimeBackpressureApplied
480            | EventKind::IsolationProcessIoCaptured
481            | EventKind::IsolationProcessStatsRecorded
482            | EventKind::UsageRecorded
483            | EventKind::CostEstimated
484            | EventKind::CostCorrected
485    )
486}
487
488fn flush_progress_summary(
489    frames: &mut VecDeque<EventFrame>,
490    queue: &SubscriberQueueConfig,
491    normal_capacity: usize,
492    summary: &mut ProgressSummaryAccumulator,
493    overflow: &mut OverflowAccumulator,
494) {
495    let Some(frame) = summary.take_summary_frame() else {
496        return;
497    };
498    if can_accept_nonterminal(frames, queue.capacity.get(), normal_capacity) {
499        push_with_notice(frames, frame, queue.overflow.clone(), overflow);
500    } else {
501        overflow.record_drop(&frame, EventOverflowReason::PolicyDroppedProgress);
502    }
503}
504
505#[derive(Default)]
506struct ProgressSummaryAccumulator {
507    dropped_count: u64,
508    gap_start: Option<EventCursor>,
509    gap_end: Option<EventCursor>,
510    repair_from: Option<crate::domain::JournalCursor>,
511    summary_frame: Option<EventFrame>,
512}
513
514impl ProgressSummaryAccumulator {
515    fn record_progress(&mut self, frame: EventFrame) {
516        self.dropped_count += 1;
517        self.gap_start.get_or_insert_with(|| frame.cursor.clone());
518        self.gap_end = Some(frame.cursor.clone());
519        if self.repair_from.is_none() {
520            self.repair_from = frame.cursor.journal_cursor.clone();
521        }
522        self.summary_frame = Some(frame);
523    }
524
525    fn take_summary_frame(&mut self) -> Option<EventFrame> {
526        if self.dropped_count == 0 {
527            return None;
528        }
529        let mut frame = self.summary_frame.take()?;
530        let notice = EventOverflowNotice {
531            policy: SubscriberOverflowPolicy::SummarizeAndContinue,
532            dropped_count: self.dropped_count,
533            gap_start: self.gap_start.clone(),
534            gap_end: self
535                .gap_end
536                .clone()
537                .unwrap_or_else(|| self.gap_start.clone().expect("summary gap start")),
538            repair_from: self.repair_from.clone(),
539            terminal_preserved: true,
540            reason: EventOverflowReason::PolicyDroppedProgress,
541        };
542        frame.event = AgentEvent::with_redacted_summary(
543            frame.event.envelope.clone(),
544            format!(
545                "redacted progress summary for {} dropped progress frames",
546                self.dropped_count
547            ),
548        );
549        frame.overflow = Some(notice);
550        *self = Self::default();
551        Some(frame)
552    }
553}
554
555#[derive(Default)]
556struct OverflowAccumulator {
557    dropped_count: u64,
558    gap_start: Option<EventCursor>,
559    gap_end: Option<EventCursor>,
560    repair_from: Option<crate::domain::JournalCursor>,
561    terminal_dropped: bool,
562    reason: Option<EventOverflowReason>,
563}
564
565impl OverflowAccumulator {
566    fn record_drop(&mut self, frame: &EventFrame, reason: EventOverflowReason) {
567        self.dropped_count += 1;
568        self.gap_start.get_or_insert_with(|| frame.cursor.clone());
569        self.gap_end = Some(frame.cursor.clone());
570        if self.repair_from.is_none() {
571            self.repair_from = frame.cursor.journal_cursor.clone();
572        }
573        self.terminal_dropped |= frame.event.envelope.event_kind.is_terminal();
574        self.reason.get_or_insert(reason);
575    }
576
577    fn has_drop(&self) -> bool {
578        self.dropped_count > 0
579    }
580
581    fn take_notice(&mut self, policy: SubscriberOverflowPolicy) -> EventOverflowNotice {
582        let notice = self.notice(policy);
583        *self = Self::default();
584        notice
585    }
586
587    fn notice(&self, policy: SubscriberOverflowPolicy) -> EventOverflowNotice {
588        EventOverflowNotice {
589            policy,
590            dropped_count: self.dropped_count,
591            gap_start: self.gap_start.clone(),
592            gap_end: self
593                .gap_end
594                .clone()
595                .unwrap_or_else(|| self.gap_start.clone().expect("overflow gap start")),
596            repair_from: self.repair_from.clone(),
597            terminal_preserved: !self.terminal_dropped,
598            reason: if self.terminal_dropped {
599                EventOverflowReason::SubscriberQueueFull
600            } else {
601                self.reason
602                    .clone()
603                    .unwrap_or(EventOverflowReason::SubscriberQueueFull)
604            },
605        }
606    }
607}