aico/historystore/
store.rs

1use crate::exceptions::AicoError;
2use crate::models::HistoryRecord;
3use std::collections::HashMap;
4use std::fs::{self, OpenOptions};
5use std::io::{BufRead, BufReader, BufWriter, Write};
6use std::path::PathBuf;
7
8pub const SHARD_SIZE: usize = 10_000;
9
10#[derive(Debug)]
11pub struct HistoryStore {
12    root: PathBuf,
13    shard_size: usize,
14    state: Option<StoreState>,
15}
16
17#[derive(Debug, Clone, Copy, Default)]
18struct StoreState {
19    last_base: usize,
20    count: usize,
21}
22
23impl HistoryStore {
24    pub fn new(root: PathBuf) -> Self {
25        Self {
26            root,
27            shard_size: SHARD_SIZE,
28            state: None,
29        }
30    }
31
32    /// For testing, allow overriding shard size
33    pub fn new_with_shard_size(root: PathBuf, shard_size: usize) -> Self {
34        Self {
35            root,
36            shard_size,
37            state: None,
38        }
39    }
40
41    /// Appends a record and returns its global index.
42    pub fn append(&mut self, record: &HistoryRecord) -> Result<usize, AicoError> {
43        if self.state.is_none() {
44            self.refresh_state()?;
45        }
46
47        let (index, last_base) = {
48            let state = self.state.get_or_insert_default();
49
50            if state.count >= self.shard_size {
51                state.last_base += self.shard_size;
52                state.count = 0;
53            }
54
55            (state.last_base + state.count, state.last_base)
56        };
57
58        let shard_path = self.shard_path(last_base);
59
60        if let Some(parent) = shard_path.parent() {
61            fs::create_dir_all(parent)?;
62        }
63
64        let mut options = OpenOptions::new();
65        options.create(true).append(true);
66
67        #[cfg(unix)]
68        {
69            use std::os::unix::fs::OpenOptionsExt;
70            options.mode(0o600);
71        }
72
73        let file = options.open(&shard_path)?;
74        let mut writer = BufWriter::new(file);
75
76        serde_json::to_writer(&mut writer, record)?;
77        writeln!(writer)?;
78        writer.flush()?;
79
80        if let Some(state) = self.state.as_mut() {
81            state.count += 1;
82        }
83
84        Ok(index)
85    }
86
87    /// Returns a lazy iterator that yields records in disk order (by global ID).
88    pub fn stream_many<'a>(&'a self, indices: &[usize]) -> HistoryStream<'a> {
89        let mut sorted_reqs: Vec<(usize, usize, usize)> = indices
90            .iter()
91            .map(|&global_id| {
92                (
93                    (global_id / self.shard_size) * self.shard_size,
94                    global_id % self.shard_size,
95                    global_id,
96                )
97            })
98            .collect();
99
100        sorted_reqs.sort_unstable();
101        sorted_reqs.dedup();
102
103        HistoryStream {
104            store: self,
105            sorted_reqs: sorted_reqs.into_iter(),
106            current_reader: None,
107            current_shard_base: None,
108            current_line_in_shard: 0,
109        }
110    }
111
112    pub fn read_many(&self, indices: &[usize]) -> Result<Vec<HistoryRecord>, AicoError> {
113        if indices.is_empty() {
114            return Ok(Vec::new());
115        }
116
117        let mut records_map = HashMap::with_capacity(indices.len());
118        for result in self.stream_many(indices) {
119            let (id, record) = result?;
120            records_map.insert(id, record);
121        }
122
123        let mut results = Vec::with_capacity(indices.len());
124        for &idx in indices {
125            if let Some(rec) = records_map.get(&idx) {
126                results.push(rec.clone());
127            } else {
128                return Err(AicoError::Session(format!("Record ID {} not found", idx)));
129            }
130        }
131
132        Ok(results)
133    }
134
135    // --- Helpers ---
136
137    fn shard_path(&self, base: usize) -> PathBuf {
138        self.root.join(format!("{}.jsonl", base))
139    }
140
141    fn refresh_state(&mut self) -> Result<(), AicoError> {
142        if !self.root.exists() {
143            self.state = Some(StoreState {
144                last_base: 0,
145                count: 0,
146            });
147            return Ok(());
148        }
149
150        let mut max_base = None;
151
152        for entry in fs::read_dir(&self.root)? {
153            let entry = entry?;
154            let path = entry.path();
155            if path.extension().and_then(|s| s.to_str()) == Some("jsonl")
156                && let Some(stem) = path.file_stem().and_then(|s| s.to_str())
157                && let Ok(base) = stem.parse::<usize>()
158                && max_base.is_none_or(|m| base > m)
159            {
160                max_base = Some(base);
161            }
162        }
163
164        let base = max_base.unwrap_or(0);
165        let path = self.shard_path(base);
166
167        let count = if path.exists() {
168            let file = fs::File::open(&path)?;
169            let mut reader = BufReader::with_capacity(64 * 1024, file);
170            let mut c = 0;
171            while reader.skip_until(b'\n')? > 0 {
172                c += 1;
173            }
174            c
175        } else {
176            0
177        };
178
179        self.state = Some(StoreState {
180            last_base: base,
181            count,
182        });
183        Ok(())
184    }
185}
186
187pub struct HistoryStream<'a> {
188    store: &'a HistoryStore,
189    sorted_reqs: std::vec::IntoIter<(usize, usize, usize)>,
190    current_reader: Option<BufReader<fs::File>>,
191    current_shard_base: Option<usize>,
192    current_line_in_shard: usize,
193}
194
195impl<'a> Iterator for HistoryStream<'a> {
196    type Item = Result<(usize, HistoryRecord), AicoError>;
197
198    fn next(&mut self) -> Option<Self::Item> {
199        let (shard_base, target_offset, global_id) = self.sorted_reqs.next()?;
200
201        // 1. Ensure correct shard file is open
202        if self.current_shard_base != Some(shard_base) {
203            let path = self.store.shard_path(shard_base);
204            if !path.exists() {
205                return Some(Err(AicoError::Session(format!(
206                    "Shard missing: {:?}",
207                    path
208                ))));
209            }
210            match fs::File::open(&path) {
211                Ok(f) => {
212                    self.current_reader = Some(BufReader::with_capacity(64 * 1024, f));
213                    self.current_shard_base = Some(shard_base);
214                    self.current_line_in_shard = 0;
215                }
216                Err(e) => return Some(Err(AicoError::Io(e))),
217            }
218        }
219
220        let reader = self.current_reader.as_mut()?;
221
222        // 2. Seek to target line
223        while self.current_line_in_shard < target_offset {
224            match reader.skip_until(b'\n') {
225                Ok(0) => {
226                    return Some(Err(AicoError::Session(format!(
227                        "Record ID {} not found",
228                        global_id
229                    ))));
230                }
231                Ok(_) => self.current_line_in_shard += 1,
232                Err(e) => return Some(Err(AicoError::Io(e))),
233            }
234        }
235
236        // 3. Read and Deserialize
237        let mut buffer = Vec::new();
238        match reader.read_until(b'\n', &mut buffer) {
239            Ok(0) => Some(Err(AicoError::Session(format!(
240                "Record ID {} not found",
241                global_id
242            )))),
243            Ok(_) => {
244                self.current_line_in_shard += 1;
245                // Resilience: Handle deserialization failure gracefully
246                match serde_json::from_slice::<HistoryRecord>(&buffer) {
247                    Ok(record) => Some(Ok((global_id, record))),
248                    Err(e) => {
249                        eprintln!(
250                            "[WARN] Failed to parse history record ID {}: {}. Skipping.",
251                            global_id, e
252                        );
253                        // Recursively try next item
254                        self.next()
255                    }
256                }
257            }
258            Err(e) => Some(Err(AicoError::Io(e))),
259        }
260    }
261}