use std::collections::HashMap;
use std::fs::{self, File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::PathBuf;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ReviewStatus {
pub status: String,
pub llm_reviewed: bool,
pub llm_model: Option<String>,
pub llm_result: Option<serde_json::Value>,
pub applied: bool,
pub phrases_added: usize,
pub version_before: u64,
pub version_after: Option<u64>,
pub summary: Option<String>,
}
impl ReviewStatus {
pub fn pending() -> Self {
Self {
status: "pending".to_string(),
llm_reviewed: false,
llm_model: None,
llm_result: None,
applied: false,
phrases_added: 0,
version_before: 0,
version_after: None,
summary: None,
}
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct LogRecord {
pub id: u64,
pub query: String,
pub app_id: String,
pub detected_intents: Vec<String>,
pub confidence: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub session_id: Option<String>,
pub timestamp_ms: u64,
pub router_version: u64,
pub source: String,
}
pub struct LogQuery {
pub app_id: Option<String>,
pub resolved: Option<bool>,
pub since_ms: Option<u64>,
pub limit: usize,
pub offset: usize,
}
impl Default for LogQuery {
fn default() -> Self {
Self {
app_id: None,
resolved: Some(false),
since_ms: None,
limit: 50,
offset: 0,
}
}
}
pub struct LogQueryResult {
pub total: usize, pub records: Vec<LogRecord>,
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
struct LogMeta {
offset: u64,
payload_len: u32,
id: u64,
timestamp_ms: u64,
confidence: String,
alive: bool,
cached: Option<Vec<u8>>,
}
struct AppLog {
file: Option<File>,
size: u64,
index: Vec<LogMeta>,
next_id: u64,
review_status: HashMap<u64, ReviewStatus>,
}
impl AppLog {
fn in_memory() -> Self {
Self {
file: None,
size: 0,
index: Vec::new(),
next_id: 0,
review_status: HashMap::new(),
}
}
fn open(path: &PathBuf) -> std::io::Result<Self> {
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(path)?;
let mut index = Vec::new();
let mut offset = 0u64;
let mut next_id = 0u64;
loop {
let mut header = [0u8; 5];
match file.read_exact(&mut header) {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
Err(e) => return Err(e),
}
let alive = header[0] == 1;
let payload_len = u32::from_le_bytes([header[1], header[2], header[3], header[4]]);
let mut payload = vec![0u8; payload_len as usize];
if let Err(e) = file.read_exact(&mut payload) {
eprintln!("[log_store] truncated record at offset {}: {}", offset, e);
break;
}
if let Ok(record) = serde_json::from_slice::<LogRecord>(&payload) {
next_id = next_id.max(record.id + 1);
index.push(LogMeta {
offset,
payload_len,
id: record.id,
timestamp_ms: record.timestamp_ms,
confidence: record.confidence,
alive,
cached: None,
});
}
offset += 5 + payload_len as u64;
}
Ok(Self {
file: Some(file),
size: offset,
index,
next_id,
review_status: HashMap::new(),
})
}
}
pub struct LogStore {
data_dir: Option<PathBuf>,
apps: HashMap<String, AppLog>,
}
impl LogStore {
pub fn new(data_dir: Option<&str>) -> Self {
let data_dir = data_dir.map(|d| {
let p = PathBuf::from(d).join("_logs");
let _ = fs::create_dir_all(&p);
p
});
let mut store = Self {
data_dir,
apps: HashMap::new(),
};
store.scan_existing();
store
}
fn scan_existing(&mut self) {
let dir = match self.data_dir.clone() {
Some(d) => d,
None => return,
};
let Ok(entries) = fs::read_dir(&dir) else {
return;
};
for entry in entries.flatten() {
let path = entry.path();
if path.extension().map(|e| e == "bin").unwrap_or(false) {
if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
match AppLog::open(&path) {
Ok(app_log) => {
let count = app_log.index.len();
eprintln!("[log_store] loaded app={} records={}", stem, count);
self.apps.insert(stem.to_string(), app_log);
}
Err(e) => eprintln!("[log_store] error loading {}: {}", stem, e),
}
}
}
}
}
fn get_or_create(&mut self, app_id: &str) -> &mut AppLog {
if self.apps.contains_key(app_id) {
return self.apps.get_mut(app_id).unwrap();
}
let app_log = match self
.data_dir
.as_ref()
.map(|d| d.join(format!("{}.bin", app_id)))
{
Some(path) => AppLog::open(&path).unwrap_or_else(|e| {
eprintln!("[log_store] cannot open {}: {}", path.display(), e);
AppLog::in_memory()
}),
None => AppLog::in_memory(),
};
self.apps.insert(app_id.to_string(), app_log);
self.apps.get_mut(app_id).unwrap()
}
pub fn append(&mut self, mut record: LogRecord) -> u64 {
let app_id = record.app_id.clone();
let al = self.get_or_create(&app_id);
record.id = al.next_id;
al.next_id += 1;
let payload = serde_json::to_vec(&record).unwrap_or_default();
let payload_len = payload.len() as u32;
let offset = al.size;
if let Some(ref mut file) = al.file {
let mut header = [1u8; 5]; header[1..5].copy_from_slice(&payload_len.to_le_bytes());
let _ = file.seek(SeekFrom::Start(offset));
let _ = file.write_all(&header);
let _ = file.write_all(&payload);
let _ = file.flush();
}
al.size += 5 + payload_len as u64;
al.index.push(LogMeta {
offset,
payload_len,
id: record.id,
timestamp_ms: record.timestamp_ms,
confidence: record.confidence.clone(),
alive: true,
cached: if al.file.is_none() {
Some(payload)
} else {
None
},
});
record.id
}
pub fn resolve(&mut self, app_id: &str, id: u64) -> bool {
let al = match self.apps.get_mut(app_id) {
Some(a) => a,
None => return false,
};
let meta = match al.index.iter_mut().find(|m| m.id == id && m.alive) {
Some(m) => m,
None => return false,
};
let offset = meta.offset;
let size = al.size;
if let Some(ref mut file) = al.file {
let _ = file.seek(SeekFrom::Start(offset));
let _ = file.write_all(&[0u8]); let _ = file.seek(SeekFrom::Start(size)); }
meta.alive = false;
true
}
pub fn query(&mut self, q: &LogQuery) -> LogQueryResult {
let app_ids: Vec<String> = match &q.app_id {
Some(id) => vec![id.clone()],
None => self.apps.keys().cloned().collect(),
};
struct Candidate {
app_id: String,
offset: u64,
payload_len: u32,
timestamp_ms: u64,
cached: Option<Vec<u8>>,
}
let mut candidates: Vec<Candidate> = Vec::new();
for app_id in &app_ids {
let Some(al) = self.apps.get(app_id) else {
continue;
};
for meta in &al.index {
if !Self::matches(meta, q) {
continue;
}
candidates.push(Candidate {
app_id: app_id.clone(),
offset: meta.offset,
payload_len: meta.payload_len,
timestamp_ms: meta.timestamp_ms,
cached: meta.cached.clone(),
});
}
}
candidates.sort_by(|a, b| b.timestamp_ms.cmp(&a.timestamp_ms));
let total = candidates.len();
let mut records = Vec::new();
for c in candidates.into_iter().skip(q.offset).take(q.limit) {
let record = if let Some(cached) = c.cached {
serde_json::from_slice(&cached).ok()
} else {
self.read_at(&c.app_id, c.offset, c.payload_len)
};
if let Some(r) = record {
records.push(r);
}
}
LogQueryResult { total, records }
}
fn matches(meta: &LogMeta, q: &LogQuery) -> bool {
if let Some(resolved) = q.resolved {
if meta.alive == resolved {
return false;
}
}
if let Some(since) = q.since_ms {
if meta.timestamp_ms < since {
return false;
}
}
true
}
fn read_at(&mut self, app_id: &str, offset: u64, payload_len: u32) -> Option<LogRecord> {
let al = self.apps.get_mut(app_id)?;
let file = al.file.as_mut()?;
file.seek(SeekFrom::Start(offset + 5)).ok()?; let mut buf = vec![0u8; payload_len as usize];
file.read_exact(&mut buf).ok()?;
serde_json::from_slice(&buf).ok()
}
pub fn count_alive(&self, app_id: &str) -> usize {
self.apps
.get(app_id)
.map(|al| al.index.iter().filter(|m| m.alive).count())
.unwrap_or(0)
}
pub fn count_total(&self, app_id: &str) -> usize {
self.apps.get(app_id).map(|al| al.index.len()).unwrap_or(0)
}
pub fn get_record(&mut self, app_id: &str, id: u64) -> Option<LogRecord> {
let al = self.apps.get(app_id)?;
let meta = al.index.iter().find(|m| m.id == id)?;
if let Some(ref cached) = meta.cached {
return serde_json::from_slice(cached).ok();
}
let offset = meta.offset;
let payload_len = meta.payload_len;
self.read_at(app_id, offset, payload_len)
}
pub fn set_review_status(&mut self, app_id: &str, id: u64, status: ReviewStatus) {
if let Some(al) = self.apps.get_mut(app_id) {
al.review_status.insert(id, status);
}
}
pub fn pending_worker_ids(&self, app_id_filter: Option<&str>) -> Vec<(String, u64)> {
let mut pending = Vec::new();
for (app_id, al) in &self.apps {
if let Some(filter) = app_id_filter {
if app_id != filter {
continue;
}
}
for meta in &al.index {
if !meta.alive {
continue;
}
let already_done = al
.review_status
.get(&meta.id)
.map(|s| {
s.status == "done" || s.status == "escalated" || s.status == "processing"
})
.unwrap_or(false);
if already_done {
continue;
}
pending.push((app_id.clone(), meta.id));
}
}
pending
}
pub fn drop_app(&mut self, app_id: &str) {
self.apps.remove(app_id);
if let Some(ref dir) = self.data_dir {
let _ = fs::remove_file(dir.join(format!("{}.bin", app_id)));
}
}
pub fn clear_all(&mut self) {
if let Some(ref dir) = self.data_dir.clone() {
if let Ok(entries) = fs::read_dir(dir) {
for entry in entries.flatten() {
let p = entry.path();
if p.extension().map(|e| e == "bin").unwrap_or(false) {
let _ = fs::remove_file(&p);
}
}
}
}
self.apps.clear();
}
pub fn stats(&self) -> Vec<serde_json::Value> {
self.apps
.iter()
.map(|(app_id, al)| {
let total = al.index.len();
let alive = al.index.iter().filter(|m| m.alive).count();
serde_json::json!({
"app_id": app_id,
"total": total,
"unresolved": alive,
"size_bytes": al.size,
})
})
.collect()
}
}