agentkit_reporting/buffered.rs
1//! Batch-buffered reporter adapter.
2//!
3//! [`BufferedReporter`] enqueues events and forwards them to an inner
4//! [`LoopObserver`] in batches — either when the buffer reaches a configured
5//! capacity or on an explicit [`flush`](BufferedReporter::flush) call.
6//! Remaining events are flushed automatically on drop.
7
8use agentkit_loop::{AgentEvent, LoopObserver};
9
10/// Reporter adapter that enqueues events for batch flushing.
11///
12/// Wraps any [`LoopObserver`] and delivers events in batches rather than
13/// one-at-a-time. This is useful when the inner reporter benefits from
14/// amortised work (e.g. a network sink that batches writes).
15///
16/// # Example
17///
18/// ```rust
19/// use agentkit_reporting::{BufferedReporter, UsageReporter};
20///
21/// // Buffer up to 64 events before forwarding to the inner reporter.
22/// let reporter = BufferedReporter::new(UsageReporter::new(), 64);
23/// ```
24pub struct BufferedReporter<T: LoopObserver> {
25 inner: T,
26 buffer: std::sync::Mutex<Vec<AgentEvent>>,
27 capacity: usize,
28}
29
30impl<T: LoopObserver> BufferedReporter<T> {
31 /// Creates a new `BufferedReporter` that buffers up to `capacity` events
32 /// before flushing them to `inner`.
33 pub fn new(inner: T, capacity: usize) -> Self {
34 Self {
35 inner,
36 buffer: std::sync::Mutex::new(Vec::with_capacity(capacity)),
37 capacity,
38 }
39 }
40
41 /// Delivers all buffered events to the inner observer and clears the
42 /// buffer.
43 pub fn flush(&self) {
44 let drained = std::mem::replace(
45 &mut *self.buffer.lock().unwrap_or_else(|e| e.into_inner()),
46 Vec::with_capacity(self.capacity),
47 );
48 for event in drained {
49 self.inner.handle_event(event);
50 }
51 }
52
53 /// Returns the number of events currently buffered.
54 pub fn pending(&self) -> usize {
55 self.buffer.lock().unwrap_or_else(|e| e.into_inner()).len()
56 }
57
58 /// Returns a reference to the inner observer.
59 pub fn inner(&self) -> &T {
60 &self.inner
61 }
62}
63
64impl<T: LoopObserver> LoopObserver for BufferedReporter<T> {
65 fn handle_event(&self, event: AgentEvent) {
66 let needs_flush = {
67 let mut buffer = self.buffer.lock().unwrap_or_else(|e| e.into_inner());
68 buffer.push(event);
69 self.capacity > 0 && buffer.len() >= self.capacity
70 };
71 if needs_flush {
72 self.flush();
73 }
74 }
75}
76
77impl<T: LoopObserver> Drop for BufferedReporter<T> {
78 fn drop(&mut self) {
79 self.flush();
80 }
81}