Skip to main content

perpcity_sdk/
feed.rs

1//! Live market event feed over WebSocket.
2//!
3//! [`MarketFeed`] subscribes to PerpManager and Beacon contract events
4//! via [`WsManager`], decodes raw logs into typed [`MarketEvent`] values,
5//! and filters by perp. Consumers call [`MarketFeed::next()`] in a loop
6//! to receive real-time market data with zero per-read RPC cost.
7//!
8//! # Example
9//!
10//! ```rust,no_run
11//! use perpcity_sdk::feed::MarketFeed;
12//! use perpcity_sdk::transport::ws::{WsManager, ReconnectConfig};
13//! use alloy::primitives::{Address, B256, address};
14//!
15//! # async fn example() -> perpcity_sdk::Result<()> {
16//! let ws = WsManager::connect("wss://base-rpc.example.com", ReconnectConfig::default()).await?;
17//!
18//! let perp_manager = address!("0000000000000000000000000000000000000001");
19//! let beacon = address!("0000000000000000000000000000000000000002");
20//! let perp_id = B256::ZERO;
21//!
22//! let mut feed = MarketFeed::subscribe(&ws, perp_manager, beacon, perp_id).await?;
23//! while let Some(event) = feed.next().await {
24//!     println!("{event:?}");
25//! }
26//! # Ok(())
27//! # }
28//! ```
29
30use alloy::primitives::{Address, B256};
31use alloy::rpc::types::{Filter, Log};
32use tokio::sync::mpsc;
33
34use crate::events::{MarketEvent, decode_log};
35use crate::transport::ws::WsManager;
36
37/// A filtered stream of decoded [`MarketEvent`]s for a single perp.
38///
39/// Created via [`MarketFeed::subscribe()`]. Call [`next()`](MarketFeed::next)
40/// in a loop to receive events. Returns `None` when the WebSocket
41/// connection is lost.
42#[derive(Debug)]
43pub struct MarketFeed {
44    rx: mpsc::Receiver<Log>,
45    perp_id: B256,
46}
47
48impl MarketFeed {
49    /// Subscribe to events for a single perp.
50    ///
51    /// Creates a WebSocket log subscription filtered to the `perp_manager`
52    /// and `beacon` contract addresses. PerpManager events are further
53    /// filtered by `perp_id` in [`next()`](MarketFeed::next). Beacon
54    /// `IndexUpdated` events are already scoped to this perp by the
55    /// beacon address filter.
56    pub async fn subscribe(
57        ws: &WsManager,
58        perp_manager: Address,
59        beacon: Address,
60        perp_id: B256,
61    ) -> crate::Result<Self> {
62        let filter = Filter::new().address(vec![perp_manager, beacon]);
63        let rx = ws.subscribe_logs(filter).await?;
64        tracing::info!(%perp_id, %perp_manager, %beacon, "market feed subscribed");
65        Ok(Self { rx, perp_id })
66    }
67
68    /// Receive the next decoded event for this perp.
69    ///
70    /// Blocks until a matching event arrives. Returns `None` when the
71    /// WebSocket connection is lost (sender dropped).
72    ///
73    /// Skips unrecognized events and events belonging to other perps.
74    pub async fn next(&mut self) -> Option<MarketEvent> {
75        loop {
76            let log = self.rx.recv().await?;
77            if let Some(event) = decode_log(&log) {
78                match &event {
79                    MarketEvent::PositionOpened { perp_id, .. }
80                    | MarketEvent::NotionalAdjusted { perp_id, .. }
81                    | MarketEvent::PositionClosed { perp_id, .. }
82                        if *perp_id != self.perp_id =>
83                    {
84                        continue;
85                    }
86                    _ => {
87                        tracing::trace!(perp_id = %self.perp_id, event = ?event, "market event received");
88                        return Some(event);
89                    }
90                }
91            }
92        }
93    }
94
95    /// The perp ID this feed is filtering for.
96    pub fn perp_id(&self) -> B256 {
97        self.perp_id
98    }
99}