use std::path::{Path, PathBuf};
use std::time::Duration;
use actix_web::http::header;
use actix_web::{web, HttpResponse};
use tokio::sync::broadcast;
use crate::events::change_feed::ChangeEvent;
use crate::events::journal;
pub(super) struct ReplayPlan {
pub reset_from: Option<u64>,
pub events: Vec<ChangeEvent>,
pub last_replayed: u64,
}
pub(super) fn plan_replay(events_dir: &Path, since: u64, latest_at_start: u64) -> ReplayPlan {
if since > 0 {
if let Ok(Some(oldest)) = journal::oldest_seq(events_dir) {
if oldest > since + 1 {
return ReplayPlan {
reset_from: Some(since),
events: Vec::new(),
last_replayed: latest_at_start,
};
}
}
}
let mut events = match journal::read_since(events_dir, since) {
Ok(events) => events,
Err(e) => {
tracing::error!("account feed: journal replay failed from {since}: {e}");
Vec::new()
}
};
events.retain(|ce| ce.seq > since);
let last_replayed = events.last().map(|ce| ce.seq).unwrap_or(since);
ReplayPlan {
reset_from: None,
events,
last_replayed,
}
}
pub(super) fn account_feed_response(
since: u64,
latest_at_start: u64,
events_dir: PathBuf,
mut receiver: broadcast::Receiver<std::sync::Arc<ChangeEvent>>,
) -> HttpResponse {
HttpResponse::Ok()
.append_header((header::CONTENT_TYPE, "text/event-stream; charset=utf-8"))
.append_header((header::CACHE_CONTROL, "no-cache, no-transform"))
.append_header((header::CONNECTION, "keep-alive"))
.append_header(("X-Accel-Buffering", "no"))
.streaming(async_stream::stream! {
let plan = plan_replay(&events_dir, since, latest_at_start);
if let Some(from) = plan.reset_from {
yield Ok::<_, actix_web::Error>(web::Bytes::from(reset_frame(from)));
}
let mut last_replayed = plan.last_replayed;
for ce in plan.events {
yield Ok::<_, actix_web::Error>(web::Bytes::from(change_frame(&ce)));
}
let mut heartbeat = tokio::time::interval(Duration::from_secs(15));
heartbeat.tick().await;
loop {
tokio::select! {
_ = heartbeat.tick() => {
yield Ok::<_, actix_web::Error>(web::Bytes::from(keepalive_frame()));
}
recv = receiver.recv() => {
match recv {
Ok(ce) => {
if ce.seq <= last_replayed {
continue; }
yield Ok::<_, actix_web::Error>(web::Bytes::from(change_frame(&ce)));
last_replayed = ce.seq;
}
Err(broadcast::error::RecvError::Lagged(_)) => {
if let Ok(events) = journal::read_since(&events_dir, last_replayed) {
for ce in events {
if ce.seq <= last_replayed {
continue;
}
yield Ok::<_, actix_web::Error>(web::Bytes::from(change_frame(&ce)));
last_replayed = ce.seq;
}
}
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
}
}
})
}
fn change_frame(ce: &ChangeEvent) -> String {
match serde_json::to_string(ce) {
Ok(json) => format!("id: {}\ndata: {}\n\n", ce.seq, json),
Err(_) => String::new(),
}
}
fn reset_frame(from_seq: u64) -> String {
format!("data: {{\"type\":\"feed_reset\",\"from_seq\":{from_seq}}}\n\n")
}
fn keepalive_frame() -> &'static str {
"data: [KEEPALIVE]\n\n"
}