pub struct SessionBus { /* private fields */ }Expand description
Unified event bus for a single session.
See the module docs for the ephemeral-vs-durable model.
SessionBus is cheap to clone; every clone shares the same
broadcast channel, durable sink, and optional legacy mpsc forwarder.
§Examples
Fan out a single event to a broadcast subscriber and a durable sink:
use std::sync::{Arc, Mutex};
use codetether_agent::session::{
DurableSink, SessionBus, SessionEvent, TokenDelta, TokenSource,
};
struct Mem(Arc<Mutex<usize>>);
impl DurableSink for Mem {
fn write(&self, _e: &SessionEvent) -> std::io::Result<()> {
*self.0.lock().unwrap() += 1;
Ok(())
}
}
let counter = Arc::new(Mutex::new(0));
let sink = Arc::new(Mem(counter.clone()));
let bus = SessionBus::new(64).with_durable_sink(sink);
let mut rx = bus.subscribe();
bus.emit(SessionEvent::TokenUsage(TokenDelta {
source: TokenSource::Root,
model: "m".into(),
prompt_tokens: 10, completion_tokens: 5, duration_ms: 0,
}));
let ev = rx.recv().await.unwrap();
assert!(ev.is_durable());
assert_eq!(*counter.lock().unwrap(), 1);Implementations§
Source§impl SessionBus
impl SessionBus
Sourcepub fn new(capacity: usize) -> Self
pub fn new(capacity: usize) -> Self
Construct a new bus with the given broadcast channel capacity.
capacity controls how many pending events the broadcast channel
retains per subscriber before emitting RecvError::Lagged. 64–256
is typical for interactive TUIs.
Starts with NoopSink and no legacy forwarder. Use
Self::with_durable_sink and Self::with_legacy_mpsc to
attach either.
§Panics
Panics if capacity == 0 (same contract as broadcast::channel).
§Examples
use codetether_agent::session::SessionBus;
let bus = SessionBus::new(16);
assert_eq!(bus.subscriber_count(), 0);Sourcepub fn with_durable_sink(self, sink: Arc<dyn DurableSink>) -> Self
pub fn with_durable_sink(self, sink: Arc<dyn DurableSink>) -> Self
Attach a durable write-ahead sink for is_durable() events.
§Examples
use std::sync::Arc;
use codetether_agent::session::{NoopSink, SessionBus};
let bus = SessionBus::new(8).with_durable_sink(Arc::new(NoopSink));
assert_eq!(bus.subscriber_count(), 0);Sourcepub fn with_legacy_mpsc(self, tx: Sender<SessionEvent>) -> Self
pub fn with_legacy_mpsc(self, tx: Sender<SessionEvent>) -> Self
Forward every emission to an existing legacy mpsc::Sender for
backward compatibility with code paths that have not migrated.
Emission is non-blocking: uses try_send and logs (does not
propagate) failures. This preserves the lossy semantics the legacy
channel already had.
§Examples
use codetether_agent::session::{SessionBus, SessionEvent};
let (tx, mut rx) = tokio::sync::mpsc::channel(8);
let bus = SessionBus::new(8).with_legacy_mpsc(tx);
bus.emit(SessionEvent::Thinking);
let ev = rx.recv().await.unwrap();
assert!(matches!(ev, SessionEvent::Thinking));Sourcepub fn subscribe(&self) -> Receiver<SessionEvent>
pub fn subscribe(&self) -> Receiver<SessionEvent>
Subscribe to the ephemeral broadcast stream.
All events — ephemeral and durable — are republished here for live
UIs. Slow consumers will see RecvError::Lagged(n); the durable
sink is unaffected.
Sourcepub fn subscriber_count(&self) -> usize
pub fn subscriber_count(&self) -> usize
Current number of active broadcast subscribers.
§Examples
use codetether_agent::session::SessionBus;
let bus = SessionBus::new(4);
let _rx = bus.subscribe();
assert_eq!(bus.subscriber_count(), 1);Sourcepub fn emit(&self, event: SessionEvent)
pub fn emit(&self, event: SessionEvent)
Publish one event.
For durable events the sink is written first (synchronously);
on success the event is then broadcast. A sink failure is logged
at warn and does not prevent the broadcast — observers still see
the event even if durable persistence fell over.
Sending on an empty broadcast channel is not an error; it simply means no live subscribers.
§Examples
use codetether_agent::session::{SessionBus, SessionEvent};
let bus = SessionBus::new(4);
bus.emit(SessionEvent::Done); // no subscribers → still okTrait Implementations§
Source§impl Clone for SessionBus
impl Clone for SessionBus
Source§fn clone(&self) -> SessionBus
fn clone(&self) -> SessionBus
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreAuto Trait Implementations§
impl Freeze for SessionBus
impl !RefUnwindSafe for SessionBus
impl Send for SessionBus
impl Sync for SessionBus
impl Unpin for SessionBus
impl UnsafeUnpin for SessionBus
impl !UnwindSafe for SessionBus
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request