use super::events::{handle_event, LoopState};
use super::options::{ReindexOptions, ReindexOutcome};
use super::progress_state::SharedProgress;
use super::registration::fetch_chunk_count;
use super::ticker::spawn_ticker;
use super::verify::verify_reindex_health;
use crate::commands::daemon_utils::daemon_base_url;
use crate::commands::format::{fmt_elapsed, format_with_commas};
use crate::commands::reindex_ui::{print_timing_breakdown, ReindexUi};
use anyhow::Result;
use colored::Colorize;
use eventsource_stream::Eventsource;
use futures_util::stream::StreamExt;
use std::io::IsTerminal;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
pub async fn run_reindex(
index_id: &str,
root_path: &std::path::Path,
_timeout_secs: u64,
) -> Result<()> {
run_reindex_with(
index_id,
root_path,
ReindexOptions {
timeout_explicit: false,
..ReindexOptions::default()
},
)
.await
.map(|_| ())
}
pub async fn run_reindex_opts(
index_id: &str,
root_path: &std::path::Path,
timeout_secs: u64,
timeout_explicit: bool,
) -> Result<()> {
run_reindex_with(
index_id,
root_path,
ReindexOptions {
timeout_secs,
timeout_explicit,
..ReindexOptions::default()
},
)
.await
.map(|_| ())
}
pub async fn run_reindex_force_opts(
index_id: &str,
root_path: &std::path::Path,
timeout_secs: u64,
timeout_explicit: bool,
) -> Result<()> {
let prior = fetch_chunk_count(index_id).await;
let opts = ReindexOptions {
verify_after: true,
prior_chunk_count: prior,
force: true,
timeout_secs,
timeout_explicit,
..ReindexOptions::default()
};
run_reindex_with(index_id, root_path, opts)
.await
.map(|_| ())
}
pub async fn run_reindex_with(
index_id: &str,
root_path: &std::path::Path,
opts: ReindexOptions,
) -> Result<ReindexOutcome> {
let base = daemon_base_url();
let client = trusty_common::server::daemon_http_client()?;
let kickoff_url = format!("{}/indexes/{}/reindex", base, index_id);
let kickoff_body = serde_json::json!({
"root_path": root_path,
"force": opts.force,
});
let kickoff = client
.post(&kickoff_url)
.json(&kickoff_body)
.send()
.await
.map_err(|e| anyhow::anyhow!("could not reach daemon at {base}: {e}"))?;
if kickoff.status() == reqwest::StatusCode::NOT_FOUND {
anyhow::bail!(
"index '{}' is not registered on the daemon \u{2014} run `trusty-search index` first",
index_id
);
}
if !kickoff.status().is_success() {
anyhow::bail!("daemon returned {} for reindex kickoff", kickoff.status());
}
let kickoff_body: serde_json::Value = kickoff
.json()
.await
.unwrap_or_else(|_| serde_json::json!({}));
let stream_path = kickoff_body
.get("stream_url")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.unwrap_or_else(|| format!("/indexes/{}/reindex/stream", index_id));
let stream_url = format!("{}{}", base, stream_path);
let sse_client = reqwest::Client::builder()
.connect_timeout(Duration::from_secs(5))
.timeout(Duration::MAX)
.build()
.map_err(|e| anyhow::anyhow!("could not build SSE client: {e}"))?;
let resp = sse_client
.get(&stream_url)
.send()
.await
.map_err(|e| anyhow::anyhow!("could not connect to SSE stream {stream_url}: {e}"))?;
if !resp.status().is_success() {
anyhow::bail!(
"reindex stream returned {} \u{2014} daemon may be an older version that \
doesn't support /reindex/stream",
resp.status()
);
}
let interactive = std::io::stdout().is_terminal();
let mut ui = ReindexUi::new(index_id, interactive);
let started = std::time::Instant::now();
let progress = SharedProgress::new(started);
let tick_done = Arc::new(AtomicBool::new(false));
let ticker = spawn_ticker(progress.clone(), ui.stats_bar(), tick_done.clone());
let mut timed_out = false;
let mut stalled = false;
let hard_deadline: Option<tokio::time::Instant> = if opts.timeout_explicit {
if opts.timeout_secs > 0 {
Some(tokio::time::Instant::now() + Duration::from_secs(opts.timeout_secs))
} else {
None }
} else {
None
};
let stall_deadline_dur = Duration::from_secs(opts.stall_secs);
let byte_stream = resp.bytes_stream();
let stream = byte_stream.eventsource();
tokio::pin!(stream);
let mut state = LoopState::new(started);
while !state.done {
let maybe_event = if let Some(dl) = hard_deadline {
tokio::select! {
biased;
ev = stream.next() => ev,
_ = tokio::time::sleep_until(dl) => {
timed_out = true;
break;
}
}
} else {
tokio::select! {
biased;
ev = stream.next() => ev,
_ = tokio::time::sleep(Duration::from_secs(1)) => {
let current_indexed = progress.indexed_now.load(Ordering::Acquire);
if current_indexed > state.last_indexed_snapshot {
state.last_indexed_snapshot = current_indexed;
state.last_progress = std::time::Instant::now();
} else if state.last_progress.elapsed() >= stall_deadline_dur {
stalled = true;
break;
}
continue;
}
}
};
let event = match maybe_event {
Some(Ok(e)) => e,
Some(Err(e)) => {
ui.stats_bar()
.println(format!("{} stream read error: {e}", "\u{26a0}".yellow()));
break;
}
None => break,
};
let evt: serde_json::Value = match serde_json::from_str(event.data.trim()) {
Ok(v) => v,
Err(_) => continue,
};
handle_event(&mut state, &mut ui, &progress, &evt, index_id);
}
tick_done.store(true, Ordering::Release);
let _ = ticker.await;
let outcome = state.outcome;
if timed_out {
let still_progressing = progress.indexed_now.load(Ordering::Acquire)
> state.last_indexed_snapshot
|| state.last_progress.elapsed() < stall_deadline_dur;
let reason = if still_progressing {
format!(
"reached --timeout {}s while still progressing \u{2014} detaching",
opts.timeout_secs,
)
} else {
format!(
"timed out after {}s with no recent progress",
opts.timeout_secs,
)
};
ui.abandon(format!("{} {}", "\u{26a0}".yellow(), reason));
eprintln!(
"{} Daemon is still indexing in the background. \
Use `trusty-search status` or re-run `trusty-search index` to check progress. \
Pass `--timeout <seconds>` to wait longer (e.g. `--timeout 1200`).",
"\u{2139}".cyan()
);
return Ok(outcome);
}
if stalled {
let indexed = progress.indexed_now.load(Ordering::Acquire);
let total = outcome.indexed.max(indexed);
ui.abandon(format!(
"{} No indexing progress for {}s (Files {}/{}) \u{2014} detaching; \
daemon continues in background",
"\u{26a0}".yellow(),
opts.stall_secs,
format_with_commas(indexed),
format_with_commas(total),
));
eprintln!(
"{} Daemon appears stalled or very slow. Use `trusty-search status` to check. \
If indexing is still running, re-run `trusty-search index` to reattach or \
pass `--timeout <seconds>` to extend the hard cap.",
"\u{2139}".cyan()
);
return Ok(outcome);
}
if !outcome.completed {
ui.abandon(format!(
"{} Reindex stream ended without completion event",
"\u{26a0}".yellow()
));
anyhow::bail!("reindex did not complete");
}
let elapsed = fmt_elapsed(outcome.elapsed_ms);
let changed = outcome.indexed.saturating_sub(outcome.skipped);
let final_msg = if outcome.errors > 0 {
format!(
"{} '{}' — indexed {} files \u{2192} {} chunks [took {}, {} errors, {} unchanged]",
"\u{2713}".green(),
index_id,
format_with_commas(changed),
format_with_commas(outcome.total_chunks),
elapsed,
outcome.errors,
format_with_commas(outcome.skipped),
)
} else if changed == 0 && outcome.indexed > 0 {
format!(
"{} '{}' is up to date ({} chunks, {} files \u{2014} no changes detected) [took {}]",
"\u{2713}".green(),
index_id,
format_with_commas(outcome.total_chunks),
format_with_commas(outcome.indexed),
elapsed,
)
} else {
format!(
"{} '{}' — indexed {} changed file{} \u{2192} {} chunks [took {}, {} unchanged]",
"\u{2713}".green(),
index_id,
format_with_commas(changed),
if changed == 1 { "" } else { "s" },
format_with_commas(outcome.total_chunks),
elapsed,
format_with_commas(outcome.skipped),
)
};
ui.finish(final_msg);
if let Some(t) = outcome.timings {
print_timing_breakdown(
&t,
outcome.total_chunks,
outcome.elapsed_ms,
state.defer_embed,
state.lexical_only,
);
}
if state.defer_embed {
println!();
println!("{} Searchable now (lexical + graph).", "\u{2713}".green());
println!("\u{23f3} Semantic embedding running in background.");
println!(
" Track: trusty-search status {} --watch",
index_id.cyan()
);
}
if opts.verify_after {
verify_reindex_health(&client, &base, index_id, &outcome, opts.prior_chunk_count).await?;
}
Ok(outcome)
}