use std::collections::HashSet;
use std::collections::VecDeque;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{SystemTime, UNIX_EPOCH};
use anyhow::{ensure, Context, Result};
use serde::Serialize;
use turbovec::IdMapIndex;
use crate::chunks::ChunkStore;
use crate::detect::{AnomalyDetector, DetectionResult};
use crate::index::PingPongIndexer;
use crate::ingest::{Embedder, TemplateCache};
use crate::wal::Wal;
pub struct EngineConfig {
pub dim: usize,
pub bit_width: usize,
pub data_dir: PathBuf,
pub swap_interval_secs: u64,
pub retention_hours: u64,
pub ring_windows: usize,
pub centroids: usize,
pub anomaly_threshold: f32,
pub calibration_templates: usize,
pub shards: usize,
}
impl Default for EngineConfig {
fn default() -> Self {
let shards = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4);
Self {
dim: 384,
bit_width: 4,
data_dir: PathBuf::from("./data"),
swap_interval_secs: 10,
retention_hours: 7 * 24,
ring_windows: 30,
centroids: 32,
anomaly_threshold: 0.5,
calibration_templates: 64,
shards,
}
}
}
#[derive(Serialize)]
pub struct AnomalyReport {
pub score: f32,
pub nearest_incidents: Vec<u64>,
}
#[derive(Serialize)]
pub struct LogReport {
pub id: u64,
pub template_id: u64,
pub timestamp: i64,
pub anomaly: Option<AnomalyReport>,
}
#[derive(Serialize)]
pub struct SearchHit {
pub id: u64,
pub score: f32,
}
#[derive(Serialize)]
pub struct Stats {
pub cache_hits: u64,
pub cache_misses: u64,
pub cache_hit_rate: f64,
pub pending_window_len: usize,
pub ring_windows: usize,
pub ring_vectors: usize,
pub detector_calibrated: bool,
pub detector_threshold: Option<f32>,
pub ingested_total: u64,
}
struct Shard {
wal: Mutex<Wal>,
indexer: PingPongIndexer,
ring: Mutex<VecDeque<Arc<IdMapIndex>>>,
}
pub struct TurboLogEngine {
cfg: EngineConfig,
templates: Mutex<TemplateCache>,
embedders: Vec<Mutex<Embedder>>,
embed_rr: AtomicUsize,
shards: Vec<Shard>,
chunks: ChunkStore,
detector: RwLock<Option<AnomalyDetector>>,
calibration: Mutex<Vec<f32>>,
next_id: AtomicU64,
ingested_total: AtomicU64,
}
fn now_millis() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0)
}
impl TurboLogEngine {
pub fn open(cfg: EngineConfig, embedders: Vec<Embedder>) -> Result<Self> {
ensure!(!embedders.is_empty(), "At least one embedder is required");
ensure!(cfg.shards >= 1, "At least one shard is required");
std::fs::create_dir_all(&cfg.data_dir)?;
let mut shards = Vec::with_capacity(cfg.shards);
let mut max_replayed_id = 0u64;
let mut total_entries = 0usize;
for i in 0..cfg.shards {
let wal_path = cfg.data_dir.join(format!("wal-{i}.bin"));
let sealed_prefix = format!("wal-{i}-sealed-");
let indexer = PingPongIndexer::new(cfg.dim, cfg.bit_width)?;
let leftovers = Wal::sealed_leftovers(&cfg.data_dir, &sealed_prefix)?;
let mut entries: Vec<(u64, Vec<f32>)> = Vec::new();
for file in &leftovers {
entries.extend(Wal::replay(file, cfg.dim)?);
}
entries.extend(Wal::replay(&wal_path, cfg.dim)?);
let mut seen = HashSet::with_capacity(entries.len());
entries.retain(|(id, _)| seen.insert(*id));
for (id, vector) in &entries {
indexer.ingest(*id, vector)?;
max_replayed_id = max_replayed_id.max(*id);
}
total_entries += entries.len();
if !leftovers.is_empty() {
let tmp_path = cfg.data_dir.join(format!("wal-{i}.bin.tmp"));
std::fs::remove_file(&tmp_path).ok();
{
let mut tmp = Wal::open(&tmp_path, cfg.dim)?;
for (id, vector) in &entries {
tmp.append(*id, vector)?;
}
}
std::fs::rename(&tmp_path, &wal_path)
.with_context(|| format!("WAL consolidation failed for shard {i}"))?;
for file in &leftovers {
std::fs::remove_file(file).ok();
}
}
let wal = Mutex::new(Wal::open(&wal_path, cfg.dim)?);
shards.push(Shard {
wal,
indexer,
ring: Mutex::new(VecDeque::new()),
});
}
let next_id = ((now_millis() as u64) << 20).max(max_replayed_id + 1);
Ok(Self {
chunks: ChunkStore::new(cfg.data_dir.join("chunks"))?,
templates: Mutex::new(TemplateCache::new()),
embedders: embedders.into_iter().map(Mutex::new).collect(),
embed_rr: AtomicUsize::new(0),
shards,
detector: RwLock::new(None),
calibration: Mutex::new(Vec::new()),
next_id: AtomicU64::new(next_id),
ingested_total: AtomicU64::new(total_entries as u64),
cfg,
})
}
pub fn config(&self) -> &EngineConfig {
&self.cfg
}
fn with_embedder<T>(&self, f: impl FnOnce(&mut Embedder) -> Result<T>) -> Result<T> {
let slot = self.embed_rr.fetch_add(1, Ordering::Relaxed) % self.embedders.len();
let mut embedder = self.embedders[slot].lock().unwrap();
f(&mut embedder)
}
pub fn ingest_log(&self, line: &str) -> Result<LogReport> {
let t0 = std::time::Instant::now();
let (parsed, cached) = self.templates.lock().unwrap().parse_and_lookup(line);
let (vector, new_template) = match cached {
Some(vector) => (vector, false),
None => {
let vector: Arc<[f32]> = self.with_embedder(|e| e.embed(&parsed.template))?.into();
self.templates
.lock()
.unwrap()
.insert(parsed.template_id, Arc::clone(&vector));
(vector, true)
}
};
if new_template {
self.maybe_calibrate(&vector);
}
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
let shard_idx = (id as usize) % self.shards.len();
let shard = &self.shards[shard_idx];
{
let mut wal = shard.wal.lock().unwrap();
wal.append(id, &vector)?;
shard.indexer.ingest(id, &vector)?;
}
self.ingested_total.fetch_add(1, Ordering::Relaxed);
crate::metrics::inc_ingested(1);
let anomaly = {
let detector = self.detector.read().unwrap();
detector.as_ref().and_then(|d| {
match d.detect(&vector, &shard.indexer.get_search_index()) {
DetectionResult::Normal => None,
DetectionResult::Anomaly {
score,
nearest_incidents,
} => Some(AnomalyReport {
score,
nearest_incidents,
}),
}
})
};
if anomaly.is_some() {
crate::metrics::inc_anomaly();
}
crate::metrics::observe_ingest_seconds(t0.elapsed().as_secs_f64());
Ok(LogReport {
id,
template_id: parsed.template_id,
timestamp: parsed.timestamp,
anomaly,
})
}
pub fn search_text(&self, query: &str, k: usize) -> Result<Vec<SearchHit>> {
ensure!(k > 0, "k must be 1 or greater");
let t0 = std::time::Instant::now();
let vector = self.with_embedder(|e| e.embed(query))?;
let mut hits: Vec<SearchHit> = Vec::new();
for shard in &self.shards {
let windows: Vec<Arc<IdMapIndex>> =
shard.ring.lock().unwrap().iter().cloned().collect();
for window in windows {
if window.is_empty() {
continue;
}
let (scores, ids) = window.search(&vector, k);
hits.extend(
scores
.into_iter()
.zip(ids)
.map(|(score, id)| SearchHit { id, score }),
);
}
}
hits.sort_by(|a, b| b.score.total_cmp(&a.score));
hits.truncate(k);
crate::metrics::observe_search_seconds(t0.elapsed().as_secs_f64());
Ok(hits)
}
pub fn swap_tick(&self) -> Result<bool> {
let mut any_swapped = false;
for (i, shard) in self.shards.iter().enumerate() {
let (sealed, sealed_wal) = {
let mut wal = shard.wal.lock().unwrap();
if shard.indexer.pending_len() == 0 {
continue;
}
let sealed = shard.indexer.seal()?;
let sealed_wal = wal.detach_sealed()?;
(sealed, sealed_wal)
};
sealed.prepare();
let segment = self.chunks.segment_path(now_millis() + i as i64)?;
sealed.write(&segment).with_context(|| {
format!(
"Failed to backup chunk to {} (shard {i})",
segment.display()
)
})?;
std::fs::remove_file(&sealed_wal).ok();
let sealed = Arc::new(sealed);
shard.indexer.publish(Arc::clone(&sealed));
let mut ring = shard.ring.lock().unwrap();
ring.push_front(sealed);
ring.truncate(self.cfg.ring_windows.max(1));
any_swapped = true;
}
Ok(any_swapped)
}
pub fn sweep_chunks(&self) -> Result<usize> {
self.chunks.sweep(self.cfg.retention_hours, now_millis())
}
pub fn stats(&self) -> Stats {
let (cache_hits, cache_misses, cache_hit_rate) = {
let templates = self.templates.lock().unwrap();
(templates.hits(), templates.misses(), templates.hit_rate())
};
let mut pending_window_len = 0usize;
let mut ring_windows = 0usize;
let mut ring_vectors = 0usize;
for shard in &self.shards {
pending_window_len += shard.indexer.pending_len();
let ring = shard.ring.lock().unwrap();
ring_windows += ring.len();
ring_vectors += ring.iter().map(|w| w.len()).sum::<usize>();
}
let detector_threshold = self
.detector
.read()
.unwrap()
.as_ref()
.map(|d| d.threshold());
Stats {
cache_hits,
cache_misses,
cache_hit_rate,
pending_window_len,
ring_windows,
ring_vectors,
detector_calibrated: detector_threshold.is_some(),
detector_threshold,
ingested_total: self.ingested_total.load(Ordering::Relaxed),
}
}
fn maybe_calibrate(&self, vector: &[f32]) {
if self.detector.read().unwrap().is_some() {
return;
}
let mut calibration = self.calibration.lock().unwrap();
calibration.extend_from_slice(vector);
let templates = calibration.len() / self.cfg.dim;
if templates < self.cfg.calibration_templates {
return;
}
let detector = AnomalyDetector::fit_auto(
&calibration,
self.cfg.dim,
self.cfg.centroids,
self.cfg.anomaly_threshold,
);
let mut slot = self.detector.write().unwrap();
if slot.is_none() {
*slot = Some(detector);
calibration.clear();
calibration.shrink_to_fit();
}
}
}