1use std::fs::{File, OpenOptions};
22use std::io::Write;
23use std::path::{Path, PathBuf};
24use std::sync::Mutex;
25use std::time::{Instant, SystemTime, UNIX_EPOCH};
26
27use serde::Serialize;
28
29pub const ENV_PROGRESS_JSONL: &str = "CASS_SEMANTIC_PROGRESS_JSONL";
31
32pub const PROGRESS_JSONL_SCHEMA: &str = "cass.semantic.progress.v1";
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
40#[serde(rename_all = "snake_case")]
41pub enum SemanticProgressEvent {
42 SelectionStart,
44 SelectionDone,
47 PacketReplayStart,
50 PacketReplayProgress,
53 PacketReplayDone,
55 EmbedBatchStart,
57 EmbedBatchDone,
59 StagingWriteStart,
61 StagingWriteDone,
63 CheckpointSaveStart,
65 CheckpointSaveDone,
67 PublishStart,
70 PublishDone,
72 Error,
74 Cancelled,
76 Complete,
79}
80
81impl SemanticProgressEvent {
82 pub fn as_str(self) -> &'static str {
86 match self {
87 Self::SelectionStart => "selection_start",
88 Self::SelectionDone => "selection_done",
89 Self::PacketReplayStart => "packet_replay_start",
90 Self::PacketReplayProgress => "packet_replay_progress",
91 Self::PacketReplayDone => "packet_replay_done",
92 Self::EmbedBatchStart => "embed_batch_start",
93 Self::EmbedBatchDone => "embed_batch_done",
94 Self::StagingWriteStart => "staging_write_start",
95 Self::StagingWriteDone => "staging_write_done",
96 Self::CheckpointSaveStart => "checkpoint_save_start",
97 Self::CheckpointSaveDone => "checkpoint_save_done",
98 Self::PublishStart => "publish_start",
99 Self::PublishDone => "publish_done",
100 Self::Error => "error",
101 Self::Cancelled => "cancelled",
102 Self::Complete => "complete",
103 }
104 }
105
106 pub fn phase(self) -> &'static str {
110 match self {
111 Self::SelectionStart | Self::SelectionDone => "selection",
112 Self::PacketReplayStart | Self::PacketReplayProgress | Self::PacketReplayDone => {
113 "packet_replay"
114 }
115 Self::EmbedBatchStart | Self::EmbedBatchDone => "embed",
116 Self::StagingWriteStart | Self::StagingWriteDone => "staging",
117 Self::CheckpointSaveStart | Self::CheckpointSaveDone => "checkpoint",
118 Self::PublishStart | Self::PublishDone => "publish",
119 Self::Error => "error",
120 Self::Cancelled => "cancelled",
121 Self::Complete => "complete",
122 }
123 }
124
125 pub fn sub_phase(self) -> &'static str {
127 match self {
128 Self::SelectionStart
129 | Self::PacketReplayStart
130 | Self::EmbedBatchStart
131 | Self::StagingWriteStart
132 | Self::CheckpointSaveStart
133 | Self::PublishStart => "start",
134 Self::SelectionDone
135 | Self::PacketReplayDone
136 | Self::EmbedBatchDone
137 | Self::StagingWriteDone
138 | Self::CheckpointSaveDone
139 | Self::PublishDone => "done",
140 Self::PacketReplayProgress => "progress",
141 Self::Error => "error",
142 Self::Cancelled => "cancelled",
143 Self::Complete => "complete",
144 }
145 }
146}
147
148#[derive(Debug, Clone, Default, Serialize)]
152pub struct SemanticProgressFields {
153 #[serde(skip_serializing_if = "Option::is_none")]
155 pub batch_index: Option<u64>,
156 #[serde(skip_serializing_if = "Option::is_none")]
158 pub batch_rows: Option<u64>,
159 #[serde(skip_serializing_if = "Option::is_none")]
161 pub rows_processed: Option<u64>,
162 #[serde(skip_serializing_if = "Option::is_none")]
164 pub rows_total: Option<u64>,
165 #[serde(skip_serializing_if = "Option::is_none")]
167 pub last_conversation_id: Option<i64>,
168 #[serde(skip_serializing_if = "Option::is_none")]
170 pub last_message_id: Option<i64>,
171 #[serde(skip_serializing_if = "Option::is_none")]
173 pub conversations_in_batch: Option<u64>,
174 #[serde(skip_serializing_if = "Option::is_none")]
177 pub note: Option<String>,
178 #[serde(skip_serializing_if = "Option::is_none")]
182 pub bytes: Option<u64>,
183 #[serde(skip_serializing_if = "Option::is_none")]
185 pub error: Option<String>,
186}
187
188#[derive(Debug, Clone, Serialize)]
189struct EventRecord<'a> {
190 schema: &'static str,
191 event: &'static str,
192 phase: &'static str,
193 sub_phase: &'static str,
194 ts_ms: i64,
196 elapsed_ms: u64,
198 tier: &'a str,
200 embedder_id: &'a str,
202 #[serde(skip_serializing_if = "Option::is_none")]
205 rss_mib: Option<u64>,
206 #[serde(flatten)]
207 fields: &'a SemanticProgressFields,
208}
209
210fn current_pid() -> u32 {
213 std::process::id()
214}
215
216fn now_unix_ms() -> i64 {
218 SystemTime::now()
219 .duration_since(UNIX_EPOCH)
220 .ok()
221 .and_then(|d| i64::try_from(d.as_millis()).ok())
222 .unwrap_or(0)
223}
224
225fn read_rss_mib() -> Option<u64> {
229 let bytes = std::fs::read("/proc/self/status").ok()?;
230 let text = std::str::from_utf8(&bytes).ok()?;
231 for line in text.lines() {
232 if let Some(rest) = line.strip_prefix("VmRSS:") {
233 let mut parts = rest.split_whitespace();
235 let kb_str = parts.next()?;
236 let kb: u64 = kb_str.parse().ok()?;
237 return Some(kb / 1024);
238 }
239 }
240 None
241}
242
243fn resolve_path() -> Option<PathBuf> {
245 let raw = dotenvy::var(ENV_PROGRESS_JSONL).ok()?;
246 let trimmed = raw.trim();
247 if trimmed.is_empty() {
248 return None;
249 }
250 Some(PathBuf::from(trimmed))
251}
252
253pub struct SemanticProgressSink {
258 inner: Option<Mutex<SinkInner>>,
259 tier: String,
260 embedder_id: String,
261 started: Instant,
262}
263
264struct SinkInner {
265 file: File,
266 path: PathBuf,
268 healthy: bool,
271}
272
273impl SemanticProgressSink {
274 pub fn open(tier: &str, embedder_id: &str) -> Self {
278 let path = resolve_path();
279 let inner = match path {
280 Some(p) => match Self::open_file(&p) {
281 Ok(file) => Some(Mutex::new(SinkInner {
282 file,
283 path: p,
284 healthy: false,
285 })),
286 Err(err) => {
287 tracing::warn!(
288 path = %p.display(),
289 error = %err,
290 "CASS_SEMANTIC_PROGRESS_JSONL: failed to open sink — continuing without progress JSONL",
291 );
292 None
293 }
294 },
295 None => None,
296 };
297 Self {
298 inner,
299 tier: tier.to_string(),
300 embedder_id: embedder_id.to_string(),
301 started: Instant::now(),
302 }
303 }
304
305 pub fn disabled() -> Self {
309 Self {
310 inner: None,
311 tier: "unknown".to_string(),
312 embedder_id: "unknown".to_string(),
313 started: Instant::now(),
314 }
315 }
316
317 pub fn is_active(&self) -> bool {
321 self.inner.is_some()
322 }
323
324 fn open_file(path: &Path) -> std::io::Result<File> {
325 if let Some(parent) = path.parent() {
326 std::fs::create_dir_all(parent)?;
327 }
328 OpenOptions::new().create(true).append(true).open(path)
329 }
330
331 pub fn emit(&self, event: SemanticProgressEvent, fields: SemanticProgressFields) {
334 let Some(mutex) = self.inner.as_ref() else {
335 return;
336 };
337 let elapsed_ms = u64::try_from(self.started.elapsed().as_millis()).unwrap_or(u64::MAX);
338 let rss_mib = read_rss_mib();
339 let record = EventRecord {
340 schema: PROGRESS_JSONL_SCHEMA,
341 event: event.as_str(),
342 phase: event.phase(),
343 sub_phase: event.sub_phase(),
344 ts_ms: now_unix_ms(),
345 elapsed_ms,
346 tier: self.tier.as_str(),
347 embedder_id: self.embedder_id.as_str(),
348 rss_mib,
349 fields: &fields,
350 };
351 let mut line = match serde_json::to_string(&record) {
352 Ok(s) => s,
353 Err(err) => {
354 tracing::debug!(
355 ?err,
356 event = event.as_str(),
357 "skip JSONL emit: serialize failed"
358 );
359 return;
360 }
361 };
362 line.push('\n');
363 let mut guard = match mutex.lock() {
367 Ok(g) => g,
368 Err(poisoned) => poisoned.into_inner(),
369 };
370 if let Err(err) = guard.file.write_all(line.as_bytes()) {
371 if guard.healthy {
372 tracing::warn!(
375 path = %guard.path.display(),
376 error = %err,
377 "CASS_SEMANTIC_PROGRESS_JSONL: write failed after previous successes; continuing without progress JSONL",
378 );
379 guard.healthy = false;
380 } else {
381 tracing::debug!(
382 path = %guard.path.display(),
383 error = %err,
384 "CASS_SEMANTIC_PROGRESS_JSONL: write failed",
385 );
386 }
387 } else {
388 guard.healthy = true;
389 }
394 }
395
396 pub fn emit_bare(&self, event: SemanticProgressEvent) {
398 self.emit(event, SemanticProgressFields::default());
399 }
400
401 pub fn pid(&self) -> u32 {
403 current_pid()
404 }
405}
406
407#[cfg(test)]
408mod tests {
409 use super::*;
410 use std::io::BufRead;
411 use std::sync::Mutex;
412 use tempfile::TempDir;
413
414 static ENV_LOCK: Mutex<()> = Mutex::new(());
417
418 fn read_lines(path: &Path) -> Vec<String> {
419 let f = File::open(path).expect("open jsonl");
420 std::io::BufReader::new(f)
421 .lines()
422 .map_while(Result::ok)
423 .collect()
424 }
425
426 #[test]
427 fn disabled_sink_is_noop() {
428 let sink = SemanticProgressSink::disabled();
429 assert!(!sink.is_active());
430 sink.emit_bare(SemanticProgressEvent::SelectionStart);
431 }
433
434 #[test]
435 fn unset_env_is_noop() {
436 let _guard = ENV_LOCK.lock().unwrap();
437 unsafe {
440 std::env::remove_var(ENV_PROGRESS_JSONL);
441 }
442 let sink = SemanticProgressSink::open("quality", "minilm-384");
443 assert!(!sink.is_active());
444 sink.emit_bare(SemanticProgressEvent::SelectionStart);
445 }
446
447 #[test]
448 fn writes_one_line_per_event() {
449 let _guard = ENV_LOCK.lock().unwrap();
450 let dir = TempDir::new().unwrap();
451 let path = dir.path().join("progress.jsonl");
452 unsafe {
454 std::env::set_var(ENV_PROGRESS_JSONL, &path);
455 }
456 let sink = SemanticProgressSink::open("quality", "minilm-384");
457 assert!(sink.is_active());
458 sink.emit_bare(SemanticProgressEvent::SelectionStart);
459 sink.emit(
460 SemanticProgressEvent::EmbedBatchDone,
461 SemanticProgressFields {
462 batch_index: Some(3),
463 batch_rows: Some(128),
464 rows_processed: Some(384),
465 ..Default::default()
466 },
467 );
468 sink.emit_bare(SemanticProgressEvent::Complete);
469 drop(sink);
470
471 let lines = read_lines(&path);
472 assert_eq!(lines.len(), 3, "expected 3 events; got {:?}", lines);
473 assert!(
474 lines[0].contains("\"event\":\"selection_start\""),
475 "line 0: {}",
476 lines[0]
477 );
478 assert!(
479 lines[1].contains("\"event\":\"embed_batch_done\""),
480 "line 1: {}",
481 lines[1]
482 );
483 assert!(
484 lines[1].contains("\"batch_index\":3"),
485 "line 1: {}",
486 lines[1]
487 );
488 assert!(
489 lines[2].contains("\"event\":\"complete\""),
490 "line 2: {}",
491 lines[2]
492 );
493 unsafe {
495 std::env::remove_var(ENV_PROGRESS_JSONL);
496 }
497 }
498
499 #[test]
500 fn each_event_has_phase_and_sub_phase() {
501 use SemanticProgressEvent::*;
502 let all = [
503 SelectionStart,
504 SelectionDone,
505 PacketReplayStart,
506 PacketReplayProgress,
507 PacketReplayDone,
508 EmbedBatchStart,
509 EmbedBatchDone,
510 StagingWriteStart,
511 StagingWriteDone,
512 CheckpointSaveStart,
513 CheckpointSaveDone,
514 PublishStart,
515 PublishDone,
516 Error,
517 Cancelled,
518 Complete,
519 ];
520 assert_eq!(all.len(), 16);
521 for event in all {
522 assert!(!event.as_str().is_empty(), "{:?}", event);
523 assert!(!event.phase().is_empty(), "{:?}", event);
524 assert!(!event.sub_phase().is_empty(), "{:?}", event);
525 }
526 }
527
528 #[test]
529 fn invalid_env_var_is_safe_noop() {
530 let _guard = ENV_LOCK.lock().unwrap();
531 unsafe {
536 std::env::set_var(ENV_PROGRESS_JSONL, " ");
537 }
538 let sink = SemanticProgressSink::open("quality", "minilm-384");
539 assert!(!sink.is_active());
540 unsafe {
542 std::env::remove_var(ENV_PROGRESS_JSONL);
543 }
544 }
545}