Skip to main content

oxirs_tsdb/
write_buffer.rs

1//! In-memory write buffer for time-series data ingestion.
2//!
3//! Accumulates incoming data points before flushing them to durable storage.
4//! Supports size-based, time-based, and explicit flush policies, WAL
5//! integration hooks, out-of-order write handling, backpressure, and partial
6//! flushing.
7//!
8//! # Example
9//!
10//! ```
11//! use oxirs_tsdb::write_buffer::{WriteBufferConfig, WriteBuffer, FlushPolicy, DataPoint};
12//!
13//! let config = WriteBufferConfig {
14//!     max_capacity: 1000,
15//!     flush_policy: FlushPolicy::SizeBased { threshold: 500 },
16//!     ..Default::default()
17//! };
18//! let mut buf = WriteBuffer::new(config);
19//! buf.push(DataPoint { series_id: 1, timestamp_ms: 1_000, value: 3.14 }).expect("push failed");
20//! if buf.should_flush() {
21//!     let points = buf.flush().expect("flush failed");
22//!     println!("flushed {} points", points.len());
23//! }
24//! ```
25
26use std::cmp::Ordering;
27use std::time::{Duration, Instant};
28
29// ---------------------------------------------------------------------------
30// Error type
31// ---------------------------------------------------------------------------
32
33/// Errors produced by the write buffer.
34#[derive(Debug, Clone, PartialEq, Eq)]
35pub enum WriteBufferError {
36    /// The buffer is full and backpressure is active.
37    BufferFull,
38    /// An explicit flush was requested while the buffer is in a transient state.
39    FlushConflict(String),
40    /// The WAL entry could not be written (simulated).
41    WalError(String),
42    /// General internal error.
43    Internal(String),
44}
45
46impl std::fmt::Display for WriteBufferError {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        match self {
49            Self::BufferFull => write!(f, "Write buffer full: backpressure active"),
50            Self::FlushConflict(msg) => write!(f, "Flush conflict: {msg}"),
51            Self::WalError(msg) => write!(f, "WAL error: {msg}"),
52            Self::Internal(msg) => write!(f, "Internal error: {msg}"),
53        }
54    }
55}
56
57impl std::error::Error for WriteBufferError {}
58
59/// Result alias for write buffer operations.
60pub type WriteBufferResult<T> = Result<T, WriteBufferError>;
61
62// ---------------------------------------------------------------------------
63// Data point
64// ---------------------------------------------------------------------------
65
66/// A single time-series observation.
67#[derive(Debug, Clone, PartialEq)]
68pub struct DataPoint {
69    /// Numeric series identifier.
70    pub series_id: u64,
71    /// Millisecond UTC timestamp.
72    pub timestamp_ms: i64,
73    /// Observed value.
74    pub value: f64,
75}
76
77impl Eq for DataPoint {}
78
79impl PartialOrd for DataPoint {
80    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
81        Some(self.cmp(other))
82    }
83}
84
85impl Ord for DataPoint {
86    fn cmp(&self, other: &Self) -> Ordering {
87        self.timestamp_ms
88            .cmp(&other.timestamp_ms)
89            .then(self.series_id.cmp(&other.series_id))
90    }
91}
92
93// ---------------------------------------------------------------------------
94// Flush policy
95// ---------------------------------------------------------------------------
96
97/// Determines when the buffer should be automatically flushed.
98#[derive(Debug, Clone, PartialEq)]
99pub enum FlushPolicy {
100    /// Flush only when explicitly requested.
101    Explicit,
102    /// Flush when the number of buffered points reaches `threshold`.
103    SizeBased {
104        /// Number of data points that triggers a flush.
105        threshold: usize,
106    },
107    /// Flush when the oldest buffered point exceeds `max_age`.
108    TimeBased {
109        /// Maximum age of the oldest buffered point before flushing.
110        max_age: Duration,
111    },
112    /// Flush when either the size or time condition is met.
113    Combined {
114        /// Point count threshold.
115        threshold: usize,
116        /// Age threshold.
117        max_age: Duration,
118    },
119}
120
121impl Default for FlushPolicy {
122    fn default() -> Self {
123        Self::SizeBased { threshold: 1000 }
124    }
125}
126
127// ---------------------------------------------------------------------------
128// Buffer state
129// ---------------------------------------------------------------------------
130
131/// Lifecycle state of the write buffer.
132#[derive(Debug, Clone, Copy, PartialEq, Eq)]
133pub enum BufferState {
134    /// No data points are buffered.
135    Empty,
136    /// Points are being accumulated normally.
137    Accumulating,
138    /// A flush is currently in progress.
139    Flushing,
140    /// The buffer is at maximum capacity; writes are blocked.
141    Full,
142}
143
144// ---------------------------------------------------------------------------
145// WAL entry
146// ---------------------------------------------------------------------------
147
148/// A simulated write-ahead log entry produced during a flush.
149#[derive(Debug, Clone)]
150pub struct WalEntry {
151    /// Sequence number assigned to this flush batch.
152    pub sequence: u64,
153    /// Number of data points in this entry.
154    pub point_count: usize,
155    /// Minimum timestamp in the batch (ms).
156    pub min_timestamp_ms: i64,
157    /// Maximum timestamp in the batch (ms).
158    pub max_timestamp_ms: i64,
159}
160
161// ---------------------------------------------------------------------------
162// Configuration
163// ---------------------------------------------------------------------------
164
165/// Configuration for the write buffer.
166#[derive(Debug, Clone)]
167pub struct WriteBufferConfig {
168    /// Maximum number of data points the buffer can hold before backpressure.
169    pub max_capacity: usize,
170    /// Flush policy.
171    pub flush_policy: FlushPolicy,
172    /// If true, WAL entries are produced on each flush.
173    pub enable_wal: bool,
174    /// Maximum age of buffered data before a partial flush is triggered (0 = disabled).
175    pub partial_flush_age: Duration,
176    /// Number of oldest points to flush in a partial flush (0 = all eligible).
177    pub partial_flush_count: usize,
178}
179
180impl Default for WriteBufferConfig {
181    fn default() -> Self {
182        Self {
183            max_capacity: 100_000,
184            flush_policy: FlushPolicy::default(),
185            enable_wal: true,
186            partial_flush_age: Duration::ZERO,
187            partial_flush_count: 0,
188        }
189    }
190}
191
192// ---------------------------------------------------------------------------
193// Buffer statistics
194// ---------------------------------------------------------------------------
195
196/// Statistics about the current state and historical activity of the buffer.
197#[derive(Debug, Clone, Default)]
198pub struct BufferStats {
199    /// Number of data points currently in the buffer.
200    pub buffered_count: usize,
201    /// Oldest timestamp (ms) in the buffer (`i64::MAX` if empty).
202    pub oldest_timestamp_ms: i64,
203    /// Newest timestamp (ms) in the buffer (`i64::MIN` if empty).
204    pub newest_timestamp_ms: i64,
205    /// Total number of flushes performed.
206    pub flush_count: u64,
207    /// Total number of data points flushed across all flushes.
208    pub total_points_flushed: u64,
209    /// Total number of out-of-order points that were sorted on flush.
210    pub out_of_order_count: u64,
211    /// Number of writes blocked by backpressure.
212    pub backpressure_events: u64,
213    /// Number of WAL entries written.
214    pub wal_entries: u64,
215}
216
217// ---------------------------------------------------------------------------
218// WAL sink (callback)
219// ---------------------------------------------------------------------------
220
221/// Trait for sinking WAL entries to durable storage.
222///
223/// Implement this on a real WAL to integrate durability.
224pub trait WalSink: Send + 'static {
225    /// Write a WAL entry.
226    fn write_entry(&mut self, entry: WalEntry) -> WriteBufferResult<()>;
227}
228
229/// A no-op WAL sink (drops all entries).
230pub struct NoopWalSink;
231
232impl WalSink for NoopWalSink {
233    fn write_entry(&mut self, _entry: WalEntry) -> WriteBufferResult<()> {
234        Ok(())
235    }
236}
237
238// ---------------------------------------------------------------------------
239// Write buffer
240// ---------------------------------------------------------------------------
241
242/// In-memory write buffer for time-series data points.
243pub struct WriteBuffer {
244    config: WriteBufferConfig,
245    points: Vec<DataPoint>,
246    state: BufferState,
247    stats: BufferStats,
248    wal_sequence: u64,
249    oldest_insert_time: Option<Instant>,
250}
251
252impl WriteBuffer {
253    /// Create a new write buffer with the given configuration.
254    pub fn new(config: WriteBufferConfig) -> Self {
255        Self {
256            config,
257            points: Vec::new(),
258            state: BufferState::Empty,
259            stats: BufferStats {
260                oldest_timestamp_ms: i64::MAX,
261                newest_timestamp_ms: i64::MIN,
262                ..Default::default()
263            },
264            wal_sequence: 0,
265            oldest_insert_time: None,
266        }
267    }
268
269    // ------------------------------------------------------------------
270    // State queries
271    // ------------------------------------------------------------------
272
273    /// Current buffer state.
274    pub fn state(&self) -> BufferState {
275        self.state
276    }
277
278    /// Return a snapshot of current buffer statistics.
279    pub fn stats(&self) -> &BufferStats {
280        &self.stats
281    }
282
283    /// Number of data points currently held.
284    pub fn len(&self) -> usize {
285        self.points.len()
286    }
287
288    /// True if no points are currently buffered.
289    pub fn is_empty(&self) -> bool {
290        self.points.is_empty()
291    }
292
293    /// True if the buffer has reached maximum capacity.
294    pub fn is_full(&self) -> bool {
295        self.points.len() >= self.config.max_capacity
296    }
297
298    // ------------------------------------------------------------------
299    // Push
300    // ------------------------------------------------------------------
301
302    /// Push a data point into the buffer.
303    ///
304    /// # Errors
305    ///
306    /// Returns [`WriteBufferError::BufferFull`] when the buffer is at capacity
307    /// and backpressure should be applied by the caller.
308    pub fn push(&mut self, point: DataPoint) -> WriteBufferResult<()> {
309        if self.points.len() >= self.config.max_capacity {
310            self.stats.backpressure_events += 1;
311            self.state = BufferState::Full;
312            return Err(WriteBufferError::BufferFull);
313        }
314
315        // Update timestamp bounds.
316        if point.timestamp_ms < self.stats.oldest_timestamp_ms {
317            self.stats.oldest_timestamp_ms = point.timestamp_ms;
318        }
319        if point.timestamp_ms > self.stats.newest_timestamp_ms {
320            self.stats.newest_timestamp_ms = point.timestamp_ms;
321        }
322
323        self.points.push(point);
324        self.stats.buffered_count = self.points.len();
325
326        if self.oldest_insert_time.is_none() {
327            self.oldest_insert_time = Some(Instant::now());
328        }
329
330        self.state = if self.points.len() >= self.config.max_capacity {
331            BufferState::Full
332        } else {
333            BufferState::Accumulating
334        };
335
336        Ok(())
337    }
338
339    /// Push multiple data points at once.
340    ///
341    /// Points are added until the buffer is full; returns the number actually
342    /// pushed.
343    pub fn push_batch(&mut self, points: impl IntoIterator<Item = DataPoint>) -> usize {
344        let mut count = 0usize;
345        for point in points {
346            if self.push(point).is_err() {
347                break;
348            }
349            count += 1;
350        }
351        count
352    }
353
354    // ------------------------------------------------------------------
355    // Flush triggering
356    // ------------------------------------------------------------------
357
358    /// True if the current policy conditions require a flush.
359    pub fn should_flush(&self) -> bool {
360        if self.points.is_empty() {
361            return false;
362        }
363        match &self.config.flush_policy {
364            FlushPolicy::Explicit => false,
365            FlushPolicy::SizeBased { threshold } => self.points.len() >= *threshold,
366            FlushPolicy::TimeBased { max_age } => self
367                .oldest_insert_time
368                .map(|t| t.elapsed() >= *max_age)
369                .unwrap_or(false),
370            FlushPolicy::Combined { threshold, max_age } => {
371                let size_ok = self.points.len() >= *threshold;
372                let time_ok = self
373                    .oldest_insert_time
374                    .map(|t| t.elapsed() >= *max_age)
375                    .unwrap_or(false);
376                size_ok || time_ok
377            }
378        }
379    }
380
381    // ------------------------------------------------------------------
382    // Flush (full)
383    // ------------------------------------------------------------------
384
385    /// Flush all buffered data points, sorted by timestamp.
386    ///
387    /// Writes a WAL entry if `enable_wal` is set and a sink is provided.
388    /// Returns the flushed points in ascending timestamp order.
389    pub fn flush(&mut self) -> WriteBufferResult<Vec<DataPoint>> {
390        self.flush_inner(None)
391    }
392
393    /// Flush with a WAL sink.
394    pub fn flush_with_wal<W: WalSink>(&mut self, wal: &mut W) -> WriteBufferResult<Vec<DataPoint>> {
395        self.flush_inner(Some(wal as &mut dyn WalSink))
396    }
397
398    fn flush_inner(&mut self, wal: Option<&mut dyn WalSink>) -> WriteBufferResult<Vec<DataPoint>> {
399        if matches!(self.state, BufferState::Flushing) {
400            return Err(WriteBufferError::FlushConflict(
401                "flush already in progress".to_string(),
402            ));
403        }
404
405        let prev_state = self.state;
406        self.state = BufferState::Flushing;
407
408        let mut points = std::mem::take(&mut self.points);
409
410        // Detect out-of-order writes.
411        let was_sorted = points
412            .windows(2)
413            .all(|w| w[0].timestamp_ms <= w[1].timestamp_ms);
414        if !was_sorted {
415            self.stats.out_of_order_count += points.len() as u64;
416            points.sort_unstable();
417        }
418
419        // WAL integration.
420        if self.config.enable_wal {
421            let entry = self.build_wal_entry(&points);
422            if let Some(sink) = wal {
423                sink.write_entry(entry)?;
424            }
425            self.stats.wal_entries += 1;
426        }
427
428        // Update statistics.
429        self.stats.flush_count += 1;
430        self.stats.total_points_flushed += points.len() as u64;
431        self.stats.buffered_count = 0;
432        self.stats.oldest_timestamp_ms = i64::MAX;
433        self.stats.newest_timestamp_ms = i64::MIN;
434        self.oldest_insert_time = None;
435
436        let _ = prev_state;
437        self.state = BufferState::Empty;
438
439        Ok(points)
440    }
441
442    // ------------------------------------------------------------------
443    // Partial flush
444    // ------------------------------------------------------------------
445
446    /// Flush only the oldest data points (those with the smallest timestamps).
447    ///
448    /// `count` specifies how many points to flush (0 = use config default).
449    /// Points are sorted by timestamp before selection.
450    pub fn partial_flush(&mut self, count: usize) -> WriteBufferResult<Vec<DataPoint>> {
451        if matches!(self.state, BufferState::Flushing) {
452            return Err(WriteBufferError::FlushConflict(
453                "flush already in progress".to_string(),
454            ));
455        }
456        if self.points.is_empty() {
457            return Ok(Vec::new());
458        }
459
460        self.state = BufferState::Flushing;
461
462        // Sort all points so we can take the oldest.
463        self.points.sort_unstable();
464
465        let take = if count == 0 {
466            self.config.partial_flush_count.max(1)
467        } else {
468            count
469        };
470        let take = take.min(self.points.len());
471
472        let flushed: Vec<DataPoint> = self.points.drain(..take).collect();
473
474        // Update stats.
475        self.stats.flush_count += 1;
476        self.stats.total_points_flushed += flushed.len() as u64;
477        self.stats.buffered_count = self.points.len();
478
479        // Recompute timestamp bounds.
480        self.recompute_bounds();
481
482        self.state = if self.points.is_empty() {
483            BufferState::Empty
484        } else {
485            BufferState::Accumulating
486        };
487
488        Ok(flushed)
489    }
490
491    // ------------------------------------------------------------------
492    // Internal helpers
493    // ------------------------------------------------------------------
494
495    fn build_wal_entry(&mut self, points: &[DataPoint]) -> WalEntry {
496        self.wal_sequence += 1;
497        let min_ts = points.iter().map(|p| p.timestamp_ms).min().unwrap_or(0);
498        let max_ts = points.iter().map(|p| p.timestamp_ms).max().unwrap_or(0);
499        WalEntry {
500            sequence: self.wal_sequence,
501            point_count: points.len(),
502            min_timestamp_ms: min_ts,
503            max_timestamp_ms: max_ts,
504        }
505    }
506
507    fn recompute_bounds(&mut self) {
508        if self.points.is_empty() {
509            self.stats.oldest_timestamp_ms = i64::MAX;
510            self.stats.newest_timestamp_ms = i64::MIN;
511        } else {
512            self.stats.oldest_timestamp_ms = self
513                .points
514                .iter()
515                .map(|p| p.timestamp_ms)
516                .min()
517                .unwrap_or(i64::MAX);
518            self.stats.newest_timestamp_ms = self
519                .points
520                .iter()
521                .map(|p| p.timestamp_ms)
522                .max()
523                .unwrap_or(i64::MIN);
524        }
525    }
526}
527
528// ---------------------------------------------------------------------------
529// Tests
530// ---------------------------------------------------------------------------
531
532#[cfg(test)]
533mod tests {
534    use super::*;
535
536    fn make_point(series_id: u64, timestamp_ms: i64, value: f64) -> DataPoint {
537        DataPoint {
538            series_id,
539            timestamp_ms,
540            value,
541        }
542    }
543
544    fn size_buffer(threshold: usize) -> WriteBuffer {
545        WriteBuffer::new(WriteBufferConfig {
546            max_capacity: 10_000,
547            flush_policy: FlushPolicy::SizeBased { threshold },
548            enable_wal: false,
549            ..Default::default()
550        })
551    }
552
553    // -- state transitions --------------------------------------------------
554
555    #[test]
556    fn test_initial_state_is_empty() {
557        let buf = WriteBuffer::new(WriteBufferConfig::default());
558        assert_eq!(buf.state(), BufferState::Empty);
559        assert!(buf.is_empty());
560    }
561
562    #[test]
563    fn test_push_transitions_to_accumulating() {
564        let mut buf = size_buffer(100);
565        buf.push(make_point(1, 1000, 1.0)).expect("push failed");
566        assert_eq!(buf.state(), BufferState::Accumulating);
567    }
568
569    #[test]
570    fn test_flush_transitions_back_to_empty() {
571        let mut buf = size_buffer(100);
572        buf.push(make_point(1, 1000, 1.0)).expect("push failed");
573        let _ = buf.flush().expect("flush failed");
574        assert_eq!(buf.state(), BufferState::Empty);
575    }
576
577    #[test]
578    fn test_buffer_full_state_on_capacity() {
579        let config = WriteBufferConfig {
580            max_capacity: 3,
581            flush_policy: FlushPolicy::Explicit,
582            enable_wal: false,
583            ..Default::default()
584        };
585        let mut buf = WriteBuffer::new(config);
586        buf.push(make_point(1, 1, 1.0)).expect("push 1");
587        buf.push(make_point(1, 2, 2.0)).expect("push 2");
588        buf.push(make_point(1, 3, 3.0)).expect("push 3");
589        assert_eq!(buf.state(), BufferState::Full);
590    }
591
592    // -- backpressure -------------------------------------------------------
593
594    #[test]
595    fn test_push_beyond_capacity_returns_buffer_full() {
596        let config = WriteBufferConfig {
597            max_capacity: 2,
598            flush_policy: FlushPolicy::Explicit,
599            enable_wal: false,
600            ..Default::default()
601        };
602        let mut buf = WriteBuffer::new(config);
603        buf.push(make_point(1, 1, 0.0)).expect("push 1");
604        buf.push(make_point(1, 2, 0.0)).expect("push 2");
605        let err = buf.push(make_point(1, 3, 0.0)).unwrap_err();
606        assert_eq!(err, WriteBufferError::BufferFull);
607    }
608
609    #[test]
610    fn test_backpressure_events_counted() {
611        let config = WriteBufferConfig {
612            max_capacity: 1,
613            flush_policy: FlushPolicy::Explicit,
614            enable_wal: false,
615            ..Default::default()
616        };
617        let mut buf = WriteBuffer::new(config);
618        buf.push(make_point(1, 1, 0.0)).expect("push 1");
619        let _ = buf.push(make_point(1, 2, 0.0));
620        let _ = buf.push(make_point(1, 3, 0.0));
621        assert_eq!(buf.stats().backpressure_events, 2);
622    }
623
624    // -- push_batch ---------------------------------------------------------
625
626    #[test]
627    fn test_push_batch_stops_at_capacity() {
628        let config = WriteBufferConfig {
629            max_capacity: 3,
630            flush_policy: FlushPolicy::Explicit,
631            enable_wal: false,
632            ..Default::default()
633        };
634        let mut buf = WriteBuffer::new(config);
635        let points: Vec<DataPoint> = (0..10).map(|i| make_point(1, i, 0.0)).collect();
636        let pushed = buf.push_batch(points);
637        assert_eq!(pushed, 3);
638    }
639
640    // -- flush ordering -----------------------------------------------------
641
642    #[test]
643    fn test_flush_returns_points_sorted_by_timestamp() {
644        let mut buf = size_buffer(10);
645        buf.push(make_point(1, 300, 1.0)).expect("push");
646        buf.push(make_point(1, 100, 2.0)).expect("push");
647        buf.push(make_point(1, 200, 3.0)).expect("push");
648        let flushed = buf.flush().expect("flush failed");
649        assert_eq!(flushed[0].timestamp_ms, 100);
650        assert_eq!(flushed[1].timestamp_ms, 200);
651        assert_eq!(flushed[2].timestamp_ms, 300);
652    }
653
654    #[test]
655    fn test_out_of_order_counted_on_flush() {
656        let mut buf = size_buffer(10);
657        buf.push(make_point(1, 200, 1.0)).expect("push");
658        buf.push(make_point(1, 100, 2.0)).expect("push");
659        let _ = buf.flush().expect("flush");
660        assert_eq!(buf.stats().out_of_order_count, 2);
661    }
662
663    #[test]
664    fn test_in_order_write_not_counted_as_out_of_order() {
665        let mut buf = size_buffer(10);
666        buf.push(make_point(1, 100, 1.0)).expect("push");
667        buf.push(make_point(1, 200, 2.0)).expect("push");
668        let _ = buf.flush().expect("flush");
669        assert_eq!(buf.stats().out_of_order_count, 0);
670    }
671
672    // -- flush stats --------------------------------------------------------
673
674    #[test]
675    fn test_flush_count_increments() {
676        let mut buf = size_buffer(10);
677        buf.push(make_point(1, 1, 0.0)).expect("push");
678        buf.flush().expect("flush 1");
679        buf.push(make_point(1, 2, 0.0)).expect("push");
680        buf.flush().expect("flush 2");
681        assert_eq!(buf.stats().flush_count, 2);
682    }
683
684    #[test]
685    fn test_total_points_flushed_accumulates() {
686        let mut buf = size_buffer(10);
687        for i in 0..5 {
688            buf.push(make_point(1, i, 0.0)).expect("push");
689        }
690        buf.flush().expect("flush 1");
691        for i in 5..8 {
692            buf.push(make_point(1, i, 0.0)).expect("push");
693        }
694        buf.flush().expect("flush 2");
695        assert_eq!(buf.stats().total_points_flushed, 8);
696    }
697
698    #[test]
699    fn test_buffered_count_reset_after_flush() {
700        let mut buf = size_buffer(10);
701        buf.push(make_point(1, 1, 0.0)).expect("push");
702        buf.flush().expect("flush");
703        assert_eq!(buf.stats().buffered_count, 0);
704    }
705
706    // -- timestamp bounds ---------------------------------------------------
707
708    #[test]
709    fn test_timestamp_bounds_updated_on_push() {
710        let mut buf = size_buffer(10);
711        buf.push(make_point(1, 500, 0.0)).expect("push");
712        buf.push(make_point(1, 100, 0.0)).expect("push");
713        buf.push(make_point(1, 900, 0.0)).expect("push");
714        assert_eq!(buf.stats().oldest_timestamp_ms, 100);
715        assert_eq!(buf.stats().newest_timestamp_ms, 900);
716    }
717
718    #[test]
719    fn test_timestamp_bounds_reset_after_flush() {
720        let mut buf = size_buffer(10);
721        buf.push(make_point(1, 100, 0.0)).expect("push");
722        buf.flush().expect("flush");
723        assert_eq!(buf.stats().oldest_timestamp_ms, i64::MAX);
724        assert_eq!(buf.stats().newest_timestamp_ms, i64::MIN);
725    }
726
727    // -- flush policy -------------------------------------------------------
728
729    #[test]
730    fn test_should_flush_size_based_below_threshold() {
731        let mut buf = size_buffer(5);
732        buf.push(make_point(1, 1, 0.0)).expect("push");
733        buf.push(make_point(1, 2, 0.0)).expect("push");
734        assert!(!buf.should_flush());
735    }
736
737    #[test]
738    fn test_should_flush_size_based_at_threshold() {
739        let mut buf = size_buffer(2);
740        buf.push(make_point(1, 1, 0.0)).expect("push");
741        buf.push(make_point(1, 2, 0.0)).expect("push");
742        assert!(buf.should_flush());
743    }
744
745    #[test]
746    fn test_should_flush_explicit_never_auto() {
747        let config = WriteBufferConfig {
748            max_capacity: 10_000,
749            flush_policy: FlushPolicy::Explicit,
750            enable_wal: false,
751            ..Default::default()
752        };
753        let mut buf = WriteBuffer::new(config);
754        for i in 0..100 {
755            buf.push(make_point(1, i, 0.0)).expect("push");
756        }
757        assert!(!buf.should_flush());
758    }
759
760    #[test]
761    fn test_should_flush_empty_buffer_false() {
762        let buf = size_buffer(1);
763        assert!(!buf.should_flush());
764    }
765
766    #[test]
767    fn test_should_flush_combined_size_path() {
768        let config = WriteBufferConfig {
769            max_capacity: 10_000,
770            flush_policy: FlushPolicy::Combined {
771                threshold: 2,
772                max_age: Duration::from_secs(3600),
773            },
774            enable_wal: false,
775            ..Default::default()
776        };
777        let mut buf = WriteBuffer::new(config);
778        buf.push(make_point(1, 1, 0.0)).expect("push");
779        buf.push(make_point(1, 2, 0.0)).expect("push");
780        assert!(buf.should_flush());
781    }
782
783    // -- partial flush ------------------------------------------------------
784
785    #[test]
786    fn test_partial_flush_removes_oldest_points() {
787        let mut buf = size_buffer(100);
788        buf.push(make_point(1, 300, 1.0)).expect("push");
789        buf.push(make_point(1, 100, 2.0)).expect("push");
790        buf.push(make_point(1, 200, 3.0)).expect("push");
791        let flushed = buf.partial_flush(1).expect("partial flush");
792        assert_eq!(flushed.len(), 1);
793        assert_eq!(flushed[0].timestamp_ms, 100);
794        assert_eq!(buf.len(), 2);
795    }
796
797    #[test]
798    fn test_partial_flush_empty_buffer_returns_empty() {
799        let mut buf = size_buffer(100);
800        let flushed = buf.partial_flush(5).expect("partial flush");
801        assert!(flushed.is_empty());
802    }
803
804    #[test]
805    fn test_partial_flush_count_exceeds_buffer_size() {
806        let mut buf = size_buffer(100);
807        buf.push(make_point(1, 1, 0.0)).expect("push");
808        buf.push(make_point(1, 2, 0.0)).expect("push");
809        let flushed = buf.partial_flush(10).expect("partial flush");
810        assert_eq!(flushed.len(), 2);
811        assert!(buf.is_empty());
812    }
813
814    #[test]
815    fn test_partial_flush_state_after_partial() {
816        let mut buf = size_buffer(100);
817        buf.push(make_point(1, 1, 0.0)).expect("push");
818        buf.push(make_point(1, 2, 0.0)).expect("push");
819        let _ = buf.partial_flush(1).expect("partial flush");
820        assert_eq!(buf.state(), BufferState::Accumulating);
821    }
822
823    // -- WAL integration ----------------------------------------------------
824
825    #[test]
826    fn test_wal_entry_count_increments_on_flush() {
827        let config = WriteBufferConfig {
828            max_capacity: 10_000,
829            flush_policy: FlushPolicy::Explicit,
830            enable_wal: true,
831            ..Default::default()
832        };
833        let mut buf = WriteBuffer::new(config);
834        buf.push(make_point(1, 100, 1.0)).expect("push");
835        buf.flush().expect("flush");
836        assert_eq!(buf.stats().wal_entries, 1);
837    }
838
839    #[test]
840    fn test_wal_disabled_no_entries_recorded() {
841        let config = WriteBufferConfig {
842            max_capacity: 10_000,
843            flush_policy: FlushPolicy::Explicit,
844            enable_wal: false,
845            ..Default::default()
846        };
847        let mut buf = WriteBuffer::new(config);
848        buf.push(make_point(1, 100, 1.0)).expect("push");
849        buf.flush().expect("flush");
850        assert_eq!(buf.stats().wal_entries, 0);
851    }
852
853    // -- error types --------------------------------------------------------
854
855    #[test]
856    fn test_write_buffer_error_display() {
857        assert!(WriteBufferError::BufferFull
858            .to_string()
859            .contains("backpressure"));
860        assert!(WriteBufferError::FlushConflict("x".into())
861            .to_string()
862            .contains("x"));
863        assert!(WriteBufferError::WalError("wal".into())
864            .to_string()
865            .contains("wal"));
866        assert!(WriteBufferError::Internal("internal".into())
867            .to_string()
868            .contains("internal"));
869    }
870
871    // -- len / is_empty / is_full -------------------------------------------
872
873    #[test]
874    fn test_len_reflects_buffered_points() {
875        let mut buf = size_buffer(100);
876        for i in 0..7 {
877            buf.push(make_point(1, i, 0.0)).expect("push");
878        }
879        assert_eq!(buf.len(), 7);
880    }
881
882    #[test]
883    fn test_is_full_at_capacity() {
884        let config = WriteBufferConfig {
885            max_capacity: 2,
886            flush_policy: FlushPolicy::Explicit,
887            enable_wal: false,
888            ..Default::default()
889        };
890        let mut buf = WriteBuffer::new(config);
891        buf.push(make_point(1, 1, 0.0)).expect("push");
892        assert!(!buf.is_full());
893        buf.push(make_point(1, 2, 0.0)).expect("push");
894        assert!(buf.is_full());
895    }
896
897    // -- DataPoint ordering -------------------------------------------------
898
899    #[test]
900    fn test_data_point_ordering_by_timestamp() {
901        let a = make_point(1, 100, 0.0);
902        let b = make_point(1, 200, 0.0);
903        assert!(a < b);
904    }
905
906    #[test]
907    fn test_data_point_ordering_same_timestamp_by_series() {
908        let a = make_point(1, 100, 0.0);
909        let b = make_point(2, 100, 0.0);
910        assert!(a < b);
911    }
912
913    // -- multiple series in one flush ---------------------------------------
914
915    #[test]
916    fn test_flush_multiple_series_interleaved() {
917        let mut buf = size_buffer(100);
918        buf.push(make_point(2, 200, 1.0)).expect("push");
919        buf.push(make_point(1, 100, 2.0)).expect("push");
920        buf.push(make_point(3, 150, 3.0)).expect("push");
921        let flushed = buf.flush().expect("flush");
922        assert_eq!(flushed[0].series_id, 1);
923        assert_eq!(flushed[1].series_id, 3);
924        assert_eq!(flushed[2].series_id, 2);
925    }
926
927    // -- flush_with_wal -----------------------------------------------------
928
929    #[test]
930    fn test_flush_with_noop_wal_sink() {
931        let mut buf = WriteBuffer::new(WriteBufferConfig {
932            max_capacity: 10_000,
933            flush_policy: FlushPolicy::Explicit,
934            enable_wal: true,
935            ..Default::default()
936        });
937        buf.push(make_point(1, 100, 1.0)).expect("push");
938        let mut sink = NoopWalSink;
939        let flushed = buf.flush_with_wal(&mut sink).expect("flush_with_wal");
940        assert_eq!(flushed.len(), 1);
941    }
942}