mevshare 0.1.1

A Rust client library for subscribing to the Flashbots MEV-Share SSE event stream.
Documentation
use alloy::rpc::types::mev::mevshare::Event;
use eventsource_stream::Eventsource;
use futures::{Stream, StreamExt};
use pin_project::pin_project;
use std::{
    pin::Pin,
    task::{Context, Poll},
};
use tracing::{debug, trace, warn};

use crate::error::Error;

/// A handle to an active MEV-Share SSE connection.
///
/// Returned by [`Client::subscribe`]. Call [`into_stream`](Subscription::into_stream)
/// to begin consuming events.
pub struct Subscription {
    pub(crate) response: reqwest::Response,
}

impl Subscription {
    /// Converts this subscription into an async [`Stream`] of [`Event`]s.
    ///
    /// The stream will:
    /// - Automatically skip SSE keepalive pings sent every 15 seconds
    /// - Deserialize each event's JSON payload into a typed [`Event`]
    /// - Yield [`Error::Deserialize`] for malformed payloads without terminating the stream
    /// - Yield [`Error::Stream`] for underlying SSE transport errors
    ///
    /// # Examples
    ///
    /// ```rust,no_run
    /// use mevshare::Client;
    /// use futures::StreamExt;
    ///
    /// #[tokio::main]
    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
    ///     let sub = Client::builder().build().subscribe().await?;
    ///     let mut stream = sub.into_stream();
    ///
    ///     while let Some(event) = stream.next().await {
    ///         match event {
    ///             Ok(ev) => println!("{ev:#?}"),
    ///             Err(e) => eprintln!("stream error: {e}"),
    ///         }
    ///     }
    ///     Ok(())
    /// }
    /// ```
    pub fn into_stream(self) -> EventStream {
        let inner = self
            .response
            .bytes_stream()
            .eventsource()
            .filter_map(|result| async move {
                match result {
                    Ok(event) => {
                        let data = event.data.trim();

                        // MEV-Share sends `:ping` comments every 15s to keep the
                        // connection alive — skip them silently
                        if data.is_empty() || data == ":ping" {
                            trace!("received keepalive ping, skipping");
                            return None;
                        }

                        trace!(data = %data, "received raw SSE event");

                        match serde_json::from_str::<Event>(data) {
                            Ok(ev) => {
                                debug!(hash = ?ev.hash, "deserialized MEV-Share event");
                                Some(Ok(ev))
                            }
                            Err(e) => {
                                // don't terminate the stream on a bad payload —
                                // yield the error and keep going
                                warn!(error = ?e, data = %data, "failed to deserialize event");
                                Some(Err(Error::Deserialize(e)))
                            }
                        }
                    }
                    Err(e) => {
                        warn!(error = ?e, "SSE transport error");
                        Some(Err(Error::Stream(e.to_string())))
                    }
                }
            });

        EventStream {
            inner: Box::pin(inner),
        }
    }
}

/// An async stream of [`Event`]s from the MEV-Share SSE endpoint.
///
/// Implements [`Stream`] — use it with [`StreamExt::next`] or any other
/// [`StreamExt`] combinator.
///
/// Obtained by calling [`Subscription::into_stream`].
#[pin_project]
pub struct EventStream {
    /// The inner pinned stream of deserialized events.
    #[pin]
    inner: Pin<Box<dyn Stream<Item = Result<Event, Error>> + Send>>,
}

impl Stream for EventStream {
    type Item = Result<Event, Error>;

    /// Polls the inner SSE stream for the next [`Event`].
    ///
    /// Returns:
    /// - `Poll::Ready(Some(Ok(event)))` — a new event arrived
    /// - `Poll::Ready(Some(Err(e)))` — a non-fatal error (stream continues)
    /// - `Poll::Ready(None)` — the stream has ended
    /// - `Poll::Pending` — no event yet, waker registered
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        self.project().inner.poll_next(cx)
    }
}