aico/historystore/
store.rs1use crate::exceptions::AicoError;
2use crate::models::HistoryRecord;
3use std::collections::HashMap;
4use std::fs::{self, OpenOptions};
5use std::io::{BufRead, BufReader, BufWriter, Write};
6use std::path::PathBuf;
7
8pub const SHARD_SIZE: usize = 10_000;
9
10#[derive(Debug)]
11pub struct HistoryStore {
12 root: PathBuf,
13 shard_size: usize,
14 state: Option<StoreState>,
15}
16
17#[derive(Debug, Clone, Copy, Default)]
18struct StoreState {
19 last_base: usize,
20 count: usize,
21}
22
23impl HistoryStore {
24 pub fn new(root: PathBuf) -> Self {
25 Self {
26 root,
27 shard_size: SHARD_SIZE,
28 state: None,
29 }
30 }
31
32 pub fn new_with_shard_size(root: PathBuf, shard_size: usize) -> Self {
34 Self {
35 root,
36 shard_size,
37 state: None,
38 }
39 }
40
41 pub fn append(&mut self, record: &HistoryRecord) -> Result<usize, AicoError> {
43 if self.state.is_none() {
44 self.refresh_state()?;
45 }
46
47 let (index, last_base) = {
48 let state = self.state.get_or_insert_default();
49
50 if state.count >= self.shard_size {
51 state.last_base += self.shard_size;
52 state.count = 0;
53 }
54
55 (state.last_base + state.count, state.last_base)
56 };
57
58 let shard_path = self.shard_path(last_base);
59
60 if let Some(parent) = shard_path.parent() {
61 fs::create_dir_all(parent)?;
62 }
63
64 let mut options = OpenOptions::new();
65 options.create(true).append(true);
66
67 #[cfg(unix)]
68 {
69 use std::os::unix::fs::OpenOptionsExt;
70 options.mode(0o600);
71 }
72
73 let file = options.open(&shard_path)?;
74 let mut writer = BufWriter::new(file);
75
76 serde_json::to_writer(&mut writer, record)?;
77 writeln!(writer)?;
78 writer.flush()?;
79
80 if let Some(state) = self.state.as_mut() {
81 state.count += 1;
82 }
83
84 Ok(index)
85 }
86
87 pub fn stream_many<'a>(&'a self, indices: &[usize]) -> HistoryStream<'a> {
89 let mut sorted_reqs: Vec<(usize, usize, usize)> = indices
90 .iter()
91 .map(|&global_id| {
92 (
93 (global_id / self.shard_size) * self.shard_size,
94 global_id % self.shard_size,
95 global_id,
96 )
97 })
98 .collect();
99
100 sorted_reqs.sort_unstable();
101 sorted_reqs.dedup();
102
103 HistoryStream {
104 store: self,
105 sorted_reqs: sorted_reqs.into_iter(),
106 current_reader: None,
107 current_shard_base: None,
108 current_line_in_shard: 0,
109 }
110 }
111
112 pub fn read_many(&self, indices: &[usize]) -> Result<Vec<HistoryRecord>, AicoError> {
113 if indices.is_empty() {
114 return Ok(Vec::new());
115 }
116
117 let mut records_map = HashMap::with_capacity(indices.len());
118 for result in self.stream_many(indices) {
119 let (id, record) = result?;
120 records_map.insert(id, record);
121 }
122
123 let mut results = Vec::with_capacity(indices.len());
124 for &idx in indices {
125 if let Some(rec) = records_map.get(&idx) {
126 results.push(rec.clone());
127 } else {
128 return Err(AicoError::Session(format!("Record ID {} not found", idx)));
129 }
130 }
131
132 Ok(results)
133 }
134
135 fn shard_path(&self, base: usize) -> PathBuf {
138 self.root.join(format!("{}.jsonl", base))
139 }
140
141 fn refresh_state(&mut self) -> Result<(), AicoError> {
142 if !self.root.exists() {
143 self.state = Some(StoreState {
144 last_base: 0,
145 count: 0,
146 });
147 return Ok(());
148 }
149
150 let mut max_base = None;
151
152 for entry in fs::read_dir(&self.root)? {
153 let entry = entry?;
154 let path = entry.path();
155 if path.extension().and_then(|s| s.to_str()) == Some("jsonl")
156 && let Some(stem) = path.file_stem().and_then(|s| s.to_str())
157 && let Ok(base) = stem.parse::<usize>()
158 && max_base.is_none_or(|m| base > m)
159 {
160 max_base = Some(base);
161 }
162 }
163
164 let base = max_base.unwrap_or(0);
165 let path = self.shard_path(base);
166
167 let count = if path.exists() {
168 let file = fs::File::open(&path)?;
169 let mut reader = BufReader::with_capacity(64 * 1024, file);
170 let mut c = 0;
171 while reader.skip_until(b'\n')? > 0 {
172 c += 1;
173 }
174 c
175 } else {
176 0
177 };
178
179 self.state = Some(StoreState {
180 last_base: base,
181 count,
182 });
183 Ok(())
184 }
185}
186
187pub struct HistoryStream<'a> {
188 store: &'a HistoryStore,
189 sorted_reqs: std::vec::IntoIter<(usize, usize, usize)>,
190 current_reader: Option<BufReader<fs::File>>,
191 current_shard_base: Option<usize>,
192 current_line_in_shard: usize,
193}
194
195impl<'a> Iterator for HistoryStream<'a> {
196 type Item = Result<(usize, HistoryRecord), AicoError>;
197
198 fn next(&mut self) -> Option<Self::Item> {
199 let (shard_base, target_offset, global_id) = self.sorted_reqs.next()?;
200
201 if self.current_shard_base != Some(shard_base) {
203 let path = self.store.shard_path(shard_base);
204 if !path.exists() {
205 return Some(Err(AicoError::Session(format!(
206 "Shard missing: {:?}",
207 path
208 ))));
209 }
210 match fs::File::open(&path) {
211 Ok(f) => {
212 self.current_reader = Some(BufReader::with_capacity(64 * 1024, f));
213 self.current_shard_base = Some(shard_base);
214 self.current_line_in_shard = 0;
215 }
216 Err(e) => return Some(Err(AicoError::Io(e))),
217 }
218 }
219
220 let reader = self.current_reader.as_mut()?;
221
222 while self.current_line_in_shard < target_offset {
224 match reader.skip_until(b'\n') {
225 Ok(0) => {
226 return Some(Err(AicoError::Session(format!(
227 "Record ID {} not found",
228 global_id
229 ))));
230 }
231 Ok(_) => self.current_line_in_shard += 1,
232 Err(e) => return Some(Err(AicoError::Io(e))),
233 }
234 }
235
236 let mut buffer = Vec::new();
238 match reader.read_until(b'\n', &mut buffer) {
239 Ok(0) => Some(Err(AicoError::Session(format!(
240 "Record ID {} not found",
241 global_id
242 )))),
243 Ok(_) => {
244 self.current_line_in_shard += 1;
245 match serde_json::from_slice::<HistoryRecord>(&buffer) {
247 Ok(record) => Some(Ok((global_id, record))),
248 Err(e) => {
249 eprintln!(
250 "[WARN] Failed to parse history record ID {}: {}. Skipping.",
251 global_id, e
252 );
253 self.next()
255 }
256 }
257 }
258 Err(e) => Some(Err(AicoError::Io(e))),
259 }
260 }
261}