use alloy::rpc::types::mev::mevshare::Event;
use eventsource_stream::Eventsource;
use futures::{Stream, StreamExt};
use pin_project::pin_project;
use std::{
pin::Pin,
task::{Context, Poll},
};
use tracing::{debug, trace, warn};
use crate::error::Error;
pub struct Subscription {
pub(crate) response: reqwest::Response,
}
impl Subscription {
pub fn into_stream(self) -> EventStream {
let inner = self
.response
.bytes_stream()
.eventsource()
.filter_map(|result| async move {
match result {
Ok(event) => {
let data = event.data.trim();
if data.is_empty() || data == ":ping" {
trace!("received keepalive ping, skipping");
return None;
}
trace!(data = %data, "received raw SSE event");
match serde_json::from_str::<Event>(data) {
Ok(ev) => {
debug!(hash = ?ev.hash, "deserialized MEV-Share event");
Some(Ok(ev))
}
Err(e) => {
warn!(error = ?e, data = %data, "failed to deserialize event");
Some(Err(Error::Deserialize(e)))
}
}
}
Err(e) => {
warn!(error = ?e, "SSE transport error");
Some(Err(Error::Stream(e.to_string())))
}
}
});
EventStream {
inner: Box::pin(inner),
}
}
}
#[pin_project]
pub struct EventStream {
#[pin]
inner: Pin<Box<dyn Stream<Item = Result<Event, Error>> + Send>>,
}
impl Stream for EventStream {
type Item = Result<Event, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().inner.poll_next(cx)
}
}