use std::time::Duration;
use reqwest_eventsource::{Event, EventSource};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use crate::services::types::ArcSseEvent;
pub struct ArcSseClientOptions {
pub base_url: String,
pub callback_token: String,
pub arc_api_key: Option<String>,
pub last_event_id: Option<String>,
}
pub type LastEventIdChangedCallback = Box<dyn Fn(&str) + Send + Sync>;
pub struct ArcSseClient {
base_url: String,
callback_token: String,
arc_api_key: Option<String>,
last_event_id: Option<String>,
on_last_event_id_changed: Option<LastEventIdChangedCallback>,
event_tx: mpsc::Sender<ArcSseEvent>,
cancel_token: CancellationToken,
}
impl ArcSseClient {
pub fn new(options: ArcSseClientOptions) -> (Self, mpsc::Receiver<ArcSseEvent>) {
let (tx, rx) = mpsc::channel(100);
let client = Self {
base_url: options.base_url.trim_end_matches('/').to_string(),
callback_token: options.callback_token,
arc_api_key: options.arc_api_key,
last_event_id: options.last_event_id,
on_last_event_id_changed: None,
event_tx: tx,
cancel_token: CancellationToken::new(),
};
(client, rx)
}
pub fn set_on_last_event_id_changed(&mut self, cb: LastEventIdChangedCallback) {
self.on_last_event_id_changed = Some(cb);
}
pub fn last_event_id(&self) -> Option<&str> {
self.last_event_id.as_deref()
}
pub fn close(&self) {
self.cancel_token.cancel();
}
pub async fn connect(&mut self) {
let url = format!("{}/v1/tx/status/stream", self.base_url);
let mut backoff_secs: u64 = 1;
#[allow(unused_assignments)]
let max_backoff_secs: u64 = 30;
loop {
if self.cancel_token.is_cancelled() {
tracing::info!("[ArcSSE] Connection cancelled");
break;
}
let mut request = reqwest::Client::new()
.get(&url)
.header("x-callback-token", &self.callback_token);
if let Some(ref api_key) = self.arc_api_key {
request = request.header("Authorization", format!("Bearer {}", api_key));
}
if let Some(ref last_id) = self.last_event_id {
request = request.header("Last-Event-ID", last_id.as_str());
}
tracing::info!(
"[ArcSSE] Connecting to {} (Last-Event-ID: {:?})",
url,
self.last_event_id
);
let mut es = EventSource::new(request).expect("Failed to create EventSource");
loop {
use futures::StreamExt;
tokio::select! {
_ = self.cancel_token.cancelled() => {
tracing::info!("[ArcSSE] Connection cancelled during event loop");
es.close();
return;
}
event = es.next() => {
match event {
Some(Ok(Event::Open)) => {
tracing::info!("[ArcSSE] Connected");
backoff_secs = 1;
}
Some(Ok(Event::Message(msg))) => {
if msg.event == "status" {
match serde_json::from_str::<ArcSseEvent>(&msg.data) {
Ok(sse_event) => {
tracing::debug!(
"[ArcSSE] Event: txid={} status={}",
sse_event.txid,
sse_event.tx_status
);
if !msg.id.is_empty() {
self.last_event_id = Some(msg.id.clone());
if let Some(ref cb) = self.on_last_event_id_changed {
cb(&msg.id);
}
}
if self.event_tx.try_send(sse_event).is_err() {
tracing::warn!(
"[ArcSSE] Event channel full, dropping event"
);
}
}
Err(_e) => {
tracing::warn!(
"[ArcSSE] Malformed event data: {}",
&msg.data[..msg.data.len().min(200)]
);
}
}
}
}
Some(Err(e)) => {
tracing::warn!("[ArcSSE] Error: {}", e);
es.close();
break;
}
None => {
tracing::info!("[ArcSSE] Stream ended");
break;
}
}
}
}
}
if self.cancel_token.is_cancelled() {
break;
}
tracing::info!("[ArcSSE] Reconnecting in {}s (backoff)", backoff_secs);
tokio::select! {
_ = self.cancel_token.cancelled() => {
break;
}
_ = tokio::time::sleep(Duration::from_secs(backoff_secs)) => {}
}
backoff_secs = (backoff_secs * 2).min(max_backoff_secs);
}
}
}