Skip to main content

digdigdig3/connector_manager/feed/
handle.rs

1//! `FeedHandle` — per-subscription token returned by `MarketFeed::subscribe_*`.
2//!
3//! Holds a `tokio::sync::broadcast::Receiver` so each subscriber sees its own
4//! copy of events. Dropping the handle decrements a refcount; when it hits
5//! zero (and after `unsub_grace`) the upstream subscription is released —
6//! mirrors MLC `WsActorMap` behaviour.
7
8use std::sync::Arc;
9
10use tokio::sync::broadcast;
11
12use crate::core::types::{AccountType, ExchangeId, StreamEvent};
13
14/// Event emitted by `MarketFeed`. Wraps the raw `StreamEvent` with the source
15/// tuple so a single merged stream can carry events from many exchanges.
16#[derive(Debug, Clone)]
17pub struct FeedEvent {
18    pub exchange: ExchangeId,
19    pub account_type: AccountType,
20    pub symbol: String,
21    pub event: StreamEvent,
22}
23
24/// Handle returned to the consumer. Receive events via `.recv().await` or
25/// (when `futures_util::StreamExt` is in scope) treat it as a `Stream` via
26/// `into_stream()`.
27pub struct FeedHandle {
28    pub(crate) rx: broadcast::Receiver<FeedEvent>,
29    pub(crate) _keep_alive: Arc<()>,
30}
31
32impl FeedHandle {
33    /// Receive the next event. `None` on lagging-close / channel-closed.
34    pub async fn recv(&mut self) -> Option<FeedEvent> {
35        loop {
36            match self.rx.recv().await {
37                Ok(ev) => return Some(ev),
38                Err(broadcast::error::RecvError::Closed) => return None,
39                Err(broadcast::error::RecvError::Lagged(_)) => continue,
40            }
41        }
42    }
43
44    /// Convert into a `Stream<Item = FeedEvent>` (requires the caller to bring
45    /// `tokio_stream` or call `.recv()` in a loop).
46    pub fn into_receiver(self) -> broadcast::Receiver<FeedEvent> { self.rx }
47}