Skip to main content

agentkit_reporting/
channel.rs

1//! Channel-based reporter adapter.
2//!
3//! [`ChannelReporter`] forwards events to another thread or task via a
4//! [`std::sync::mpsc::Sender`]. This keeps the observer contract synchronous
5//! while allowing expensive event processing to happen off the driver's hot
6//! path.
7
8use std::sync::mpsc::{self, Receiver, Sender};
9
10use agentkit_loop::{AgentEvent, LoopObserver};
11
12use crate::ReportError;
13use crate::policy::FallibleObserver;
14
15/// Reporter adapter that forwards events over a channel.
16///
17/// Use this when you need expensive or async event processing without
18/// blocking the agent loop. The receiving end can live on a dedicated
19/// thread or async task.
20///
21/// # Example
22///
23/// ```rust
24/// use agentkit_reporting::ChannelReporter;
25///
26/// let (reporter, rx) = ChannelReporter::pair();
27///
28/// // Spawn a consumer on another thread.
29/// std::thread::spawn(move || {
30///     while let Ok(event) = rx.recv() {
31///         println!("{event:?}");
32///     }
33/// });
34///
35/// // `reporter` implements `LoopObserver` — hand it to the agent loop.
36/// ```
37pub struct ChannelReporter {
38    sender: std::sync::Mutex<Sender<AgentEvent>>,
39}
40
41impl ChannelReporter {
42    /// Creates a `ChannelReporter` from an existing sender.
43    pub fn new(sender: Sender<AgentEvent>) -> Self {
44        Self {
45            sender: std::sync::Mutex::new(sender),
46        }
47    }
48
49    /// Creates a `ChannelReporter` together with the receiving end of the
50    /// channel.
51    pub fn pair() -> (Self, Receiver<AgentEvent>) {
52        let (sender, receiver) = mpsc::channel();
53        (Self::new(sender), receiver)
54    }
55}
56
57impl LoopObserver for ChannelReporter {
58    fn handle_event(&self, event: AgentEvent) {
59        let _ = self
60            .sender
61            .lock()
62            .unwrap_or_else(|e| e.into_inner())
63            .send(event);
64    }
65}
66
67impl FallibleObserver for ChannelReporter {
68    fn try_handle_event(&self, event: &AgentEvent) -> Result<(), ReportError> {
69        self.sender
70            .lock()
71            .unwrap_or_else(|e| e.into_inner())
72            .send(event.clone())
73            .map_err(|_| ReportError::ChannelSend)
74    }
75}