Skip to main content

codetether_agent/session/bus/
handle.rs

1//! The [`SessionBus`] handle itself — broadcast fan-out plus durable sink.
2
3use std::sync::Arc;
4
5use tokio::sync::{broadcast, mpsc};
6use tracing::warn;
7
8use super::sink::{NoopSink, SharedSink};
9use crate::session::{DurableSink, SessionEvent};
10
11/// Unified event bus for a single session.
12///
13/// See the [module docs](super) for the ephemeral-vs-durable model.
14/// `SessionBus` is cheap to `clone`; every clone shares the same
15/// broadcast channel, durable sink, and optional legacy mpsc forwarder.
16///
17/// # Examples
18///
19/// Fan out a single event to a broadcast subscriber and a durable sink:
20///
21/// ```rust,no_run
22/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
23/// use std::sync::{Arc, Mutex};
24/// use codetether_agent::session::{
25///     DurableSink, SessionBus, SessionEvent, TokenDelta, TokenSource,
26/// };
27///
28/// struct Mem(Arc<Mutex<usize>>);
29/// impl DurableSink for Mem {
30///     fn write(&self, _e: &SessionEvent) -> std::io::Result<()> {
31///         *self.0.lock().unwrap() += 1;
32///         Ok(())
33///     }
34/// }
35///
36/// let counter = Arc::new(Mutex::new(0));
37/// let sink = Arc::new(Mem(counter.clone()));
38/// let bus = SessionBus::new(64).with_durable_sink(sink);
39/// let mut rx = bus.subscribe();
40///
41/// bus.emit(SessionEvent::TokenUsage(TokenDelta {
42///     source: TokenSource::Root,
43///     model: "m".into(),
44///     prompt_tokens: 10, completion_tokens: 5, duration_ms: 0,
45/// }));
46///
47/// let ev = rx.recv().await.unwrap();
48/// assert!(ev.is_durable());
49/// assert_eq!(*counter.lock().unwrap(), 1);
50/// # });
51/// ```
52#[derive(Clone)]
53pub struct SessionBus {
54    tx: broadcast::Sender<SessionEvent>,
55    sink: SharedSink,
56    legacy: Option<mpsc::Sender<SessionEvent>>,
57}
58
59impl SessionBus {
60    /// Construct a new bus with the given broadcast channel capacity.
61    ///
62    /// `capacity` controls how many pending events the broadcast channel
63    /// retains per subscriber before emitting `RecvError::Lagged`. 64–256
64    /// is typical for interactive TUIs.
65    ///
66    /// Starts with [`NoopSink`] and no legacy forwarder. Use
67    /// [`Self::with_durable_sink`] and [`Self::with_legacy_mpsc`] to
68    /// attach either.
69    ///
70    /// # Panics
71    ///
72    /// Panics if `capacity == 0` (same contract as `broadcast::channel`).
73    ///
74    /// # Examples
75    ///
76    /// ```rust
77    /// use codetether_agent::session::SessionBus;
78    ///
79    /// let bus = SessionBus::new(16);
80    /// assert_eq!(bus.subscriber_count(), 0);
81    /// ```
82    pub fn new(capacity: usize) -> Self {
83        let (tx, _) = broadcast::channel(capacity);
84        Self {
85            tx,
86            sink: Arc::new(NoopSink),
87            legacy: None,
88        }
89    }
90
91    /// Attach a durable write-ahead sink for `is_durable()` events.
92    ///
93    /// # Examples
94    ///
95    /// ```rust
96    /// use std::sync::Arc;
97    /// use codetether_agent::session::{NoopSink, SessionBus};
98    ///
99    /// let bus = SessionBus::new(8).with_durable_sink(Arc::new(NoopSink));
100    /// assert_eq!(bus.subscriber_count(), 0);
101    /// ```
102    pub fn with_durable_sink(mut self, sink: Arc<dyn DurableSink>) -> Self {
103        self.sink = sink;
104        self
105    }
106
107    /// Forward every emission to an existing legacy `mpsc::Sender` for
108    /// backward compatibility with code paths that have not migrated.
109    ///
110    /// Emission is **non-blocking**: uses `try_send` and logs (does not
111    /// propagate) failures. This preserves the lossy semantics the legacy
112    /// channel already had.
113    ///
114    /// # Examples
115    ///
116    /// ```rust,no_run
117    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
118    /// use codetether_agent::session::{SessionBus, SessionEvent};
119    /// let (tx, mut rx) = tokio::sync::mpsc::channel(8);
120    /// let bus = SessionBus::new(8).with_legacy_mpsc(tx);
121    /// bus.emit(SessionEvent::Thinking);
122    /// let ev = rx.recv().await.unwrap();
123    /// assert!(matches!(ev, SessionEvent::Thinking));
124    /// # });
125    /// ```
126    pub fn with_legacy_mpsc(mut self, tx: mpsc::Sender<SessionEvent>) -> Self {
127        self.legacy = Some(tx);
128        self
129    }
130
131    /// Subscribe to the ephemeral broadcast stream.
132    ///
133    /// All events — ephemeral and durable — are republished here for live
134    /// UIs. Slow consumers will see `RecvError::Lagged(n)`; the durable
135    /// sink is unaffected.
136    pub fn subscribe(&self) -> broadcast::Receiver<SessionEvent> {
137        self.tx.subscribe()
138    }
139
140    /// Current number of active broadcast subscribers.
141    ///
142    /// # Examples
143    ///
144    /// ```rust
145    /// use codetether_agent::session::SessionBus;
146    ///
147    /// let bus = SessionBus::new(4);
148    /// let _rx = bus.subscribe();
149    /// assert_eq!(bus.subscriber_count(), 1);
150    /// ```
151    pub fn subscriber_count(&self) -> usize {
152        self.tx.receiver_count()
153    }
154
155    /// Publish one event.
156    ///
157    /// For durable events the sink is written **first** (synchronously);
158    /// on success the event is then broadcast. A sink failure is logged
159    /// at `warn` and does not prevent the broadcast — observers still see
160    /// the event even if durable persistence fell over.
161    ///
162    /// Sending on an empty broadcast channel is not an error; it simply
163    /// means no live subscribers.
164    ///
165    /// # Examples
166    ///
167    /// ```rust,no_run
168    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
169    /// use codetether_agent::session::{SessionBus, SessionEvent};
170    /// let bus = SessionBus::new(4);
171    /// bus.emit(SessionEvent::Done); // no subscribers → still ok
172    /// # });
173    /// ```
174    pub fn emit(&self, event: SessionEvent) {
175        if event.is_durable()
176            && let Err(err) = self.sink.write(&event)
177        {
178            warn!(error = %err, "SessionBus: durable sink write failed");
179        }
180
181        if let Some(legacy) = self.legacy.as_ref()
182            && let Err(err) = legacy.try_send(event.clone())
183        {
184            tracing::debug!(error = %err, "SessionBus: legacy mpsc forward failed");
185        }
186
187        let _ = self.tx.send(event);
188    }
189}