1use crate::error::{TsdbError, TsdbResult};
36use crc32fast::Hasher as Crc32Hasher;
37use serde::{Deserialize, Serialize};
38use std::collections::VecDeque;
39use std::fs::{File, OpenOptions};
40use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
41use std::path::{Path, PathBuf};
42use std::sync::atomic::{AtomicU64, Ordering};
43use std::sync::{Arc, Mutex};
44
45#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
51pub struct MetricPoint {
52 pub metric: String,
54 pub timestamp_ms: i64,
56 pub value: f64,
58}
59
60impl MetricPoint {
61 pub fn new(metric: impl Into<String>, timestamp_ms: i64, value: f64) -> Self {
63 Self {
64 metric: metric.into(),
65 timestamp_ms,
66 value,
67 }
68 }
69
70 pub fn to_bytes(&self) -> TsdbResult<Vec<u8>> {
74 let metric_bytes = self.metric.as_bytes();
75 let metric_len = metric_bytes.len() as u32;
76 let mut buf = Vec::with_capacity(4 + metric_bytes.len() + 8 + 8);
77 buf.extend_from_slice(&metric_len.to_le_bytes());
78 buf.extend_from_slice(metric_bytes);
79 buf.extend_from_slice(&self.timestamp_ms.to_le_bytes());
80 buf.extend_from_slice(&self.value.to_le_bytes());
81 Ok(buf)
82 }
83
84 pub fn from_bytes(data: &[u8]) -> TsdbResult<Self> {
86 if data.len() < 4 {
87 return Err(TsdbError::Wal("record too short (< 4 bytes)".into()));
88 }
89 let metric_len = u32::from_le_bytes(
90 data[0..4]
91 .try_into()
92 .map_err(|_| TsdbError::Wal("slice error".into()))?,
93 ) as usize;
94
95 let end_metric = 4 + metric_len;
96 if data.len() < end_metric + 16 {
97 return Err(TsdbError::Wal(format!(
98 "record too short: expected {}, got {}",
99 end_metric + 16,
100 data.len()
101 )));
102 }
103
104 let metric = String::from_utf8(data[4..end_metric].to_vec())
105 .map_err(|e| TsdbError::Wal(format!("metric name UTF-8 error: {e}")))?;
106
107 let timestamp_ms = i64::from_le_bytes(
108 data[end_metric..end_metric + 8]
109 .try_into()
110 .map_err(|_| TsdbError::Wal("ts slice error".into()))?,
111 );
112 let value = f64::from_le_bytes(
113 data[end_metric + 8..end_metric + 16]
114 .try_into()
115 .map_err(|_| TsdbError::Wal("value slice error".into()))?,
116 );
117
118 Ok(Self {
119 metric,
120 timestamp_ms,
121 value,
122 })
123 }
124}
125
126pub struct CrcWal {
135 path: PathBuf,
136 writer: BufWriter<File>,
137 record_count: u64,
139 next_seq: Arc<AtomicU64>,
141}
142
143impl CrcWal {
144 pub fn open(path: &Path) -> TsdbResult<Self> {
146 let file = OpenOptions::new()
147 .create(true)
148 .append(true)
149 .open(path)
150 .map_err(|e| TsdbError::Wal(format!("open WAL: {e}")))?;
151
152 Ok(Self {
153 path: path.to_path_buf(),
154 writer: BufWriter::new(file),
155 record_count: 0,
156 next_seq: Arc::new(AtomicU64::new(1)),
157 })
158 }
159
160 pub fn path(&self) -> &Path {
162 &self.path
163 }
164
165 pub fn record_count(&self) -> u64 {
167 self.record_count
168 }
169
170 pub fn crc32(data: &[u8]) -> u32 {
172 let mut h = Crc32Hasher::new();
173 h.update(data);
174 h.finalize()
175 }
176
177 pub fn append(&mut self, point: &MetricPoint) -> TsdbResult<u64> {
181 let payload = point.to_bytes()?;
182 let checksum = Self::crc32(&payload);
183 let length = payload.len() as u32;
184
185 self.writer
187 .write_all(&checksum.to_le_bytes())
188 .map_err(|e| TsdbError::Wal(format!("write CRC: {e}")))?;
189 self.writer
190 .write_all(&length.to_le_bytes())
191 .map_err(|e| TsdbError::Wal(format!("write length: {e}")))?;
192 self.writer
193 .write_all(&payload)
194 .map_err(|e| TsdbError::Wal(format!("write payload: {e}")))?;
195 self.writer
196 .flush()
197 .map_err(|e| TsdbError::Wal(format!("flush WAL: {e}")))?;
198
199 self.record_count += 1;
200 let seq = self.next_seq.fetch_add(1, Ordering::Relaxed);
201 Ok(seq)
202 }
203
204 pub fn append_batch(&mut self, points: &[MetricPoint]) -> TsdbResult<u64> {
208 let mut last_seq = 0u64;
209 for point in points {
210 last_seq = self.append(point)?;
211 }
212 Ok(last_seq)
213 }
214
215 pub fn replay(path: &Path) -> TsdbResult<Vec<MetricPoint>> {
219 let file =
220 File::open(path).map_err(|e| TsdbError::Wal(format!("open WAL for replay: {e}")))?;
221 let mut reader = BufReader::new(file);
222 let mut points = Vec::new();
223
224 loop {
225 let mut header = [0u8; 8];
227 match reader.read_exact(&mut header) {
228 Ok(()) => {}
229 Err(ref e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
230 Err(e) => return Err(TsdbError::Wal(format!("read header: {e}"))),
231 }
232
233 let stored_crc = u32::from_le_bytes(header[0..4].try_into().expect("4 bytes"));
234 let length = u32::from_le_bytes(header[4..8].try_into().expect("4 bytes")) as usize;
235
236 let mut payload = vec![0u8; length];
237 reader
238 .read_exact(&mut payload)
239 .map_err(|e| TsdbError::Wal(format!("read payload: {e}")))?;
240
241 let computed_crc = Self::crc32(&payload);
242 if computed_crc != stored_crc {
243 return Err(TsdbError::CrcMismatch {
244 expected: stored_crc,
245 got: computed_crc,
246 });
247 }
248
249 points.push(MetricPoint::from_bytes(&payload)?);
250 }
251
252 Ok(points)
253 }
254
255 pub fn clear(&mut self) -> TsdbResult<()> {
257 self.writer
258 .flush()
259 .map_err(|e| TsdbError::Wal(format!("{e}")))?;
260
261 let file = OpenOptions::new()
262 .write(true)
263 .truncate(true)
264 .open(&self.path)
265 .map_err(|e| TsdbError::Wal(format!("truncate WAL: {e}")))?;
266
267 self.writer = BufWriter::new(file);
268 self.record_count = 0;
269 Ok(())
270 }
271
272 pub fn sync(&mut self) -> TsdbResult<()> {
274 self.writer
275 .flush()
276 .map_err(|e| TsdbError::Wal(format!("{e}")))?;
277 self.writer
278 .get_ref()
279 .sync_all()
280 .map_err(|e| TsdbError::Wal(format!("fsync WAL: {e}")))
281 }
282
283 pub fn file_size(&mut self) -> TsdbResult<u64> {
285 self.writer
286 .flush()
287 .map_err(|e| TsdbError::Wal(format!("{e}")))?;
288 let pos = self
289 .writer
290 .get_mut()
291 .seek(SeekFrom::End(0))
292 .map_err(|e| TsdbError::Wal(format!("seek: {e}")))?;
293 Ok(pos)
294 }
295}
296
297#[derive(Debug)]
307struct RingBuffer {
308 inner: Mutex<VecDeque<MetricPoint>>,
309 capacity: usize,
310}
311
312impl RingBuffer {
313 fn new(capacity: usize) -> Self {
314 Self {
315 inner: Mutex::new(VecDeque::with_capacity(capacity)),
316 capacity,
317 }
318 }
319
320 fn push(&self, point: MetricPoint) -> TsdbResult<()> {
322 let mut q = self
323 .inner
324 .lock()
325 .map_err(|_| TsdbError::Wal("lock poisoned".into()))?;
326 if q.len() >= self.capacity {
327 return Err(TsdbError::BufferFull(q.len()));
328 }
329 q.push_back(point);
330 Ok(())
331 }
332
333 fn drain_all(&self) -> TsdbResult<Vec<MetricPoint>> {
335 let mut q = self
336 .inner
337 .lock()
338 .map_err(|_| TsdbError::Wal("lock poisoned".into()))?;
339 Ok(q.drain(..).collect())
340 }
341
342 fn len(&self) -> usize {
344 self.inner.lock().map(|q| q.len()).unwrap_or(0)
345 }
346
347 fn capacity(&self) -> usize {
349 self.capacity
350 }
351}
352
353pub struct BatchWriter {
377 buffer: Arc<RingBuffer>,
378 wal: CrcWal,
379 config: BatchWriterConfig,
380 total_flushed: u64,
382}
383
384#[derive(Debug, Clone)]
386pub struct BatchWriterConfig {
387 pub batch_capacity: usize,
389 pub fsync_on_flush: bool,
391}
392
393impl Default for BatchWriterConfig {
394 fn default() -> Self {
395 Self {
396 batch_capacity: 4096,
397 fsync_on_flush: false,
398 }
399 }
400}
401
402impl BatchWriterConfig {
403 pub fn with_capacity(capacity: usize) -> Self {
405 Self {
406 batch_capacity: capacity,
407 ..Default::default()
408 }
409 }
410}
411
412impl BatchWriter {
413 pub fn open(path: &Path, config: BatchWriterConfig) -> TsdbResult<Self> {
415 let wal = CrcWal::open(path)?;
416 let buffer = Arc::new(RingBuffer::new(config.batch_capacity));
417 Ok(Self {
418 buffer,
419 wal,
420 config,
421 total_flushed: 0,
422 })
423 }
424
425 pub fn total_flushed(&self) -> u64 {
427 self.total_flushed
428 }
429
430 pub fn buffered_count(&self) -> usize {
432 self.buffer.len()
433 }
434
435 pub fn batch_capacity(&self) -> usize {
437 self.buffer.capacity()
438 }
439
440 pub fn write_point(&mut self, point: MetricPoint) -> TsdbResult<()> {
444 if self.buffer.len() >= self.config.batch_capacity {
445 self.flush()?;
446 }
447 self.buffer.push(point)
448 }
449
450 pub fn write_batch(&mut self, points: &[MetricPoint]) -> TsdbResult<u64> {
454 let mut last_seq = 0u64;
455 for point in points {
456 if self.buffer.len() >= self.config.batch_capacity {
457 last_seq = self.flush()?;
458 }
459 self.buffer.push(point.clone())?;
460 }
461 if self.buffer.len() > 0 {
463 last_seq = self.flush()?;
464 }
465 Ok(last_seq)
466 }
467
468 pub fn flush(&mut self) -> TsdbResult<u64> {
473 let points = self.buffer.drain_all()?;
474 if points.is_empty() {
475 return Ok(0);
476 }
477 let last_seq = self.wal.append_batch(&points)?;
478 self.total_flushed += points.len() as u64;
479 if self.config.fsync_on_flush {
480 self.wal.sync()?;
481 }
482 Ok(last_seq)
483 }
484
485 pub fn replay_wal(path: &Path) -> TsdbResult<Vec<MetricPoint>> {
487 CrcWal::replay(path)
488 }
489}
490
491#[cfg(test)]
500mod tests {
501 use super::*;
502 use std::env;
503
504 fn tmp_path(name: &str) -> PathBuf {
505 env::temp_dir().join(format!("oxirs_tsdb_{name}.wal"))
506 }
507
508 #[test]
511 fn test_metric_point_roundtrip() {
512 let p = MetricPoint::new("temperature", 1_700_000_000_000, 23.7);
513 let bytes = p.to_bytes().expect("serialize");
514 let back = MetricPoint::from_bytes(&bytes).expect("deserialize");
515 assert_eq!(p.metric, back.metric);
516 assert_eq!(p.timestamp_ms, back.timestamp_ms);
517 assert!((p.value - back.value).abs() < 1e-12);
518 }
519
520 #[test]
521 fn test_metric_point_unicode_metric_name() {
522 let p = MetricPoint::new("温度センサー", 42, 100.0);
523 let bytes = p.to_bytes().expect("serialize unicode");
524 let back = MetricPoint::from_bytes(&bytes).expect("deserialize unicode");
525 assert_eq!(back.metric, "温度センサー");
526 }
527
528 #[test]
529 fn test_metric_point_from_bytes_too_short() {
530 let result = MetricPoint::from_bytes(&[0u8; 3]);
531 assert!(result.is_err());
532 }
533
534 #[test]
535 fn test_metric_point_clone_and_eq() {
536 let p = MetricPoint::new("x", 0, 0.0);
537 assert_eq!(p, p.clone());
538 }
539
540 #[test]
541 fn test_metric_point_empty_metric_name() {
542 let p = MetricPoint::new("", 1000, 42.0);
543 let bytes = p.to_bytes().expect("serialize empty name");
544 let back = MetricPoint::from_bytes(&bytes).expect("deserialize empty name");
545 assert_eq!(back.metric, "");
546 assert_eq!(back.timestamp_ms, 1000);
547 }
548
549 #[test]
550 fn test_metric_point_negative_value() {
551 let p = MetricPoint::new("temp", 1000, -40.5);
552 let bytes = p.to_bytes().expect("serialize neg");
553 let back = MetricPoint::from_bytes(&bytes).expect("deserialize neg");
554 assert!((back.value - (-40.5)).abs() < f64::EPSILON);
555 }
556
557 #[test]
558 fn test_metric_point_zero_timestamp() {
559 let p = MetricPoint::new("x", 0, 0.0);
560 let bytes = p.to_bytes().expect("serialize zero ts");
561 let back = MetricPoint::from_bytes(&bytes).expect("deserialize zero ts");
562 assert_eq!(back.timestamp_ms, 0);
563 }
564
565 #[test]
566 fn test_metric_point_max_timestamp() {
567 let p = MetricPoint::new("x", i64::MAX, 1.0);
568 let bytes = p.to_bytes().expect("serialize max ts");
569 let back = MetricPoint::from_bytes(&bytes).expect("deserialize max ts");
570 assert_eq!(back.timestamp_ms, i64::MAX);
571 }
572
573 #[test]
574 fn test_metric_point_special_float_values() {
575 let p_inf = MetricPoint::new("x", 0, f64::INFINITY);
576 let bytes = p_inf.to_bytes().expect("serialize inf");
577 let back = MetricPoint::from_bytes(&bytes).expect("deserialize inf");
578 assert!(back.value.is_infinite() && back.value.is_sign_positive());
579
580 let p_nan = MetricPoint::new("x", 0, f64::NAN);
581 let bytes = p_nan.to_bytes().expect("serialize nan");
582 let back = MetricPoint::from_bytes(&bytes).expect("deserialize nan");
583 assert!(back.value.is_nan());
584 }
585
586 #[test]
587 fn test_metric_point_serde_json_roundtrip() {
588 let p = MetricPoint::new("latency", 1_700_000_000_000, 2.5);
589 let json = serde_json::to_string(&p).expect("json serialize");
590 let back: MetricPoint = serde_json::from_str(&json).expect("json deserialize");
591 assert_eq!(p, back);
592 }
593
594 #[test]
597 fn test_crc_wal_append_and_replay() {
598 let path = tmp_path("append_replay");
599 let _ = std::fs::remove_file(&path);
600
601 let pts = vec![
602 MetricPoint::new("cpu", 1_000, 55.0),
603 MetricPoint::new("mem", 2_000, 8192.0),
604 MetricPoint::new("disk", 3_000, 0.75),
605 ];
606
607 {
608 let mut wal = CrcWal::open(&path).expect("open");
609 for p in &pts {
610 wal.append(p).expect("append");
611 }
612 }
613
614 let replayed = CrcWal::replay(&path).expect("replay");
615 assert_eq!(replayed.len(), 3);
616 assert_eq!(replayed[0].metric, "cpu");
617 assert_eq!(replayed[1].metric, "mem");
618 assert_eq!(replayed[2].metric, "disk");
619
620 let _ = std::fs::remove_file(path);
621 }
622
623 #[test]
624 fn test_crc_wal_crc_mismatch_detected() {
625 let path = tmp_path("crc_corrupt");
626 let _ = std::fs::remove_file(&path);
627
628 {
629 let mut wal = CrcWal::open(&path).expect("open");
630 wal.append(&MetricPoint::new("x", 0, 1.0)).expect("append");
631 }
632
633 {
635 let mut data = std::fs::read(&path).expect("read");
636 data[0] ^= 0xFF;
637 std::fs::write(&path, &data).expect("write corrupt");
638 }
639
640 let result = CrcWal::replay(&path);
641 assert!(
642 matches!(result, Err(TsdbError::CrcMismatch { .. })),
643 "expected CrcMismatch, got: {result:?}"
644 );
645
646 let _ = std::fs::remove_file(path);
647 }
648
649 #[test]
650 fn test_crc_wal_clear() {
651 let path = tmp_path("clear");
652 let _ = std::fs::remove_file(&path);
653
654 let mut wal = CrcWal::open(&path).expect("open");
655 wal.append(&MetricPoint::new("y", 1, 2.0)).expect("append");
656 assert_eq!(wal.record_count(), 1);
657
658 wal.clear().expect("clear");
659 assert_eq!(wal.record_count(), 0);
660
661 let replayed = CrcWal::replay(&path).expect("replay after clear");
662 assert!(replayed.is_empty());
663
664 let _ = std::fs::remove_file(path);
665 }
666
667 #[test]
668 fn test_crc_wal_record_count() {
669 let path = tmp_path("record_count");
670 let _ = std::fs::remove_file(&path);
671
672 let mut wal = CrcWal::open(&path).expect("open");
673 for i in 0..10u32 {
674 wal.append(&MetricPoint::new(format!("s{i}"), i as i64, i as f64))
675 .expect("append");
676 }
677 assert_eq!(wal.record_count(), 10);
678
679 let _ = std::fs::remove_file(path);
680 }
681
682 #[test]
683 fn test_crc_wal_append_batch() {
684 let path = tmp_path("batch");
685 let _ = std::fs::remove_file(&path);
686
687 let pts: Vec<_> = (0..5).map(|i| MetricPoint::new("b", i, i as f64)).collect();
688
689 {
690 let mut wal = CrcWal::open(&path).expect("open");
691 let seq = wal.append_batch(&pts).expect("batch");
692 assert!(seq > 0);
693 }
694
695 let replayed = CrcWal::replay(&path).expect("replay");
696 assert_eq!(replayed.len(), 5);
697
698 let _ = std::fs::remove_file(path);
699 }
700
701 #[test]
702 fn test_crc32_known_value() {
703 let crc = CrcWal::crc32(b"");
704 assert_eq!(crc, 0x0000_0000);
705 }
706
707 #[test]
708 fn test_crc32_non_empty() {
709 let a = CrcWal::crc32(b"hello");
710 let b = CrcWal::crc32(b"hello");
711 assert_eq!(a, b, "CRC must be deterministic");
712 let c = CrcWal::crc32(b"world");
713 assert_ne!(a, c, "different data must produce different CRC");
714 }
715
716 #[test]
717 fn test_crc_wal_file_size_grows() {
718 let path = tmp_path("size_grows");
719 let _ = std::fs::remove_file(&path);
720
721 let mut wal = CrcWal::open(&path).expect("open");
722 let before = wal.file_size().expect("size");
723
724 wal.append(&MetricPoint::new("s", 0, 0.0)).expect("append");
725 let after = wal.file_size().expect("size after");
726
727 assert!(after > before, "WAL must grow after append");
728 let _ = std::fs::remove_file(path);
729 }
730
731 #[test]
732 fn test_crc_wal_multiple_sessions() {
733 let path = tmp_path("multi_session");
734 let _ = std::fs::remove_file(&path);
735
736 {
738 let mut wal = CrcWal::open(&path).expect("open");
739 for i in 0..3 {
740 wal.append(&MetricPoint::new("s1", i, i as f64))
741 .expect("append");
742 }
743 }
744
745 {
747 let mut wal = CrcWal::open(&path).expect("reopen");
748 for i in 3..5 {
749 wal.append(&MetricPoint::new("s2", i, i as f64))
750 .expect("append");
751 }
752 }
753
754 let replayed = CrcWal::replay(&path).expect("replay");
756 assert_eq!(replayed.len(), 5);
757 assert_eq!(replayed[0].metric, "s1");
758 assert_eq!(replayed[3].metric, "s2");
759
760 let _ = std::fs::remove_file(path);
761 }
762
763 #[test]
764 fn test_crc_wal_large_metric_name() {
765 let path = tmp_path("large_name");
766 let _ = std::fs::remove_file(&path);
767
768 let large_name = "x".repeat(10_000);
769 let mut wal = CrcWal::open(&path).expect("open");
770 wal.append(&MetricPoint::new(&large_name, 0, 0.0))
771 .expect("append large name");
772
773 let replayed = CrcWal::replay(&path).expect("replay");
774 assert_eq!(replayed.len(), 1);
775 assert_eq!(replayed[0].metric.len(), 10_000);
776
777 let _ = std::fs::remove_file(path);
778 }
779
780 #[test]
781 fn test_crc_wal_sync() {
782 let path = tmp_path("sync_test");
783 let _ = std::fs::remove_file(&path);
784
785 let mut wal = CrcWal::open(&path).expect("open");
786 wal.append(&MetricPoint::new("s", 0, 1.0)).expect("append");
787 wal.sync().expect("sync should succeed");
788
789 let replayed = CrcWal::replay(&path).expect("replay");
790 assert_eq!(replayed.len(), 1);
791
792 let _ = std::fs::remove_file(path);
793 }
794
795 #[test]
796 fn test_crc_wal_path_accessor() {
797 let path = tmp_path("path_access");
798 let _ = std::fs::remove_file(&path);
799
800 let wal = CrcWal::open(&path).expect("open");
801 assert_eq!(wal.path(), path.as_path());
802
803 let _ = std::fs::remove_file(path);
804 }
805
806 #[test]
809 fn test_ring_buffer_capacity_enforced() {
810 let rb = RingBuffer::new(3);
811 rb.push(MetricPoint::new("a", 0, 0.0)).expect("push 1");
812 rb.push(MetricPoint::new("b", 1, 1.0)).expect("push 2");
813 rb.push(MetricPoint::new("c", 2, 2.0)).expect("push 3");
814 let result = rb.push(MetricPoint::new("d", 3, 3.0));
815 assert!(
816 matches!(result, Err(TsdbError::BufferFull(_))),
817 "should be BufferFull"
818 );
819 }
820
821 #[test]
822 fn test_ring_buffer_drain_all() {
823 let rb = RingBuffer::new(10);
824 for i in 0..5 {
825 rb.push(MetricPoint::new("x", i, i as f64)).expect("push");
826 }
827 let drained = rb.drain_all().expect("drain");
828 assert_eq!(drained.len(), 5);
829 assert_eq!(rb.len(), 0);
830 }
831
832 #[test]
833 fn test_ring_buffer_drain_preserves_order() {
834 let rb = RingBuffer::new(10);
835 for i in 0..5 {
836 rb.push(MetricPoint::new(format!("m{i}"), i, i as f64))
837 .expect("push");
838 }
839 let drained = rb.drain_all().expect("drain");
840 for (i, p) in drained.iter().enumerate() {
841 assert_eq!(p.metric, format!("m{i}"));
842 assert_eq!(p.timestamp_ms, i as i64);
843 }
844 }
845
846 #[test]
847 fn test_ring_buffer_capacity_accessor() {
848 let rb = RingBuffer::new(42);
849 assert_eq!(rb.capacity(), 42);
850 }
851
852 #[test]
853 fn test_ring_buffer_len_after_push_and_drain() {
854 let rb = RingBuffer::new(10);
855 assert_eq!(rb.len(), 0);
856 rb.push(MetricPoint::new("a", 0, 0.0)).expect("push");
857 assert_eq!(rb.len(), 1);
858 rb.push(MetricPoint::new("b", 1, 1.0)).expect("push");
859 assert_eq!(rb.len(), 2);
860 let _ = rb.drain_all().expect("drain");
861 assert_eq!(rb.len(), 0);
862 }
863
864 #[test]
867 fn test_batch_writer_write_and_flush() {
868 let path = tmp_path("bw_flush");
869 let _ = std::fs::remove_file(&path);
870
871 let cfg = BatchWriterConfig::with_capacity(10);
872 let mut bw = BatchWriter::open(&path, cfg).expect("open");
873
874 for i in 0..5 {
875 bw.write_point(MetricPoint::new("cpu", i, i as f64))
876 .expect("write");
877 }
878 let seq = bw.flush().expect("flush");
879 assert!(seq > 0);
880 assert_eq!(bw.total_flushed(), 5);
881
882 let _ = std::fs::remove_file(path);
883 }
884
885 #[test]
886 fn test_batch_writer_auto_flush_on_capacity() {
887 let path = tmp_path("bw_auto");
888 let _ = std::fs::remove_file(&path);
889
890 let cfg = BatchWriterConfig::with_capacity(3);
891 let mut bw = BatchWriter::open(&path, cfg).expect("open");
892
893 for i in 0..4i64 {
894 bw.write_point(MetricPoint::new("x", i, i as f64))
895 .expect("write");
896 }
897
898 assert!(bw.total_flushed() >= 3);
899
900 let _ = std::fs::remove_file(path);
901 }
902
903 #[test]
904 fn test_batch_writer_write_batch() {
905 let path = tmp_path("bw_write_batch");
906 let _ = std::fs::remove_file(&path);
907
908 let pts: Vec<_> = (0..20i64)
909 .map(|i| MetricPoint::new(format!("s{i}"), i, i as f64))
910 .collect();
911
912 let cfg = BatchWriterConfig::with_capacity(8);
913 let mut bw = BatchWriter::open(&path, cfg).expect("open");
914
915 let seq = bw.write_batch(&pts).expect("write_batch");
916 assert!(seq > 0);
917 assert_eq!(bw.total_flushed(), 20);
918
919 let _ = std::fs::remove_file(path);
920 }
921
922 #[test]
923 fn test_batch_writer_replay_after_crash() {
924 let path = tmp_path("bw_crash_recovery");
925 let _ = std::fs::remove_file(&path);
926
927 let pts: Vec<_> = (0..10i64)
928 .map(|i| MetricPoint::new("temp", i * 1_000, 20.0 + i as f64))
929 .collect();
930
931 {
932 let cfg = BatchWriterConfig::default();
933 let mut bw = BatchWriter::open(&path, cfg).expect("open");
934 bw.write_batch(&pts).expect("write");
935 }
936
937 let recovered = BatchWriter::replay_wal(&path).expect("replay");
938 assert_eq!(recovered.len(), 10);
939 assert_eq!(recovered[0].metric, "temp");
940 assert!((recovered[5].value - 25.0).abs() < 1e-9);
941
942 let _ = std::fs::remove_file(path);
943 }
944
945 #[test]
946 fn test_batch_writer_flush_empty_buffer() {
947 let path = tmp_path("bw_empty_flush");
948 let _ = std::fs::remove_file(&path);
949
950 let mut bw = BatchWriter::open(&path, Default::default()).expect("open");
951 let seq = bw.flush().expect("flush empty");
952 assert_eq!(seq, 0, "flushing empty buffer should return seq 0");
953
954 let _ = std::fs::remove_file(path);
955 }
956
957 #[test]
958 fn test_batch_writer_config_default() {
959 let cfg = BatchWriterConfig::default();
960 assert_eq!(cfg.batch_capacity, 4096);
961 assert!(!cfg.fsync_on_flush);
962 }
963
964 #[test]
965 fn test_batch_writer_buffered_count() {
966 let path = tmp_path("bw_count");
967 let _ = std::fs::remove_file(&path);
968
969 let cfg = BatchWriterConfig::with_capacity(100);
970 let mut bw = BatchWriter::open(&path, cfg).expect("open");
971
972 assert_eq!(bw.buffered_count(), 0);
973 bw.write_point(MetricPoint::new("a", 0, 0.0))
974 .expect("write");
975 assert_eq!(bw.buffered_count(), 1);
976
977 let _ = std::fs::remove_file(path);
978 }
979
980 #[test]
983 fn test_batch_writer_fsync_on_flush() {
984 let path = tmp_path("bw_fsync");
985 let _ = std::fs::remove_file(&path);
986
987 let cfg = BatchWriterConfig {
988 batch_capacity: 10,
989 fsync_on_flush: true,
990 };
991 let mut bw = BatchWriter::open(&path, cfg).expect("open");
992
993 bw.write_point(MetricPoint::new("x", 0, 1.0))
994 .expect("write");
995 let seq = bw.flush().expect("flush with fsync");
996 assert!(seq > 0);
997
998 let recovered = BatchWriter::replay_wal(&path).expect("replay");
1000 assert_eq!(recovered.len(), 1);
1001
1002 let _ = std::fs::remove_file(path);
1003 }
1004
1005 #[test]
1006 fn test_batch_writer_large_batch() {
1007 let path = tmp_path("bw_large");
1008 let _ = std::fs::remove_file(&path);
1009
1010 let pts: Vec<_> = (0..1000i64)
1011 .map(|i| MetricPoint::new("sensor", i * 100, i as f64 * 0.1))
1012 .collect();
1013
1014 let cfg = BatchWriterConfig::with_capacity(100);
1015 let mut bw = BatchWriter::open(&path, cfg).expect("open");
1016 let seq = bw.write_batch(&pts).expect("write large batch");
1017 assert!(seq > 0);
1018 assert_eq!(bw.total_flushed(), 1000);
1019
1020 let recovered = BatchWriter::replay_wal(&path).expect("replay");
1021 assert_eq!(recovered.len(), 1000);
1022 assert!((recovered[999].value - 99.9).abs() < 1e-9);
1023
1024 let _ = std::fs::remove_file(path);
1025 }
1026
1027 #[test]
1028 fn test_batch_writer_multiple_flush_cycles() {
1029 let path = tmp_path("bw_multi_flush");
1030 let _ = std::fs::remove_file(&path);
1031
1032 let cfg = BatchWriterConfig::with_capacity(5);
1033 let mut bw = BatchWriter::open(&path, cfg).expect("open");
1034
1035 for i in 0..5 {
1037 bw.write_point(MetricPoint::new("a", i, i as f64))
1038 .expect("write");
1039 }
1040 bw.flush().expect("flush 1");
1041 assert_eq!(bw.total_flushed(), 5);
1042
1043 for i in 5..10 {
1045 bw.write_point(MetricPoint::new("b", i, i as f64))
1046 .expect("write");
1047 }
1048 bw.flush().expect("flush 2");
1049 assert_eq!(bw.total_flushed(), 10);
1050
1051 let recovered = BatchWriter::replay_wal(&path).expect("replay");
1052 assert_eq!(recovered.len(), 10);
1053
1054 let _ = std::fs::remove_file(path);
1055 }
1056
1057 #[test]
1058 fn test_batch_writer_batch_capacity_accessor() {
1059 let path = tmp_path("bw_cap");
1060 let _ = std::fs::remove_file(&path);
1061
1062 let cfg = BatchWriterConfig::with_capacity(256);
1063 let bw = BatchWriter::open(&path, cfg).expect("open");
1064 assert_eq!(bw.batch_capacity(), 256);
1065
1066 let _ = std::fs::remove_file(path);
1067 }
1068
1069 #[test]
1070 fn test_batch_writer_interleaved_write_and_flush() {
1071 let path = tmp_path("bw_interleave");
1072 let _ = std::fs::remove_file(&path);
1073
1074 let cfg = BatchWriterConfig::with_capacity(50);
1075 let mut bw = BatchWriter::open(&path, cfg).expect("open");
1076
1077 bw.write_point(MetricPoint::new("a", 0, 1.0)).expect("w1");
1079 bw.flush().expect("f1");
1080 bw.write_point(MetricPoint::new("b", 1, 2.0)).expect("w2");
1081 bw.write_point(MetricPoint::new("c", 2, 3.0)).expect("w3");
1082 bw.flush().expect("f2");
1083
1084 assert_eq!(bw.total_flushed(), 3);
1085
1086 let recovered = BatchWriter::replay_wal(&path).expect("replay");
1087 assert_eq!(recovered.len(), 3);
1088 assert_eq!(recovered[0].metric, "a");
1089 assert_eq!(recovered[1].metric, "b");
1090 assert_eq!(recovered[2].metric, "c");
1091
1092 let _ = std::fs::remove_file(path);
1093 }
1094
1095 #[test]
1096 fn test_batch_writer_config_with_capacity() {
1097 let cfg = BatchWriterConfig::with_capacity(512);
1098 assert_eq!(cfg.batch_capacity, 512);
1099 assert!(!cfg.fsync_on_flush); }
1101}