Skip to main content

strike_sdk/events/
subscribe.rs

1//! Live WSS event subscriptions with auto-reconnect.
2
3use alloy::primitives::Address;
4use alloy::providers::{Provider, ProviderBuilder, WsConnect};
5use alloy::rpc::types::Filter;
6use alloy::sol_types::SolEvent;
7use futures_util::stream::{Stream, StreamExt};
8use std::pin::Pin;
9use std::task::{Context, Poll};
10use tokio::sync::mpsc;
11use tracing::{info, warn};
12
13use crate::contracts::{BatchAuction, MarketFactory};
14use crate::error::Result;
15use crate::types::StrikeEvent;
16
17/// A stream of on-chain Strike events.
18///
19/// Internally manages a WebSocket connection with auto-reconnect on drop
20/// (5-second backoff between attempts).
21pub struct EventStream {
22    rx: mpsc::UnboundedReceiver<StrikeEvent>,
23    // Hold the task handle so it doesn't get dropped
24    _handle: tokio::task::JoinHandle<()>,
25}
26
27impl EventStream {
28    /// Connect to the WSS endpoint and start streaming events.
29    pub(crate) async fn connect(
30        wss_url: &str,
31        market_factory_addr: Address,
32        batch_auction_addr: Address,
33    ) -> Result<Self> {
34        let (tx, rx) = mpsc::unbounded_channel();
35        let wss_url = wss_url.to_string();
36
37        let handle = tokio::spawn(async move {
38            loop {
39                match run_subscriptions(&wss_url, market_factory_addr, batch_auction_addr, &tx)
40                    .await
41                {
42                    Ok(()) => {
43                        info!("WS subscriber exited cleanly");
44                        break;
45                    }
46                    Err(e) => {
47                        warn!(err = %e, "WS subscription dropped — reconnecting in 5s");
48                        tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
49                    }
50                }
51            }
52        });
53
54        Ok(Self {
55            rx,
56            _handle: handle,
57        })
58    }
59
60    /// Receive the next event. Returns `None` if the stream has ended.
61    pub async fn next(&mut self) -> Option<StrikeEvent> {
62        self.rx.recv().await
63    }
64}
65
66impl Stream for EventStream {
67    type Item = StrikeEvent;
68
69    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
70        self.rx.poll_recv(cx)
71    }
72}
73
74async fn run_subscriptions(
75    wss_url: &str,
76    market_factory_addr: Address,
77    batch_auction_addr: Address,
78    tx: &mpsc::UnboundedSender<StrikeEvent>,
79) -> std::result::Result<(), eyre::Report> {
80    let ws = WsConnect::new(wss_url);
81    let provider = ProviderBuilder::new()
82        .connect_ws(ws)
83        .await
84        .map_err(|e| eyre::eyre!("WS connect failed: {e}"))?;
85
86    // MarketCreated
87    let mc_filter = Filter::new()
88        .address(market_factory_addr)
89        .event_signature(MarketFactory::MarketCreated::SIGNATURE_HASH);
90    let mc_sub = provider.subscribe_logs(&mc_filter).await?;
91    info!("subscribed to MarketCreated events");
92
93    // BatchCleared
94    let batch_filter = Filter::new()
95        .address(batch_auction_addr)
96        .event_signature(BatchAuction::BatchCleared::SIGNATURE_HASH);
97    let batch_sub = provider.subscribe_logs(&batch_filter).await?;
98    info!("subscribed to BatchCleared events");
99
100    // OrderSettled (all — no owner filter in the SDK)
101    let settled_filter = Filter::new()
102        .address(batch_auction_addr)
103        .event_signature(BatchAuction::OrderSettled::SIGNATURE_HASH);
104    let settled_sub = provider.subscribe_logs(&settled_filter).await?;
105    info!("subscribed to OrderSettled events");
106
107    // GtcAutoCancelled
108    let gtc_filter = Filter::new()
109        .address(batch_auction_addr)
110        .event_signature(BatchAuction::GtcAutoCancelled::SIGNATURE_HASH);
111    let gtc_sub = provider.subscribe_logs(&gtc_filter).await?;
112    info!("subscribed to GtcAutoCancelled events");
113
114    let mut mc_stream = mc_sub.into_stream();
115    let mut batch_stream = batch_sub.into_stream();
116    let mut settled_stream = settled_sub.into_stream();
117    let mut gtc_stream = gtc_sub.into_stream();
118
119    loop {
120        tokio::select! {
121            Some(log) = mc_stream.next() => {
122                if let Ok(event) = MarketFactory::MarketCreated::decode_log(&log.inner) {
123                    let mut price_id = [0u8; 32];
124                    price_id.copy_from_slice(&event.priceId[..]);
125                    let _ = tx.send(StrikeEvent::MarketCreated {
126                        market_id: event.orderBookMarketId.to::<u64>(),
127                        price_id,
128                        strike_price: event.strikePrice,
129                        expiry_time: event.expiryTime.to::<u64>(),
130                    });
131                }
132            }
133            Some(log) = batch_stream.next() => {
134                if let Ok(event) = BatchAuction::BatchCleared::decode_log(&log.inner) {
135                    let _ = tx.send(StrikeEvent::BatchCleared {
136                        market_id: event.marketId.to::<u64>(),
137                        batch_id: event.batchId.to::<u64>(),
138                        clearing_tick: event.clearingTick.to::<u64>(),
139                        matched_lots: event.matchedLots.to::<u64>(),
140                    });
141                }
142            }
143            Some(log) = settled_stream.next() => {
144                if let Ok(event) = BatchAuction::OrderSettled::decode_log(&log.inner) {
145                    let _ = tx.send(StrikeEvent::OrderSettled {
146                        order_id: event.orderId,
147                        owner: event.owner,
148                        filled_lots: event.filledLots.to::<u64>(),
149                    });
150                }
151            }
152            Some(log) = gtc_stream.next() => {
153                if let Ok(event) = BatchAuction::GtcAutoCancelled::decode_log(&log.inner) {
154                    let _ = tx.send(StrikeEvent::GtcAutoCancelled {
155                        order_id: event.orderId,
156                        owner: event.owner,
157                    });
158                }
159            }
160            else => {
161                eyre::bail!("all event streams ended");
162            }
163        }
164    }
165}