use axum::{
body::Body,
extract::{Query, State},
response::{IntoResponse, Response},
Json,
};
use futures::stream::{self, StreamExt};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio_stream::wrappers::BroadcastStream;
use super::state::SearchAppState;
#[derive(Deserialize)]
pub struct LogsTailParams {
#[serde(default = "default_logs_tail_n")]
pub n: usize,
}
const DEFAULT_LOGS_TAIL_N: usize = 100;
pub(super) const MAX_LOGS_TAIL_N: usize = trusty_common::log_buffer::DEFAULT_LOG_CAPACITY;
fn default_logs_tail_n() -> usize {
DEFAULT_LOGS_TAIL_N
}
pub(super) async fn logs_tail_handler(
State(state): State<Arc<SearchAppState>>,
Query(params): Query<LogsTailParams>,
) -> Json<serde_json::Value> {
let n = params.n.clamp(1, MAX_LOGS_TAIL_N);
let lines = state.log_buffer.tail(n);
Json(serde_json::json!({
"lines": lines,
"total": state.log_buffer.len(),
}))
}
pub(super) async fn admin_stop_handler(
State(state): State<Arc<SearchAppState>>,
) -> Json<serde_json::Value> {
tracing::warn!("admin_stop: graceful shutdown requested via POST /admin/stop");
let _ = state.shutdown_tx.send(true);
Json(serde_json::json!({ "ok": true, "message": "shutting down" }))
}
#[derive(Debug, Deserialize, Default)]
pub(super) struct PatchConfigRequest {
#[serde(default, deserialize_with = "deserialize_optional_option_u64")]
memory_limit_mb: Option<Option<u64>>,
#[serde(default, deserialize_with = "deserialize_optional_option_u64")]
index_memory_limit_mb: Option<Option<u64>>,
}
#[derive(Debug, Serialize)]
pub(super) struct ConfigResponse {
memory_limit_mb: Option<u64>,
index_memory_limit_mb: Option<u64>,
}
fn deserialize_optional_option_u64<'de, D>(deserializer: D) -> Result<Option<Option<u64>>, D::Error>
where
D: serde::Deserializer<'de>,
{
let v = Option::<u64>::deserialize(deserializer)?;
Ok(Some(v))
}
pub(super) async fn get_config_handler(
State(_state): State<Arc<SearchAppState>>,
) -> Json<ConfigResponse> {
use crate::core::memguard::{index_memory_limit_mb, memory_limit_mb};
Json(ConfigResponse {
memory_limit_mb: memory_limit_mb(),
index_memory_limit_mb: index_memory_limit_mb(),
})
}
pub(super) async fn patch_config_handler(
State(_state): State<Arc<SearchAppState>>,
Json(req): Json<PatchConfigRequest>,
) -> Json<ConfigResponse> {
use crate::core::memguard::{
index_memory_limit_mb, memory_limit_mb, set_index_memory_limit_mb, set_memory_limit_mb,
};
let fmt = |v: Option<u64>| match v {
Some(mb) => mb.to_string(),
None => "unlimited".to_string(),
};
if let Some(new) = req.memory_limit_mb {
let before = memory_limit_mb();
set_memory_limit_mb(new);
let after = memory_limit_mb();
tracing::info!(
"config updated: memory_limit_mb {} → {}",
fmt(before),
fmt(after)
);
}
if let Some(new) = req.index_memory_limit_mb {
let before = index_memory_limit_mb();
set_index_memory_limit_mb(new);
let after = index_memory_limit_mb();
tracing::info!(
"config updated: index_memory_limit_mb {} → {}",
fmt(before),
fmt(after)
);
}
Json(ConfigResponse {
memory_limit_mb: memory_limit_mb(),
index_memory_limit_mb: index_memory_limit_mb(),
})
}
pub(super) async fn collect_status_counts(state: &SearchAppState) -> (usize, usize) {
let ids = state.registry.list();
let indexes_count = ids.len();
let mut total_chunks: usize = 0;
for id in ids {
if let Some(handle) = state.registry.get(&id) {
let indexer = handle.indexer.read().await;
let count = indexer
.corpus_arc()
.and_then(|c| c.chunk_count().ok())
.unwrap_or_else(|| indexer.chunk_count());
total_chunks = total_chunks.saturating_add(count);
}
}
(indexes_count, total_chunks)
}
pub(super) async fn status_stream_handler(
State(state): State<Arc<SearchAppState>>,
) -> impl IntoResponse {
let rx = state.events.subscribe();
let initial = stream::once(async {
Ok::<axum::body::Bytes, std::io::Error>(axum::body::Bytes::from(
"data: {\"type\":\"connected\"}\n\n",
))
});
let events = BroadcastStream::new(rx).map(|res| {
let frame = match res {
Ok(event) => match serde_json::to_string(&event) {
Ok(json) => format!("data: {json}\n\n"),
Err(e) => format!("data: {{\"type\":\"error\",\"message\":\"{e}\"}}\n\n"),
},
Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(n)) => {
format!("data: {{\"type\":\"lag\",\"skipped\":{n}}}\n\n")
}
};
Ok::<axum::body::Bytes, std::io::Error>(axum::body::Bytes::from(frame))
});
let stream = initial.chain(events);
Response::builder()
.header("Content-Type", "text/event-stream")
.header("Cache-Control", "no-cache")
.header("X-Accel-Buffering", "no")
.body(Body::from_stream(stream))
.expect("valid SSE response")
}