digdigdig3/connector_manager/feed/handle.rs
1//! `FeedHandle` — per-subscription token returned by `MarketFeed::subscribe_*`.
2//!
3//! Holds a `tokio::sync::broadcast::Receiver` so each subscriber sees its own
4//! copy of events. Dropping the handle decrements a refcount; when it hits
5//! zero (and after `unsub_grace`) the upstream subscription is released —
6//! mirrors MLC `WsActorMap` behaviour.
7
8use std::sync::Arc;
9
10use tokio::sync::broadcast;
11
12use crate::core::types::{AccountType, ExchangeId, StreamEvent};
13
14/// Event emitted by `MarketFeed`. Wraps the raw `StreamEvent` with the source
15/// tuple so a single merged stream can carry events from many exchanges.
16#[derive(Debug, Clone)]
17pub struct FeedEvent {
18 pub exchange: ExchangeId,
19 pub account_type: AccountType,
20 pub symbol: String,
21 pub event: StreamEvent,
22}
23
24/// Handle returned to the consumer. Receive events via `.recv().await` or
25/// (when `futures_util::StreamExt` is in scope) treat it as a `Stream` via
26/// `into_stream()`.
27pub struct FeedHandle {
28 pub(crate) rx: broadcast::Receiver<FeedEvent>,
29 pub(crate) _keep_alive: Arc<()>,
30}
31
32impl FeedHandle {
33 /// Receive the next event. `None` on lagging-close / channel-closed.
34 pub async fn recv(&mut self) -> Option<FeedEvent> {
35 loop {
36 match self.rx.recv().await {
37 Ok(ev) => return Some(ev),
38 Err(broadcast::error::RecvError::Closed) => return None,
39 Err(broadcast::error::RecvError::Lagged(_)) => continue,
40 }
41 }
42 }
43
44 /// Convert into a `Stream<Item = FeedEvent>` (requires the caller to bring
45 /// `tokio_stream` or call `.recv()` in a loop).
46 pub fn into_receiver(self) -> broadcast::Receiver<FeedEvent> { self.rx }
47}