use futures::StreamExt;
use reqwest_eventsource::{Event, EventSource};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::debug;
#[derive(Debug, Clone)]
pub struct SseEvent {
pub event: String,
pub data: String,
}
pub async fn drive_sse(
mut es: EventSource,
tx: mpsc::UnboundedSender<SseEvent>,
cancel: CancellationToken,
) -> Result<(), String> {
loop {
tokio::select! {
_ = cancel.cancelled() => {
es.close();
return Err("cancelled".into());
}
event = es.next() => {
match event {
None => return Ok(()),
Some(Ok(Event::Open)) => {
debug!("SSE connection opened");
}
Some(Ok(Event::Message(msg))) => {
if tx.send(SseEvent {
event: msg.event,
data: msg.data,
}).is_err() {
es.close();
return Ok(());
}
}
Some(Err(e)) => {
es.close();
return Err(e.to_string());
}
}
}
}
}
}