use std::fs::{File, OpenOptions};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use std::time::{Instant, SystemTime, UNIX_EPOCH};
use serde::Serialize;
pub const ENV_PROGRESS_JSONL: &str = "CASS_SEMANTIC_PROGRESS_JSONL";
pub const PROGRESS_JSONL_SCHEMA: &str = "cass.semantic.progress.v1";
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum SemanticProgressEvent {
SelectionStart,
SelectionDone,
PacketReplayStart,
PacketReplayProgress,
PacketReplayDone,
EmbedBatchStart,
EmbedBatchDone,
StagingWriteStart,
StagingWriteDone,
CheckpointSaveStart,
CheckpointSaveDone,
PublishStart,
PublishDone,
Error,
Cancelled,
Complete,
}
impl SemanticProgressEvent {
pub fn as_str(self) -> &'static str {
match self {
Self::SelectionStart => "selection_start",
Self::SelectionDone => "selection_done",
Self::PacketReplayStart => "packet_replay_start",
Self::PacketReplayProgress => "packet_replay_progress",
Self::PacketReplayDone => "packet_replay_done",
Self::EmbedBatchStart => "embed_batch_start",
Self::EmbedBatchDone => "embed_batch_done",
Self::StagingWriteStart => "staging_write_start",
Self::StagingWriteDone => "staging_write_done",
Self::CheckpointSaveStart => "checkpoint_save_start",
Self::CheckpointSaveDone => "checkpoint_save_done",
Self::PublishStart => "publish_start",
Self::PublishDone => "publish_done",
Self::Error => "error",
Self::Cancelled => "cancelled",
Self::Complete => "complete",
}
}
pub fn phase(self) -> &'static str {
match self {
Self::SelectionStart | Self::SelectionDone => "selection",
Self::PacketReplayStart | Self::PacketReplayProgress | Self::PacketReplayDone => {
"packet_replay"
}
Self::EmbedBatchStart | Self::EmbedBatchDone => "embed",
Self::StagingWriteStart | Self::StagingWriteDone => "staging",
Self::CheckpointSaveStart | Self::CheckpointSaveDone => "checkpoint",
Self::PublishStart | Self::PublishDone => "publish",
Self::Error => "error",
Self::Cancelled => "cancelled",
Self::Complete => "complete",
}
}
pub fn sub_phase(self) -> &'static str {
match self {
Self::SelectionStart
| Self::PacketReplayStart
| Self::EmbedBatchStart
| Self::StagingWriteStart
| Self::CheckpointSaveStart
| Self::PublishStart => "start",
Self::SelectionDone
| Self::PacketReplayDone
| Self::EmbedBatchDone
| Self::StagingWriteDone
| Self::CheckpointSaveDone
| Self::PublishDone => "done",
Self::PacketReplayProgress => "progress",
Self::Error => "error",
Self::Cancelled => "cancelled",
Self::Complete => "complete",
}
}
}
#[derive(Debug, Clone, Default, Serialize)]
pub struct SemanticProgressFields {
#[serde(skip_serializing_if = "Option::is_none")]
pub batch_index: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub batch_rows: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub rows_processed: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub rows_total: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_conversation_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_message_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub conversations_in_batch: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub note: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub bytes: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
struct EventRecord<'a> {
schema: &'static str,
event: &'static str,
phase: &'static str,
sub_phase: &'static str,
ts_ms: i64,
elapsed_ms: u64,
tier: &'a str,
embedder_id: &'a str,
#[serde(skip_serializing_if = "Option::is_none")]
rss_mib: Option<u64>,
#[serde(flatten)]
fields: &'a SemanticProgressFields,
}
fn current_pid() -> u32 {
std::process::id()
}
fn now_unix_ms() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.ok()
.and_then(|d| i64::try_from(d.as_millis()).ok())
.unwrap_or(0)
}
fn read_rss_mib() -> Option<u64> {
let bytes = std::fs::read("/proc/self/status").ok()?;
let text = std::str::from_utf8(&bytes).ok()?;
for line in text.lines() {
if let Some(rest) = line.strip_prefix("VmRSS:") {
let mut parts = rest.split_whitespace();
let kb_str = parts.next()?;
let kb: u64 = kb_str.parse().ok()?;
return Some(kb / 1024);
}
}
None
}
fn resolve_path() -> Option<PathBuf> {
let raw = dotenvy::var(ENV_PROGRESS_JSONL).ok()?;
let trimmed = raw.trim();
if trimmed.is_empty() {
return None;
}
Some(PathBuf::from(trimmed))
}
pub struct SemanticProgressSink {
inner: Option<Mutex<SinkInner>>,
tier: String,
embedder_id: String,
started: Instant,
}
struct SinkInner {
file: File,
path: PathBuf,
healthy: bool,
}
impl SemanticProgressSink {
pub fn open(tier: &str, embedder_id: &str) -> Self {
let path = resolve_path();
let inner = match path {
Some(p) => match Self::open_file(&p) {
Ok(file) => Some(Mutex::new(SinkInner {
file,
path: p,
healthy: false,
})),
Err(err) => {
tracing::warn!(
path = %p.display(),
error = %err,
"CASS_SEMANTIC_PROGRESS_JSONL: failed to open sink — continuing without progress JSONL",
);
None
}
},
None => None,
};
Self {
inner,
tier: tier.to_string(),
embedder_id: embedder_id.to_string(),
started: Instant::now(),
}
}
pub fn disabled() -> Self {
Self {
inner: None,
tier: "unknown".to_string(),
embedder_id: "unknown".to_string(),
started: Instant::now(),
}
}
pub fn is_active(&self) -> bool {
self.inner.is_some()
}
fn open_file(path: &Path) -> std::io::Result<File> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
OpenOptions::new().create(true).append(true).open(path)
}
pub fn emit(&self, event: SemanticProgressEvent, fields: SemanticProgressFields) {
let Some(mutex) = self.inner.as_ref() else {
return;
};
let elapsed_ms = u64::try_from(self.started.elapsed().as_millis()).unwrap_or(u64::MAX);
let rss_mib = read_rss_mib();
let record = EventRecord {
schema: PROGRESS_JSONL_SCHEMA,
event: event.as_str(),
phase: event.phase(),
sub_phase: event.sub_phase(),
ts_ms: now_unix_ms(),
elapsed_ms,
tier: self.tier.as_str(),
embedder_id: self.embedder_id.as_str(),
rss_mib,
fields: &fields,
};
let mut line = match serde_json::to_string(&record) {
Ok(s) => s,
Err(err) => {
tracing::debug!(
?err,
event = event.as_str(),
"skip JSONL emit: serialize failed"
);
return;
}
};
line.push('\n');
let mut guard = match mutex.lock() {
Ok(g) => g,
Err(poisoned) => poisoned.into_inner(),
};
if let Err(err) = guard.file.write_all(line.as_bytes()) {
if guard.healthy {
tracing::warn!(
path = %guard.path.display(),
error = %err,
"CASS_SEMANTIC_PROGRESS_JSONL: write failed after previous successes; continuing without progress JSONL",
);
guard.healthy = false;
} else {
tracing::debug!(
path = %guard.path.display(),
error = %err,
"CASS_SEMANTIC_PROGRESS_JSONL: write failed",
);
}
} else {
guard.healthy = true;
}
}
pub fn emit_bare(&self, event: SemanticProgressEvent) {
self.emit(event, SemanticProgressFields::default());
}
pub fn pid(&self) -> u32 {
current_pid()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::BufRead;
use std::sync::Mutex;
use tempfile::TempDir;
static ENV_LOCK: Mutex<()> = Mutex::new(());
fn read_lines(path: &Path) -> Vec<String> {
let f = File::open(path).expect("open jsonl");
std::io::BufReader::new(f)
.lines()
.map_while(Result::ok)
.collect()
}
#[test]
fn disabled_sink_is_noop() {
let sink = SemanticProgressSink::disabled();
assert!(!sink.is_active());
sink.emit_bare(SemanticProgressEvent::SelectionStart);
}
#[test]
fn unset_env_is_noop() {
let _guard = ENV_LOCK.lock().unwrap();
unsafe {
std::env::remove_var(ENV_PROGRESS_JSONL);
}
let sink = SemanticProgressSink::open("quality", "minilm-384");
assert!(!sink.is_active());
sink.emit_bare(SemanticProgressEvent::SelectionStart);
}
#[test]
fn writes_one_line_per_event() {
let _guard = ENV_LOCK.lock().unwrap();
let dir = TempDir::new().unwrap();
let path = dir.path().join("progress.jsonl");
unsafe {
std::env::set_var(ENV_PROGRESS_JSONL, &path);
}
let sink = SemanticProgressSink::open("quality", "minilm-384");
assert!(sink.is_active());
sink.emit_bare(SemanticProgressEvent::SelectionStart);
sink.emit(
SemanticProgressEvent::EmbedBatchDone,
SemanticProgressFields {
batch_index: Some(3),
batch_rows: Some(128),
rows_processed: Some(384),
..Default::default()
},
);
sink.emit_bare(SemanticProgressEvent::Complete);
drop(sink);
let lines = read_lines(&path);
assert_eq!(lines.len(), 3, "expected 3 events; got {:?}", lines);
assert!(
lines[0].contains("\"event\":\"selection_start\""),
"line 0: {}",
lines[0]
);
assert!(
lines[1].contains("\"event\":\"embed_batch_done\""),
"line 1: {}",
lines[1]
);
assert!(
lines[1].contains("\"batch_index\":3"),
"line 1: {}",
lines[1]
);
assert!(
lines[2].contains("\"event\":\"complete\""),
"line 2: {}",
lines[2]
);
unsafe {
std::env::remove_var(ENV_PROGRESS_JSONL);
}
}
#[test]
fn each_event_has_phase_and_sub_phase() {
use SemanticProgressEvent::*;
let all = [
SelectionStart,
SelectionDone,
PacketReplayStart,
PacketReplayProgress,
PacketReplayDone,
EmbedBatchStart,
EmbedBatchDone,
StagingWriteStart,
StagingWriteDone,
CheckpointSaveStart,
CheckpointSaveDone,
PublishStart,
PublishDone,
Error,
Cancelled,
Complete,
];
assert_eq!(all.len(), 16);
for event in all {
assert!(!event.as_str().is_empty(), "{:?}", event);
assert!(!event.phase().is_empty(), "{:?}", event);
assert!(!event.sub_phase().is_empty(), "{:?}", event);
}
}
#[test]
fn invalid_env_var_is_safe_noop() {
let _guard = ENV_LOCK.lock().unwrap();
unsafe {
std::env::set_var(ENV_PROGRESS_JSONL, " ");
}
let sink = SemanticProgressSink::open("quality", "minilm-384");
assert!(!sink.is_active());
unsafe {
std::env::remove_var(ENV_PROGRESS_JSONL);
}
}
}