Skip to main content

coding_agent_search/indexer/
semantic_progress.rs

1//! Progress JSONL sink for quality semantic backfill.
2//!
3//! When `CASS_SEMANTIC_PROGRESS_JSONL=/abs/path/to/file.jsonl` is set,
4//! the semantic backfill code path appends one JSON object per transition
5//! event to that file. Each event carries a timestamp, a phase + sub-phase,
6//! a row/batch counter where applicable, the wall-time delta since the
7//! sink was started, and a cheap RSS estimate.
8//!
9//! Goal — give operators enough proof, during long-running quality semantic
10//! backfill runs, to tell whether time is going to selection, packet
11//! replay, embedding, staging, checkpoint, or publish; and to distinguish
12//! storage-side stalls from model-inference stalls. See cass#257.
13//!
14//! Env-var family: matches the existing `CASS_SEMANTIC_*` namespace (see
15//! `src/search/policy.rs` and `src/indexer/semantic.rs`). The sink itself
16//! is silent when the env var is unset, so it has zero cost for normal
17//! operation. Writes are best-effort: a failed write is logged at debug
18//! and never propagated upward — we never want telemetry to crash a
19//! backfill that would otherwise succeed.
20
21use std::fs::{File, OpenOptions};
22use std::io::Write;
23use std::path::{Path, PathBuf};
24use std::sync::Mutex;
25use std::time::{Instant, SystemTime, UNIX_EPOCH};
26
27use serde::Serialize;
28
29/// Env var that activates the sink and names the output file.
30pub const ENV_PROGRESS_JSONL: &str = "CASS_SEMANTIC_PROGRESS_JSONL";
31
32/// Schema version for the JSONL event stream. Bump on any
33/// breaking change to event names or fields.
34pub const PROGRESS_JSONL_SCHEMA: &str = "cass.semantic.progress.v1";
35
36/// The 16 named transition events. Strings deliberately mirror the
37/// `phase` + `sub_phase` columns in each emitted record so a `jq` user
38/// can filter on event name OR phase as they prefer.
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
40#[serde(rename_all = "snake_case")]
41pub enum SemanticProgressEvent {
42    /// Backfill is about to materialize the message-selection query.
43    SelectionStart,
44    /// Selection finished; downstream knows how many candidate rows
45    /// will be considered for this batch.
46    SelectionDone,
47    /// Canonical packet replay is about to begin (envelope fetch +
48    /// per-conversation message materialization + packet build).
49    PacketReplayStart,
50    /// Periodic per-conversation tick during packet replay so a
51    /// stuck conversation does not look like a stuck model.
52    PacketReplayProgress,
53    /// Packet replay finished — `EmbeddingInput`s are ready.
54    PacketReplayDone,
55    /// About to call `embedder.embed_batch_sync` for a single batch.
56    EmbedBatchStart,
57    /// `embedder.embed_batch_sync` returned for this batch.
58    EmbedBatchDone,
59    /// About to write the embedded vectors into the staging index.
60    StagingWriteStart,
61    /// Staging write returned.
62    StagingWriteDone,
63    /// About to fsync the updated manifest with this batch's checkpoint.
64    CheckpointSaveStart,
65    /// Manifest fsync returned.
66    CheckpointSaveDone,
67    /// About to atomically rename the staged index into the published
68    /// index path (only fires on the batch that completes the tier).
69    PublishStart,
70    /// Publish rename + fsync done; tier is queryable.
71    PublishDone,
72    /// Backfill aborted with an error.
73    Error,
74    /// Backfill cancelled cooperatively (signal, idle-yield, etc).
75    Cancelled,
76    /// All work finished cleanly (terminal — emitted exactly once per
77    /// run, after publish_done or in the no-op path).
78    Complete,
79}
80
81impl SemanticProgressEvent {
82    /// Stable snake_case string for the event field. Used both as the
83    /// JSONL `event` value and (with `phase()`) as a discriminator in
84    /// downstream consumers.
85    pub fn as_str(self) -> &'static str {
86        match self {
87            Self::SelectionStart => "selection_start",
88            Self::SelectionDone => "selection_done",
89            Self::PacketReplayStart => "packet_replay_start",
90            Self::PacketReplayProgress => "packet_replay_progress",
91            Self::PacketReplayDone => "packet_replay_done",
92            Self::EmbedBatchStart => "embed_batch_start",
93            Self::EmbedBatchDone => "embed_batch_done",
94            Self::StagingWriteStart => "staging_write_start",
95            Self::StagingWriteDone => "staging_write_done",
96            Self::CheckpointSaveStart => "checkpoint_save_start",
97            Self::CheckpointSaveDone => "checkpoint_save_done",
98            Self::PublishStart => "publish_start",
99            Self::PublishDone => "publish_done",
100            Self::Error => "error",
101            Self::Cancelled => "cancelled",
102            Self::Complete => "complete",
103        }
104    }
105
106    /// Coarse phase classification, useful to a downstream `jq` consumer
107    /// that wants to bucket time across selection / replay / embed /
108    /// staging / checkpoint / publish without enumerating every event.
109    pub fn phase(self) -> &'static str {
110        match self {
111            Self::SelectionStart | Self::SelectionDone => "selection",
112            Self::PacketReplayStart | Self::PacketReplayProgress | Self::PacketReplayDone => {
113                "packet_replay"
114            }
115            Self::EmbedBatchStart | Self::EmbedBatchDone => "embed",
116            Self::StagingWriteStart | Self::StagingWriteDone => "staging",
117            Self::CheckpointSaveStart | Self::CheckpointSaveDone => "checkpoint",
118            Self::PublishStart | Self::PublishDone => "publish",
119            Self::Error => "error",
120            Self::Cancelled => "cancelled",
121            Self::Complete => "complete",
122        }
123    }
124
125    /// `start` / `done` / `progress` / single (sub_phase=`event`).
126    pub fn sub_phase(self) -> &'static str {
127        match self {
128            Self::SelectionStart
129            | Self::PacketReplayStart
130            | Self::EmbedBatchStart
131            | Self::StagingWriteStart
132            | Self::CheckpointSaveStart
133            | Self::PublishStart => "start",
134            Self::SelectionDone
135            | Self::PacketReplayDone
136            | Self::EmbedBatchDone
137            | Self::StagingWriteDone
138            | Self::CheckpointSaveDone
139            | Self::PublishDone => "done",
140            Self::PacketReplayProgress => "progress",
141            Self::Error => "error",
142            Self::Cancelled => "cancelled",
143            Self::Complete => "complete",
144        }
145    }
146}
147
148/// Optional counters carried by an event. Every field is `None` when
149/// not applicable — JSON serializers should skip nulls so the row stays
150/// readable.
151#[derive(Debug, Clone, Default, Serialize)]
152pub struct SemanticProgressFields {
153    /// Batch index within this backfill run, when applicable.
154    #[serde(skip_serializing_if = "Option::is_none")]
155    pub batch_index: Option<u64>,
156    /// Rows in the current batch, when applicable.
157    #[serde(skip_serializing_if = "Option::is_none")]
158    pub batch_rows: Option<u64>,
159    /// Cumulative rows processed so far, when applicable.
160    #[serde(skip_serializing_if = "Option::is_none")]
161    pub rows_processed: Option<u64>,
162    /// Total rows expected (best-effort).
163    #[serde(skip_serializing_if = "Option::is_none")]
164    pub rows_total: Option<u64>,
165    /// Conversation cursor (per-tier semantic) at this event.
166    #[serde(skip_serializing_if = "Option::is_none")]
167    pub last_conversation_id: Option<i64>,
168    /// Message PK cursor at this event.
169    #[serde(skip_serializing_if = "Option::is_none")]
170    pub last_message_id: Option<i64>,
171    /// Conversations in the active batch.
172    #[serde(skip_serializing_if = "Option::is_none")]
173    pub conversations_in_batch: Option<u64>,
174    /// Free-form context note. Kept short — long context belongs in a
175    /// debug log line, not in a high-frequency JSONL event.
176    #[serde(skip_serializing_if = "Option::is_none")]
177    pub note: Option<String>,
178    /// Bytes touched (e.g. content bytes selected, bytes embedded,
179    /// staged write size). Lets operators distinguish a stalled query
180    /// from a stalled model.
181    #[serde(skip_serializing_if = "Option::is_none")]
182    pub bytes: Option<u64>,
183    /// Free-form error string when the event is `error`.
184    #[serde(skip_serializing_if = "Option::is_none")]
185    pub error: Option<String>,
186}
187
188#[derive(Debug, Clone, Serialize)]
189struct EventRecord<'a> {
190    schema: &'static str,
191    event: &'static str,
192    phase: &'static str,
193    sub_phase: &'static str,
194    /// Unix milliseconds, wall clock.
195    ts_ms: i64,
196    /// Milliseconds since this sink was opened.
197    elapsed_ms: u64,
198    /// Tier label (`fast` / `quality` / `unknown`).
199    tier: &'a str,
200    /// Embedder id (e.g. `minilm-384`, `hash`).
201    embedder_id: &'a str,
202    /// Cheap RSS estimate in MiB (None if /proc parse fails or the
203    /// platform doesn't expose it).
204    #[serde(skip_serializing_if = "Option::is_none")]
205    rss_mib: Option<u64>,
206    #[serde(flatten)]
207    fields: &'a SemanticProgressFields,
208}
209
210/// Process-pid, used only for cross-correlation when an operator
211/// concatenates JSONL files from multiple runs.
212fn current_pid() -> u32 {
213    std::process::id()
214}
215
216/// Wall-clock Unix milliseconds at the moment of the call.
217fn now_unix_ms() -> i64 {
218    SystemTime::now()
219        .duration_since(UNIX_EPOCH)
220        .ok()
221        .and_then(|d| i64::try_from(d.as_millis()).ok())
222        .unwrap_or(0)
223}
224
225/// Cheap RSS estimate from /proc/self/status (Linux). Returns None on
226/// other platforms or any parse failure. Reading /proc/self/status is
227/// a cheap pseudo-file read — safe to call inside the embed batch loop.
228fn read_rss_mib() -> Option<u64> {
229    let bytes = std::fs::read("/proc/self/status").ok()?;
230    let text = std::str::from_utf8(&bytes).ok()?;
231    for line in text.lines() {
232        if let Some(rest) = line.strip_prefix("VmRSS:") {
233            // Expected format: `VmRSS:    12345 kB`
234            let mut parts = rest.split_whitespace();
235            let kb_str = parts.next()?;
236            let kb: u64 = kb_str.parse().ok()?;
237            return Some(kb / 1024);
238        }
239    }
240    None
241}
242
243/// Resolve the sink's destination path from the env var.
244fn resolve_path() -> Option<PathBuf> {
245    let raw = dotenvy::var(ENV_PROGRESS_JSONL).ok()?;
246    let trimmed = raw.trim();
247    if trimmed.is_empty() {
248        return None;
249    }
250    Some(PathBuf::from(trimmed))
251}
252
253/// Open the sink file (append, create) on first event, cache the
254/// handle in a Mutex. We deliberately accept the cost of a Mutex over
255/// every event because the JSONL stream is several events per batch,
256/// not per row — even a 50ms batch wall-time dwarfs the lock cost.
257pub struct SemanticProgressSink {
258    inner: Option<Mutex<SinkInner>>,
259    tier: String,
260    embedder_id: String,
261    started: Instant,
262}
263
264struct SinkInner {
265    file: File,
266    /// Cached so we can include it in `complete`/`error` log lines.
267    path: PathBuf,
268    /// True after we've written at least one record successfully —
269    /// lets us suppress repeat "failed to write" warnings.
270    healthy: bool,
271}
272
273impl SemanticProgressSink {
274    /// Open a sink for the given tier+embedder. Returns a no-op sink
275    /// when the env var is unset, so callers can always emit events
276    /// unconditionally without branching.
277    pub fn open(tier: &str, embedder_id: &str) -> Self {
278        let path = resolve_path();
279        let inner = match path {
280            Some(p) => match Self::open_file(&p) {
281                Ok(file) => Some(Mutex::new(SinkInner {
282                    file,
283                    path: p,
284                    healthy: false,
285                })),
286                Err(err) => {
287                    tracing::warn!(
288                        path = %p.display(),
289                        error = %err,
290                        "CASS_SEMANTIC_PROGRESS_JSONL: failed to open sink — continuing without progress JSONL",
291                    );
292                    None
293                }
294            },
295            None => None,
296        };
297        Self {
298            inner,
299            tier: tier.to_string(),
300            embedder_id: embedder_id.to_string(),
301            started: Instant::now(),
302        }
303    }
304
305    /// Sink that never writes — kept as an explicit factory so callers
306    /// can default to a sink without consulting the env var (e.g. tests
307    /// that don't care about telemetry).
308    pub fn disabled() -> Self {
309        Self {
310            inner: None,
311            tier: "unknown".to_string(),
312            embedder_id: "unknown".to_string(),
313            started: Instant::now(),
314        }
315    }
316
317    /// True if the sink is actively writing (env var set + file
318    /// opened). Callers can branch on this to skip building expensive
319    /// `SemanticProgressFields` when no one will read them.
320    pub fn is_active(&self) -> bool {
321        self.inner.is_some()
322    }
323
324    fn open_file(path: &Path) -> std::io::Result<File> {
325        if let Some(parent) = path.parent() {
326            std::fs::create_dir_all(parent)?;
327        }
328        OpenOptions::new().create(true).append(true).open(path)
329    }
330
331    /// Emit one event. Best-effort: a write failure logs at debug and
332    /// returns Ok — telemetry never bubbles errors into the backfill.
333    pub fn emit(&self, event: SemanticProgressEvent, fields: SemanticProgressFields) {
334        let Some(mutex) = self.inner.as_ref() else {
335            return;
336        };
337        let elapsed_ms = u64::try_from(self.started.elapsed().as_millis()).unwrap_or(u64::MAX);
338        let rss_mib = read_rss_mib();
339        let record = EventRecord {
340            schema: PROGRESS_JSONL_SCHEMA,
341            event: event.as_str(),
342            phase: event.phase(),
343            sub_phase: event.sub_phase(),
344            ts_ms: now_unix_ms(),
345            elapsed_ms,
346            tier: self.tier.as_str(),
347            embedder_id: self.embedder_id.as_str(),
348            rss_mib,
349            fields: &fields,
350        };
351        let mut line = match serde_json::to_string(&record) {
352            Ok(s) => s,
353            Err(err) => {
354                tracing::debug!(
355                    ?err,
356                    event = event.as_str(),
357                    "skip JSONL emit: serialize failed"
358                );
359                return;
360            }
361        };
362        line.push('\n');
363        // Best-effort write under lock. We intentionally do not propagate
364        // errors — a backfill that succeeded but couldn't write telemetry
365        // is still a successful backfill.
366        let mut guard = match mutex.lock() {
367            Ok(g) => g,
368            Err(poisoned) => poisoned.into_inner(),
369        };
370        if let Err(err) = guard.file.write_all(line.as_bytes()) {
371            if guard.healthy {
372                // Surface once on transition healthy→sick to help
373                // operators notice (e.g. disk full mid-run).
374                tracing::warn!(
375                    path = %guard.path.display(),
376                    error = %err,
377                    "CASS_SEMANTIC_PROGRESS_JSONL: write failed after previous successes; continuing without progress JSONL",
378                );
379                guard.healthy = false;
380            } else {
381                tracing::debug!(
382                    path = %guard.path.display(),
383                    error = %err,
384                    "CASS_SEMANTIC_PROGRESS_JSONL: write failed",
385                );
386            }
387        } else {
388            guard.healthy = true;
389            // We do NOT fsync per-event — sync at end is the operator's
390            // job (e.g. shutdown drain). Per-event fsync would dominate
391            // wall time on a long run. The file is opened append, so
392            // partial writes are tolerable to the reader.
393        }
394    }
395
396    /// Convenience: emit an event with no extra fields.
397    pub fn emit_bare(&self, event: SemanticProgressEvent) {
398        self.emit(event, SemanticProgressFields::default());
399    }
400
401    /// Process-pid for cross-correlation. Stable for the life of the sink.
402    pub fn pid(&self) -> u32 {
403        current_pid()
404    }
405}
406
407#[cfg(test)]
408mod tests {
409    use super::*;
410    use std::io::BufRead;
411    use std::sync::Mutex;
412    use tempfile::TempDir;
413
414    // env vars are process-global; serialize tests so concurrent
415    // cargo test runs don't trample each other's env state.
416    static ENV_LOCK: Mutex<()> = Mutex::new(());
417
418    fn read_lines(path: &Path) -> Vec<String> {
419        let f = File::open(path).expect("open jsonl");
420        std::io::BufReader::new(f)
421            .lines()
422            .map_while(Result::ok)
423            .collect()
424    }
425
426    #[test]
427    fn disabled_sink_is_noop() {
428        let sink = SemanticProgressSink::disabled();
429        assert!(!sink.is_active());
430        sink.emit_bare(SemanticProgressEvent::SelectionStart);
431        // No panic = pass.
432    }
433
434    #[test]
435    fn unset_env_is_noop() {
436        let _guard = ENV_LOCK.lock().unwrap();
437        // SAFETY: tests are serialized via ENV_LOCK; this is the
438        // standard pattern in this crate for env-dependent tests.
439        unsafe {
440            std::env::remove_var(ENV_PROGRESS_JSONL);
441        }
442        let sink = SemanticProgressSink::open("quality", "minilm-384");
443        assert!(!sink.is_active());
444        sink.emit_bare(SemanticProgressEvent::SelectionStart);
445    }
446
447    #[test]
448    fn writes_one_line_per_event() {
449        let _guard = ENV_LOCK.lock().unwrap();
450        let dir = TempDir::new().unwrap();
451        let path = dir.path().join("progress.jsonl");
452        // SAFETY: tests are serialized via ENV_LOCK.
453        unsafe {
454            std::env::set_var(ENV_PROGRESS_JSONL, &path);
455        }
456        let sink = SemanticProgressSink::open("quality", "minilm-384");
457        assert!(sink.is_active());
458        sink.emit_bare(SemanticProgressEvent::SelectionStart);
459        sink.emit(
460            SemanticProgressEvent::EmbedBatchDone,
461            SemanticProgressFields {
462                batch_index: Some(3),
463                batch_rows: Some(128),
464                rows_processed: Some(384),
465                ..Default::default()
466            },
467        );
468        sink.emit_bare(SemanticProgressEvent::Complete);
469        drop(sink);
470
471        let lines = read_lines(&path);
472        assert_eq!(lines.len(), 3, "expected 3 events; got {:?}", lines);
473        assert!(
474            lines[0].contains("\"event\":\"selection_start\""),
475            "line 0: {}",
476            lines[0]
477        );
478        assert!(
479            lines[1].contains("\"event\":\"embed_batch_done\""),
480            "line 1: {}",
481            lines[1]
482        );
483        assert!(
484            lines[1].contains("\"batch_index\":3"),
485            "line 1: {}",
486            lines[1]
487        );
488        assert!(
489            lines[2].contains("\"event\":\"complete\""),
490            "line 2: {}",
491            lines[2]
492        );
493        // SAFETY: tests serialized via ENV_LOCK.
494        unsafe {
495            std::env::remove_var(ENV_PROGRESS_JSONL);
496        }
497    }
498
499    #[test]
500    fn each_event_has_phase_and_sub_phase() {
501        use SemanticProgressEvent::*;
502        let all = [
503            SelectionStart,
504            SelectionDone,
505            PacketReplayStart,
506            PacketReplayProgress,
507            PacketReplayDone,
508            EmbedBatchStart,
509            EmbedBatchDone,
510            StagingWriteStart,
511            StagingWriteDone,
512            CheckpointSaveStart,
513            CheckpointSaveDone,
514            PublishStart,
515            PublishDone,
516            Error,
517            Cancelled,
518            Complete,
519        ];
520        assert_eq!(all.len(), 16);
521        for event in all {
522            assert!(!event.as_str().is_empty(), "{:?}", event);
523            assert!(!event.phase().is_empty(), "{:?}", event);
524            assert!(!event.sub_phase().is_empty(), "{:?}", event);
525        }
526    }
527
528    #[test]
529    fn invalid_env_var_is_safe_noop() {
530        let _guard = ENV_LOCK.lock().unwrap();
531        // Whitespace-only env var should be treated as unset rather
532        // than as an attempt to write to "" (which would fail). The
533        // sink should silently degrade to disabled.
534        // SAFETY: tests serialized via ENV_LOCK.
535        unsafe {
536            std::env::set_var(ENV_PROGRESS_JSONL, "   ");
537        }
538        let sink = SemanticProgressSink::open("quality", "minilm-384");
539        assert!(!sink.is_active());
540        // SAFETY: tests serialized via ENV_LOCK.
541        unsafe {
542            std::env::remove_var(ENV_PROGRESS_JSONL);
543        }
544    }
545}