use crate::common::error::AppError;
use alloy::primitives::{Address, B256, U256};
use dashmap::DashSet;
use futures::StreamExt;
use reqwest::Client;
use serde::Deserialize;
use std::sync::Arc;
use std::str::FromStr;
use tokio::sync::mpsc::UnboundedSender;
use tokio::time::{sleep, Duration};
use crate::core::strategy::StrategyWork;
#[derive(Debug, Clone)]
pub struct MevShareHint {
pub tx_hash: B256,
pub router: Address,
pub from: Option<Address>,
pub call_data: Vec<u8>,
pub value: U256,
pub gas_limit: Option<u64>,
pub max_fee_per_gas: Option<u128>,
pub max_priority_fee_per_gas: Option<u128>,
}
#[derive(Debug, Deserialize)]
struct RawEvent {
#[allow(dead_code)]
hash: Option<String>,
#[serde(default)]
txs: Option<Vec<RawTx>>,
}
#[derive(Debug, Deserialize)]
struct RawTx {
#[serde(rename = "hash")]
hash: Option<String>,
#[serde(rename = "to")]
to: Option<String>,
#[serde(rename = "from")]
from: Option<String>,
#[serde(rename = "callData")]
call_data: Option<String>,
#[serde(rename = "value")]
value: Option<String>,
#[serde(rename = "gas")]
gas: Option<String>,
#[serde(rename = "maxFeePerGas")]
max_fee_per_gas: Option<String>,
#[serde(rename = "maxPriorityFeePerGas")]
max_priority_fee_per_gas: Option<String>,
#[serde(rename = "chainId")]
chain_id: Option<String>,
}
pub struct MevShareClient {
base_url: String,
history_url: String,
client: Client,
chain_id: u64,
seen: Arc<DashSet<B256>>,
tx_sender: UnboundedSender<StrategyWork>,
history_limit: u32,
}
impl MevShareClient {
pub fn new(
base_url: String,
chain_id: u64,
tx_sender: UnboundedSender<StrategyWork>,
history_limit: u32,
) -> Self {
let history_url = format!("{}/api/v1/history", base_url.trim_end_matches('/'));
Self {
base_url,
history_url,
client: Client::builder()
.timeout(Duration::from_secs(10))
.build()
.unwrap(),
chain_id,
seen: Arc::new(DashSet::new()),
tx_sender,
history_limit,
}
}
pub async fn run(mut self) -> Result<(), AppError> {
self.backfill_history().await?;
loop {
match self.stream_once().await {
Ok(_) => {}
Err(e) => {
tracing::warn!(target: "mev_share", error=%e, "Stream error, reconnecting");
sleep(Duration::from_secs(2)).await;
}
}
}
}
async fn backfill_history(&mut self) -> Result<(), AppError> {
if self.history_limit == 0 {
return Ok(());
}
let url = format!("{}?limit={}", self.history_url, self.history_limit);
let resp = self
.client
.get(&url)
.send()
.await
.map_err(|e| AppError::Connection(format!("History request failed: {}", e)))?;
if !resp.status().is_success() {
tracing::warn!(
target: "mev_share",
status = %resp.status(),
"History endpoint returned non-success"
);
return Ok(());
}
let raw: Vec<HistoricalRecord> = match resp.json().await {
Ok(v) => v,
Err(e) => {
tracing::warn!(target: "mev_share", error=%e, "Failed to decode history response");
return Ok(());
}
};
for rec in raw {
if let Some(evt) = rec.hint {
self.handle_event(evt).await;
}
}
Ok(())
}
async fn stream_once(&mut self) -> Result<(), AppError> {
tracing::info!(target: "mev_share", url=%self.base_url, "Connecting to MEV-Share SSE");
let resp = self
.client
.get(&self.base_url)
.send()
.await
.map_err(|e| AppError::Connection(format!("SSE connect failed: {}", e)))?;
if !resp.status().is_success() {
return Err(AppError::Connection(format!(
"SSE returned status {}",
resp.status()
)));
}
let mut stream = resp.bytes_stream();
let mut buffer = String::new();
while let Some(chunk) = stream.next().await {
let chunk = chunk
.map_err(|e| AppError::Connection(format!("SSE chunk error: {}", e)))?;
buffer.push_str(&String::from_utf8_lossy(&chunk));
while let Some(idx) = buffer.find("\n\n") {
let event = buffer[..idx].to_string();
buffer = buffer[idx + 2..].to_string();
let mut data_lines = Vec::new();
for line in event.lines() {
let line = line.trim_end_matches('\r');
if line.starts_with(':') {
continue;
}
if let Some(data) = line.strip_prefix("data:") {
data_lines.push(data.trim());
}
}
if data_lines.is_empty() {
continue;
}
let data = data_lines.join("\n");
match serde_json::from_str::<RawEvent>(&data) {
Ok(evt) => self.handle_event(evt).await,
Err(e) => tracing::warn!(target: "mev_share", error=%e, "Failed to parse SSE data"),
}
}
}
Err(AppError::Connection(
"SSE stream ended unexpectedly".into(),
))
}
async fn handle_event(&self, evt: RawEvent) {
let Some(txs) = evt.txs else { return };
for tx in txs {
if let Some(hint) = self.convert_hint(tx) {
let key = hint.tx_hash;
if self.seen.insert(key) {
let _ = self.tx_sender.send(StrategyWork::MevShareHint(hint));
}
}
}
}
fn convert_hint(&self, raw: RawTx) -> Option<MevShareHint> {
let tx_hash = raw.hash.as_deref().and_then(parse_b256)?;
let router = raw.to.as_deref().and_then(parse_address)?;
let chain_ok = raw
.chain_id
.as_deref()
.and_then(parse_u64_hex)
.map(|cid| cid == self.chain_id)
.unwrap_or(true);
if !chain_ok {
return None;
}
let call_data = raw
.call_data
.as_deref()
.and_then(parse_hex_bytes)
.filter(|v| !v.is_empty())?;
let value = raw
.value
.as_deref()
.and_then(parse_u256_hex)
.unwrap_or(U256::ZERO);
let gas_limit = raw.gas.as_deref().and_then(parse_u64_hex);
let max_fee_per_gas = raw
.max_fee_per_gas
.as_deref()
.and_then(parse_u128_hex);
let max_priority_fee_per_gas = raw
.max_priority_fee_per_gas
.as_deref()
.and_then(parse_u128_hex);
let from = raw.from.as_deref().and_then(parse_address);
Some(MevShareHint {
tx_hash,
router,
from,
call_data,
value,
gas_limit,
max_fee_per_gas,
max_priority_fee_per_gas,
})
}
}
#[derive(Debug, Deserialize)]
struct HistoricalRecord {
#[serde(rename = "hint")]
hint: Option<RawEvent>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parses_null_txs_hint() {
let json = r#"{"hash":"0xabc","txs":null}"#;
let evt: RawEvent = serde_json::from_str(json).expect("parse");
assert!(evt.txs.is_none());
}
}
fn strip_0x(s: &str) -> &str {
s.strip_prefix("0x").unwrap_or(s)
}
fn parse_hex_bytes(s: &str) -> Option<Vec<u8>> {
hex::decode(strip_0x(s)).ok()
}
fn parse_b256(s: &str) -> Option<B256> {
let bytes = parse_hex_bytes(s)?;
if bytes.len() != 32 {
return None;
}
Some(B256::from_slice(&bytes))
}
fn parse_address(s: &str) -> Option<Address> {
Address::from_str(strip_0x(s)).ok()
}
fn parse_u256_hex(s: &str) -> Option<U256> {
U256::from_str_radix(strip_0x(s), 16).ok()
}
fn parse_u128_hex(s: &str) -> Option<u128> {
u128::from_str_radix(strip_0x(s), 16).ok()
}
fn parse_u64_hex(s: &str) -> Option<u64> {
u64::from_str_radix(strip_0x(s), 16).ok()
}