use std::time::Duration;
use serde::Deserialize;
use crate::monitor::dashboard::{IndexRow, SearchData};
pub const DEFAULT_SEARCH_URL: &str = "http://127.0.0.1:7878";
const REQUEST_TIMEOUT: Duration = Duration::from_secs(3);
pub fn resolve_search_url() -> String {
match crate::read_daemon_addr("trusty-search") {
Ok(Some(addr)) => normalize_url(&addr),
_ => DEFAULT_SEARCH_URL.to_string(),
}
}
pub fn normalize_url(raw: &str) -> String {
if raw.starts_with("http://") || raw.starts_with("https://") {
raw.to_string()
} else {
format!("http://{raw}")
}
}
#[derive(Debug, Deserialize)]
struct HealthWire {
version: String,
#[serde(default)]
uptime_secs: u64,
}
#[derive(Debug, Deserialize)]
struct IndexListWire {
#[serde(default)]
indexes: Vec<String>,
}
#[derive(Debug, Deserialize)]
struct IndexStatusWire {
#[serde(default)]
root_path: String,
#[serde(default)]
chunk_count: u64,
}
#[derive(Debug, Clone)]
pub struct SearchClient {
base: String,
http: reqwest::Client,
}
impl SearchClient {
pub fn new(base: impl Into<String>) -> Self {
let http = reqwest::Client::builder()
.timeout(REQUEST_TIMEOUT)
.build()
.unwrap_or_default();
Self {
base: base.into(),
http,
}
}
pub fn base_url(&self) -> &str {
&self.base
}
pub fn set_base_url(&mut self, base: impl Into<String>) {
self.base = base.into();
}
pub async fn fetch_all(&self) -> anyhow::Result<SearchData> {
let health: HealthWire = self
.http
.get(format!("{}/health", self.base))
.send()
.await?
.error_for_status()?
.json()
.await?;
let list: IndexListWire = self
.http
.get(format!("{}/indexes", self.base))
.send()
.await?
.error_for_status()?
.json()
.await?;
let mut indexes = Vec::with_capacity(list.indexes.len());
for id in list.indexes {
let row = match self.index_status(&id).await {
Ok((root_path, chunk_count)) => IndexRow {
id,
chunk_count,
root_path,
},
Err(e) => {
tracing::warn!("index status probe failed for {id}: {e}");
IndexRow {
id,
chunk_count: 0,
root_path: String::new(),
}
}
};
indexes.push(row);
}
indexes.sort_by(|a, b| a.id.cmp(&b.id));
Ok(SearchData {
version: health.version,
uptime_secs: health.uptime_secs,
indexes,
})
}
async fn index_status(&self, id: &str) -> anyhow::Result<(String, u64)> {
let status: IndexStatusWire = self
.http
.get(format!("{}/indexes/{id}/status", self.base))
.send()
.await?
.error_for_status()?
.json()
.await?;
Ok((status.root_path, status.chunk_count))
}
pub async fn reindex(&self, id: &str) -> anyhow::Result<()> {
self.http
.post(format!("{}/indexes/{id}/reindex", self.base))
.json(&serde_json::json!({}))
.send()
.await?
.error_for_status()?;
Ok(())
}
pub async fn search(
&self,
id: &str,
query: &str,
top_k: usize,
) -> anyhow::Result<Vec<SearchHit>> {
let raw: serde_json::Value = self
.http
.post(format!("{}/indexes/{id}/search", self.base))
.json(&serde_json::json!({ "text": query, "top_k": top_k }))
.send()
.await?
.error_for_status()?
.json()
.await?;
Ok(parse_search_hits(&raw))
}
pub async fn reindex_stream(&self, id: &str, tx: tokio::sync::mpsc::Sender<ReindexEvent>) {
if let Err(e) = self.reindex_stream_inner(id, &tx).await {
let _ = tx.send(ReindexEvent::Failed(e.to_string())).await;
}
}
async fn reindex_stream_inner(
&self,
id: &str,
tx: &tokio::sync::mpsc::Sender<ReindexEvent>,
) -> anyhow::Result<()> {
use futures_util::StreamExt;
let kickoff: serde_json::Value = self
.http
.post(format!("{}/indexes/{id}/reindex", self.base))
.json(&serde_json::json!({}))
.send()
.await?
.error_for_status()?
.json()
.await
.unwrap_or_else(|_| serde_json::json!({}));
let stream_path = kickoff
.get("stream_url")
.and_then(|v| v.as_str())
.map(str::to_string)
.unwrap_or_else(|| format!("/indexes/{id}/reindex/stream"));
let sse = reqwest::Client::builder()
.connect_timeout(Duration::from_secs(5))
.build()?;
let resp = sse
.get(format!("{}{stream_path}", self.base))
.send()
.await?
.error_for_status()?;
let mut bytes = resp.bytes_stream();
let mut buf = String::new();
while let Some(chunk) = bytes.next().await {
let chunk = chunk?;
buf.push_str(&String::from_utf8_lossy(&chunk));
while let Some(nl) = buf.find('\n') {
let line = buf[..nl].trim_end_matches('\r').to_string();
buf.drain(..=nl);
let Some(payload) = line.strip_prefix("data:") else {
continue;
};
let payload = payload.trim();
if payload.is_empty() {
continue;
}
if let Ok(value) = serde_json::from_str::<serde_json::Value>(payload) {
let event = parse_reindex_event(&value);
let terminal = matches!(event, ReindexEvent::Complete { .. });
if tx.send(event).await.is_err() {
return Ok(()); }
if terminal {
return Ok(());
}
}
}
}
Ok(())
}
}
#[derive(Debug, Clone, Default, PartialEq)]
pub struct SearchHit {
pub file: String,
pub line: usize,
pub snippet: String,
}
pub fn parse_search_hits(raw: &serde_json::Value) -> Vec<SearchHit> {
let Some(results) = raw.get("results").and_then(|v| v.as_array()) else {
return Vec::new();
};
results
.iter()
.map(|item| {
let file = item
.get("file")
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string();
let line = item.get("start_line").and_then(|v| v.as_u64()).unwrap_or(0) as usize;
let snippet = item
.get("compact_snippet")
.and_then(|v| v.as_str())
.or_else(|| item.get("content").and_then(|v| v.as_str()))
.unwrap_or_default()
.lines()
.next()
.unwrap_or_default()
.trim()
.to_string();
SearchHit {
file,
line,
snippet,
}
})
.collect()
}
#[derive(Debug, Clone, PartialEq)]
pub enum ReindexEvent {
Started {
total_files: u64,
},
Progress {
indexed: u64,
total_files: u64,
},
Complete {
total_chunks: u64,
status: String,
},
Failed(String),
}
pub fn parse_reindex_event(value: &serde_json::Value) -> ReindexEvent {
let kind = value.get("event").and_then(|v| v.as_str()).unwrap_or("");
let u64_of = |key: &str| value.get(key).and_then(|v| v.as_u64()).unwrap_or(0);
match kind {
"start" => ReindexEvent::Started {
total_files: u64_of("total_files"),
},
"complete" => ReindexEvent::Complete {
total_chunks: u64_of("total_chunks"),
status: value
.get("status")
.and_then(|v| v.as_str())
.unwrap_or("complete")
.to_string(),
},
"error" => ReindexEvent::Failed(
value
.get("message")
.and_then(|v| v.as_str())
.unwrap_or("reindex error")
.to_string(),
),
_ => ReindexEvent::Progress {
indexed: u64_of("indexed"),
total_files: u64_of("total_files"),
},
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_search_url_is_local() {
assert_eq!(DEFAULT_SEARCH_URL, "http://127.0.0.1:7878");
}
#[test]
fn normalize_url_adds_scheme() {
assert_eq!(normalize_url("127.0.0.1:7878"), "http://127.0.0.1:7878");
assert_eq!(
normalize_url("http://127.0.0.1:7878"),
"http://127.0.0.1:7878"
);
assert_eq!(normalize_url("https://example.com"), "https://example.com");
}
#[test]
fn search_client_stores_base_url() {
let client = SearchClient::new("http://127.0.0.1:7878");
assert_eq!(client.base_url(), "http://127.0.0.1:7878");
}
#[test]
fn search_client_repoints() {
let mut client = SearchClient::new("http://127.0.0.1:7878");
client.set_base_url("http://127.0.0.1:9999");
assert_eq!(client.base_url(), "http://127.0.0.1:9999");
}
#[test]
fn resolve_search_url_falls_back_to_default() {
let url = resolve_search_url();
assert!(url.starts_with("http://") || url.starts_with("https://"));
}
#[test]
fn parse_search_hits_projects_fields() {
let raw = serde_json::json!({
"results": [
{
"file": "src/lib.rs",
"start_line": 42,
"compact_snippet": "fn embed() {\n ...\n}",
"content": "ignored when compact present",
},
{
"file": "src/main.rs",
"start_line": 7,
"content": " fn main() {}\nmore",
},
],
"intent": "Code",
});
let hits = parse_search_hits(&raw);
assert_eq!(hits.len(), 2);
assert_eq!(hits[0].file, "src/lib.rs");
assert_eq!(hits[0].line, 42);
assert_eq!(hits[0].snippet, "fn embed() {");
assert_eq!(hits[1].snippet, "fn main() {}");
assert!(parse_search_hits(&serde_json::json!({})).is_empty());
}
#[test]
fn parse_reindex_event_maps_event_field() {
let started = parse_reindex_event(&serde_json::json!({
"event": "start", "total_files": 1200,
}));
assert_eq!(started, ReindexEvent::Started { total_files: 1200 });
let progress = parse_reindex_event(&serde_json::json!({
"event": "batch", "indexed": 500, "total_files": 1200,
}));
assert_eq!(
progress,
ReindexEvent::Progress {
indexed: 500,
total_files: 1200,
}
);
let complete = parse_reindex_event(&serde_json::json!({
"event": "complete", "total_chunks": 19012, "status": "complete",
}));
assert_eq!(
complete,
ReindexEvent::Complete {
total_chunks: 19012,
status: "complete".into(),
}
);
let failed = parse_reindex_event(&serde_json::json!({
"event": "error", "message": "read: permission denied",
}));
assert_eq!(
failed,
ReindexEvent::Failed("read: permission denied".into())
);
}
}