Skip to main content

oxirs_tsdb/write/
batch_writer.rs

1//! High-performance batched write path for TSDB with CRC32-protected WAL.
2//!
3//! ## Design
4//!
5//! ```text
6//! Ingest           Buffer                   Flush
7//! ──────────────   ──────────────────────   ─────────────────────
8//! write_point() ──►  RingBuffer<MetricPt>  ──►  WriteAheadLog
9//! write_batch() ──►  (lock-free enqueue)   ──►  Columnar storage
10//!                        ▲
11//!                   flush on size limit
12//!                   or periodic timeout
13//! ```
14//!
15//! [`BatchWriter`] accumulates [`MetricPoint`] values in a bounded ring
16//! buffer.  When the buffer reaches `batch_capacity` entries, or when
17//! [`BatchWriter::flush`] is called explicitly, it drains the buffer,
18//! appends each entry to the [`CrcWal`], and returns the sequence number
19//! of the last committed entry.
20//!
21//! [`CrcWal`] extends the basic WAL format with a CRC32 checksum per record,
22//! enabling corruption detection on recovery.
23//!
24//! ## CRC32 WAL record format
25//!
26//! ```text
27//! ┌────────────┬────────────┬─────────────────────────────┐
28//! │ CRC32 (4B) │ Length(4B) │ Payload (Length bytes)      │
29//! └────────────┴────────────┴─────────────────────────────┘
30//! ```
31//!
32//! On replay the CRC is recomputed over the payload and compared; mismatches
33//! surface as [`TsdbError::CrcMismatch`].
34
35use crate::error::{TsdbError, TsdbResult};
36use crc32fast::Hasher as Crc32Hasher;
37use serde::{Deserialize, Serialize};
38use std::collections::VecDeque;
39use std::fs::{File, OpenOptions};
40use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
41use std::path::{Path, PathBuf};
42use std::sync::atomic::{AtomicU64, Ordering};
43use std::sync::{Arc, Mutex};
44
45// =============================================================================
46// MetricPoint
47// =============================================================================
48
49/// A single time-series observation ready for ingestion.
50#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
51pub struct MetricPoint {
52    /// Series or metric name.
53    pub metric: String,
54    /// Unix epoch milliseconds.
55    pub timestamp_ms: i64,
56    /// Observed value.
57    pub value: f64,
58}
59
60impl MetricPoint {
61    /// Create a new metric point.
62    pub fn new(metric: impl Into<String>, timestamp_ms: i64, value: f64) -> Self {
63        Self {
64            metric: metric.into(),
65            timestamp_ms,
66            value,
67        }
68    }
69
70    /// Serialize to a compact binary representation.
71    ///
72    /// Format: `metric_len(4) + metric_bytes + timestamp_ms(8) + value(8)`
73    pub fn to_bytes(&self) -> TsdbResult<Vec<u8>> {
74        let metric_bytes = self.metric.as_bytes();
75        let metric_len = metric_bytes.len() as u32;
76        let mut buf = Vec::with_capacity(4 + metric_bytes.len() + 8 + 8);
77        buf.extend_from_slice(&metric_len.to_le_bytes());
78        buf.extend_from_slice(metric_bytes);
79        buf.extend_from_slice(&self.timestamp_ms.to_le_bytes());
80        buf.extend_from_slice(&self.value.to_le_bytes());
81        Ok(buf)
82    }
83
84    /// Deserialize from the binary representation produced by [`MetricPoint::to_bytes`].
85    pub fn from_bytes(data: &[u8]) -> TsdbResult<Self> {
86        if data.len() < 4 {
87            return Err(TsdbError::Wal("record too short (< 4 bytes)".into()));
88        }
89        let metric_len = u32::from_le_bytes(
90            data[0..4]
91                .try_into()
92                .map_err(|_| TsdbError::Wal("slice error".into()))?,
93        ) as usize;
94
95        let end_metric = 4 + metric_len;
96        if data.len() < end_metric + 16 {
97            return Err(TsdbError::Wal(format!(
98                "record too short: expected {}, got {}",
99                end_metric + 16,
100                data.len()
101            )));
102        }
103
104        let metric = String::from_utf8(data[4..end_metric].to_vec())
105            .map_err(|e| TsdbError::Wal(format!("metric name UTF-8 error: {e}")))?;
106
107        let timestamp_ms = i64::from_le_bytes(
108            data[end_metric..end_metric + 8]
109                .try_into()
110                .map_err(|_| TsdbError::Wal("ts slice error".into()))?,
111        );
112        let value = f64::from_le_bytes(
113            data[end_metric + 8..end_metric + 16]
114                .try_into()
115                .map_err(|_| TsdbError::Wal("value slice error".into()))?,
116        );
117
118        Ok(Self {
119            metric,
120            timestamp_ms,
121            value,
122        })
123    }
124}
125
126// =============================================================================
127// CrcWal — CRC32-protected Write-Ahead Log
128// =============================================================================
129
130/// CRC32-protected Write-Ahead Log.
131///
132/// Each record is prefixed with a 4-byte CRC and a 4-byte payload length.
133/// On recovery, the CRC is verified before returning the payload.
134pub struct CrcWal {
135    path: PathBuf,
136    writer: BufWriter<File>,
137    /// Total records appended since open (monotonically increasing).
138    record_count: u64,
139    /// Global sequence counter (shared across instances for testing).
140    next_seq: Arc<AtomicU64>,
141}
142
143impl CrcWal {
144    /// Open (or create) a WAL file at `path`.
145    pub fn open(path: &Path) -> TsdbResult<Self> {
146        let file = OpenOptions::new()
147            .create(true)
148            .append(true)
149            .open(path)
150            .map_err(|e| TsdbError::Wal(format!("open WAL: {e}")))?;
151
152        Ok(Self {
153            path: path.to_path_buf(),
154            writer: BufWriter::new(file),
155            record_count: 0,
156            next_seq: Arc::new(AtomicU64::new(1)),
157        })
158    }
159
160    /// Return the WAL file path.
161    pub fn path(&self) -> &Path {
162        &self.path
163    }
164
165    /// Return the number of records appended during this session.
166    pub fn record_count(&self) -> u64 {
167        self.record_count
168    }
169
170    /// Compute CRC32 over a byte slice.
171    pub fn crc32(data: &[u8]) -> u32 {
172        let mut h = Crc32Hasher::new();
173        h.update(data);
174        h.finalize()
175    }
176
177    /// Append a [`MetricPoint`] to the WAL.
178    ///
179    /// Returns the sequence number assigned to this record.
180    pub fn append(&mut self, point: &MetricPoint) -> TsdbResult<u64> {
181        let payload = point.to_bytes()?;
182        let checksum = Self::crc32(&payload);
183        let length = payload.len() as u32;
184
185        // Write: CRC32(4) | length(4) | payload
186        self.writer
187            .write_all(&checksum.to_le_bytes())
188            .map_err(|e| TsdbError::Wal(format!("write CRC: {e}")))?;
189        self.writer
190            .write_all(&length.to_le_bytes())
191            .map_err(|e| TsdbError::Wal(format!("write length: {e}")))?;
192        self.writer
193            .write_all(&payload)
194            .map_err(|e| TsdbError::Wal(format!("write payload: {e}")))?;
195        self.writer
196            .flush()
197            .map_err(|e| TsdbError::Wal(format!("flush WAL: {e}")))?;
198
199        self.record_count += 1;
200        let seq = self.next_seq.fetch_add(1, Ordering::Relaxed);
201        Ok(seq)
202    }
203
204    /// Append multiple points in a single write call.
205    ///
206    /// Returns the sequence number of the last record.
207    pub fn append_batch(&mut self, points: &[MetricPoint]) -> TsdbResult<u64> {
208        let mut last_seq = 0u64;
209        for point in points {
210            last_seq = self.append(point)?;
211        }
212        Ok(last_seq)
213    }
214
215    /// Replay all records from the WAL file.
216    ///
217    /// Returns an error if any record fails CRC verification.
218    pub fn replay(path: &Path) -> TsdbResult<Vec<MetricPoint>> {
219        let file =
220            File::open(path).map_err(|e| TsdbError::Wal(format!("open WAL for replay: {e}")))?;
221        let mut reader = BufReader::new(file);
222        let mut points = Vec::new();
223
224        loop {
225            // Read header: CRC32(4) + length(4)
226            let mut header = [0u8; 8];
227            match reader.read_exact(&mut header) {
228                Ok(()) => {}
229                Err(ref e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
230                Err(e) => return Err(TsdbError::Wal(format!("read header: {e}"))),
231            }
232
233            let stored_crc = u32::from_le_bytes(header[0..4].try_into().expect("4 bytes"));
234            let length = u32::from_le_bytes(header[4..8].try_into().expect("4 bytes")) as usize;
235
236            let mut payload = vec![0u8; length];
237            reader
238                .read_exact(&mut payload)
239                .map_err(|e| TsdbError::Wal(format!("read payload: {e}")))?;
240
241            let computed_crc = Self::crc32(&payload);
242            if computed_crc != stored_crc {
243                return Err(TsdbError::CrcMismatch {
244                    expected: stored_crc,
245                    got: computed_crc,
246                });
247            }
248
249            points.push(MetricPoint::from_bytes(&payload)?);
250        }
251
252        Ok(points)
253    }
254
255    /// Truncate the WAL file (clear all records).
256    pub fn clear(&mut self) -> TsdbResult<()> {
257        self.writer
258            .flush()
259            .map_err(|e| TsdbError::Wal(format!("{e}")))?;
260
261        let file = OpenOptions::new()
262            .write(true)
263            .truncate(true)
264            .open(&self.path)
265            .map_err(|e| TsdbError::Wal(format!("truncate WAL: {e}")))?;
266
267        self.writer = BufWriter::new(file);
268        self.record_count = 0;
269        Ok(())
270    }
271
272    /// Force all buffered data to the OS (fsync).
273    pub fn sync(&mut self) -> TsdbResult<()> {
274        self.writer
275            .flush()
276            .map_err(|e| TsdbError::Wal(format!("{e}")))?;
277        self.writer
278            .get_ref()
279            .sync_all()
280            .map_err(|e| TsdbError::Wal(format!("fsync WAL: {e}")))
281    }
282
283    /// Estimate WAL file size in bytes.
284    pub fn file_size(&mut self) -> TsdbResult<u64> {
285        self.writer
286            .flush()
287            .map_err(|e| TsdbError::Wal(format!("{e}")))?;
288        let pos = self
289            .writer
290            .get_mut()
291            .seek(SeekFrom::End(0))
292            .map_err(|e| TsdbError::Wal(format!("seek: {e}")))?;
293        Ok(pos)
294    }
295}
296
297// =============================================================================
298// RingBuffer — bounded lock-free-ish write buffer
299// =============================================================================
300
301/// Bounded ring buffer backed by a `VecDeque` and protected by a `Mutex`.
302///
303/// The `Mutex` overhead is acceptable at the expected write rates (≥1M/s is
304/// achieved by batching many calls per flush).  For true lock-free ingestion
305/// replace this with `crossbeam-queue::ArrayQueue`.
306#[derive(Debug)]
307struct RingBuffer {
308    inner: Mutex<VecDeque<MetricPoint>>,
309    capacity: usize,
310}
311
312impl RingBuffer {
313    fn new(capacity: usize) -> Self {
314        Self {
315            inner: Mutex::new(VecDeque::with_capacity(capacity)),
316            capacity,
317        }
318    }
319
320    /// Push a point.  Returns `Err(BufferFull)` when the buffer is at capacity.
321    fn push(&self, point: MetricPoint) -> TsdbResult<()> {
322        let mut q = self
323            .inner
324            .lock()
325            .map_err(|_| TsdbError::Wal("lock poisoned".into()))?;
326        if q.len() >= self.capacity {
327            return Err(TsdbError::BufferFull(q.len()));
328        }
329        q.push_back(point);
330        Ok(())
331    }
332
333    /// Drain all buffered points.
334    fn drain_all(&self) -> TsdbResult<Vec<MetricPoint>> {
335        let mut q = self
336            .inner
337            .lock()
338            .map_err(|_| TsdbError::Wal("lock poisoned".into()))?;
339        Ok(q.drain(..).collect())
340    }
341
342    /// Current number of buffered points.
343    fn len(&self) -> usize {
344        self.inner.lock().map(|q| q.len()).unwrap_or(0)
345    }
346
347    /// Buffer capacity.
348    fn capacity(&self) -> usize {
349        self.capacity
350    }
351}
352
353// =============================================================================
354// BatchWriter
355// =============================================================================
356
357/// High-throughput write path that buffers metric points and flushes in batches.
358///
359/// ## Usage
360///
361/// ```rust,no_run
362/// use oxirs_tsdb::write::batch_writer::{BatchWriter, BatchWriterConfig, MetricPoint};
363/// use std::path::Path;
364///
365/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
366/// let cfg = BatchWriterConfig::default();
367/// let mut bw = BatchWriter::open(Path::new("/tmp/tsdb_wal.bin"), cfg)?;
368///
369/// bw.write_point(MetricPoint::new("cpu", 1_700_000_000_000, 42.5))?;
370///
371/// let seq = bw.flush()?;
372/// println!("flushed; last seq = {seq}");
373/// # Ok(())
374/// # }
375/// ```
376pub struct BatchWriter {
377    buffer: Arc<RingBuffer>,
378    wal: CrcWal,
379    config: BatchWriterConfig,
380    /// Total number of points successfully flushed.
381    total_flushed: u64,
382}
383
384/// Configuration for [`BatchWriter`].
385#[derive(Debug, Clone)]
386pub struct BatchWriterConfig {
387    /// Maximum number of points held in the ring buffer before an automatic flush.
388    pub batch_capacity: usize,
389    /// Whether to fsync the WAL after every flush (safer but slower).
390    pub fsync_on_flush: bool,
391}
392
393impl Default for BatchWriterConfig {
394    fn default() -> Self {
395        Self {
396            batch_capacity: 4096,
397            fsync_on_flush: false,
398        }
399    }
400}
401
402impl BatchWriterConfig {
403    /// Create a config with a specific batch capacity.
404    pub fn with_capacity(capacity: usize) -> Self {
405        Self {
406            batch_capacity: capacity,
407            ..Default::default()
408        }
409    }
410}
411
412impl BatchWriter {
413    /// Open (or create) a `BatchWriter` backed by a CRC-protected WAL at `path`.
414    pub fn open(path: &Path, config: BatchWriterConfig) -> TsdbResult<Self> {
415        let wal = CrcWal::open(path)?;
416        let buffer = Arc::new(RingBuffer::new(config.batch_capacity));
417        Ok(Self {
418            buffer,
419            wal,
420            config,
421            total_flushed: 0,
422        })
423    }
424
425    /// Return total points flushed since creation.
426    pub fn total_flushed(&self) -> u64 {
427        self.total_flushed
428    }
429
430    /// Return the number of points currently buffered.
431    pub fn buffered_count(&self) -> usize {
432        self.buffer.len()
433    }
434
435    /// Return the configured batch capacity.
436    pub fn batch_capacity(&self) -> usize {
437        self.buffer.capacity()
438    }
439
440    /// Enqueue a single metric point.
441    ///
442    /// If the buffer is full, automatically flushes first.
443    pub fn write_point(&mut self, point: MetricPoint) -> TsdbResult<()> {
444        if self.buffer.len() >= self.config.batch_capacity {
445            self.flush()?;
446        }
447        self.buffer.push(point)
448    }
449
450    /// Enqueue a batch of metric points, flushing automatically when needed.
451    ///
452    /// Returns the WAL sequence number of the last committed record.
453    pub fn write_batch(&mut self, points: &[MetricPoint]) -> TsdbResult<u64> {
454        let mut last_seq = 0u64;
455        for point in points {
456            if self.buffer.len() >= self.config.batch_capacity {
457                last_seq = self.flush()?;
458            }
459            self.buffer.push(point.clone())?;
460        }
461        // Flush remaining
462        if self.buffer.len() > 0 {
463            last_seq = self.flush()?;
464        }
465        Ok(last_seq)
466    }
467
468    /// Flush all buffered points to the WAL.
469    ///
470    /// Returns the WAL sequence number of the last committed record, or 0 if
471    /// the buffer was empty.
472    pub fn flush(&mut self) -> TsdbResult<u64> {
473        let points = self.buffer.drain_all()?;
474        if points.is_empty() {
475            return Ok(0);
476        }
477        let last_seq = self.wal.append_batch(&points)?;
478        self.total_flushed += points.len() as u64;
479        if self.config.fsync_on_flush {
480            self.wal.sync()?;
481        }
482        Ok(last_seq)
483    }
484
485    /// Replay the WAL from disk (e.g. after a crash).
486    pub fn replay_wal(path: &Path) -> TsdbResult<Vec<MetricPoint>> {
487        CrcWal::replay(path)
488    }
489}
490
491// =============================================================================
492// Tests
493// =============================================================================
494
495// =============================================================================
496// Tests
497// =============================================================================
498
499#[cfg(test)]
500mod tests {
501    use super::*;
502    use std::env;
503
504    fn tmp_path(name: &str) -> PathBuf {
505        env::temp_dir().join(format!("oxirs_tsdb_{name}.wal"))
506    }
507
508    // -- MetricPoint ----------------------------------------------------------
509
510    #[test]
511    fn test_metric_point_roundtrip() {
512        let p = MetricPoint::new("temperature", 1_700_000_000_000, 23.7);
513        let bytes = p.to_bytes().expect("serialize");
514        let back = MetricPoint::from_bytes(&bytes).expect("deserialize");
515        assert_eq!(p.metric, back.metric);
516        assert_eq!(p.timestamp_ms, back.timestamp_ms);
517        assert!((p.value - back.value).abs() < 1e-12);
518    }
519
520    #[test]
521    fn test_metric_point_unicode_metric_name() {
522        let p = MetricPoint::new("温度センサー", 42, 100.0);
523        let bytes = p.to_bytes().expect("serialize unicode");
524        let back = MetricPoint::from_bytes(&bytes).expect("deserialize unicode");
525        assert_eq!(back.metric, "温度センサー");
526    }
527
528    #[test]
529    fn test_metric_point_from_bytes_too_short() {
530        let result = MetricPoint::from_bytes(&[0u8; 3]);
531        assert!(result.is_err());
532    }
533
534    #[test]
535    fn test_metric_point_clone_and_eq() {
536        let p = MetricPoint::new("x", 0, 0.0);
537        assert_eq!(p, p.clone());
538    }
539
540    #[test]
541    fn test_metric_point_empty_metric_name() {
542        let p = MetricPoint::new("", 1000, 42.0);
543        let bytes = p.to_bytes().expect("serialize empty name");
544        let back = MetricPoint::from_bytes(&bytes).expect("deserialize empty name");
545        assert_eq!(back.metric, "");
546        assert_eq!(back.timestamp_ms, 1000);
547    }
548
549    #[test]
550    fn test_metric_point_negative_value() {
551        let p = MetricPoint::new("temp", 1000, -40.5);
552        let bytes = p.to_bytes().expect("serialize neg");
553        let back = MetricPoint::from_bytes(&bytes).expect("deserialize neg");
554        assert!((back.value - (-40.5)).abs() < f64::EPSILON);
555    }
556
557    #[test]
558    fn test_metric_point_zero_timestamp() {
559        let p = MetricPoint::new("x", 0, 0.0);
560        let bytes = p.to_bytes().expect("serialize zero ts");
561        let back = MetricPoint::from_bytes(&bytes).expect("deserialize zero ts");
562        assert_eq!(back.timestamp_ms, 0);
563    }
564
565    #[test]
566    fn test_metric_point_max_timestamp() {
567        let p = MetricPoint::new("x", i64::MAX, 1.0);
568        let bytes = p.to_bytes().expect("serialize max ts");
569        let back = MetricPoint::from_bytes(&bytes).expect("deserialize max ts");
570        assert_eq!(back.timestamp_ms, i64::MAX);
571    }
572
573    #[test]
574    fn test_metric_point_special_float_values() {
575        let p_inf = MetricPoint::new("x", 0, f64::INFINITY);
576        let bytes = p_inf.to_bytes().expect("serialize inf");
577        let back = MetricPoint::from_bytes(&bytes).expect("deserialize inf");
578        assert!(back.value.is_infinite() && back.value.is_sign_positive());
579
580        let p_nan = MetricPoint::new("x", 0, f64::NAN);
581        let bytes = p_nan.to_bytes().expect("serialize nan");
582        let back = MetricPoint::from_bytes(&bytes).expect("deserialize nan");
583        assert!(back.value.is_nan());
584    }
585
586    #[test]
587    fn test_metric_point_serde_json_roundtrip() {
588        let p = MetricPoint::new("latency", 1_700_000_000_000, 2.5);
589        let json = serde_json::to_string(&p).expect("json serialize");
590        let back: MetricPoint = serde_json::from_str(&json).expect("json deserialize");
591        assert_eq!(p, back);
592    }
593
594    // -- CrcWal ---------------------------------------------------------------
595
596    #[test]
597    fn test_crc_wal_append_and_replay() {
598        let path = tmp_path("append_replay");
599        let _ = std::fs::remove_file(&path);
600
601        let pts = vec![
602            MetricPoint::new("cpu", 1_000, 55.0),
603            MetricPoint::new("mem", 2_000, 8192.0),
604            MetricPoint::new("disk", 3_000, 0.75),
605        ];
606
607        {
608            let mut wal = CrcWal::open(&path).expect("open");
609            for p in &pts {
610                wal.append(p).expect("append");
611            }
612        }
613
614        let replayed = CrcWal::replay(&path).expect("replay");
615        assert_eq!(replayed.len(), 3);
616        assert_eq!(replayed[0].metric, "cpu");
617        assert_eq!(replayed[1].metric, "mem");
618        assert_eq!(replayed[2].metric, "disk");
619
620        let _ = std::fs::remove_file(path);
621    }
622
623    #[test]
624    fn test_crc_wal_crc_mismatch_detected() {
625        let path = tmp_path("crc_corrupt");
626        let _ = std::fs::remove_file(&path);
627
628        {
629            let mut wal = CrcWal::open(&path).expect("open");
630            wal.append(&MetricPoint::new("x", 0, 1.0)).expect("append");
631        }
632
633        // Corrupt the stored CRC (flip byte 0)
634        {
635            let mut data = std::fs::read(&path).expect("read");
636            data[0] ^= 0xFF;
637            std::fs::write(&path, &data).expect("write corrupt");
638        }
639
640        let result = CrcWal::replay(&path);
641        assert!(
642            matches!(result, Err(TsdbError::CrcMismatch { .. })),
643            "expected CrcMismatch, got: {result:?}"
644        );
645
646        let _ = std::fs::remove_file(path);
647    }
648
649    #[test]
650    fn test_crc_wal_clear() {
651        let path = tmp_path("clear");
652        let _ = std::fs::remove_file(&path);
653
654        let mut wal = CrcWal::open(&path).expect("open");
655        wal.append(&MetricPoint::new("y", 1, 2.0)).expect("append");
656        assert_eq!(wal.record_count(), 1);
657
658        wal.clear().expect("clear");
659        assert_eq!(wal.record_count(), 0);
660
661        let replayed = CrcWal::replay(&path).expect("replay after clear");
662        assert!(replayed.is_empty());
663
664        let _ = std::fs::remove_file(path);
665    }
666
667    #[test]
668    fn test_crc_wal_record_count() {
669        let path = tmp_path("record_count");
670        let _ = std::fs::remove_file(&path);
671
672        let mut wal = CrcWal::open(&path).expect("open");
673        for i in 0..10u32 {
674            wal.append(&MetricPoint::new(format!("s{i}"), i as i64, i as f64))
675                .expect("append");
676        }
677        assert_eq!(wal.record_count(), 10);
678
679        let _ = std::fs::remove_file(path);
680    }
681
682    #[test]
683    fn test_crc_wal_append_batch() {
684        let path = tmp_path("batch");
685        let _ = std::fs::remove_file(&path);
686
687        let pts: Vec<_> = (0..5).map(|i| MetricPoint::new("b", i, i as f64)).collect();
688
689        {
690            let mut wal = CrcWal::open(&path).expect("open");
691            let seq = wal.append_batch(&pts).expect("batch");
692            assert!(seq > 0);
693        }
694
695        let replayed = CrcWal::replay(&path).expect("replay");
696        assert_eq!(replayed.len(), 5);
697
698        let _ = std::fs::remove_file(path);
699    }
700
701    #[test]
702    fn test_crc32_known_value() {
703        let crc = CrcWal::crc32(b"");
704        assert_eq!(crc, 0x0000_0000);
705    }
706
707    #[test]
708    fn test_crc32_non_empty() {
709        let a = CrcWal::crc32(b"hello");
710        let b = CrcWal::crc32(b"hello");
711        assert_eq!(a, b, "CRC must be deterministic");
712        let c = CrcWal::crc32(b"world");
713        assert_ne!(a, c, "different data must produce different CRC");
714    }
715
716    #[test]
717    fn test_crc_wal_file_size_grows() {
718        let path = tmp_path("size_grows");
719        let _ = std::fs::remove_file(&path);
720
721        let mut wal = CrcWal::open(&path).expect("open");
722        let before = wal.file_size().expect("size");
723
724        wal.append(&MetricPoint::new("s", 0, 0.0)).expect("append");
725        let after = wal.file_size().expect("size after");
726
727        assert!(after > before, "WAL must grow after append");
728        let _ = std::fs::remove_file(path);
729    }
730
731    #[test]
732    fn test_crc_wal_multiple_sessions() {
733        let path = tmp_path("multi_session");
734        let _ = std::fs::remove_file(&path);
735
736        // Session 1: write 3 records
737        {
738            let mut wal = CrcWal::open(&path).expect("open");
739            for i in 0..3 {
740                wal.append(&MetricPoint::new("s1", i, i as f64))
741                    .expect("append");
742            }
743        }
744
745        // Session 2: write 2 more records (append mode)
746        {
747            let mut wal = CrcWal::open(&path).expect("reopen");
748            for i in 3..5 {
749                wal.append(&MetricPoint::new("s2", i, i as f64))
750                    .expect("append");
751            }
752        }
753
754        // Verify all 5 records are present
755        let replayed = CrcWal::replay(&path).expect("replay");
756        assert_eq!(replayed.len(), 5);
757        assert_eq!(replayed[0].metric, "s1");
758        assert_eq!(replayed[3].metric, "s2");
759
760        let _ = std::fs::remove_file(path);
761    }
762
763    #[test]
764    fn test_crc_wal_large_metric_name() {
765        let path = tmp_path("large_name");
766        let _ = std::fs::remove_file(&path);
767
768        let large_name = "x".repeat(10_000);
769        let mut wal = CrcWal::open(&path).expect("open");
770        wal.append(&MetricPoint::new(&large_name, 0, 0.0))
771            .expect("append large name");
772
773        let replayed = CrcWal::replay(&path).expect("replay");
774        assert_eq!(replayed.len(), 1);
775        assert_eq!(replayed[0].metric.len(), 10_000);
776
777        let _ = std::fs::remove_file(path);
778    }
779
780    #[test]
781    fn test_crc_wal_sync() {
782        let path = tmp_path("sync_test");
783        let _ = std::fs::remove_file(&path);
784
785        let mut wal = CrcWal::open(&path).expect("open");
786        wal.append(&MetricPoint::new("s", 0, 1.0)).expect("append");
787        wal.sync().expect("sync should succeed");
788
789        let replayed = CrcWal::replay(&path).expect("replay");
790        assert_eq!(replayed.len(), 1);
791
792        let _ = std::fs::remove_file(path);
793    }
794
795    #[test]
796    fn test_crc_wal_path_accessor() {
797        let path = tmp_path("path_access");
798        let _ = std::fs::remove_file(&path);
799
800        let wal = CrcWal::open(&path).expect("open");
801        assert_eq!(wal.path(), path.as_path());
802
803        let _ = std::fs::remove_file(path);
804    }
805
806    // -- RingBuffer -----------------------------------------------------------
807
808    #[test]
809    fn test_ring_buffer_capacity_enforced() {
810        let rb = RingBuffer::new(3);
811        rb.push(MetricPoint::new("a", 0, 0.0)).expect("push 1");
812        rb.push(MetricPoint::new("b", 1, 1.0)).expect("push 2");
813        rb.push(MetricPoint::new("c", 2, 2.0)).expect("push 3");
814        let result = rb.push(MetricPoint::new("d", 3, 3.0));
815        assert!(
816            matches!(result, Err(TsdbError::BufferFull(_))),
817            "should be BufferFull"
818        );
819    }
820
821    #[test]
822    fn test_ring_buffer_drain_all() {
823        let rb = RingBuffer::new(10);
824        for i in 0..5 {
825            rb.push(MetricPoint::new("x", i, i as f64)).expect("push");
826        }
827        let drained = rb.drain_all().expect("drain");
828        assert_eq!(drained.len(), 5);
829        assert_eq!(rb.len(), 0);
830    }
831
832    #[test]
833    fn test_ring_buffer_drain_preserves_order() {
834        let rb = RingBuffer::new(10);
835        for i in 0..5 {
836            rb.push(MetricPoint::new(format!("m{i}"), i, i as f64))
837                .expect("push");
838        }
839        let drained = rb.drain_all().expect("drain");
840        for (i, p) in drained.iter().enumerate() {
841            assert_eq!(p.metric, format!("m{i}"));
842            assert_eq!(p.timestamp_ms, i as i64);
843        }
844    }
845
846    #[test]
847    fn test_ring_buffer_capacity_accessor() {
848        let rb = RingBuffer::new(42);
849        assert_eq!(rb.capacity(), 42);
850    }
851
852    #[test]
853    fn test_ring_buffer_len_after_push_and_drain() {
854        let rb = RingBuffer::new(10);
855        assert_eq!(rb.len(), 0);
856        rb.push(MetricPoint::new("a", 0, 0.0)).expect("push");
857        assert_eq!(rb.len(), 1);
858        rb.push(MetricPoint::new("b", 1, 1.0)).expect("push");
859        assert_eq!(rb.len(), 2);
860        let _ = rb.drain_all().expect("drain");
861        assert_eq!(rb.len(), 0);
862    }
863
864    // -- BatchWriter ----------------------------------------------------------
865
866    #[test]
867    fn test_batch_writer_write_and_flush() {
868        let path = tmp_path("bw_flush");
869        let _ = std::fs::remove_file(&path);
870
871        let cfg = BatchWriterConfig::with_capacity(10);
872        let mut bw = BatchWriter::open(&path, cfg).expect("open");
873
874        for i in 0..5 {
875            bw.write_point(MetricPoint::new("cpu", i, i as f64))
876                .expect("write");
877        }
878        let seq = bw.flush().expect("flush");
879        assert!(seq > 0);
880        assert_eq!(bw.total_flushed(), 5);
881
882        let _ = std::fs::remove_file(path);
883    }
884
885    #[test]
886    fn test_batch_writer_auto_flush_on_capacity() {
887        let path = tmp_path("bw_auto");
888        let _ = std::fs::remove_file(&path);
889
890        let cfg = BatchWriterConfig::with_capacity(3);
891        let mut bw = BatchWriter::open(&path, cfg).expect("open");
892
893        for i in 0..4i64 {
894            bw.write_point(MetricPoint::new("x", i, i as f64))
895                .expect("write");
896        }
897
898        assert!(bw.total_flushed() >= 3);
899
900        let _ = std::fs::remove_file(path);
901    }
902
903    #[test]
904    fn test_batch_writer_write_batch() {
905        let path = tmp_path("bw_write_batch");
906        let _ = std::fs::remove_file(&path);
907
908        let pts: Vec<_> = (0..20i64)
909            .map(|i| MetricPoint::new(format!("s{i}"), i, i as f64))
910            .collect();
911
912        let cfg = BatchWriterConfig::with_capacity(8);
913        let mut bw = BatchWriter::open(&path, cfg).expect("open");
914
915        let seq = bw.write_batch(&pts).expect("write_batch");
916        assert!(seq > 0);
917        assert_eq!(bw.total_flushed(), 20);
918
919        let _ = std::fs::remove_file(path);
920    }
921
922    #[test]
923    fn test_batch_writer_replay_after_crash() {
924        let path = tmp_path("bw_crash_recovery");
925        let _ = std::fs::remove_file(&path);
926
927        let pts: Vec<_> = (0..10i64)
928            .map(|i| MetricPoint::new("temp", i * 1_000, 20.0 + i as f64))
929            .collect();
930
931        {
932            let cfg = BatchWriterConfig::default();
933            let mut bw = BatchWriter::open(&path, cfg).expect("open");
934            bw.write_batch(&pts).expect("write");
935        }
936
937        let recovered = BatchWriter::replay_wal(&path).expect("replay");
938        assert_eq!(recovered.len(), 10);
939        assert_eq!(recovered[0].metric, "temp");
940        assert!((recovered[5].value - 25.0).abs() < 1e-9);
941
942        let _ = std::fs::remove_file(path);
943    }
944
945    #[test]
946    fn test_batch_writer_flush_empty_buffer() {
947        let path = tmp_path("bw_empty_flush");
948        let _ = std::fs::remove_file(&path);
949
950        let mut bw = BatchWriter::open(&path, Default::default()).expect("open");
951        let seq = bw.flush().expect("flush empty");
952        assert_eq!(seq, 0, "flushing empty buffer should return seq 0");
953
954        let _ = std::fs::remove_file(path);
955    }
956
957    #[test]
958    fn test_batch_writer_config_default() {
959        let cfg = BatchWriterConfig::default();
960        assert_eq!(cfg.batch_capacity, 4096);
961        assert!(!cfg.fsync_on_flush);
962    }
963
964    #[test]
965    fn test_batch_writer_buffered_count() {
966        let path = tmp_path("bw_count");
967        let _ = std::fs::remove_file(&path);
968
969        let cfg = BatchWriterConfig::with_capacity(100);
970        let mut bw = BatchWriter::open(&path, cfg).expect("open");
971
972        assert_eq!(bw.buffered_count(), 0);
973        bw.write_point(MetricPoint::new("a", 0, 0.0))
974            .expect("write");
975        assert_eq!(bw.buffered_count(), 1);
976
977        let _ = std::fs::remove_file(path);
978    }
979
980    // -- Additional BatchWriter tests for high-performance write path ---------
981
982    #[test]
983    fn test_batch_writer_fsync_on_flush() {
984        let path = tmp_path("bw_fsync");
985        let _ = std::fs::remove_file(&path);
986
987        let cfg = BatchWriterConfig {
988            batch_capacity: 10,
989            fsync_on_flush: true,
990        };
991        let mut bw = BatchWriter::open(&path, cfg).expect("open");
992
993        bw.write_point(MetricPoint::new("x", 0, 1.0))
994            .expect("write");
995        let seq = bw.flush().expect("flush with fsync");
996        assert!(seq > 0);
997
998        // Verify data survived fsync
999        let recovered = BatchWriter::replay_wal(&path).expect("replay");
1000        assert_eq!(recovered.len(), 1);
1001
1002        let _ = std::fs::remove_file(path);
1003    }
1004
1005    #[test]
1006    fn test_batch_writer_large_batch() {
1007        let path = tmp_path("bw_large");
1008        let _ = std::fs::remove_file(&path);
1009
1010        let pts: Vec<_> = (0..1000i64)
1011            .map(|i| MetricPoint::new("sensor", i * 100, i as f64 * 0.1))
1012            .collect();
1013
1014        let cfg = BatchWriterConfig::with_capacity(100);
1015        let mut bw = BatchWriter::open(&path, cfg).expect("open");
1016        let seq = bw.write_batch(&pts).expect("write large batch");
1017        assert!(seq > 0);
1018        assert_eq!(bw.total_flushed(), 1000);
1019
1020        let recovered = BatchWriter::replay_wal(&path).expect("replay");
1021        assert_eq!(recovered.len(), 1000);
1022        assert!((recovered[999].value - 99.9).abs() < 1e-9);
1023
1024        let _ = std::fs::remove_file(path);
1025    }
1026
1027    #[test]
1028    fn test_batch_writer_multiple_flush_cycles() {
1029        let path = tmp_path("bw_multi_flush");
1030        let _ = std::fs::remove_file(&path);
1031
1032        let cfg = BatchWriterConfig::with_capacity(5);
1033        let mut bw = BatchWriter::open(&path, cfg).expect("open");
1034
1035        // Flush cycle 1
1036        for i in 0..5 {
1037            bw.write_point(MetricPoint::new("a", i, i as f64))
1038                .expect("write");
1039        }
1040        bw.flush().expect("flush 1");
1041        assert_eq!(bw.total_flushed(), 5);
1042
1043        // Flush cycle 2
1044        for i in 5..10 {
1045            bw.write_point(MetricPoint::new("b", i, i as f64))
1046                .expect("write");
1047        }
1048        bw.flush().expect("flush 2");
1049        assert_eq!(bw.total_flushed(), 10);
1050
1051        let recovered = BatchWriter::replay_wal(&path).expect("replay");
1052        assert_eq!(recovered.len(), 10);
1053
1054        let _ = std::fs::remove_file(path);
1055    }
1056
1057    #[test]
1058    fn test_batch_writer_batch_capacity_accessor() {
1059        let path = tmp_path("bw_cap");
1060        let _ = std::fs::remove_file(&path);
1061
1062        let cfg = BatchWriterConfig::with_capacity(256);
1063        let bw = BatchWriter::open(&path, cfg).expect("open");
1064        assert_eq!(bw.batch_capacity(), 256);
1065
1066        let _ = std::fs::remove_file(path);
1067    }
1068
1069    #[test]
1070    fn test_batch_writer_interleaved_write_and_flush() {
1071        let path = tmp_path("bw_interleave");
1072        let _ = std::fs::remove_file(&path);
1073
1074        let cfg = BatchWriterConfig::with_capacity(50);
1075        let mut bw = BatchWriter::open(&path, cfg).expect("open");
1076
1077        // Write-flush-write-flush pattern
1078        bw.write_point(MetricPoint::new("a", 0, 1.0)).expect("w1");
1079        bw.flush().expect("f1");
1080        bw.write_point(MetricPoint::new("b", 1, 2.0)).expect("w2");
1081        bw.write_point(MetricPoint::new("c", 2, 3.0)).expect("w3");
1082        bw.flush().expect("f2");
1083
1084        assert_eq!(bw.total_flushed(), 3);
1085
1086        let recovered = BatchWriter::replay_wal(&path).expect("replay");
1087        assert_eq!(recovered.len(), 3);
1088        assert_eq!(recovered[0].metric, "a");
1089        assert_eq!(recovered[1].metric, "b");
1090        assert_eq!(recovered[2].metric, "c");
1091
1092        let _ = std::fs::remove_file(path);
1093    }
1094
1095    #[test]
1096    fn test_batch_writer_config_with_capacity() {
1097        let cfg = BatchWriterConfig::with_capacity(512);
1098        assert_eq!(cfg.batch_capacity, 512);
1099        assert!(!cfg.fsync_on_flush); // defaults preserved
1100    }
1101}