Skip to main content

steer_core/app/domain/runtime/
subscription.rs

1use tokio::sync::{broadcast, mpsc};
2
3use crate::app::domain::event::SessionEvent;
4use crate::app::domain::types::SessionId;
5
6#[derive(Debug, Clone)]
7pub struct SessionEventEnvelope {
8    pub seq: u64,
9    pub event: SessionEvent,
10}
11
12pub struct SessionEventSubscription {
13    pub session_id: SessionId,
14    pub rx: broadcast::Receiver<SessionEventEnvelope>,
15    unsubscribe_tx: mpsc::UnboundedSender<UnsubscribeSignal>,
16}
17
18pub(crate) struct UnsubscribeSignal;
19
20impl SessionEventSubscription {
21    pub(crate) fn new(
22        session_id: SessionId,
23        rx: broadcast::Receiver<SessionEventEnvelope>,
24        unsubscribe_tx: mpsc::UnboundedSender<UnsubscribeSignal>,
25    ) -> Self {
26        Self {
27            session_id,
28            rx,
29            unsubscribe_tx,
30        }
31    }
32
33    pub async fn recv(&mut self) -> Option<SessionEventEnvelope> {
34        loop {
35            match self.rx.recv().await {
36                Ok(envelope) => return Some(envelope),
37                Err(broadcast::error::RecvError::Closed) => return None,
38                Err(broadcast::error::RecvError::Lagged(n)) => {
39                    tracing::warn!(
40                        session_id = %self.session_id,
41                        lagged = n,
42                        "Event subscriber lagged, some events were dropped"
43                    );
44                }
45            }
46        }
47    }
48}
49
50impl Drop for SessionEventSubscription {
51    fn drop(&mut self) {
52        let _ = self.unsubscribe_tx.send(UnsubscribeSignal);
53    }
54}