Skip to main content

agentkit_reporting/
buffered.rs

1//! Batch-buffered reporter adapter.
2//!
3//! [`BufferedReporter`] enqueues events and forwards them to an inner
4//! [`LoopObserver`] in batches — either when the buffer reaches a configured
5//! capacity or on an explicit [`flush`](BufferedReporter::flush) call.
6//! Remaining events are flushed automatically on drop.
7
8use agentkit_loop::{AgentEvent, LoopObserver};
9
10/// Reporter adapter that enqueues events for batch flushing.
11///
12/// Wraps any [`LoopObserver`] and delivers events in batches rather than
13/// one-at-a-time. This is useful when the inner reporter benefits from
14/// amortised work (e.g. a network sink that batches writes).
15///
16/// # Example
17///
18/// ```rust
19/// use agentkit_reporting::{BufferedReporter, UsageReporter};
20///
21/// // Buffer up to 64 events before forwarding to the inner reporter.
22/// let reporter = BufferedReporter::new(UsageReporter::new(), 64);
23/// ```
24pub struct BufferedReporter<T: LoopObserver> {
25    inner: T,
26    buffer: std::sync::Mutex<Vec<AgentEvent>>,
27    capacity: usize,
28}
29
30impl<T: LoopObserver> BufferedReporter<T> {
31    /// Creates a new `BufferedReporter` that buffers up to `capacity` events
32    /// before flushing them to `inner`.
33    pub fn new(inner: T, capacity: usize) -> Self {
34        Self {
35            inner,
36            buffer: std::sync::Mutex::new(Vec::with_capacity(capacity)),
37            capacity,
38        }
39    }
40
41    /// Delivers all buffered events to the inner observer and clears the
42    /// buffer.
43    pub fn flush(&self) {
44        let drained = std::mem::replace(
45            &mut *self.buffer.lock().unwrap_or_else(|e| e.into_inner()),
46            Vec::with_capacity(self.capacity),
47        );
48        for event in drained {
49            self.inner.handle_event(event);
50        }
51    }
52
53    /// Returns the number of events currently buffered.
54    pub fn pending(&self) -> usize {
55        self.buffer.lock().unwrap_or_else(|e| e.into_inner()).len()
56    }
57
58    /// Returns a reference to the inner observer.
59    pub fn inner(&self) -> &T {
60        &self.inner
61    }
62}
63
64impl<T: LoopObserver> LoopObserver for BufferedReporter<T> {
65    fn handle_event(&self, event: AgentEvent) {
66        let needs_flush = {
67            let mut buffer = self.buffer.lock().unwrap_or_else(|e| e.into_inner());
68            buffer.push(event);
69            self.capacity > 0 && buffer.len() >= self.capacity
70        };
71        if needs_flush {
72            self.flush();
73        }
74    }
75}
76
77impl<T: LoopObserver> Drop for BufferedReporter<T> {
78    fn drop(&mut self) {
79        self.flush();
80    }
81}