Skip to main content

kaizen/store/
hot_log.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! HOT append log: mmap replay, rkyv records, redb offset indexes.
3
4use crate::core::event::{Event, EventKind, EventSource};
5use anyhow::{Context, Result, anyhow};
6use crc32fast::Hasher;
7use memmap2::Mmap;
8use redb::{Database, ReadableDatabase, ReadableTable, TableDefinition};
9use rkyv::{Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
10use std::fs::{File, OpenOptions};
11use std::io::{Seek, SeekFrom, Write};
12use std::path::Path;
13
14const MAGIC: u64 = 0x4b41495a454e484f;
15const VERSION: u32 = 1;
16const HEADER_LEN: u64 = 12;
17const SESSIONS: TableDefinition<&str, &[u8]> = TableDefinition::new("sessions");
18const SEQ_IDX: TableDefinition<&str, u64> = TableDefinition::new("seq_idx");
19
20#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
21pub struct SessionMeta {
22    pub first_offset: u64,
23    pub last_offset: u64,
24    pub last_seq: u64,
25}
26
27#[derive(Archive, RkyvSerialize, RkyvDeserialize)]
28struct HotEventRecord {
29    session_id: String,
30    seq: u64,
31    ts_ms: u64,
32    ts_exact: bool,
33    kind: String,
34    source: String,
35    tool: Option<String>,
36    tool_call_id: Option<String>,
37    tokens_in: Option<u32>,
38    tokens_out: Option<u32>,
39    reasoning_tokens: Option<u32>,
40    cost_usd_e6: Option<i64>,
41    stop_reason: Option<String>,
42    latency_ms: Option<u32>,
43    ttft_ms: Option<u32>,
44    retry_count: Option<u16>,
45    context_used_tokens: Option<u32>,
46    context_max_tokens: Option<u32>,
47    cache_creation_tokens: Option<u32>,
48    cache_read_tokens: Option<u32>,
49    system_prompt_tokens: Option<u32>,
50    payload_json: String,
51}
52
53pub struct HotLog {
54    file: File,
55    index: Database,
56    pending_index: Vec<(String, u64, u64)>,
57    bytes_since_sync: u64,
58}
59
60impl HotLog {
61    pub fn open(root: &Path) -> Result<Self> {
62        let dir = root.join("hot");
63        std::fs::create_dir_all(&dir)?;
64        let log_path = dir.join("log.bin");
65        let mut file = OpenOptions::new()
66            .create(true)
67            .read(true)
68            .append(true)
69            .open(&log_path)?;
70        ensure_header(&mut file)?;
71        let index = Database::create(dir.join("index.redb"))?;
72        Ok(Self {
73            file,
74            index,
75            pending_index: Vec::new(),
76            bytes_since_sync: 0,
77        })
78    }
79
80    pub fn append(&mut self, event: &Event) -> Result<u64> {
81        let offset = self.file.seek(SeekFrom::End(0))?;
82        let bytes = rkyv::to_bytes::<rkyv::rancor::Error>(&HotEventRecord::from(event))?;
83        let crc = crc32(&bytes);
84        self.file.write_all(&(bytes.len() as u32).to_le_bytes())?;
85        self.file.write_all(&bytes)?;
86        self.file.write_all(&crc.to_le_bytes())?;
87        self.bytes_since_sync += 8 + bytes.len() as u64;
88        self.pending_index
89            .push((event.session_id.clone(), event.seq, offset));
90        if self.bytes_since_sync >= 4096 || self.pending_index.len() >= 128 {
91            self.flush()?;
92        }
93        Ok(offset)
94    }
95
96    pub fn flush(&mut self) -> Result<()> {
97        if !self.pending_index.is_empty() {
98            self.flush_index()?;
99        }
100        if self.bytes_since_sync > 0 {
101            self.file.sync_data()?;
102            self.bytes_since_sync = 0;
103        }
104        Ok(())
105    }
106
107    pub fn replay(root: &Path) -> Result<Vec<(u64, Event)>> {
108        let path = root.join("hot/log.bin");
109        let file =
110            File::open(&path).with_context(|| format!("open hot log: {}", path.display()))?;
111        if file.metadata()?.len() <= HEADER_LEN {
112            return Ok(Vec::new());
113        }
114        // SAFETY: read-only map over stable file handle; records validated by len + CRC.
115        let mmap = unsafe { Mmap::map(&file)? };
116        validate_header(&mmap)?;
117        read_records(&mmap)
118    }
119
120    pub fn offset_for(root: &Path, session_id: &str, seq: u64) -> Result<Option<u64>> {
121        let db = Database::create(root.join("hot/index.redb"))?;
122        let tx = db.begin_read()?;
123        let table = tx.open_table(SEQ_IDX)?;
124        Ok(table
125            .get(seq_key(session_id, seq).as_str())?
126            .map(|v| v.value()))
127    }
128
129    fn flush_index(&mut self) -> Result<()> {
130        let tx = self.index.begin_write()?;
131        {
132            let mut sessions = tx.open_table(SESSIONS)?;
133            for (session_id, rows) in pending_by_session(&self.pending_index) {
134                let prior = sessions
135                    .get(session_id.as_str())?
136                    .map(|v| serde_json::from_slice::<SessionMeta>(v.value()))
137                    .transpose()?;
138                let first_offset = prior
139                    .map(|m| m.first_offset)
140                    .unwrap_or_else(|| rows.first().map(|(_, o)| *o).unwrap_or(0));
141                let (last_seq, last_offset) = rows.last().copied().unwrap_or((0, first_offset));
142                let bytes = serde_json::to_vec(&SessionMeta {
143                    first_offset,
144                    last_offset,
145                    last_seq,
146                })?;
147                sessions.insert(session_id.as_str(), bytes.as_slice())?;
148            }
149        }
150        {
151            let mut seq_idx = tx.open_table(SEQ_IDX)?;
152            for (session_id, seq, offset) in &self.pending_index {
153                seq_idx.insert(seq_key(session_id, *seq).as_str(), *offset)?;
154            }
155        }
156        tx.commit()?;
157        self.pending_index.clear();
158        Ok(())
159    }
160}
161
162impl Drop for HotLog {
163    fn drop(&mut self) {
164        let _ = self.flush();
165    }
166}
167
168fn ensure_header(file: &mut File) -> Result<()> {
169    if file.metadata()?.len() == 0 {
170        file.write_all(&MAGIC.to_le_bytes())?;
171        file.write_all(&VERSION.to_le_bytes())?;
172        file.sync_data()?;
173    }
174    Ok(())
175}
176
177fn validate_header(bytes: &[u8]) -> Result<()> {
178    let magic = u64::from_le_bytes(bytes[0..8].try_into()?);
179    let version = u32::from_le_bytes(bytes[8..12].try_into()?);
180    if magic != MAGIC || version != VERSION {
181        return Err(anyhow!("invalid hot log header"));
182    }
183    Ok(())
184}
185
186fn read_records(bytes: &[u8]) -> Result<Vec<(u64, Event)>> {
187    let mut out = Vec::new();
188    let mut pos = HEADER_LEN as usize;
189    while pos + 8 <= bytes.len() {
190        let offset = pos as u64;
191        let len = u32::from_le_bytes(bytes[pos..pos + 4].try_into()?) as usize;
192        let start = pos + 4;
193        let end = start + len;
194        if end + 4 > bytes.len() {
195            break;
196        }
197        let expected = u32::from_le_bytes(bytes[end..end + 4].try_into()?);
198        if crc32(&bytes[start..end]) != expected {
199            break;
200        }
201        let _ = rkyv::access::<ArchivedHotEventRecord, rkyv::rancor::Error>(&bytes[start..end])?;
202        let rec = rkyv::from_bytes::<HotEventRecord, rkyv::rancor::Error>(&bytes[start..end])?;
203        out.push((offset, rec.into_event()?));
204        pos = end + 4;
205    }
206    Ok(out)
207}
208
209fn crc32(bytes: &[u8]) -> u32 {
210    let mut hasher = Hasher::new();
211    hasher.update(bytes);
212    hasher.finalize()
213}
214
215fn seq_key(session_id: &str, seq: u64) -> String {
216    format!("{session_id}\0{seq:020}")
217}
218
219fn pending_by_session(rows: &[(String, u64, u64)]) -> Vec<(String, Vec<(u64, u64)>)> {
220    let mut grouped = std::collections::BTreeMap::<String, Vec<(u64, u64)>>::new();
221    for (session_id, seq, offset) in rows {
222        grouped
223            .entry(session_id.clone())
224            .or_default()
225            .push((*seq, *offset));
226    }
227    grouped
228        .into_iter()
229        .map(|(session_id, mut rows)| {
230            rows.sort_by_key(|(seq, _)| *seq);
231            (session_id, rows)
232        })
233        .collect()
234}
235
236impl From<&Event> for HotEventRecord {
237    fn from(e: &Event) -> Self {
238        Self {
239            session_id: e.session_id.clone(),
240            seq: e.seq,
241            ts_ms: e.ts_ms,
242            ts_exact: e.ts_exact,
243            kind: format!("{:?}", e.kind),
244            source: format!("{:?}", e.source),
245            tool: e.tool.clone(),
246            tool_call_id: e.tool_call_id.clone(),
247            tokens_in: e.tokens_in,
248            tokens_out: e.tokens_out,
249            reasoning_tokens: e.reasoning_tokens,
250            cost_usd_e6: e.cost_usd_e6,
251            stop_reason: e.stop_reason.clone(),
252            latency_ms: e.latency_ms,
253            ttft_ms: e.ttft_ms,
254            retry_count: e.retry_count,
255            context_used_tokens: e.context_used_tokens,
256            context_max_tokens: e.context_max_tokens,
257            cache_creation_tokens: e.cache_creation_tokens,
258            cache_read_tokens: e.cache_read_tokens,
259            system_prompt_tokens: e.system_prompt_tokens,
260            payload_json: serde_json::to_string(&e.payload).unwrap_or_else(|_| "null".into()),
261        }
262    }
263}
264
265impl HotEventRecord {
266    fn into_event(self) -> Result<Event> {
267        Ok(Event {
268            session_id: self.session_id,
269            seq: self.seq,
270            ts_ms: self.ts_ms,
271            ts_exact: self.ts_exact,
272            kind: kind_from_str(&self.kind),
273            source: source_from_str(&self.source),
274            tool: self.tool,
275            tool_call_id: self.tool_call_id,
276            tokens_in: self.tokens_in,
277            tokens_out: self.tokens_out,
278            reasoning_tokens: self.reasoning_tokens,
279            cost_usd_e6: self.cost_usd_e6,
280            stop_reason: self.stop_reason,
281            latency_ms: self.latency_ms,
282            ttft_ms: self.ttft_ms,
283            retry_count: self.retry_count,
284            context_used_tokens: self.context_used_tokens,
285            context_max_tokens: self.context_max_tokens,
286            cache_creation_tokens: self.cache_creation_tokens,
287            cache_read_tokens: self.cache_read_tokens,
288            system_prompt_tokens: self.system_prompt_tokens,
289            payload: serde_json::from_str(&self.payload_json)?,
290        })
291    }
292}
293
294fn kind_from_str(s: &str) -> EventKind {
295    match s {
296        "ToolCall" => EventKind::ToolCall,
297        "ToolResult" => EventKind::ToolResult,
298        "Message" => EventKind::Message,
299        "Error" => EventKind::Error,
300        "Cost" => EventKind::Cost,
301        "Hook" => EventKind::Hook,
302        "Lifecycle" => EventKind::Lifecycle,
303        _ => EventKind::Message,
304    }
305}
306
307fn source_from_str(s: &str) -> EventSource {
308    match s {
309        "Hook" => EventSource::Hook,
310        "Proxy" => EventSource::Proxy,
311        _ => EventSource::Tail,
312    }
313}
314
315#[cfg(test)]
316mod tests {
317    use super::*;
318    use serde_json::json;
319
320    #[test]
321    fn append_replay_and_lookup() {
322        let dir = tempfile::tempdir().unwrap();
323        let root = dir.path();
324        let event = Event {
325            session_id: "s1".into(),
326            seq: 7,
327            ts_ms: 1_700_000_000_000,
328            ts_exact: true,
329            kind: EventKind::ToolCall,
330            source: EventSource::Tail,
331            tool: Some("bash".into()),
332            tool_call_id: Some("c1".into()),
333            tokens_in: Some(1),
334            tokens_out: Some(2),
335            reasoning_tokens: Some(3),
336            cost_usd_e6: Some(4),
337            stop_reason: None,
338            latency_ms: None,
339            ttft_ms: None,
340            retry_count: None,
341            context_used_tokens: None,
342            context_max_tokens: None,
343            cache_creation_tokens: None,
344            cache_read_tokens: None,
345            system_prompt_tokens: None,
346            payload: json!({"ok": true}),
347        };
348        let mut log = HotLog::open(root).unwrap();
349        let offset = log.append(&event).unwrap();
350        drop(log);
351        assert_eq!(HotLog::offset_for(root, "s1", 7).unwrap(), Some(offset));
352        let rows = HotLog::replay(root).unwrap();
353        assert_eq!(rows.len(), 1);
354        assert_eq!(rows[0].1.session_id, "s1");
355        assert_eq!(rows[0].1.seq, 7);
356    }
357}