use alloy::primitives::{Address, B256};
use alloy::rpc::types::{Filter, Log};
use tokio::sync::mpsc;
use crate::events::{MarketEvent, decode_log};
use crate::transport::ws::WsManager;
#[derive(Debug)]
pub struct MarketFeed {
rx: mpsc::Receiver<Log>,
perp_id: B256,
}
impl MarketFeed {
pub async fn subscribe(
ws: &WsManager,
perp_manager: Address,
beacon: Address,
perp_id: B256,
) -> crate::Result<Self> {
let filter = Filter::new().address(vec![perp_manager, beacon]);
let rx = ws.subscribe_logs(filter).await?;
tracing::info!(%perp_id, %perp_manager, %beacon, "market feed subscribed");
Ok(Self { rx, perp_id })
}
pub async fn next(&mut self) -> Option<MarketEvent> {
loop {
let log = self.rx.recv().await?;
if let Some(event) = decode_log(&log) {
match &event {
MarketEvent::PositionOpened { perp_id, .. }
| MarketEvent::NotionalAdjusted { perp_id, .. }
| MarketEvent::PositionClosed { perp_id, .. }
if *perp_id != self.perp_id =>
{
continue;
}
_ => {
tracing::trace!(perp_id = %self.perp_id, event = ?event, "market event received");
return Some(event);
}
}
}
}
}
pub fn perp_id(&self) -> B256 {
self.perp_id
}
}