use super::options::ReindexOutcome;
use super::phase_map::phase_to_u64;
use super::progress_state::SharedProgress;
use crate::commands::format::format_with_commas;
use crate::commands::reindex_ui::{ReindexPhase, ReindexUi};
use colored::Colorize;
use std::sync::atomic::Ordering;
use std::time::Instant;
pub(super) struct LoopState {
pub done: bool,
pub outcome: ReindexOutcome,
pub received_walk_complete: bool,
pub lexical_only: bool,
pub entered_embedding: bool,
pub defer_embed: bool,
pub chunk_started_ms: u64,
pub embed_started_ms: u64,
pub last_indexed_snapshot: u64,
pub last_progress: Instant,
}
impl LoopState {
pub(super) fn new(started: Instant) -> Self {
Self {
done: false,
outcome: ReindexOutcome::default(),
received_walk_complete: false,
lexical_only: false,
entered_embedding: false,
defer_embed: false,
chunk_started_ms: 0,
embed_started_ms: 0,
last_indexed_snapshot: 0,
last_progress: started,
}
}
pub(super) fn note_progress(&mut self, indexed: u64) {
if indexed > self.last_indexed_snapshot {
self.last_indexed_snapshot = indexed;
self.last_progress = Instant::now();
}
}
}
pub(super) fn handle_event(
state: &mut LoopState,
ui: &mut ReindexUi,
progress: &SharedProgress,
evt: &serde_json::Value,
index_id: &str,
) {
let started = progress.started;
match evt.get("event").and_then(|v| v.as_str()) {
Some("walk_complete") => {
state.received_walk_complete = true;
let total = evt.get("total_files").and_then(|v| v.as_u64()).unwrap_or(0);
progress.total_files_now.store(total, Ordering::Release);
ui.set_phase(ReindexPhase::Walking, index_id);
progress
.phase_disc
.store(phase_to_u64(ReindexPhase::Walking), Ordering::Release);
ui.set_total(total);
ui.set_position(total);
ui.mark_stage_done(0, 0);
if total > 0 && !state.lexical_only {
ui.set_embed_total(total);
}
}
Some("start") => {
let total = evt.get("total_files").and_then(|v| v.as_u64()).unwrap_or(0);
state.lexical_only = evt
.get("lexical_only")
.and_then(|v| v.as_bool())
.unwrap_or(false);
state.defer_embed = evt
.get("defer_embed")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if total > 0 {
progress.total_files_now.store(total, Ordering::Release);
}
if state.received_walk_complete {
state.chunk_started_ms = started.elapsed().as_millis() as u64;
ui.set_phase(ReindexPhase::Chunking, index_id);
progress
.phase_disc
.store(phase_to_u64(ReindexPhase::Chunking), Ordering::Release);
ui.set_total(total);
if total > 0 && !state.lexical_only {
ui.set_embed_total(total);
ui.activate_embed_bar();
}
} else {
ui.set_total(total);
if state.lexical_only {
state.chunk_started_ms = started.elapsed().as_millis() as u64;
ui.set_phase(ReindexPhase::Chunking, index_id);
progress
.phase_disc
.store(phase_to_u64(ReindexPhase::Chunking), Ordering::Release);
} else {
state.embed_started_ms = started.elapsed().as_millis() as u64;
ui.set_phase(ReindexPhase::Embedding, index_id);
progress
.phase_disc
.store(phase_to_u64(ReindexPhase::Embedding), Ordering::Release);
state.entered_embedding = true;
if total > 0 {
ui.set_embed_total(total);
}
}
}
}
Some("embedder_init") => {
ui.set_phase(ReindexPhase::InitializingEmbedder, index_id);
progress.phase_disc.store(
phase_to_u64(ReindexPhase::InitializingEmbedder),
Ordering::Release,
);
}
Some("embedder_ready") if !state.entered_embedding => {
state.embed_started_ms = started.elapsed().as_millis() as u64;
ui.set_phase(ReindexPhase::Embedding, index_id);
progress
.phase_disc
.store(phase_to_u64(ReindexPhase::Embedding), Ordering::Release);
state.entered_embedding = true;
}
Some("embedder_ready") => {
}
Some("chunk_progress") => {
let wave_chunks = evt.get("chunks_done").and_then(|v| v.as_u64()).unwrap_or(0);
let wave_cps = evt
.get("chunks_per_sec")
.and_then(|v| v.as_u64())
.unwrap_or(0);
if wave_cps > 0 {
progress.cps_now.store(wave_cps, Ordering::Release);
}
if wave_chunks > 0 {
progress
.chunks_embed_preview
.fetch_add(wave_chunks, Ordering::AcqRel);
}
let chunk_indexed = evt.get("indexed").and_then(|v| v.as_u64()).unwrap_or(0);
if chunk_indexed > 0 {
ui.set_position(chunk_indexed);
}
}
Some("batch") => super::event_handlers::handle_batch(state, ui, progress, evt, index_id),
Some("skip") => {
let indexed = evt.get("indexed").and_then(|v| v.as_u64()).unwrap_or(0);
progress.indexed_now.store(indexed, Ordering::Release);
let skipped = progress.skipped_now.fetch_add(1, Ordering::AcqRel) + 1;
ui.set_position(indexed);
ui.update_stats(
indexed,
progress.chunks_now.load(Ordering::Acquire),
skipped,
progress.cps_now.load(Ordering::Acquire),
started.elapsed().as_secs(),
);
state.note_progress(indexed);
}
Some("kg_start") => {
let chunk_ms = started.elapsed().as_millis() as u64 - state.chunk_started_ms;
ui.mark_stage_done(1, chunk_ms);
if state.entered_embedding {
let embed_ms = started.elapsed().as_millis() as u64 - state.embed_started_ms;
ui.mark_stage_done(2, embed_ms);
}
ui.clear_stats();
ui.set_phase(ReindexPhase::KnowledgeGraph, index_id);
progress.phase_disc.store(
phase_to_u64(ReindexPhase::KnowledgeGraph),
Ordering::Release,
);
ui.set_total(1);
ui.set_position(0);
}
Some("kg_complete") => {
let kg_ms = evt.get("kg_ms").and_then(|v| v.as_u64()).unwrap_or(0);
let symbol_count = evt
.get("symbol_count")
.and_then(|v| v.as_u64())
.unwrap_or(0);
let edge_count = evt.get("edge_count").and_then(|v| v.as_u64()).unwrap_or(0);
ui.set_position(1);
ui.mark_stage_done(3, kg_ms);
ui.stats_bar().set_message(format!(
"KG done \u{2014} {sym} symbols, {edges} edges",
sym = format_with_commas(symbol_count),
edges = format_with_commas(edge_count),
));
}
Some("complete") => super::event_handlers::handle_complete(state, ui, progress, evt),
Some("error") => {
let msg = evt
.get("message")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let file = evt.get("file").and_then(|v| v.as_str()).unwrap_or("");
ui.stats_bar()
.println(format!("{} {}: {}", "\u{26a0}".yellow(), file, msg));
}
_ => {}
}
}