use eventsource_stream::Eventsource;
use futures::StreamExt;
use talea_core::api::{ApiError, ApiResult, EventEnvelope, EventStream};
use talea_core::types::Seq;
use crate::http::{Http, decode_error};
pub(crate) fn subscribe(http: &Http, book: &str, from: Seq) -> ApiResult<EventStream> {
let mut url = http.url(&["books", book, "events"])?;
url.query_pairs_mut()
.append_pair("from", &(from - 1).to_string());
let client = http.client.clone();
let token = http.token.clone();
let retry = http.retry.clone();
Ok(Box::pin(async_stream::stream! {
let mut cursor: Option<Seq> = None;
let mut failures: u32 = 0;
'reconnect: loop {
let mut req = client.get(url.clone());
if let Some(t) = &token {
req = req.bearer_auth(t);
}
if let Some(last) = cursor {
req = req.header("Last-Event-ID", last.to_string());
}
let resp = match req.send().await {
Ok(r) => r,
Err(e) => {
failures += 1;
if failures >= retry.max_attempts {
yield Err(ApiError::Transport {
message: format!("subscribe failed after {failures} attempts: {e}"),
});
return;
}
tracing::warn!(failures, "subscribe connect failed; backing off");
tracing::warn!(failures, "subscribe rejected; backing off");
tracing::warn!(failures, "subscribe stream closed; reconnecting");
tokio::time::sleep(retry.delay_for(failures - 1, None)).await;
continue 'reconnect;
}
};
let status = resp.status();
if !status.is_success() {
let body = resp.bytes().await.unwrap_or_default();
let err = decode_error(status, &body);
let retryable = matches!(err, ApiError::Transport { .. });
yield Err(err);
if !retryable {
return; }
failures += 1;
if failures >= retry.max_attempts {
return;
}
tokio::time::sleep(retry.delay_for(failures - 1, None)).await;
continue 'reconnect;
}
let mut events = resp.bytes_stream().eventsource();
while let Some(frame) = events.next().await {
match frame {
Ok(ev) if ev.event == "error" => {
let err = serde_json::from_str::<ApiError>(&ev.data)
.unwrap_or(ApiError::Internal { message: ev.data.clone() });
yield Err(err);
}
Ok(ev) => match serde_json::from_str::<EventEnvelope>(&ev.data) {
Ok(env) => {
cursor = Some(env.seq);
failures = 0; yield Ok(env);
}
Err(e) => {
yield Err(ApiError::Transport {
message: format!("undecodable event: {e}"),
});
}
},
Err(e) => {
failures += 1;
if failures >= retry.max_attempts {
yield Err(ApiError::Transport {
message: format!(
"subscribe stream failed after {failures} attempts: {e}"
),
});
return;
}
tracing::warn!(failures, "subscribe stream dropped; reconnecting");
tokio::time::sleep(retry.delay_for(failures - 1, None)).await;
continue 'reconnect;
}
}
}
failures += 1;
if failures >= retry.max_attempts {
yield Err(ApiError::Transport {
message: format!("subscribe stream closed {failures} times"),
});
return;
}
tokio::time::sleep(retry.delay_for(failures - 1, None)).await;
}
}))
}