codetether_agent/session/bus/handle.rs
1//! The [`SessionBus`] handle itself — broadcast fan-out plus durable sink.
2
3use std::sync::Arc;
4
5use tokio::sync::{broadcast, mpsc};
6use tracing::warn;
7
8use super::sink::{NoopSink, SharedSink};
9use crate::session::{DurableSink, SessionEvent};
10
11/// Unified event bus for a single session.
12///
13/// See the [module docs](super) for the ephemeral-vs-durable model.
14/// `SessionBus` is cheap to `clone`; every clone shares the same
15/// broadcast channel, durable sink, and optional legacy mpsc forwarder.
16///
17/// # Examples
18///
19/// Fan out a single event to a broadcast subscriber and a durable sink:
20///
21/// ```rust,no_run
22/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
23/// use std::sync::{Arc, Mutex};
24/// use codetether_agent::session::{
25/// DurableSink, SessionBus, SessionEvent, TokenDelta, TokenSource,
26/// };
27///
28/// struct Mem(Arc<Mutex<usize>>);
29/// impl DurableSink for Mem {
30/// fn write(&self, _e: &SessionEvent) -> std::io::Result<()> {
31/// *self.0.lock().unwrap() += 1;
32/// Ok(())
33/// }
34/// }
35///
36/// let counter = Arc::new(Mutex::new(0));
37/// let sink = Arc::new(Mem(counter.clone()));
38/// let bus = SessionBus::new(64).with_durable_sink(sink);
39/// let mut rx = bus.subscribe();
40///
41/// bus.emit(SessionEvent::TokenUsage(TokenDelta {
42/// source: TokenSource::Root,
43/// model: "m".into(),
44/// prompt_tokens: 10, completion_tokens: 5, duration_ms: 0,
45/// }));
46///
47/// let ev = rx.recv().await.unwrap();
48/// assert!(ev.is_durable());
49/// assert_eq!(*counter.lock().unwrap(), 1);
50/// # });
51/// ```
52#[derive(Clone)]
53pub struct SessionBus {
54 tx: broadcast::Sender<SessionEvent>,
55 sink: SharedSink,
56 legacy: Option<mpsc::Sender<SessionEvent>>,
57}
58
59impl SessionBus {
60 /// Construct a new bus with the given broadcast channel capacity.
61 ///
62 /// `capacity` controls how many pending events the broadcast channel
63 /// retains per subscriber before emitting `RecvError::Lagged`. 64–256
64 /// is typical for interactive TUIs.
65 ///
66 /// Starts with [`NoopSink`] and no legacy forwarder. Use
67 /// [`Self::with_durable_sink`] and [`Self::with_legacy_mpsc`] to
68 /// attach either.
69 ///
70 /// # Panics
71 ///
72 /// Panics if `capacity == 0` (same contract as `broadcast::channel`).
73 ///
74 /// # Examples
75 ///
76 /// ```rust
77 /// use codetether_agent::session::SessionBus;
78 ///
79 /// let bus = SessionBus::new(16);
80 /// assert_eq!(bus.subscriber_count(), 0);
81 /// ```
82 pub fn new(capacity: usize) -> Self {
83 let (tx, _) = broadcast::channel(capacity);
84 Self {
85 tx,
86 sink: Arc::new(NoopSink),
87 legacy: None,
88 }
89 }
90
91 /// Attach a durable write-ahead sink for `is_durable()` events.
92 ///
93 /// # Examples
94 ///
95 /// ```rust
96 /// use std::sync::Arc;
97 /// use codetether_agent::session::{NoopSink, SessionBus};
98 ///
99 /// let bus = SessionBus::new(8).with_durable_sink(Arc::new(NoopSink));
100 /// assert_eq!(bus.subscriber_count(), 0);
101 /// ```
102 pub fn with_durable_sink(mut self, sink: Arc<dyn DurableSink>) -> Self {
103 self.sink = sink;
104 self
105 }
106
107 /// Forward every emission to an existing legacy `mpsc::Sender` for
108 /// backward compatibility with code paths that have not migrated.
109 ///
110 /// Emission is **non-blocking**: uses `try_send` and logs (does not
111 /// propagate) failures. This preserves the lossy semantics the legacy
112 /// channel already had.
113 ///
114 /// # Examples
115 ///
116 /// ```rust,no_run
117 /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
118 /// use codetether_agent::session::{SessionBus, SessionEvent};
119 /// let (tx, mut rx) = tokio::sync::mpsc::channel(8);
120 /// let bus = SessionBus::new(8).with_legacy_mpsc(tx);
121 /// bus.emit(SessionEvent::Thinking);
122 /// let ev = rx.recv().await.unwrap();
123 /// assert!(matches!(ev, SessionEvent::Thinking));
124 /// # });
125 /// ```
126 pub fn with_legacy_mpsc(mut self, tx: mpsc::Sender<SessionEvent>) -> Self {
127 self.legacy = Some(tx);
128 self
129 }
130
131 /// Subscribe to the ephemeral broadcast stream.
132 ///
133 /// All events — ephemeral and durable — are republished here for live
134 /// UIs. Slow consumers will see `RecvError::Lagged(n)`; the durable
135 /// sink is unaffected.
136 pub fn subscribe(&self) -> broadcast::Receiver<SessionEvent> {
137 self.tx.subscribe()
138 }
139
140 /// Current number of active broadcast subscribers.
141 ///
142 /// # Examples
143 ///
144 /// ```rust
145 /// use codetether_agent::session::SessionBus;
146 ///
147 /// let bus = SessionBus::new(4);
148 /// let _rx = bus.subscribe();
149 /// assert_eq!(bus.subscriber_count(), 1);
150 /// ```
151 pub fn subscriber_count(&self) -> usize {
152 self.tx.receiver_count()
153 }
154
155 /// Publish one event.
156 ///
157 /// For durable events the sink is written **first** (synchronously);
158 /// on success the event is then broadcast. A sink failure is logged
159 /// at `warn` and does not prevent the broadcast — observers still see
160 /// the event even if durable persistence fell over.
161 ///
162 /// Sending on an empty broadcast channel is not an error; it simply
163 /// means no live subscribers.
164 ///
165 /// # Examples
166 ///
167 /// ```rust,no_run
168 /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
169 /// use codetether_agent::session::{SessionBus, SessionEvent};
170 /// let bus = SessionBus::new(4);
171 /// bus.emit(SessionEvent::Done); // no subscribers → still ok
172 /// # });
173 /// ```
174 pub fn emit(&self, event: SessionEvent) {
175 if event.is_durable()
176 && let Err(err) = self.sink.write(&event)
177 {
178 warn!(error = %err, "SessionBus: durable sink write failed");
179 }
180
181 if let Some(legacy) = self.legacy.as_ref()
182 && let Err(err) = legacy.try_send(event.clone())
183 {
184 tracing::debug!(error = %err, "SessionBus: legacy mpsc forward failed");
185 }
186
187 let _ = self.tx.send(event);
188 }
189}