steer_core/app/domain/runtime/
subscription.rs1use tokio::sync::{broadcast, mpsc};
2
3use crate::app::domain::event::SessionEvent;
4use crate::app::domain::types::SessionId;
5
6#[derive(Debug, Clone)]
7pub struct SessionEventEnvelope {
8 pub seq: u64,
9 pub event: SessionEvent,
10}
11
12pub struct SessionEventSubscription {
13 pub session_id: SessionId,
14 pub rx: broadcast::Receiver<SessionEventEnvelope>,
15 unsubscribe_tx: mpsc::UnboundedSender<UnsubscribeSignal>,
16}
17
18pub(crate) struct UnsubscribeSignal;
19
20impl SessionEventSubscription {
21 pub(crate) fn new(
22 session_id: SessionId,
23 rx: broadcast::Receiver<SessionEventEnvelope>,
24 unsubscribe_tx: mpsc::UnboundedSender<UnsubscribeSignal>,
25 ) -> Self {
26 Self {
27 session_id,
28 rx,
29 unsubscribe_tx,
30 }
31 }
32
33 pub async fn recv(&mut self) -> Option<SessionEventEnvelope> {
34 loop {
35 match self.rx.recv().await {
36 Ok(envelope) => return Some(envelope),
37 Err(broadcast::error::RecvError::Closed) => return None,
38 Err(broadcast::error::RecvError::Lagged(n)) => {
39 tracing::warn!(
40 session_id = %self.session_id,
41 lagged = n,
42 "Event subscriber lagged, some events were dropped"
43 );
44 }
45 }
46 }
47 }
48}
49
50impl Drop for SessionEventSubscription {
51 fn drop(&mut self) {
52 let _ = self.unsubscribe_tx.send(UnsubscribeSignal);
53 }
54}