1use crate::core::event::{Event, EventKind, EventSource};
5use anyhow::{Context, Result, anyhow};
6use crc32fast::Hasher;
7use memmap2::Mmap;
8use redb::{Database, ReadableDatabase, ReadableTable, TableDefinition};
9use rkyv::{Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
10use std::fs::{File, OpenOptions};
11use std::io::{Seek, SeekFrom, Write};
12use std::path::Path;
13
14const MAGIC: u64 = 0x4b41495a454e484f;
15const VERSION: u32 = 1;
16const HEADER_LEN: u64 = 12;
17const SESSIONS: TableDefinition<&str, &[u8]> = TableDefinition::new("sessions");
18const SEQ_IDX: TableDefinition<&str, u64> = TableDefinition::new("seq_idx");
19
20#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
21pub struct SessionMeta {
22 pub first_offset: u64,
23 pub last_offset: u64,
24 pub last_seq: u64,
25}
26
27#[derive(Archive, RkyvSerialize, RkyvDeserialize)]
28struct HotEventRecord {
29 session_id: String,
30 seq: u64,
31 ts_ms: u64,
32 ts_exact: bool,
33 kind: String,
34 source: String,
35 tool: Option<String>,
36 tool_call_id: Option<String>,
37 tokens_in: Option<u32>,
38 tokens_out: Option<u32>,
39 reasoning_tokens: Option<u32>,
40 cost_usd_e6: Option<i64>,
41 stop_reason: Option<String>,
42 latency_ms: Option<u32>,
43 ttft_ms: Option<u32>,
44 retry_count: Option<u16>,
45 context_used_tokens: Option<u32>,
46 context_max_tokens: Option<u32>,
47 cache_creation_tokens: Option<u32>,
48 cache_read_tokens: Option<u32>,
49 system_prompt_tokens: Option<u32>,
50 payload_json: String,
51}
52
53pub struct HotLog {
54 file: File,
55 index: Database,
56 pending_index: Vec<(String, u64, u64)>,
57 bytes_since_sync: u64,
58}
59
60impl HotLog {
61 pub fn open(root: &Path) -> Result<Self> {
62 let dir = root.join("hot");
63 std::fs::create_dir_all(&dir)?;
64 let log_path = dir.join("log.bin");
65 let mut file = OpenOptions::new()
66 .create(true)
67 .read(true)
68 .append(true)
69 .open(&log_path)?;
70 ensure_header(&mut file)?;
71 let index = Database::create(dir.join("index.redb"))?;
72 Ok(Self {
73 file,
74 index,
75 pending_index: Vec::new(),
76 bytes_since_sync: 0,
77 })
78 }
79
80 pub fn append(&mut self, event: &Event) -> Result<u64> {
81 let offset = self.file.seek(SeekFrom::End(0))?;
82 let bytes = rkyv::to_bytes::<rkyv::rancor::Error>(&HotEventRecord::from(event))?;
83 let crc = crc32(&bytes);
84 self.file.write_all(&(bytes.len() as u32).to_le_bytes())?;
85 self.file.write_all(&bytes)?;
86 self.file.write_all(&crc.to_le_bytes())?;
87 self.bytes_since_sync += 8 + bytes.len() as u64;
88 self.pending_index
89 .push((event.session_id.clone(), event.seq, offset));
90 if self.bytes_since_sync >= 4096 || self.pending_index.len() >= 128 {
91 self.flush()?;
92 }
93 Ok(offset)
94 }
95
96 pub fn flush(&mut self) -> Result<()> {
97 if !self.pending_index.is_empty() {
98 self.flush_index()?;
99 }
100 if self.bytes_since_sync > 0 {
101 self.file.sync_data()?;
102 self.bytes_since_sync = 0;
103 }
104 Ok(())
105 }
106
107 pub fn replay(root: &Path) -> Result<Vec<(u64, Event)>> {
108 let path = root.join("hot/log.bin");
109 let file =
110 File::open(&path).with_context(|| format!("open hot log: {}", path.display()))?;
111 if file.metadata()?.len() <= HEADER_LEN {
112 return Ok(Vec::new());
113 }
114 let mmap = unsafe { Mmap::map(&file)? };
116 validate_header(&mmap)?;
117 read_records(&mmap)
118 }
119
120 pub fn offset_for(root: &Path, session_id: &str, seq: u64) -> Result<Option<u64>> {
121 let db = Database::create(root.join("hot/index.redb"))?;
122 let tx = db.begin_read()?;
123 let table = tx.open_table(SEQ_IDX)?;
124 Ok(table
125 .get(seq_key(session_id, seq).as_str())?
126 .map(|v| v.value()))
127 }
128
129 fn flush_index(&mut self) -> Result<()> {
130 let tx = self.index.begin_write()?;
131 {
132 let mut sessions = tx.open_table(SESSIONS)?;
133 for (session_id, rows) in pending_by_session(&self.pending_index) {
134 let prior = sessions
135 .get(session_id.as_str())?
136 .map(|v| serde_json::from_slice::<SessionMeta>(v.value()))
137 .transpose()?;
138 let first_offset = prior
139 .map(|m| m.first_offset)
140 .unwrap_or_else(|| rows.first().map(|(_, o)| *o).unwrap_or(0));
141 let (last_seq, last_offset) = rows.last().copied().unwrap_or((0, first_offset));
142 let bytes = serde_json::to_vec(&SessionMeta {
143 first_offset,
144 last_offset,
145 last_seq,
146 })?;
147 sessions.insert(session_id.as_str(), bytes.as_slice())?;
148 }
149 }
150 {
151 let mut seq_idx = tx.open_table(SEQ_IDX)?;
152 for (session_id, seq, offset) in &self.pending_index {
153 seq_idx.insert(seq_key(session_id, *seq).as_str(), *offset)?;
154 }
155 }
156 tx.commit()?;
157 self.pending_index.clear();
158 Ok(())
159 }
160}
161
162impl Drop for HotLog {
163 fn drop(&mut self) {
164 let _ = self.flush();
165 }
166}
167
168fn ensure_header(file: &mut File) -> Result<()> {
169 if file.metadata()?.len() == 0 {
170 file.write_all(&MAGIC.to_le_bytes())?;
171 file.write_all(&VERSION.to_le_bytes())?;
172 file.sync_data()?;
173 }
174 Ok(())
175}
176
177fn validate_header(bytes: &[u8]) -> Result<()> {
178 let magic = u64::from_le_bytes(bytes[0..8].try_into()?);
179 let version = u32::from_le_bytes(bytes[8..12].try_into()?);
180 if magic != MAGIC || version != VERSION {
181 return Err(anyhow!("invalid hot log header"));
182 }
183 Ok(())
184}
185
186fn read_records(bytes: &[u8]) -> Result<Vec<(u64, Event)>> {
187 let mut out = Vec::new();
188 let mut pos = HEADER_LEN as usize;
189 while pos + 8 <= bytes.len() {
190 let offset = pos as u64;
191 let len = u32::from_le_bytes(bytes[pos..pos + 4].try_into()?) as usize;
192 let start = pos + 4;
193 let end = start + len;
194 if end + 4 > bytes.len() {
195 break;
196 }
197 let expected = u32::from_le_bytes(bytes[end..end + 4].try_into()?);
198 if crc32(&bytes[start..end]) != expected {
199 break;
200 }
201 let _ = rkyv::access::<ArchivedHotEventRecord, rkyv::rancor::Error>(&bytes[start..end])?;
202 let rec = rkyv::from_bytes::<HotEventRecord, rkyv::rancor::Error>(&bytes[start..end])?;
203 out.push((offset, rec.into_event()?));
204 pos = end + 4;
205 }
206 Ok(out)
207}
208
209fn crc32(bytes: &[u8]) -> u32 {
210 let mut hasher = Hasher::new();
211 hasher.update(bytes);
212 hasher.finalize()
213}
214
215fn seq_key(session_id: &str, seq: u64) -> String {
216 format!("{session_id}\0{seq:020}")
217}
218
219fn pending_by_session(rows: &[(String, u64, u64)]) -> Vec<(String, Vec<(u64, u64)>)> {
220 let mut grouped = std::collections::BTreeMap::<String, Vec<(u64, u64)>>::new();
221 for (session_id, seq, offset) in rows {
222 grouped
223 .entry(session_id.clone())
224 .or_default()
225 .push((*seq, *offset));
226 }
227 grouped
228 .into_iter()
229 .map(|(session_id, mut rows)| {
230 rows.sort_by_key(|(seq, _)| *seq);
231 (session_id, rows)
232 })
233 .collect()
234}
235
236impl From<&Event> for HotEventRecord {
237 fn from(e: &Event) -> Self {
238 Self {
239 session_id: e.session_id.clone(),
240 seq: e.seq,
241 ts_ms: e.ts_ms,
242 ts_exact: e.ts_exact,
243 kind: format!("{:?}", e.kind),
244 source: format!("{:?}", e.source),
245 tool: e.tool.clone(),
246 tool_call_id: e.tool_call_id.clone(),
247 tokens_in: e.tokens_in,
248 tokens_out: e.tokens_out,
249 reasoning_tokens: e.reasoning_tokens,
250 cost_usd_e6: e.cost_usd_e6,
251 stop_reason: e.stop_reason.clone(),
252 latency_ms: e.latency_ms,
253 ttft_ms: e.ttft_ms,
254 retry_count: e.retry_count,
255 context_used_tokens: e.context_used_tokens,
256 context_max_tokens: e.context_max_tokens,
257 cache_creation_tokens: e.cache_creation_tokens,
258 cache_read_tokens: e.cache_read_tokens,
259 system_prompt_tokens: e.system_prompt_tokens,
260 payload_json: serde_json::to_string(&e.payload).unwrap_or_else(|_| "null".into()),
261 }
262 }
263}
264
265impl HotEventRecord {
266 fn into_event(self) -> Result<Event> {
267 Ok(Event {
268 session_id: self.session_id,
269 seq: self.seq,
270 ts_ms: self.ts_ms,
271 ts_exact: self.ts_exact,
272 kind: kind_from_str(&self.kind),
273 source: source_from_str(&self.source),
274 tool: self.tool,
275 tool_call_id: self.tool_call_id,
276 tokens_in: self.tokens_in,
277 tokens_out: self.tokens_out,
278 reasoning_tokens: self.reasoning_tokens,
279 cost_usd_e6: self.cost_usd_e6,
280 stop_reason: self.stop_reason,
281 latency_ms: self.latency_ms,
282 ttft_ms: self.ttft_ms,
283 retry_count: self.retry_count,
284 context_used_tokens: self.context_used_tokens,
285 context_max_tokens: self.context_max_tokens,
286 cache_creation_tokens: self.cache_creation_tokens,
287 cache_read_tokens: self.cache_read_tokens,
288 system_prompt_tokens: self.system_prompt_tokens,
289 payload: serde_json::from_str(&self.payload_json)?,
290 })
291 }
292}
293
294fn kind_from_str(s: &str) -> EventKind {
295 match s {
296 "ToolCall" => EventKind::ToolCall,
297 "ToolResult" => EventKind::ToolResult,
298 "Message" => EventKind::Message,
299 "Error" => EventKind::Error,
300 "Cost" => EventKind::Cost,
301 "Hook" => EventKind::Hook,
302 "Lifecycle" => EventKind::Lifecycle,
303 _ => EventKind::Message,
304 }
305}
306
307fn source_from_str(s: &str) -> EventSource {
308 match s {
309 "Hook" => EventSource::Hook,
310 "Proxy" => EventSource::Proxy,
311 _ => EventSource::Tail,
312 }
313}
314
315#[cfg(test)]
316mod tests {
317 use super::*;
318 use serde_json::json;
319
320 #[test]
321 fn append_replay_and_lookup() {
322 let dir = tempfile::tempdir().unwrap();
323 let root = dir.path();
324 let event = Event {
325 session_id: "s1".into(),
326 seq: 7,
327 ts_ms: 1_700_000_000_000,
328 ts_exact: true,
329 kind: EventKind::ToolCall,
330 source: EventSource::Tail,
331 tool: Some("bash".into()),
332 tool_call_id: Some("c1".into()),
333 tokens_in: Some(1),
334 tokens_out: Some(2),
335 reasoning_tokens: Some(3),
336 cost_usd_e6: Some(4),
337 stop_reason: None,
338 latency_ms: None,
339 ttft_ms: None,
340 retry_count: None,
341 context_used_tokens: None,
342 context_max_tokens: None,
343 cache_creation_tokens: None,
344 cache_read_tokens: None,
345 system_prompt_tokens: None,
346 payload: json!({"ok": true}),
347 };
348 let mut log = HotLog::open(root).unwrap();
349 let offset = log.append(&event).unwrap();
350 drop(log);
351 assert_eq!(HotLog::offset_for(root, "s1", 7).unwrap(), Some(offset));
352 let rows = HotLog::replay(root).unwrap();
353 assert_eq!(rows.len(), 1);
354 assert_eq!(rows[0].1.session_id, "s1");
355 assert_eq!(rows[0].1.seq, 7);
356 }
357}