1use crate::error::{TsdbError, TsdbResult};
7use crate::series::DataPoint;
8use chrono::{DateTime, Utc};
9use std::fs::{File, OpenOptions};
10use std::io::{BufReader, BufWriter, Write};
11use std::path::{Path, PathBuf};
12
13#[derive(Debug, Clone)]
15pub struct WalEntry {
16 pub series_id: u64,
18 pub timestamp: DateTime<Utc>,
20 pub value: f64,
22}
23
24impl WalEntry {
25 pub fn new(series_id: u64, point: DataPoint) -> Self {
27 Self {
28 series_id,
29 timestamp: point.timestamp,
30 value: point.value,
31 }
32 }
33
34 pub fn to_bytes(&self) -> Vec<u8> {
38 let mut bytes = Vec::with_capacity(24);
39
40 bytes.extend_from_slice(&self.series_id.to_le_bytes());
42
43 let ts_ms = self.timestamp.timestamp_millis();
45 bytes.extend_from_slice(&ts_ms.to_le_bytes());
46
47 bytes.extend_from_slice(&self.value.to_le_bytes());
49
50 bytes
51 }
52
53 pub fn from_bytes(bytes: &[u8]) -> TsdbResult<Self> {
55 if bytes.len() != 24 {
56 return Err(TsdbError::Wal(format!(
57 "WAL entry must be 24 bytes, got {}",
58 bytes.len()
59 )));
60 }
61
62 let series_id = u64::from_le_bytes(
63 bytes[0..8]
64 .try_into()
65 .expect("slice is exactly 8 bytes as verified above"),
66 );
67 let ts_ms = i64::from_le_bytes(
68 bytes[8..16]
69 .try_into()
70 .expect("slice is exactly 8 bytes as verified above"),
71 );
72 let value = f64::from_le_bytes(
73 bytes[16..24]
74 .try_into()
75 .expect("slice is exactly 8 bytes as verified above"),
76 );
77
78 let timestamp = DateTime::from_timestamp_millis(ts_ms)
79 .ok_or_else(|| TsdbError::Wal(format!("Invalid timestamp: {}", ts_ms)))?;
80
81 Ok(Self {
82 series_id,
83 timestamp,
84 value,
85 })
86 }
87}
88
89pub struct WriteAheadLog {
91 path: PathBuf,
93 writer: BufWriter<File>,
95 sync_on_write: bool,
97 entry_count: u64,
99}
100
101impl WriteAheadLog {
102 pub fn new<P: AsRef<Path>>(path: P, sync_on_write: bool) -> TsdbResult<Self> {
109 let path = path.as_ref().to_path_buf();
110
111 let file = OpenOptions::new().create(true).append(true).open(&path)?;
112
113 let entry_count = if path.exists() {
115 let reader_file = File::open(&path)?;
116 let metadata = reader_file.metadata()?;
117 metadata.len() / 24 } else {
119 0
120 };
121
122 Ok(Self {
123 path,
124 writer: BufWriter::new(file),
125 sync_on_write,
126 entry_count,
127 })
128 }
129
130 pub fn append(&mut self, series_id: u64, point: DataPoint) -> TsdbResult<()> {
132 let entry = WalEntry::new(series_id, point);
133 let bytes = entry.to_bytes();
134
135 self.writer.write_all(&bytes)?;
136
137 if self.sync_on_write {
138 self.writer.flush()?;
139 self.writer.get_ref().sync_all()?;
140 }
141
142 self.entry_count += 1;
143
144 Ok(())
145 }
146
147 pub fn append_batch(&mut self, entries: &[(u64, DataPoint)]) -> TsdbResult<()> {
149 for (series_id, point) in entries {
150 let entry = WalEntry::new(*series_id, *point);
151 let bytes = entry.to_bytes();
152 self.writer.write_all(&bytes)?;
153 }
154
155 self.writer.flush()?;
156
157 if self.sync_on_write {
158 self.writer.get_ref().sync_all()?;
159 }
160
161 self.entry_count += entries.len() as u64;
162
163 Ok(())
164 }
165
166 pub fn replay(&self) -> TsdbResult<Vec<(u64, DataPoint)>> {
170 let file = File::open(&self.path)?;
171 let mut reader = BufReader::new(file);
172 let mut entries = Vec::new();
173
174 let mut buffer = vec![0u8; 24];
175
176 loop {
177 match std::io::Read::read_exact(&mut reader, &mut buffer) {
178 Ok(_) => {
179 let entry = WalEntry::from_bytes(&buffer)?;
180 entries.push((
181 entry.series_id,
182 DataPoint {
183 timestamp: entry.timestamp,
184 value: entry.value,
185 },
186 ));
187 }
188 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
189 break; }
191 Err(e) => return Err(e.into()),
192 }
193 }
194
195 Ok(entries)
196 }
197
198 pub fn clear(&mut self) -> TsdbResult<()> {
200 self.writer.flush()?;
202
203 let file = OpenOptions::new()
205 .write(true)
206 .truncate(true)
207 .open(&self.path)?;
208
209 self.writer = BufWriter::new(file);
210 self.entry_count = 0;
211
212 Ok(())
213 }
214
215 pub fn entry_count(&self) -> u64 {
217 self.entry_count
218 }
219
220 pub fn path(&self) -> &Path {
222 &self.path
223 }
224
225 pub fn flush(&mut self) -> TsdbResult<()> {
227 self.writer.flush()?;
228
229 if self.sync_on_write {
230 self.writer.get_ref().sync_all()?;
231 }
232
233 Ok(())
234 }
235}
236
237impl Drop for WriteAheadLog {
238 fn drop(&mut self) {
239 let _ = self.writer.flush();
240 }
241}
242
243#[cfg(test)]
244mod tests {
245 use super::*;
246 use chrono::TimeZone;
247 use std::env;
248
249 #[test]
250 fn test_wal_entry_serialization() {
251 let entry = WalEntry {
252 series_id: 42,
253 timestamp: Utc.with_ymd_and_hms(2024, 1, 1, 12, 0, 0).unwrap(),
254 value: 22.5,
255 };
256
257 let bytes = entry.to_bytes();
258 assert_eq!(bytes.len(), 24);
259
260 let recovered = WalEntry::from_bytes(&bytes).unwrap();
261 assert_eq!(recovered.series_id, 42);
262 assert_eq!(recovered.timestamp, entry.timestamp);
263 assert_eq!(recovered.value, 22.5);
264 }
265
266 #[test]
267 fn test_wal_append_and_replay() {
268 let temp_dir = env::temp_dir();
269 let wal_path = temp_dir.join("test_wal_append.log");
270
271 let _ = std::fs::remove_file(&wal_path);
273
274 {
275 let mut wal = WriteAheadLog::new(&wal_path, false).unwrap();
276
277 let timestamp = Utc::now();
278 let point1 = DataPoint {
279 timestamp,
280 value: 10.0,
281 };
282 let point2 = DataPoint {
283 timestamp: timestamp + chrono::Duration::seconds(1),
284 value: 20.0,
285 };
286
287 wal.append(1, point1).unwrap();
288 wal.append(2, point2).unwrap();
289
290 assert_eq!(wal.entry_count(), 2);
291 }
292
293 let wal = WriteAheadLog::new(&wal_path, false).unwrap();
295 let entries = wal.replay().unwrap();
296
297 assert_eq!(entries.len(), 2);
298 assert_eq!(entries[0].0, 1);
299 assert_eq!(entries[0].1.value, 10.0);
300 assert_eq!(entries[1].0, 2);
301 assert_eq!(entries[1].1.value, 20.0);
302
303 let _ = std::fs::remove_file(&wal_path);
305 }
306
307 #[test]
308 fn test_wal_clear() {
309 let temp_dir = env::temp_dir();
310 let wal_path = temp_dir.join("test_wal_clear.log");
311
312 let _ = std::fs::remove_file(&wal_path);
314
315 {
316 let mut wal = WriteAheadLog::new(&wal_path, false).unwrap();
317
318 let point = DataPoint {
319 timestamp: Utc::now(),
320 value: 42.0,
321 };
322
323 wal.append(1, point).unwrap();
324 assert_eq!(wal.entry_count(), 1);
325
326 wal.clear().unwrap();
327 assert_eq!(wal.entry_count(), 0);
328 }
329
330 let wal = WriteAheadLog::new(&wal_path, false).unwrap();
332 let entries = wal.replay().unwrap();
333 assert_eq!(entries.len(), 0);
334
335 let _ = std::fs::remove_file(&wal_path);
337 }
338
339 #[test]
340 fn test_wal_batch_append() {
341 let temp_dir = env::temp_dir();
342 let wal_path = temp_dir.join("test_wal_batch.log");
343
344 let _ = std::fs::remove_file(&wal_path);
346
347 {
348 let mut wal = WriteAheadLog::new(&wal_path, false).unwrap();
349
350 let base_time = Utc::now();
351 let mut batch = Vec::new();
352
353 for i in 0..100 {
354 let point = DataPoint {
355 timestamp: base_time + chrono::Duration::seconds(i),
356 value: i as f64,
357 };
358 batch.push((i as u64, point));
359 }
360
361 wal.append_batch(&batch).unwrap();
362 assert_eq!(wal.entry_count(), 100);
363 }
364
365 let wal = WriteAheadLog::new(&wal_path, false).unwrap();
367 let entries = wal.replay().unwrap();
368
369 assert_eq!(entries.len(), 100);
370 for (i, (series_id, point)) in entries.iter().enumerate() {
371 assert_eq!(*series_id, i as u64);
372 assert_eq!(point.value, i as f64);
373 }
374
375 let _ = std::fs::remove_file(&wal_path);
377 }
378
379 #[test]
380 fn test_wal_fsync() {
381 let temp_dir = env::temp_dir();
382 let wal_path = temp_dir.join("test_wal_fsync.log");
383
384 let _ = std::fs::remove_file(&wal_path);
386
387 {
388 let mut wal = WriteAheadLog::new(&wal_path, true).unwrap(); let point = DataPoint {
391 timestamp: Utc::now(),
392 value: 123.456,
393 };
394
395 wal.append(1, point).unwrap();
396 }
397
398 let wal = WriteAheadLog::new(&wal_path, false).unwrap();
400 let entries = wal.replay().unwrap();
401
402 assert_eq!(entries.len(), 1);
403 assert_eq!(entries[0].0, 1);
404 assert_eq!(entries[0].1.value, 123.456);
405
406 let _ = std::fs::remove_file(&wal_path);
408 }
409}