use std::sync::Arc;
use bytes::Bytes;
use fraiseql_core::{
db::traits::DatabaseAdapter,
runtime::{Executor, QueryMatch},
security::SecurityContext,
};
use crate::routes::rest::handler::RestError;
pub(super) struct StreamState<A: DatabaseAdapter> {
pub executor: Arc<Executor<A>>,
pub query_name: String,
pub query_match: QueryMatch,
pub variables: serde_json::Value,
pub security_ctx: Option<SecurityContext>,
pub batch_size: u64,
pub offset: u64,
pub done: bool,
}
pub(super) async fn fetch_and_serialize_batch<A: DatabaseAdapter>(
state: &mut StreamState<A>,
) -> Result<Option<Bytes>, Bytes> {
let mut batch_vars = state.variables.clone();
if let Some(obj) = batch_vars.as_object_mut() {
obj.insert("limit".to_string(), serde_json::json!(state.batch_size));
if state.offset > 0 {
obj.insert("offset".to_string(), serde_json::json!(state.offset));
}
}
let vars_ref = if batch_vars.as_object().is_none_or(|m| m.is_empty()) {
None
} else {
Some(&batch_vars)
};
let result_value = match state
.executor
.execute_query_direct(&state.query_match, vars_ref, state.security_ctx.as_ref())
.await
{
Ok(r) => r,
Err(e) => {
state.done = true;
return Err(error_ndjson_line(&e.to_string()));
},
};
let rows = match extract_rows(&result_value, &state.query_name) {
Ok(r) => r,
Err(e) => {
state.done = true;
return Err(error_ndjson_line(&e.message));
},
};
if rows.is_empty() {
state.done = true;
return Ok(None);
}
let mut ndjson_bytes = Vec::new();
for row in &rows {
match serde_json::to_vec(row) {
Ok(mut line) => {
line.push(b'\n');
ndjson_bytes.extend_from_slice(&line);
},
Err(e) => {
state.done = true;
ndjson_bytes.extend_from_slice(&error_ndjson_line(&e.to_string()));
return Ok(Some(Bytes::from(ndjson_bytes)));
},
}
}
#[allow(clippy::cast_possible_truncation)] let row_count = rows.len() as u64;
if row_count < state.batch_size {
state.done = true;
} else {
state.offset += state.batch_size;
}
Ok(Some(Bytes::from(ndjson_bytes)))
}
pub(super) fn error_ndjson_line(message: &str) -> Bytes {
let escaped = serde_json::to_string(message).unwrap_or_else(|_| format!("\"{message}\""));
Bytes::from(format!("{{\"error\":{escaped}}}\n"))
}
pub(super) fn extract_rows(
result: &serde_json::Value,
query_name: &str,
) -> Result<Vec<serde_json::Value>, RestError> {
let data = result
.get("data")
.and_then(|d| d.get(query_name))
.ok_or_else(|| RestError::internal("Missing data in query result"))?;
match data {
serde_json::Value::Array(arr) => Ok(arr.clone()),
other => Ok(vec![other.clone()]),
}
}