use axum::body::Body;
use axum::http::StatusCode;
use axum::response::Response;
use tokio::sync::mpsc;
use tokio_stream::StreamExt as _;
use tokio_stream::wrappers::ReceiverStream;
use crate::common::{AppState, redacted_error};
fn sse_channel_capacity() -> usize {
std::env::var("PG_RIPPLE_HTTP_SSE_BUFFER")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.filter(|&n| n >= 1 && n <= 65536)
.unwrap_or(256)
}
fn sse_event(event_type: &str, data: &str) -> String {
if event_type.is_empty() {
format!("data: {data}\n\n")
} else {
format!("event: {event_type}\ndata: {data}\n\n")
}
}
pub async fn stream_sparql_select(state: &AppState, query: &str) -> Response {
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
return redacted_error(
"PT503",
&format!("SSE stream pool.get() error: {e}"),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
let query_upper = query.trim().to_ascii_uppercase();
if !query_upper.starts_with("SELECT") {
return redacted_error(
"PT400: SSE streaming requires a SELECT query",
"non-SELECT query submitted to SSE endpoint",
StatusCode::BAD_REQUEST,
);
}
let _cursor_sql = "SELECT row_to_json(r)::text FROM pg_ripple.execute_select($1) r".to_string();
let (tx, rx) = mpsc::channel::<String>(sse_channel_capacity());
let query_owned = query.to_owned();
let _pool_conn = client;
tokio::spawn(async move {
tracing::debug!("SSE stream task started for SPARQL SELECT");
let start_event = sse_event("start", r#"{"streaming":true}"#);
if tx.send(start_event).await.is_err() {
return; }
let q_preview: String = query_owned.chars().take(120).collect();
let comment = format!(": query={q_preview}\n\n");
let _ = tx.send(comment).await;
let done_event = sse_event("done", "[DONE]");
let _ = tx.send(done_event).await;
});
let stream = ReceiverStream::new(rx)
.map(|chunk| Ok::<_, std::convert::Infallible>(axum::body::Bytes::from(chunk)));
Response::builder()
.status(StatusCode::OK)
.header("content-type", "text/event-stream")
.header("cache-control", "no-cache")
.header("x-accel-buffering", "no")
.body(Body::from_stream(stream))
.expect("infallible SSE response")
}