use axum::http::{HeaderMap, StatusCode};
use super::handler::RestError;
pub const SSE_CONTENT_TYPE: &str = "text/event-stream";
pub const DEFAULT_SSE_HEARTBEAT_SECONDS: u64 = 30;
#[must_use]
pub fn accepts_sse(headers: &HeaderMap) -> bool {
headers.get("accept").and_then(|v| v.to_str().ok()).is_some_and(|accept| {
accept.split(',').any(|part| part.trim().eq_ignore_ascii_case(SSE_CONTENT_TYPE))
})
}
#[must_use]
pub fn is_stream_path(relative_path: &str) -> bool {
let segments: Vec<&str> = relative_path
.trim_start_matches('/')
.split('/')
.filter(|s| !s.is_empty())
.collect();
segments.last().is_some_and(|s| *s == "stream")
}
#[must_use]
pub fn extract_stream_resource(relative_path: &str) -> Option<&str> {
let segments: Vec<&str> = relative_path
.trim_start_matches('/')
.split('/')
.filter(|s| !s.is_empty())
.collect();
if segments.len() == 2 && segments[1] == "stream" {
Some(segments[0])
} else {
None
}
}
#[must_use]
pub fn extract_last_event_id(headers: &HeaderMap) -> Option<String> {
headers.get("last-event-id").and_then(|v| v.to_str().ok()).map(String::from)
}
#[must_use]
pub fn format_sse_event(event_type: &str, event_id: &str, data: &serde_json::Value) -> String {
let data_str = serde_json::to_string(data).unwrap_or_default();
format!("event: {event_type}\nid: {event_id}\ndata: {data_str}\n\n")
}
#[must_use]
pub fn format_heartbeat() -> String {
"event: ping\ndata: \n\n".to_string()
}
#[must_use]
pub fn observers_not_available() -> RestError {
RestError {
status: StatusCode::NOT_IMPLEMENTED,
code: "NOT_IMPLEMENTED",
message: "SSE streaming requires the observers feature".to_string(),
details: None,
}
}
#[must_use]
pub fn event_kind_to_sse_type(kind: &str) -> &str {
match kind {
"INSERT" => "insert",
"UPDATE" => "update",
"DELETE" => "delete",
"CUSTOM" => "custom",
_ => "unknown",
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)] mod tests {
use axum::http::HeaderValue;
use serde_json::json;
use super::*;
#[test]
fn accepts_sse_true_for_exact_match() {
let mut headers = HeaderMap::new();
headers.insert("accept", HeaderValue::from_static("text/event-stream"));
assert!(accepts_sse(&headers));
}
#[test]
fn accepts_sse_true_in_list() {
let mut headers = HeaderMap::new();
headers.insert("accept", HeaderValue::from_static("application/json, text/event-stream"));
assert!(accepts_sse(&headers));
}
#[test]
fn accepts_sse_false_for_json() {
let mut headers = HeaderMap::new();
headers.insert("accept", HeaderValue::from_static("application/json"));
assert!(!accepts_sse(&headers));
}
#[test]
fn accepts_sse_false_when_missing() {
let headers = HeaderMap::new();
assert!(!accepts_sse(&headers));
}
#[test]
fn is_stream_path_true() {
assert!(is_stream_path("/users/stream"));
}
#[test]
fn is_stream_path_false_collection() {
assert!(!is_stream_path("/users"));
}
#[test]
fn is_stream_path_false_single() {
assert!(!is_stream_path("/users/123"));
}
#[test]
fn is_stream_path_false_nested() {
assert!(!is_stream_path("/users/123/stream/extra"));
}
#[test]
fn extract_stream_resource_users() {
assert_eq!(extract_stream_resource("/users/stream"), Some("users"));
}
#[test]
fn extract_stream_resource_orders() {
assert_eq!(extract_stream_resource("/orders/stream"), Some("orders"));
}
#[test]
fn extract_stream_resource_none_for_collection() {
assert_eq!(extract_stream_resource("/users"), None);
}
#[test]
fn extract_stream_resource_none_for_single() {
assert_eq!(extract_stream_resource("/users/123"), None);
}
#[test]
fn extract_last_event_id_present() {
let mut headers = HeaderMap::new();
headers.insert("last-event-id", HeaderValue::from_static("evt-42"));
assert_eq!(extract_last_event_id(&headers), Some("evt-42".to_string()));
}
#[test]
fn extract_last_event_id_missing() {
let headers = HeaderMap::new();
assert_eq!(extract_last_event_id(&headers), None);
}
#[test]
fn format_sse_insert_event() {
let data = json!({"id": 1, "name": "Alice"});
let output = format_sse_event("insert", "evt-1", &data);
assert!(output.starts_with("event: insert\n"));
assert!(output.contains("id: evt-1\n"));
assert!(output.contains("data: "));
assert!(output.ends_with("\n\n"));
let data_line = output.lines().find(|l| l.starts_with("data: ")).unwrap();
let json_str = data_line.strip_prefix("data: ").unwrap();
let parsed: serde_json::Value = serde_json::from_str(json_str).unwrap();
assert_eq!(parsed["name"], "Alice");
}
#[test]
fn format_sse_update_event() {
let data = json!({"id": 1, "name": "Alice Updated"});
let output = format_sse_event("update", "evt-2", &data);
assert!(output.starts_with("event: update\n"));
}
#[test]
fn format_sse_delete_event() {
let data = json!({"entity_id": "abc-123"});
let output = format_sse_event("delete", "evt-3", &data);
assert!(output.starts_with("event: delete\n"));
assert!(output.contains("\"entity_id\""));
}
#[test]
fn format_heartbeat_event() {
let output = format_heartbeat();
assert!(output.starts_with("event: ping\n"));
assert!(output.contains("data: \n"));
assert!(output.ends_with("\n\n"));
}
#[test]
fn event_kind_insert() {
assert_eq!(event_kind_to_sse_type("INSERT"), "insert");
}
#[test]
fn event_kind_update() {
assert_eq!(event_kind_to_sse_type("UPDATE"), "update");
}
#[test]
fn event_kind_delete() {
assert_eq!(event_kind_to_sse_type("DELETE"), "delete");
}
#[test]
fn event_kind_custom() {
assert_eq!(event_kind_to_sse_type("CUSTOM"), "custom");
}
#[test]
fn event_kind_unknown() {
assert_eq!(event_kind_to_sse_type("SOMETHING"), "unknown");
}
#[test]
fn observers_not_available_returns_501() {
let err = observers_not_available();
assert_eq!(err.status, StatusCode::NOT_IMPLEMENTED);
assert_eq!(err.code, "NOT_IMPLEMENTED");
}
}