mod commands;
mod handlers;
mod pipeline;
mod types;
pub(crate) use commands::{dispatch, BatchInput};
pub(crate) use pipeline::{execute_pipeline, has_pipe_token};
use std::cell::{Cell, RefCell};
use std::collections::HashSet;
use std::io::{BufRead, Write};
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::OnceLock;
use std::time::{Instant, SystemTime};
use anyhow::Result;
use clap::Parser;
use cqs::embedder::ModelConfig;
use cqs::index::VectorIndex;
use cqs::reference::ReferenceIndex;
use cqs::store::Store;
use cqs::Embedder;
use super::open_project_store_readonly;
const MAX_BATCH_LINE_LEN: usize = 1_048_576;
const IDLE_TIMEOUT_MINUTES: u64 = 5;
pub(crate) struct BatchContext {
store: RefCell<Store>,
embedder: OnceLock<Embedder>,
config: OnceLock<cqs::config::Config>,
reranker: OnceLock<cqs::Reranker>,
audit_state: OnceLock<cqs::audit::AuditMode>,
hnsw: RefCell<Option<std::sync::Arc<dyn VectorIndex>>>,
base_hnsw: RefCell<Option<std::sync::Arc<dyn VectorIndex>>>,
call_graph: RefCell<Option<std::sync::Arc<cqs::store::CallGraph>>>,
test_chunks: RefCell<Option<std::sync::Arc<Vec<cqs::store::ChunkSummary>>>>,
file_set: RefCell<Option<HashSet<PathBuf>>>,
notes_cache: RefCell<Option<Vec<cqs::note::Note>>>,
refs: RefCell<lru::LruCache<String, ReferenceIndex>>,
splade_encoder: OnceLock<Option<cqs::splade::SpladeEncoder>>,
splade_index: RefCell<Option<cqs::splade::index::SpladeIndex>>,
pub root: PathBuf,
pub cqs_dir: PathBuf,
pub model_config: cqs::embedder::ModelConfig,
index_mtime: Cell<Option<SystemTime>>,
error_count: AtomicU64,
last_command_time: Cell<Instant>,
}
impl BatchContext {
pub(crate) fn check_idle_timeout(&self) {
let elapsed = self.last_command_time.get().elapsed();
let timeout = std::time::Duration::from_secs(IDLE_TIMEOUT_MINUTES * 60);
if elapsed >= timeout {
if let Some(emb) = self.embedder.get() {
emb.clear_session();
tracing::info!(
idle_minutes = elapsed.as_secs() / 60,
"Cleared embedder session after idle timeout"
);
}
if let Some(rr) = self.reranker.get() {
rr.clear_session();
tracing::info!(
idle_minutes = elapsed.as_secs() / 60,
"Cleared reranker session after idle timeout"
);
}
if let Some(splade) = self.splade_encoder.get().and_then(|opt| opt.as_ref()) {
splade.clear_session();
tracing::info!(
idle_minutes = elapsed.as_secs() / 60,
"Cleared SPLADE session after idle timeout"
);
}
}
self.last_command_time.set(Instant::now());
}
pub(crate) fn check_index_staleness(&self) {
let index_path = self.cqs_dir.join("index.db");
let current_mtime = match std::fs::metadata(&index_path).and_then(|m| m.modified()) {
Ok(t) => t,
Err(e) => {
tracing::warn!(
error = %e,
path = %index_path.display(),
"Cannot stat index.db for staleness check — caches may remain stale"
);
return;
}
};
let last = self.index_mtime.get();
if last.is_some() && last != Some(current_mtime) {
let _span = tracing::info_span!("batch_index_invalidation").entered();
tracing::info!("index.db mtime changed, invalidating mutable caches");
self.invalidate_mutable_caches();
match Store::open_readonly_pooled(&index_path) {
Ok(new_store) => {
let new_dim = new_store.dim();
if new_dim != self.model_config.dim {
tracing::warn!(
old_dim = self.model_config.dim,
new_dim = new_dim,
"Index dimension changed — queries may return wrong results until batch restart"
);
}
*self.store.borrow_mut() = new_store;
tracing::info!("Store re-opened after index change");
}
Err(e) => {
tracing::warn!(error = %e, "Failed to re-open Store after index change");
}
}
}
self.index_mtime.set(Some(current_mtime));
}
fn invalidate_mutable_caches(&self) {
*self.hnsw.borrow_mut() = None;
*self.base_hnsw.borrow_mut() = None;
*self.call_graph.borrow_mut() = None;
*self.test_chunks.borrow_mut() = None;
*self.file_set.borrow_mut() = None;
*self.notes_cache.borrow_mut() = None;
*self.splade_index.borrow_mut() = None;
self.refs.borrow_mut().clear();
}
pub(crate) fn invalidate(&self) -> Result<()> {
let _span = tracing::info_span!("batch_manual_invalidation").entered();
self.invalidate_mutable_caches();
let index_path = self.cqs_dir.join("index.db");
let new_store = Store::open_readonly_pooled(&index_path)
.map_err(|e| anyhow::anyhow!("Failed to re-open Store: {e}"))?;
*self.store.borrow_mut() = new_store;
if let Ok(mtime) = std::fs::metadata(&index_path).and_then(|m| m.modified()) {
self.index_mtime.set(Some(mtime));
}
tracing::info!("Manual cache invalidation complete");
Ok(())
}
pub(crate) fn dispatch_line(&self, line: &str, out: &mut impl std::io::Write) {
let trimmed = line.trim();
if trimmed.is_empty() {
return;
}
let tokens = match shell_words::split(trimmed) {
Ok(t) => t,
Err(e) => {
let err = serde_json::json!({"error": format!("Parse error: {e}")});
let _ = write_json_line(out, &err);
return;
}
};
if tokens.is_empty() {
return;
}
self.check_idle_timeout();
match commands::BatchInput::try_parse_from(&tokens) {
Ok(input) => match commands::dispatch(self, input.cmd) {
Ok(value) => {
let _ = write_json_line(out, &value);
}
Err(e) => {
let err = serde_json::json!({"error": format!("{e}")});
let _ = write_json_line(out, &err);
}
},
Err(e) => {
let err = serde_json::json!({"error": format!("{e}")});
let _ = write_json_line(out, &err);
}
}
}
pub fn store(&self) -> std::cell::Ref<'_, Store> {
self.check_index_staleness();
self.store.borrow()
}
pub fn warm(&self) {
if self.embedder.get().is_some() {
return;
}
let _span = tracing::info_span!("batch_warm").entered();
match Embedder::new(self.model_config.clone()) {
Ok(e) => {
let _ = self.embedder.set(e);
tracing::info!("Embedder pre-warmed");
}
Err(e) => {
tracing::warn!(error = %e, "Embedder warm failed — will retry on first query");
}
}
}
pub fn embedder(&self) -> Result<&Embedder> {
if let Some(e) = self.embedder.get() {
return Ok(e);
}
let _span = tracing::info_span!("batch_embedder_init").entered();
let e = Embedder::new(self.model_config.clone())?;
let _ = self.embedder.set(e);
Ok(self
.embedder
.get()
.expect("embedder OnceLock populated by set() above"))
}
pub fn splade_encoder(&self) -> Option<&cqs::splade::SpladeEncoder> {
let opt = self.splade_encoder.get_or_init(|| {
let model_dir = cqs::splade::resolve_splade_model_dir()?;
match cqs::splade::SpladeEncoder::new(
&model_dir,
cqs::splade::SpladeEncoder::default_threshold(),
) {
Ok(enc) => Some(enc),
Err(e) => {
tracing::warn!(
path = %model_dir.display(),
error = %e,
"SPLADE encoder unavailable in batch mode"
);
None
}
}
});
opt.as_ref()
}
pub fn ensure_splade_index(&self) {
self.check_index_staleness();
if self.splade_index.borrow().is_some() {
return;
}
let generation = match self.store().splade_generation() {
Ok(g) => g,
Err(e) => {
tracing::warn!(
error = %e,
"Failed to read splade_generation — skipping SPLADE entirely for this \
batch session; search will fall back to dense-only"
);
return;
}
};
let splade_path = self.cqs_dir.join(cqs::splade::index::SPLADE_INDEX_FILENAME);
let store = self.store();
let (idx, rebuilt) = cqs::splade::index::SpladeIndex::load_or_build(
&splade_path,
generation,
|| match store.load_all_sparse_vectors() {
Ok(v) => v,
Err(e) => {
tracing::warn!(
error = %e,
"Failed to load sparse vectors, falling back to cosine-only"
);
Vec::new()
}
},
);
if idx.is_empty() {
return;
}
tracing::info!(
chunks = idx.len(),
tokens = idx.unique_tokens(),
rebuilt,
"SPLADE index ready (batch)"
);
*self.splade_index.borrow_mut() = Some(idx);
}
pub fn borrow_splade_index(
&self,
) -> std::cell::Ref<'_, Option<cqs::splade::index::SpladeIndex>> {
self.splade_index.borrow()
}
pub fn vector_index(&self) -> Result<Option<std::sync::Arc<dyn VectorIndex>>> {
self.check_index_staleness();
{
let cached = self.hnsw.borrow();
if let Some(arc) = cached.as_ref() {
return Ok(Some(std::sync::Arc::clone(arc)));
}
}
let _span = tracing::info_span!("batch_vector_index_init").entered();
let store = self.store.borrow();
let idx = build_vector_index(&store, &self.cqs_dir, self.config().ef_search)?;
let result = idx.map(|boxed| -> std::sync::Arc<dyn VectorIndex> { boxed.into() });
let ret = result.clone();
*self.hnsw.borrow_mut() = result;
Ok(ret)
}
pub fn base_vector_index(&self) -> Result<Option<std::sync::Arc<dyn VectorIndex>>> {
self.check_index_staleness();
{
let cached = self.base_hnsw.borrow();
if let Some(arc) = cached.as_ref() {
return Ok(Some(std::sync::Arc::clone(arc)));
}
}
let _span = tracing::info_span!("batch_base_vector_index_init").entered();
let store = self.store.borrow();
let idx = crate::cli::build_base_vector_index(&store, &self.cqs_dir)?;
let result = idx.map(|boxed| -> std::sync::Arc<dyn VectorIndex> { boxed.into() });
let ret = result.clone();
*self.base_hnsw.borrow_mut() = result;
Ok(ret)
}
pub fn get_ref(&self, name: &str) -> Result<()> {
let _span = tracing::info_span!("batch_get_ref", %name).entered();
let refs = self.refs.borrow();
if refs.contains(name) {
return Ok(());
}
drop(refs);
let config = self.config();
let single: Vec<_> = config
.references
.iter()
.filter(|r| r.name == name)
.cloned()
.collect();
if single.is_empty() {
anyhow::bail!(
"Reference '{}' not found. Run 'cqs ref list' to see available references.",
name
);
}
let loaded = cqs::reference::load_references(&single);
let found = loaded.into_iter().next().ok_or_else(|| {
anyhow::anyhow!(
"Failed to load reference '{}'. Run 'cqs ref update {}' first.",
name,
name
)
})?;
self.refs.borrow_mut().put(name.to_string(), found);
Ok(())
}
pub(super) fn file_set(&self) -> Result<HashSet<PathBuf>> {
self.check_index_staleness();
{
let cached = self.file_set.borrow();
if let Some(fs) = cached.as_ref() {
return Ok(fs.clone());
}
}
let _span = tracing::info_span!("batch_file_set").entered();
let exts: Vec<&str> = cqs::language::REGISTRY.supported_extensions().collect();
let files = cqs::enumerate_files(&self.root, &exts, false)?;
let set: HashSet<PathBuf> = files.into_iter().collect();
let result = set.clone();
*self.file_set.borrow_mut() = Some(set);
Ok(result)
}
pub(super) fn audit_state(&self) -> &cqs::audit::AuditMode {
self.audit_state
.get_or_init(|| cqs::audit::load_audit_state(&self.cqs_dir))
}
pub(super) fn notes(&self) -> Vec<cqs::note::Note> {
self.check_index_staleness();
{
let cached = self.notes_cache.borrow();
if let Some(notes) = cached.as_ref() {
return notes.clone();
}
}
let notes_path = self.root.join("docs/notes.toml");
let notes = if notes_path.exists() {
match cqs::note::parse_notes(¬es_path) {
Ok(notes) => notes,
Err(e) => {
tracing::warn!(error = %e, "Failed to parse notes.toml for batch");
vec![]
}
}
} else {
vec![]
};
let result = notes.clone();
*self.notes_cache.borrow_mut() = Some(notes);
result
}
pub fn borrow_ref(&self, name: &str) -> Option<std::cell::RefMut<'_, ReferenceIndex>> {
let cache = self.refs.borrow_mut();
if cache.contains(name) {
Some(std::cell::RefMut::map(cache, |m| {
m.get_mut(name).expect("checked contains above")
}))
} else {
None
}
}
pub(super) fn call_graph(&self) -> Result<std::sync::Arc<cqs::store::CallGraph>> {
self.check_index_staleness();
{
let cached = self.call_graph.borrow();
if let Some(g) = cached.as_ref() {
return Ok(std::sync::Arc::clone(g));
}
}
let _span = tracing::info_span!("batch_call_graph_init").entered();
let store = self.store.borrow();
let g = store.get_call_graph()?;
let result = std::sync::Arc::clone(&g);
*self.call_graph.borrow_mut() = Some(g);
Ok(result)
}
pub(super) fn test_chunks(&self) -> Result<std::sync::Arc<Vec<cqs::store::ChunkSummary>>> {
self.check_index_staleness();
{
let cached = self.test_chunks.borrow();
if let Some(tc) = cached.as_ref() {
return Ok(std::sync::Arc::clone(tc));
}
}
let _span = tracing::info_span!("batch_test_chunks_init").entered();
let store = self.store.borrow();
let tc = store.find_test_chunks()?;
let result = std::sync::Arc::clone(&tc);
*self.test_chunks.borrow_mut() = Some(tc);
Ok(result)
}
pub(super) fn config(&self) -> &cqs::config::Config {
self.config
.get_or_init(|| cqs::config::Config::load(&self.root))
}
pub(super) fn reranker(&self) -> Result<&cqs::Reranker> {
if let Some(r) = self.reranker.get() {
return Ok(r);
}
let _span = tracing::info_span!("batch_reranker_init").entered();
let r = cqs::Reranker::new().map_err(|e| anyhow::anyhow!("Reranker init failed: {e}"))?;
let _ = self.reranker.set(r);
Ok(self
.reranker
.get()
.expect("reranker OnceLock populated by set() above"))
}
}
fn build_vector_index(
store: &Store,
cqs_dir: &std::path::Path,
ef_search: Option<usize>,
) -> Result<Option<Box<dyn VectorIndex>>> {
crate::cli::build_vector_index_with_config(store, cqs_dir, ef_search)
}
fn sanitize_json_floats(value: &mut serde_json::Value) {
match value {
serde_json::Value::Number(n) => {
if let Some(f) = n.as_f64() {
if f.is_nan() || f.is_infinite() {
*value = serde_json::Value::Null;
}
}
}
serde_json::Value::Array(arr) => {
for item in arr {
sanitize_json_floats(item);
}
}
serde_json::Value::Object(map) => {
for (_k, v) in map.iter_mut() {
sanitize_json_floats(v);
}
}
_ => {}
}
}
fn write_json_line(
out: &mut impl std::io::Write,
value: &serde_json::Value,
) -> std::io::Result<()> {
match serde_json::to_string(value) {
Ok(s) => writeln!(out, "{}", s),
Err(_) => {
let mut sanitized = value.clone();
sanitize_json_floats(&mut sanitized);
match serde_json::to_string(&sanitized) {
Ok(s) => writeln!(out, "{}", s),
Err(e) => {
tracing::warn!(error = %e, "JSON serialization failed after sanitization");
writeln!(out, r#"{{"error":"JSON serialization failed"}}"#)
}
}
}
}
}
pub(crate) fn create_context() -> Result<BatchContext> {
let (store, root, cqs_dir) = open_project_store_readonly()?;
let index_mtime = std::fs::metadata(cqs_dir.join("index.db"))
.and_then(|m| m.modified())
.ok();
if index_mtime.is_none() {
tracing::debug!("Could not read index.db mtime — staleness detection will be skipped until first successful stat");
}
Ok(BatchContext {
store: RefCell::new(store),
embedder: OnceLock::new(),
config: OnceLock::new(),
reranker: OnceLock::new(),
audit_state: OnceLock::new(),
hnsw: RefCell::new(None),
base_hnsw: RefCell::new(None),
call_graph: RefCell::new(None),
test_chunks: RefCell::new(None),
file_set: RefCell::new(None),
notes_cache: RefCell::new(None),
splade_encoder: OnceLock::new(),
splade_index: RefCell::new(None),
refs: RefCell::new(lru::LruCache::new(std::num::NonZeroUsize::new(2).unwrap())),
root,
cqs_dir,
model_config: ModelConfig::resolve(None, None).apply_env_overrides(),
index_mtime: Cell::new(index_mtime),
error_count: AtomicU64::new(0),
last_command_time: Cell::new(Instant::now()),
})
}
#[cfg(test)]
fn create_test_context(cqs_dir: &std::path::Path) -> Result<BatchContext> {
let index_path = cqs_dir.join("index.db");
let store =
Store::open(&index_path).map_err(|e| anyhow::anyhow!("Failed to open test store: {e}"))?;
let root = cqs_dir.parent().unwrap_or(cqs_dir).to_path_buf();
let index_mtime = std::fs::metadata(&index_path)
.and_then(|m| m.modified())
.ok();
Ok(BatchContext {
store: RefCell::new(store),
embedder: OnceLock::new(),
config: OnceLock::new(),
reranker: OnceLock::new(),
audit_state: OnceLock::new(),
hnsw: RefCell::new(None),
base_hnsw: RefCell::new(None),
call_graph: RefCell::new(None),
test_chunks: RefCell::new(None),
file_set: RefCell::new(None),
notes_cache: RefCell::new(None),
splade_encoder: OnceLock::new(),
splade_index: RefCell::new(None),
refs: RefCell::new(lru::LruCache::new(std::num::NonZeroUsize::new(2).unwrap())),
root,
cqs_dir: cqs_dir.to_path_buf(),
model_config: ModelConfig::resolve(None, None).apply_env_overrides(),
index_mtime: Cell::new(index_mtime),
error_count: AtomicU64::new(0),
last_command_time: Cell::new(Instant::now()),
})
}
pub(crate) fn cmd_batch() -> Result<()> {
let _span = tracing::info_span!("cmd_batch").entered();
let ctx = create_context()?;
ctx.warm();
let stdin = std::io::stdin();
let mut stdout = std::io::stdout();
let mut reader = std::io::BufReader::new(stdin.lock());
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line) {
Ok(0) => break, Ok(_) => {}
Err(e) => {
tracing::warn!(error = %e, "Failed to read stdin line");
break;
}
};
if line.len() > MAX_BATCH_LINE_LEN {
ctx.error_count.fetch_add(1, Ordering::Relaxed);
if writeln!(stdout, r#"{{"error":"Line too long (max 1MB)"}}"#).is_err() {
break;
}
let _ = stdout.flush();
continue;
}
let trimmed = line.trim();
if trimmed.is_empty() || trimmed.starts_with('#') {
continue;
}
if trimmed.eq_ignore_ascii_case("quit") || trimmed.eq_ignore_ascii_case("exit") {
break;
}
let tokens = match shell_words::split(trimmed) {
Ok(t) => t,
Err(e) => {
ctx.error_count.fetch_add(1, Ordering::Relaxed);
let error_json = serde_json::json!({"error": format!("Parse error: {}", e)});
match serde_json::to_string(&error_json) {
Ok(s) => {
if writeln!(stdout, "{}", s).is_err() {
break;
}
}
Err(_) => {
if writeln!(
stdout,
r#"{{"error":"Parse error (serialization failed)"}}"#
)
.is_err()
{
break;
}
}
}
let _ = stdout.flush();
continue;
}
};
if tokens.is_empty() {
continue;
}
if tokens.iter().any(|t| t.contains('\0')) {
ctx.error_count.fetch_add(1, Ordering::Relaxed);
let error_json = serde_json::json!({"error": "Input contains null bytes"});
if write_json_line(&mut stdout, &error_json).is_err() {
break;
}
continue;
}
ctx.check_idle_timeout();
if pipeline::has_pipe_token(&tokens) {
let result = pipeline::execute_pipeline(&ctx, &tokens, trimmed);
if write_json_line(&mut stdout, &result).is_err() {
break;
}
} else {
match commands::BatchInput::try_parse_from(&tokens) {
Ok(input) => match commands::dispatch(&ctx, input.cmd) {
Ok(value) => {
if write_json_line(&mut stdout, &value).is_err() {
break;
}
}
Err(e) => {
ctx.error_count.fetch_add(1, Ordering::Relaxed);
let error_json = serde_json::json!({"error": format!("{}", e)});
if write_json_line(&mut stdout, &error_json).is_err() {
break;
}
}
},
Err(e) => {
ctx.error_count.fetch_add(1, Ordering::Relaxed);
let error_json = serde_json::json!({"error": format!("{}", e)});
if write_json_line(&mut stdout, &error_json).is_err() {
break;
}
}
}
}
let _ = stdout.flush();
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use cqs::store::ModelInfo;
use std::thread;
use std::time::Duration;
fn setup_test_store() -> (tempfile::TempDir, PathBuf) {
let dir = tempfile::tempdir().unwrap();
let cqs_dir = dir.path().join(".cqs");
std::fs::create_dir_all(&cqs_dir).unwrap();
let index_path = cqs_dir.join("index.db");
let store = Store::open(&index_path).unwrap();
store.init(&ModelInfo::default()).unwrap();
drop(store);
(dir, cqs_dir)
}
#[test]
fn test_invalidate_clears_mutable_caches() {
let (_dir, cqs_dir) = setup_test_store();
let ctx = create_test_context(&cqs_dir).unwrap();
*ctx.file_set.borrow_mut() = Some(HashSet::new());
*ctx.notes_cache.borrow_mut() = Some(vec![]);
*ctx.call_graph.borrow_mut() = Some(std::sync::Arc::new(
cqs::store::CallGraph::from_string_maps(Default::default(), Default::default()),
));
*ctx.test_chunks.borrow_mut() = Some(std::sync::Arc::new(vec![]));
assert!(ctx.file_set.borrow().is_some());
assert!(ctx.notes_cache.borrow().is_some());
assert!(ctx.call_graph.borrow().is_some());
assert!(ctx.test_chunks.borrow().is_some());
ctx.invalidate().unwrap();
assert!(ctx.file_set.borrow().is_none());
assert!(ctx.notes_cache.borrow().is_none());
assert!(ctx.call_graph.borrow().is_none());
assert!(ctx.test_chunks.borrow().is_none());
assert!(ctx.hnsw.borrow().is_none());
}
#[test]
fn test_mtime_staleness_detection() {
let (_dir, cqs_dir) = setup_test_store();
let ctx = create_test_context(&cqs_dir).unwrap();
*ctx.notes_cache.borrow_mut() = Some(vec![]);
assert!(ctx.notes_cache.borrow().is_some());
ctx.check_index_staleness();
assert!(
ctx.notes_cache.borrow().is_some(),
"First check should not invalidate"
);
thread::sleep(Duration::from_secs(2));
let index_path = cqs_dir.join("index.db");
{
use std::io::Write;
let mut file = std::fs::OpenOptions::new()
.append(true)
.open(&index_path)
.unwrap();
file.write_all(b" ").unwrap();
file.sync_all().unwrap();
}
ctx.check_index_staleness();
assert!(
ctx.notes_cache.borrow().is_none(),
"Mtime change should invalidate cache"
);
}
#[test]
fn test_stable_caches_survive_invalidation() {
let (_dir, cqs_dir) = setup_test_store();
let ctx = create_test_context(&cqs_dir).unwrap();
let _ = ctx.audit_state.set(cqs::audit::AuditMode {
enabled: false,
expires_at: None,
});
ctx.invalidate().unwrap();
assert!(
ctx.audit_state.get().is_some(),
"audit_state should survive invalidation"
);
}
#[test]
fn test_refresh_command_parses() {
let input = commands::BatchInput::try_parse_from(["refresh"]).unwrap();
assert!(matches!(input.cmd, commands::BatchCmd::Refresh));
}
#[test]
fn test_invalidate_alias_parses() {
let input = commands::BatchInput::try_parse_from(["invalidate"]).unwrap();
assert!(matches!(input.cmd, commands::BatchCmd::Refresh));
}
#[test]
fn test_store_accessor_returns_valid_ref() {
let (_dir, cqs_dir) = setup_test_store();
let ctx = create_test_context(&cqs_dir).unwrap();
let store_ref = ctx.store();
let stats = store_ref.stats();
assert!(stats.is_ok(), "Store should be usable via store() accessor");
}
#[test]
fn test_sanitize_json_floats_nan_in_object() {
let mut val = serde_json::json!({
"score": f64::NAN,
"name": "foo",
"nested": {"inner_score": f64::NAN, "ok": 1.5}
});
sanitize_json_floats(&mut val);
assert!(val["score"].is_null(), "NaN should become null");
assert!(val["nested"]["inner_score"].is_null());
assert_eq!(val["nested"]["ok"], 1.5);
assert_eq!(val["name"], "foo");
}
#[test]
fn test_sanitize_json_floats_nan_in_array() {
let mut val = serde_json::json!([1.0, f64::NAN, [f64::INFINITY, 2.0]]);
sanitize_json_floats(&mut val);
assert_eq!(val[0], 1.0);
assert!(val[1].is_null(), "NaN should become null");
assert!(val[2][0].is_null(), "Infinity should become null");
assert_eq!(val[2][1], 2.0);
}
#[test]
fn test_sanitize_json_floats_clean_passthrough() {
let mut val = serde_json::json!({"a": 1, "b": "text", "c": [true, null, 3.14]});
let expected = val.clone();
sanitize_json_floats(&mut val);
assert_eq!(val, expected);
}
#[test]
fn test_write_json_line_clean() {
let val = serde_json::json!({"name": "foo", "score": 0.95});
let mut buf = Vec::new();
write_json_line(&mut buf, &val).unwrap();
let output = String::from_utf8(buf).unwrap();
let parsed: serde_json::Value = serde_json::from_str(output.trim()).unwrap();
assert_eq!(parsed["name"], "foo");
assert_eq!(parsed["score"], 0.95);
}
#[test]
fn test_write_json_line_nan_retry() {
let val = serde_json::json!({"score": f64::NAN, "name": "bar"});
let mut buf = Vec::new();
write_json_line(&mut buf, &val).unwrap();
let output = String::from_utf8(buf).unwrap();
let parsed: serde_json::Value = serde_json::from_str(output.trim()).unwrap();
assert!(parsed["score"].is_null(), "NaN should be sanitized to null");
assert_eq!(parsed["name"], "bar");
}
}