use crate::core::{
classifier::QueryClassifier,
embed::Embedder,
indexer::{CodeIndexer, SearchQuery},
registry::{IndexHandle, IndexId, IndexRegistry},
store::{UsearchStore, VectorStore},
};
use axum::{
body::Body,
extract::{Path, Query, State},
http::StatusCode,
response::{IntoResponse, Json, Redirect, Response},
routing::{delete, get, post},
Router,
};
use dashmap::DashMap;
use futures::stream::{self, StreamExt};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use tokio::sync::{broadcast, watch, OnceCell, RwLock};
use tokio_stream::wrappers::BroadcastStream;
use trusty_common::{ChatProvider, LocalModelConfig};
use crate::service::reindex::{spawn_reindex_with_cleanup, ReindexProgress, ReindexStatus};
#[derive(Clone, Debug, Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum DaemonEvent {
StatusChanged {
indexes: u64,
total_chunks: u64,
uptime_secs: u64,
version: String,
},
IndexRegistered { id: String },
IndexRemoved { id: String },
}
#[derive(Clone)]
pub struct SearchAppState {
pub registry: IndexRegistry,
pub reindex_progress: Arc<DashMap<IndexId, Arc<ReindexProgress>>>,
pub embedder: Option<Arc<dyn Embedder>>,
pub embedder_slot: Arc<RwLock<Option<Arc<dyn Embedder>>>>,
pub embedder_ready: watch::Receiver<bool>,
pub embedder_ready_tx: Arc<watch::Sender<bool>>,
pub daemon_port: Option<u16>,
pub openrouter_enabled: bool,
pub started_at: Instant,
pub local_model: LocalModelConfig,
pub openrouter_model: String,
pub openrouter_api_key: String,
pub chat_provider: Arc<OnceCell<Option<Arc<dyn ChatProvider>>>>,
pub events: Arc<broadcast::Sender<DaemonEvent>>,
}
impl SearchAppState {
pub fn new(registry: IndexRegistry) -> Self {
let openrouter_api_key = std::env::var("OPENROUTER_API_KEY").unwrap_or_default();
let (events_tx, _) = broadcast::channel::<DaemonEvent>(128);
let (ready_tx, ready_rx) = watch::channel(false);
Self {
registry,
reindex_progress: Arc::new(DashMap::new()),
embedder: None,
embedder_slot: Arc::new(RwLock::new(None)),
embedder_ready: ready_rx,
embedder_ready_tx: Arc::new(ready_tx),
daemon_port: None,
openrouter_enabled: !openrouter_api_key.is_empty(),
started_at: Instant::now(),
local_model: LocalModelConfig::default(),
openrouter_model: "anthropic/claude-haiku-4.5".to_string(),
openrouter_api_key,
chat_provider: Arc::new(OnceCell::new()),
events: Arc::new(events_tx),
}
}
pub fn emit(&self, event: DaemonEvent) {
let _ = self.events.send(event);
}
pub fn with_local_model(mut self, cfg: LocalModelConfig) -> Self {
self.local_model = cfg;
self
}
pub fn with_openrouter_model(mut self, model: impl Into<String>) -> Self {
self.openrouter_model = model.into();
self
}
pub fn with_openrouter_api_key(mut self, api_key: impl Into<String>) -> Self {
let api_key_str = api_key.into();
self.openrouter_enabled = !api_key_str.is_empty();
self.openrouter_api_key = api_key_str;
self
}
pub async fn chat_provider(&self) -> Option<Arc<dyn ChatProvider>> {
self.chat_provider
.get_or_init(|| async {
if self.local_model.enabled {
if let Some(mut p) =
trusty_common::auto_detect_local_provider(&self.local_model.base_url).await
{
p.model = self.local_model.model.clone();
return Some(Arc::new(p) as Arc<dyn ChatProvider>);
}
}
if !self.openrouter_api_key.is_empty() {
return Some(Arc::new(trusty_common::OpenRouterProvider::new(
self.openrouter_api_key.clone(),
self.openrouter_model.clone(),
)) as Arc<dyn ChatProvider>);
}
None
})
.await
.clone()
}
pub fn with_daemon_port(mut self, port: u16) -> Self {
self.daemon_port = Some(port);
self
}
pub fn with_embedder(mut self, embedder: Arc<dyn Embedder>) -> Self {
self.embedder = Some(Arc::clone(&embedder));
if let Ok(mut slot) = self.embedder_slot.try_write() {
*slot = Some(embedder);
}
let _ = self.embedder_ready_tx.send(true);
self
}
pub async fn install_embedder(&self, embedder: Arc<dyn Embedder>) {
let mut slot = self.embedder_slot.write().await;
*slot = Some(embedder);
drop(slot);
let _ = self.embedder_ready_tx.send(true);
}
pub async fn current_embedder(&self) -> Option<Arc<dyn Embedder>> {
let slot = self.embedder_slot.read().await;
slot.clone()
}
pub fn is_embedder_ready(&self) -> bool {
*self.embedder_ready.borrow()
}
}
#[derive(Serialize)]
struct HealthResponse {
status: &'static str,
version: &'static str,
indexes: usize,
uptime_secs: u64,
embedder: &'static str,
}
#[derive(Serialize)]
struct IndexListResponse {
indexes: Vec<String>,
}
#[derive(Deserialize)]
pub struct CreateIndexRequest {
pub id: String,
pub root_path: std::path::PathBuf,
}
#[derive(Deserialize)]
pub struct IndexFileRequest {
pub path: String,
pub content: String,
}
#[derive(Deserialize)]
pub struct RemoveFileRequest {
pub path: String,
}
pub fn build_router(state: SearchAppState) -> Router {
use crate::service::ui::{
chat_handler, list_chat_providers, ui_asset_handler, ui_index_handler,
};
let state_arc = Arc::new(state);
spawn_status_ticker(Arc::clone(&state_arc));
let router = Router::new()
.route("/", get(|| async { Redirect::permanent("/ui/") }))
.route("/health", get(health_handler))
.route("/status/stream", get(status_stream_handler))
.route(
"/indexes",
get(list_indexes_handler).post(create_index_handler),
)
.route("/indexes/{id}", delete(delete_index_handler))
.route("/ui", get(|| async { Redirect::permanent("/ui/") }))
.route("/ui/", get(ui_index_handler))
.route("/ui/{*path}", get(ui_asset_handler))
.route("/chat", post(chat_handler))
.route("/api/chat/providers", get(list_chat_providers))
.route("/search", post(global_search_handler))
.route("/indexes/{id}/search", post(search_handler))
.route("/indexes/{id}/search_similar", post(search_similar_handler))
.route("/indexes/{id}/status", get(index_status_handler))
.route("/indexes/{id}/index-file", post(index_file_handler))
.route("/indexes/{id}/remove-file", post(remove_file_handler))
.route("/indexes/{id}/reindex", post(reindex_handler))
.route("/indexes/{id}/reindex/stream", get(reindex_stream_handler))
.route("/indexes/{id}/chunks", get(get_index_chunks_handler))
.with_state(Arc::clone(&state_arc));
trusty_common::server::with_standard_middleware(router)
}
fn spawn_status_ticker(state: Arc<SearchAppState>) {
let weak = Arc::downgrade(&state);
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(2));
interval.tick().await;
loop {
interval.tick().await;
let Some(state) = weak.upgrade() else {
break;
};
let (indexes, total_chunks) = collect_status_counts(&state).await;
state.emit(DaemonEvent::StatusChanged {
indexes: indexes as u64,
total_chunks: total_chunks as u64,
uptime_secs: state.started_at.elapsed().as_secs(),
version: env!("CARGO_PKG_VERSION").to_string(),
});
}
});
}
async fn health_handler(State(state): State<Arc<SearchAppState>>) -> Json<HealthResponse> {
Json(HealthResponse {
status: "ok",
version: env!("CARGO_PKG_VERSION"),
indexes: state.registry.list().len(),
uptime_secs: state.started_at.elapsed().as_secs(),
embedder: if state.is_embedder_ready() {
"ready"
} else if state.embedder.is_some()
|| state
.embedder_slot
.try_read()
.map(|g| g.is_some())
.unwrap_or(false)
{
"ready"
} else {
"initializing"
},
})
}
async fn collect_status_counts(state: &SearchAppState) -> (usize, usize) {
let ids = state.registry.list();
let indexes_count = ids.len();
let mut total_chunks: usize = 0;
for id in ids {
if let Some(handle) = state.registry.get(&id) {
let indexer = handle.indexer.read().await;
total_chunks = total_chunks.saturating_add(indexer.chunk_count());
}
}
(indexes_count, total_chunks)
}
async fn status_stream_handler(State(state): State<Arc<SearchAppState>>) -> impl IntoResponse {
let rx = state.events.subscribe();
let initial = stream::once(async {
Ok::<axum::body::Bytes, std::io::Error>(axum::body::Bytes::from(
"data: {\"type\":\"connected\"}\n\n",
))
});
let events = BroadcastStream::new(rx).map(|res| {
let frame = match res {
Ok(event) => match serde_json::to_string(&event) {
Ok(json) => format!("data: {json}\n\n"),
Err(e) => format!("data: {{\"type\":\"error\",\"message\":\"{e}\"}}\n\n"),
},
Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(n)) => {
format!("data: {{\"type\":\"lag\",\"skipped\":{n}}}\n\n")
}
};
Ok::<axum::body::Bytes, std::io::Error>(axum::body::Bytes::from(frame))
});
let stream = initial.chain(events);
Response::builder()
.header("Content-Type", "text/event-stream")
.header("Cache-Control", "no-cache")
.header("X-Accel-Buffering", "no")
.body(Body::from_stream(stream))
.expect("valid SSE response")
}
async fn list_indexes_handler(State(state): State<Arc<SearchAppState>>) -> Json<IndexListResponse> {
Json(IndexListResponse {
indexes: state.registry.list().into_iter().map(|id| id.0).collect(),
})
}
async fn create_index_handler(
State(state): State<Arc<SearchAppState>>,
Json(req): Json<CreateIndexRequest>,
) -> Response {
let id = IndexId::new(req.id.clone());
if state.registry.get(&id).is_some() {
return Json(serde_json::json!({
"id": req.id,
"created": false,
"reason": "already exists",
}))
.into_response();
}
let embedder = state.current_embedder().await;
if embedder.is_none() {
return embedder_initializing_response();
}
let embedder = embedder.unwrap();
let mut indexer = CodeIndexer::new(req.id.clone(), req.root_path.clone());
{
let embedder = &embedder;
let dim = embedder.dimension();
match UsearchStore::new(dim) {
Ok(store) => {
let store: Arc<dyn VectorStore> = Arc::new(store);
indexer = indexer.with_components(Arc::clone(embedder), store);
}
Err(e) => {
tracing::error!(
"failed to allocate UsearchStore for index {}: {e} \
— index will run in BM25-only mode",
req.id
);
}
}
}
let handle = IndexHandle {
id: id.clone(),
indexer: Arc::new(tokio::sync::RwLock::new(indexer)),
root_path: req.root_path,
};
state.registry.register(handle);
state.emit(DaemonEvent::IndexRegistered { id: req.id.clone() });
Json(serde_json::json!({ "id": req.id, "created": true })).into_response()
}
fn embedder_initializing_response() -> Response {
(
StatusCode::SERVICE_UNAVAILABLE,
Json(serde_json::json!({
"error": "embedder initializing, retry in a few seconds"
})),
)
.into_response()
}
async fn delete_index_handler(
State(state): State<Arc<SearchAppState>>,
Path(id): Path<String>,
) -> Json<serde_json::Value> {
let index_id = IndexId::new(id.clone());
let removed = state.registry.unregister(&index_id);
state.reindex_progress.remove(&index_id);
if removed {
state.emit(DaemonEvent::IndexRemoved { id: id.clone() });
}
Json(serde_json::json!({ "id": id, "removed": removed }))
}
async fn search_handler(
State(state): State<Arc<SearchAppState>>,
Path(id): Path<String>,
Json(query): Json<SearchQuery>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let index_id = IndexId::new(id);
let handle = state.registry.get(&index_id).ok_or(StatusCode::NOT_FOUND)?;
let intent = QueryClassifier::classify(&query.text);
let started = std::time::Instant::now();
let indexer = handle.indexer.read().await;
let results = indexer
.search(&query)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let latency_ms = started.elapsed().as_millis() as u64;
Ok(Json(serde_json::json!({
"results": results,
"intent": format!("{:?}", intent),
"latency_ms": latency_ms,
})))
}
#[derive(Deserialize)]
pub struct GlobalSearchRequest {
pub query: String,
#[serde(default = "default_global_top_k")]
pub top_k: usize,
#[serde(default)]
pub full_content: bool,
}
fn default_global_top_k() -> usize {
10
}
async fn global_search_handler(
State(state): State<Arc<SearchAppState>>,
Json(req): Json<GlobalSearchRequest>,
) -> Result<Json<serde_json::Value>, StatusCode> {
use crate::core::search::rrf::{rrf_fuse, RRF_K};
let index_ids = state.registry.list();
let total_indexes = index_ids.len();
if index_ids.is_empty() {
return Ok(Json(serde_json::json!({
"results": Vec::<crate::core::indexer::CodeChunk>::new(),
"indexes_searched": Vec::<String>::new(),
"total_indexes": 0_usize,
"latency_ms": 0_u64,
"intent": format!("{:?}", QueryClassifier::classify(&req.query)),
})));
}
let started = std::time::Instant::now();
let intent = QueryClassifier::classify(&req.query);
let per_index_query = SearchQuery {
text: req.query.clone(),
top_k: req.top_k,
expand_graph: true,
compact: !req.full_content,
};
let registry = state.registry.clone();
let futures = index_ids.into_iter().map(|id| {
let registry = registry.clone();
let query = per_index_query.clone();
async move {
let handle = registry.get(&id)?;
let indexer = handle.indexer.read().await;
match indexer.search(&query).await {
Ok(results) => Some((id, results)),
Err(e) => {
tracing::warn!("global search: index {} errored: {e}", id);
None
}
}
}
});
let per_index_results: Vec<(IndexId, Vec<crate::core::indexer::CodeChunk>)> =
futures::future::join_all(futures)
.await
.into_iter()
.flatten()
.collect();
let mut chunk_lookup: std::collections::HashMap<String, crate::core::indexer::CodeChunk> =
std::collections::HashMap::new();
let mut lanes: Vec<Vec<(String, f32)>> = Vec::with_capacity(per_index_results.len());
let mut indexes_searched: Vec<String> = Vec::with_capacity(per_index_results.len());
for (id, results) in per_index_results {
indexes_searched.push(id.0.clone());
let mut lane: Vec<(String, f32)> = Vec::with_capacity(results.len());
for mut chunk in results {
let namespaced = format!("{}::{}", id.0, chunk.id);
chunk.index_id = Some(id.0.clone());
lane.push((namespaced.clone(), chunk.score));
chunk_lookup.insert(namespaced, chunk);
}
lanes.push(lane);
}
let mut fused: Vec<(String, f32)> = Vec::new();
let oversample = req.top_k.saturating_mul(4).max(req.top_k).max(10);
for lane in lanes {
fused = rrf_fuse(&fused, &lane, 1.0, 1.0, RRF_K, oversample);
}
fused.truncate(req.top_k);
let results: Vec<crate::core::indexer::CodeChunk> = fused
.into_iter()
.filter_map(|(id, fused_score)| {
let mut chunk = chunk_lookup.remove(&id)?;
chunk.score = fused_score;
Some(chunk)
})
.collect();
let latency_ms = started.elapsed().as_millis() as u64;
Ok(Json(serde_json::json!({
"results": results,
"indexes_searched": indexes_searched,
"total_indexes": total_indexes,
"latency_ms": latency_ms,
"intent": format!("{:?}", intent),
})))
}
#[derive(Deserialize)]
pub struct SearchSimilarRequest {
pub file: String,
#[serde(default)]
pub function: Option<String>,
#[serde(default = "default_similar_top_k")]
pub top_k: usize,
}
fn default_similar_top_k() -> usize {
10
}
async fn search_similar_handler(
State(state): State<Arc<SearchAppState>>,
Path(id): Path<String>,
Json(req): Json<SearchSimilarRequest>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let index_id = IndexId::new(id);
let handle = state.registry.get(&index_id).ok_or(StatusCode::NOT_FOUND)?;
let started = std::time::Instant::now();
let indexer = handle.indexer.read().await;
let chunk_id = indexer
.find_chunk_id(&req.file, req.function.as_deref())
.await
.ok_or(StatusCode::NOT_FOUND)?;
let embedding = indexer
.get_embedding(&chunk_id)
.ok_or(StatusCode::NOT_FOUND)?;
let results = indexer
.similar_by_embedding(&embedding, req.top_k, Some(&chunk_id))
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let latency_ms = started.elapsed().as_millis() as u64;
Ok(Json(serde_json::json!({
"results": results,
"seed_chunk_id": chunk_id,
"latency_ms": latency_ms,
})))
}
async fn index_status_handler(
State(state): State<Arc<SearchAppState>>,
Path(id): Path<String>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let index_id = IndexId::new(id);
let handle = state.registry.get(&index_id).ok_or(StatusCode::NOT_FOUND)?;
let indexer = handle.indexer.read().await;
Ok(Json(serde_json::json!({
"index_id": index_id.0,
"root_path": handle.root_path,
"chunk_count": indexer.chunk_count(),
})))
}
async fn index_file_handler(
State(state): State<Arc<SearchAppState>>,
Path(id): Path<String>,
Json(req): Json<IndexFileRequest>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let index_id = IndexId::new(id);
let handle = state.registry.get(&index_id).ok_or(StatusCode::NOT_FOUND)?;
let indexer = handle.indexer.read().await;
indexer
.index_file(&req.path, &req.content)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(serde_json::json!({
"index_id": index_id.0,
"path": req.path,
"indexed": true,
})))
}
async fn remove_file_handler(
State(state): State<Arc<SearchAppState>>,
Path(id): Path<String>,
Json(req): Json<RemoveFileRequest>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let index_id = IndexId::new(id);
let handle = state.registry.get(&index_id).ok_or(StatusCode::NOT_FOUND)?;
let indexer = handle.indexer.read().await;
let removed = indexer
.remove_file(&req.path)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(serde_json::json!({
"index_id": index_id.0,
"path": req.path,
"removed_chunks": removed,
})))
}
#[derive(Deserialize)]
pub struct ChunksParams {
#[serde(default)]
pub offset: usize,
#[serde(default = "default_chunks_limit")]
pub limit: usize,
}
fn default_chunks_limit() -> usize {
100
}
const MAX_CHUNKS_LIMIT: usize = 1_000;
async fn get_index_chunks_handler(
State(state): State<Arc<SearchAppState>>,
Path(id): Path<String>,
Query(params): Query<ChunksParams>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let index_id = IndexId::new(id);
let handle = state.registry.get(&index_id).ok_or(StatusCode::NOT_FOUND)?;
let limit = params.limit.min(MAX_CHUNKS_LIMIT);
let indexer = handle.indexer.read().await;
let (total, chunks) = indexer.enumerate_chunks(params.offset, limit).await;
Ok(Json(serde_json::json!({
"index_id": index_id.0,
"total": total,
"offset": params.offset,
"limit": limit,
"chunks": chunks,
})))
}
#[derive(Deserialize, Default)]
pub struct ReindexRequest {
#[serde(default)]
pub root_path: Option<std::path::PathBuf>,
#[serde(default)]
pub force: Option<bool>,
}
async fn reindex_handler(
State(state): State<Arc<SearchAppState>>,
Path(id): Path<String>,
body: Option<Json<ReindexRequest>>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let index_id = IndexId::new(id.clone());
let mut handle = state.registry.get(&index_id).ok_or(StatusCode::NOT_FOUND)?;
let mut force = false;
if let Some(Json(req)) = body {
force = req.force.unwrap_or(false);
if let Some(new_root) = req.root_path {
if handle.root_path.as_os_str().is_empty() || handle.root_path != new_root {
let indexer = Arc::clone(&handle.indexer);
let new_handle = IndexHandle {
id: index_id.clone(),
indexer,
root_path: new_root,
};
handle = state.registry.register(new_handle);
}
}
}
let progress = Arc::new(ReindexProgress::new());
state
.reindex_progress
.insert(index_id.clone(), Arc::clone(&progress));
spawn_reindex_with_cleanup(
handle,
progress,
force,
Some(Arc::clone(&state.reindex_progress)),
);
Ok(Json(serde_json::json!({
"index_id": index_id.0,
"queued": true,
"stream_url": format!("/indexes/{}/reindex/stream", index_id.0),
})))
}
async fn reindex_stream_handler(
State(state): State<Arc<SearchAppState>>,
Path(id): Path<String>,
) -> Result<Response, StatusCode> {
let index_id = IndexId::new(id);
let progress = state
.reindex_progress
.get(&index_id)
.map(|r| Arc::clone(r.value()))
.ok_or(StatusCode::NOT_FOUND)?;
let replay = progress.events.lock().await.clone();
let initial_status = progress.status.load();
let rx = progress.sender.subscribe();
fn frame(line: String) -> Result<axum::body::Bytes, std::io::Error> {
Ok(axum::body::Bytes::from(format!("data: {line}\n\n")))
}
let replay_stream = stream::iter(replay).map(frame);
let body = if initial_status != ReindexStatus::Running {
Body::from_stream(replay_stream)
} else {
let live = BroadcastStream::new(rx).map(|res| match res {
Ok(line) => frame(line),
Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(n)) => Ok(
axum::body::Bytes::from(format!("data: {{\"type\":\"lag\",\"skipped\":{n}}}\n\n")),
),
});
Body::from_stream(replay_stream.chain(live))
};
Ok(Response::builder()
.header("Content-Type", "text/event-stream")
.header("Cache-Control", "no-cache")
.header("X-Accel-Buffering", "no")
.body(body)
.expect("valid SSE response"))
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn health_handler_reports_indexes_and_uptime() {
use crate::core::{
indexer::CodeIndexer,
registry::{IndexHandle, IndexId, IndexRegistry},
};
use std::sync::Arc;
use tokio::sync::RwLock;
let registry = IndexRegistry::new();
let id = IndexId::new("health-test");
registry.register(IndexHandle {
id: id.clone(),
indexer: Arc::new(RwLock::new(CodeIndexer::new(
"health-test",
"/tmp/health-test",
))),
root_path: "/tmp/health-test".into(),
});
let state = Arc::new(SearchAppState::new(registry));
let Json(resp) = health_handler(State(state)).await;
assert_eq!(resp.status, "ok");
assert_eq!(resp.version, env!("CARGO_PKG_VERSION"));
assert_eq!(resp.indexes, 1);
let _ = resp.uptime_secs;
assert_eq!(resp.embedder, "initializing");
}
#[tokio::test]
async fn global_search_fans_out_and_merges() {
use crate::core::{
indexer::CodeIndexer,
registry::{IndexHandle, IndexId, IndexRegistry},
};
use std::sync::Arc;
use tokio::sync::RwLock;
let registry = IndexRegistry::new();
for name in ["proj-a", "proj-b"] {
let id = IndexId::new(name);
let indexer = CodeIndexer::new(name, format!("/tmp/{name}"));
indexer
.index_file(
&format!("{name}/lib.rs"),
&format!("fn alpha_{name}() {{ println!(\"alpha hit\"); }}"),
)
.await
.expect("index_file ok");
registry.register(IndexHandle {
id: id.clone(),
indexer: Arc::new(RwLock::new(indexer)),
root_path: format!("/tmp/{name}").into(),
});
}
let state = Arc::new(SearchAppState::new(registry));
let Json(value) = global_search_handler(
State(state),
Json(GlobalSearchRequest {
query: "alpha".into(),
top_k: 10,
full_content: false,
}),
)
.await
.expect("handler ok");
let total = value["total_indexes"].as_u64().expect("total_indexes");
assert_eq!(total, 2, "both indexes counted");
let searched: Vec<String> = value["indexes_searched"]
.as_array()
.expect("indexes_searched array")
.iter()
.filter_map(|v| v.as_str().map(str::to_owned))
.collect();
assert_eq!(searched.len(), 2);
assert!(searched.contains(&"proj-a".to_string()));
assert!(searched.contains(&"proj-b".to_string()));
let results = value["results"].as_array().expect("results array");
assert!(!results.is_empty(), "expected at least one hit");
let mut from_a = false;
let mut from_b = false;
for r in results {
let idx = r["index_id"]
.as_str()
.expect("each result must be tagged with index_id");
assert!(
idx == "proj-a" || idx == "proj-b",
"unexpected index_id: {idx}"
);
from_a |= idx == "proj-a";
from_b |= idx == "proj-b";
}
assert!(from_a, "expected a result tagged with proj-a");
assert!(from_b, "expected a result tagged with proj-b");
}
#[tokio::test]
async fn global_search_empty_registry_returns_empty_results() {
use crate::core::registry::IndexRegistry;
let state = Arc::new(SearchAppState::new(IndexRegistry::new()));
let Json(value) = global_search_handler(
State(state),
Json(GlobalSearchRequest {
query: "anything".into(),
top_k: 5,
full_content: false,
}),
)
.await
.expect("handler ok");
assert_eq!(value["total_indexes"].as_u64(), Some(0));
assert!(value["results"].as_array().unwrap().is_empty());
assert!(value["indexes_searched"].as_array().unwrap().is_empty());
}
}