strike_sdk/events/
subscribe.rs1use alloy::primitives::Address;
4use alloy::providers::{Provider, ProviderBuilder, WsConnect};
5use alloy::rpc::types::Filter;
6use alloy::sol_types::SolEvent;
7use futures_util::stream::{Stream, StreamExt};
8use std::pin::Pin;
9use std::task::{Context, Poll};
10use tokio::sync::mpsc;
11use tracing::{info, warn};
12
13use crate::contracts::{BatchAuction, MarketFactory};
14use crate::error::Result;
15use crate::types::StrikeEvent;
16
17pub struct EventStream {
22 rx: mpsc::UnboundedReceiver<StrikeEvent>,
23 _handle: tokio::task::JoinHandle<()>,
25}
26
27impl EventStream {
28 pub(crate) async fn connect(
30 wss_url: &str,
31 market_factory_addr: Address,
32 batch_auction_addr: Address,
33 ) -> Result<Self> {
34 let (tx, rx) = mpsc::unbounded_channel();
35 let wss_url = wss_url.to_string();
36
37 let handle = tokio::spawn(async move {
38 loop {
39 match run_subscriptions(&wss_url, market_factory_addr, batch_auction_addr, &tx)
40 .await
41 {
42 Ok(()) => {
43 info!("WS subscriber exited cleanly");
44 break;
45 }
46 Err(e) => {
47 warn!(err = %e, "WS subscription dropped — reconnecting in 5s");
48 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
49 }
50 }
51 }
52 });
53
54 Ok(Self {
55 rx,
56 _handle: handle,
57 })
58 }
59
60 pub async fn next(&mut self) -> Option<StrikeEvent> {
62 self.rx.recv().await
63 }
64}
65
66impl Stream for EventStream {
67 type Item = StrikeEvent;
68
69 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
70 self.rx.poll_recv(cx)
71 }
72}
73
74async fn run_subscriptions(
75 wss_url: &str,
76 market_factory_addr: Address,
77 batch_auction_addr: Address,
78 tx: &mpsc::UnboundedSender<StrikeEvent>,
79) -> std::result::Result<(), eyre::Report> {
80 let ws = WsConnect::new(wss_url);
81 let provider = ProviderBuilder::new()
82 .connect_ws(ws)
83 .await
84 .map_err(|e| eyre::eyre!("WS connect failed: {e}"))?;
85
86 let mc_filter = Filter::new()
88 .address(market_factory_addr)
89 .event_signature(MarketFactory::MarketCreated::SIGNATURE_HASH);
90 let mc_sub = provider.subscribe_logs(&mc_filter).await?;
91 info!("subscribed to MarketCreated events");
92
93 let batch_filter = Filter::new()
95 .address(batch_auction_addr)
96 .event_signature(BatchAuction::BatchCleared::SIGNATURE_HASH);
97 let batch_sub = provider.subscribe_logs(&batch_filter).await?;
98 info!("subscribed to BatchCleared events");
99
100 let settled_filter = Filter::new()
102 .address(batch_auction_addr)
103 .event_signature(BatchAuction::OrderSettled::SIGNATURE_HASH);
104 let settled_sub = provider.subscribe_logs(&settled_filter).await?;
105 info!("subscribed to OrderSettled events");
106
107 let gtc_filter = Filter::new()
109 .address(batch_auction_addr)
110 .event_signature(BatchAuction::GtcAutoCancelled::SIGNATURE_HASH);
111 let gtc_sub = provider.subscribe_logs(>c_filter).await?;
112 info!("subscribed to GtcAutoCancelled events");
113
114 let mut mc_stream = mc_sub.into_stream();
115 let mut batch_stream = batch_sub.into_stream();
116 let mut settled_stream = settled_sub.into_stream();
117 let mut gtc_stream = gtc_sub.into_stream();
118
119 loop {
120 tokio::select! {
121 Some(log) = mc_stream.next() => {
122 if let Ok(event) = MarketFactory::MarketCreated::decode_log(&log.inner) {
123 let mut price_id = [0u8; 32];
124 price_id.copy_from_slice(&event.priceId[..]);
125 let _ = tx.send(StrikeEvent::MarketCreated {
126 market_id: event.orderBookMarketId.to::<u64>(),
127 price_id,
128 strike_price: event.strikePrice,
129 expiry_time: event.expiryTime.to::<u64>(),
130 });
131 }
132 }
133 Some(log) = batch_stream.next() => {
134 if let Ok(event) = BatchAuction::BatchCleared::decode_log(&log.inner) {
135 let _ = tx.send(StrikeEvent::BatchCleared {
136 market_id: event.marketId.to::<u64>(),
137 batch_id: event.batchId.to::<u64>(),
138 clearing_tick: event.clearingTick.to::<u64>(),
139 matched_lots: event.matchedLots.to::<u64>(),
140 });
141 }
142 }
143 Some(log) = settled_stream.next() => {
144 if let Ok(event) = BatchAuction::OrderSettled::decode_log(&log.inner) {
145 let _ = tx.send(StrikeEvent::OrderSettled {
146 order_id: event.orderId,
147 owner: event.owner,
148 filled_lots: event.filledLots.to::<u64>(),
149 });
150 }
151 }
152 Some(log) = gtc_stream.next() => {
153 if let Ok(event) = BatchAuction::GtcAutoCancelled::decode_log(&log.inner) {
154 let _ = tx.send(StrikeEvent::GtcAutoCancelled {
155 order_id: event.orderId,
156 owner: event.owner,
157 });
158 }
159 }
160 else => {
161 eyre::bail!("all event streams ended");
162 }
163 }
164 }
165}