1use std::cmp::Ordering;
27use std::time::{Duration, Instant};
28
29#[derive(Debug, Clone, PartialEq, Eq)]
35pub enum WriteBufferError {
36 BufferFull,
38 FlushConflict(String),
40 WalError(String),
42 Internal(String),
44}
45
46impl std::fmt::Display for WriteBufferError {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 match self {
49 Self::BufferFull => write!(f, "Write buffer full: backpressure active"),
50 Self::FlushConflict(msg) => write!(f, "Flush conflict: {msg}"),
51 Self::WalError(msg) => write!(f, "WAL error: {msg}"),
52 Self::Internal(msg) => write!(f, "Internal error: {msg}"),
53 }
54 }
55}
56
57impl std::error::Error for WriteBufferError {}
58
59pub type WriteBufferResult<T> = Result<T, WriteBufferError>;
61
62#[derive(Debug, Clone, PartialEq)]
68pub struct DataPoint {
69 pub series_id: u64,
71 pub timestamp_ms: i64,
73 pub value: f64,
75}
76
77impl Eq for DataPoint {}
78
79impl PartialOrd for DataPoint {
80 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
81 Some(self.cmp(other))
82 }
83}
84
85impl Ord for DataPoint {
86 fn cmp(&self, other: &Self) -> Ordering {
87 self.timestamp_ms
88 .cmp(&other.timestamp_ms)
89 .then(self.series_id.cmp(&other.series_id))
90 }
91}
92
93#[derive(Debug, Clone, PartialEq)]
99pub enum FlushPolicy {
100 Explicit,
102 SizeBased {
104 threshold: usize,
106 },
107 TimeBased {
109 max_age: Duration,
111 },
112 Combined {
114 threshold: usize,
116 max_age: Duration,
118 },
119}
120
121impl Default for FlushPolicy {
122 fn default() -> Self {
123 Self::SizeBased { threshold: 1000 }
124 }
125}
126
127#[derive(Debug, Clone, Copy, PartialEq, Eq)]
133pub enum BufferState {
134 Empty,
136 Accumulating,
138 Flushing,
140 Full,
142}
143
144#[derive(Debug, Clone)]
150pub struct WalEntry {
151 pub sequence: u64,
153 pub point_count: usize,
155 pub min_timestamp_ms: i64,
157 pub max_timestamp_ms: i64,
159}
160
161#[derive(Debug, Clone)]
167pub struct WriteBufferConfig {
168 pub max_capacity: usize,
170 pub flush_policy: FlushPolicy,
172 pub enable_wal: bool,
174 pub partial_flush_age: Duration,
176 pub partial_flush_count: usize,
178}
179
180impl Default for WriteBufferConfig {
181 fn default() -> Self {
182 Self {
183 max_capacity: 100_000,
184 flush_policy: FlushPolicy::default(),
185 enable_wal: true,
186 partial_flush_age: Duration::ZERO,
187 partial_flush_count: 0,
188 }
189 }
190}
191
192#[derive(Debug, Clone, Default)]
198pub struct BufferStats {
199 pub buffered_count: usize,
201 pub oldest_timestamp_ms: i64,
203 pub newest_timestamp_ms: i64,
205 pub flush_count: u64,
207 pub total_points_flushed: u64,
209 pub out_of_order_count: u64,
211 pub backpressure_events: u64,
213 pub wal_entries: u64,
215}
216
217pub trait WalSink: Send + 'static {
225 fn write_entry(&mut self, entry: WalEntry) -> WriteBufferResult<()>;
227}
228
229pub struct NoopWalSink;
231
232impl WalSink for NoopWalSink {
233 fn write_entry(&mut self, _entry: WalEntry) -> WriteBufferResult<()> {
234 Ok(())
235 }
236}
237
238pub struct WriteBuffer {
244 config: WriteBufferConfig,
245 points: Vec<DataPoint>,
246 state: BufferState,
247 stats: BufferStats,
248 wal_sequence: u64,
249 oldest_insert_time: Option<Instant>,
250}
251
252impl WriteBuffer {
253 pub fn new(config: WriteBufferConfig) -> Self {
255 Self {
256 config,
257 points: Vec::new(),
258 state: BufferState::Empty,
259 stats: BufferStats {
260 oldest_timestamp_ms: i64::MAX,
261 newest_timestamp_ms: i64::MIN,
262 ..Default::default()
263 },
264 wal_sequence: 0,
265 oldest_insert_time: None,
266 }
267 }
268
269 pub fn state(&self) -> BufferState {
275 self.state
276 }
277
278 pub fn stats(&self) -> &BufferStats {
280 &self.stats
281 }
282
283 pub fn len(&self) -> usize {
285 self.points.len()
286 }
287
288 pub fn is_empty(&self) -> bool {
290 self.points.is_empty()
291 }
292
293 pub fn is_full(&self) -> bool {
295 self.points.len() >= self.config.max_capacity
296 }
297
298 pub fn push(&mut self, point: DataPoint) -> WriteBufferResult<()> {
309 if self.points.len() >= self.config.max_capacity {
310 self.stats.backpressure_events += 1;
311 self.state = BufferState::Full;
312 return Err(WriteBufferError::BufferFull);
313 }
314
315 if point.timestamp_ms < self.stats.oldest_timestamp_ms {
317 self.stats.oldest_timestamp_ms = point.timestamp_ms;
318 }
319 if point.timestamp_ms > self.stats.newest_timestamp_ms {
320 self.stats.newest_timestamp_ms = point.timestamp_ms;
321 }
322
323 self.points.push(point);
324 self.stats.buffered_count = self.points.len();
325
326 if self.oldest_insert_time.is_none() {
327 self.oldest_insert_time = Some(Instant::now());
328 }
329
330 self.state = if self.points.len() >= self.config.max_capacity {
331 BufferState::Full
332 } else {
333 BufferState::Accumulating
334 };
335
336 Ok(())
337 }
338
339 pub fn push_batch(&mut self, points: impl IntoIterator<Item = DataPoint>) -> usize {
344 let mut count = 0usize;
345 for point in points {
346 if self.push(point).is_err() {
347 break;
348 }
349 count += 1;
350 }
351 count
352 }
353
354 pub fn should_flush(&self) -> bool {
360 if self.points.is_empty() {
361 return false;
362 }
363 match &self.config.flush_policy {
364 FlushPolicy::Explicit => false,
365 FlushPolicy::SizeBased { threshold } => self.points.len() >= *threshold,
366 FlushPolicy::TimeBased { max_age } => self
367 .oldest_insert_time
368 .map(|t| t.elapsed() >= *max_age)
369 .unwrap_or(false),
370 FlushPolicy::Combined { threshold, max_age } => {
371 let size_ok = self.points.len() >= *threshold;
372 let time_ok = self
373 .oldest_insert_time
374 .map(|t| t.elapsed() >= *max_age)
375 .unwrap_or(false);
376 size_ok || time_ok
377 }
378 }
379 }
380
381 pub fn flush(&mut self) -> WriteBufferResult<Vec<DataPoint>> {
390 self.flush_inner(None)
391 }
392
393 pub fn flush_with_wal<W: WalSink>(&mut self, wal: &mut W) -> WriteBufferResult<Vec<DataPoint>> {
395 self.flush_inner(Some(wal as &mut dyn WalSink))
396 }
397
398 fn flush_inner(&mut self, wal: Option<&mut dyn WalSink>) -> WriteBufferResult<Vec<DataPoint>> {
399 if matches!(self.state, BufferState::Flushing) {
400 return Err(WriteBufferError::FlushConflict(
401 "flush already in progress".to_string(),
402 ));
403 }
404
405 let prev_state = self.state;
406 self.state = BufferState::Flushing;
407
408 let mut points = std::mem::take(&mut self.points);
409
410 let was_sorted = points
412 .windows(2)
413 .all(|w| w[0].timestamp_ms <= w[1].timestamp_ms);
414 if !was_sorted {
415 self.stats.out_of_order_count += points.len() as u64;
416 points.sort_unstable();
417 }
418
419 if self.config.enable_wal {
421 let entry = self.build_wal_entry(&points);
422 if let Some(sink) = wal {
423 sink.write_entry(entry)?;
424 }
425 self.stats.wal_entries += 1;
426 }
427
428 self.stats.flush_count += 1;
430 self.stats.total_points_flushed += points.len() as u64;
431 self.stats.buffered_count = 0;
432 self.stats.oldest_timestamp_ms = i64::MAX;
433 self.stats.newest_timestamp_ms = i64::MIN;
434 self.oldest_insert_time = None;
435
436 let _ = prev_state;
437 self.state = BufferState::Empty;
438
439 Ok(points)
440 }
441
442 pub fn partial_flush(&mut self, count: usize) -> WriteBufferResult<Vec<DataPoint>> {
451 if matches!(self.state, BufferState::Flushing) {
452 return Err(WriteBufferError::FlushConflict(
453 "flush already in progress".to_string(),
454 ));
455 }
456 if self.points.is_empty() {
457 return Ok(Vec::new());
458 }
459
460 self.state = BufferState::Flushing;
461
462 self.points.sort_unstable();
464
465 let take = if count == 0 {
466 self.config.partial_flush_count.max(1)
467 } else {
468 count
469 };
470 let take = take.min(self.points.len());
471
472 let flushed: Vec<DataPoint> = self.points.drain(..take).collect();
473
474 self.stats.flush_count += 1;
476 self.stats.total_points_flushed += flushed.len() as u64;
477 self.stats.buffered_count = self.points.len();
478
479 self.recompute_bounds();
481
482 self.state = if self.points.is_empty() {
483 BufferState::Empty
484 } else {
485 BufferState::Accumulating
486 };
487
488 Ok(flushed)
489 }
490
491 fn build_wal_entry(&mut self, points: &[DataPoint]) -> WalEntry {
496 self.wal_sequence += 1;
497 let min_ts = points.iter().map(|p| p.timestamp_ms).min().unwrap_or(0);
498 let max_ts = points.iter().map(|p| p.timestamp_ms).max().unwrap_or(0);
499 WalEntry {
500 sequence: self.wal_sequence,
501 point_count: points.len(),
502 min_timestamp_ms: min_ts,
503 max_timestamp_ms: max_ts,
504 }
505 }
506
507 fn recompute_bounds(&mut self) {
508 if self.points.is_empty() {
509 self.stats.oldest_timestamp_ms = i64::MAX;
510 self.stats.newest_timestamp_ms = i64::MIN;
511 } else {
512 self.stats.oldest_timestamp_ms = self
513 .points
514 .iter()
515 .map(|p| p.timestamp_ms)
516 .min()
517 .unwrap_or(i64::MAX);
518 self.stats.newest_timestamp_ms = self
519 .points
520 .iter()
521 .map(|p| p.timestamp_ms)
522 .max()
523 .unwrap_or(i64::MIN);
524 }
525 }
526}
527
528#[cfg(test)]
533mod tests {
534 use super::*;
535
536 fn make_point(series_id: u64, timestamp_ms: i64, value: f64) -> DataPoint {
537 DataPoint {
538 series_id,
539 timestamp_ms,
540 value,
541 }
542 }
543
544 fn size_buffer(threshold: usize) -> WriteBuffer {
545 WriteBuffer::new(WriteBufferConfig {
546 max_capacity: 10_000,
547 flush_policy: FlushPolicy::SizeBased { threshold },
548 enable_wal: false,
549 ..Default::default()
550 })
551 }
552
553 #[test]
556 fn test_initial_state_is_empty() {
557 let buf = WriteBuffer::new(WriteBufferConfig::default());
558 assert_eq!(buf.state(), BufferState::Empty);
559 assert!(buf.is_empty());
560 }
561
562 #[test]
563 fn test_push_transitions_to_accumulating() {
564 let mut buf = size_buffer(100);
565 buf.push(make_point(1, 1000, 1.0)).expect("push failed");
566 assert_eq!(buf.state(), BufferState::Accumulating);
567 }
568
569 #[test]
570 fn test_flush_transitions_back_to_empty() {
571 let mut buf = size_buffer(100);
572 buf.push(make_point(1, 1000, 1.0)).expect("push failed");
573 let _ = buf.flush().expect("flush failed");
574 assert_eq!(buf.state(), BufferState::Empty);
575 }
576
577 #[test]
578 fn test_buffer_full_state_on_capacity() {
579 let config = WriteBufferConfig {
580 max_capacity: 3,
581 flush_policy: FlushPolicy::Explicit,
582 enable_wal: false,
583 ..Default::default()
584 };
585 let mut buf = WriteBuffer::new(config);
586 buf.push(make_point(1, 1, 1.0)).expect("push 1");
587 buf.push(make_point(1, 2, 2.0)).expect("push 2");
588 buf.push(make_point(1, 3, 3.0)).expect("push 3");
589 assert_eq!(buf.state(), BufferState::Full);
590 }
591
592 #[test]
595 fn test_push_beyond_capacity_returns_buffer_full() {
596 let config = WriteBufferConfig {
597 max_capacity: 2,
598 flush_policy: FlushPolicy::Explicit,
599 enable_wal: false,
600 ..Default::default()
601 };
602 let mut buf = WriteBuffer::new(config);
603 buf.push(make_point(1, 1, 0.0)).expect("push 1");
604 buf.push(make_point(1, 2, 0.0)).expect("push 2");
605 let err = buf.push(make_point(1, 3, 0.0)).unwrap_err();
606 assert_eq!(err, WriteBufferError::BufferFull);
607 }
608
609 #[test]
610 fn test_backpressure_events_counted() {
611 let config = WriteBufferConfig {
612 max_capacity: 1,
613 flush_policy: FlushPolicy::Explicit,
614 enable_wal: false,
615 ..Default::default()
616 };
617 let mut buf = WriteBuffer::new(config);
618 buf.push(make_point(1, 1, 0.0)).expect("push 1");
619 let _ = buf.push(make_point(1, 2, 0.0));
620 let _ = buf.push(make_point(1, 3, 0.0));
621 assert_eq!(buf.stats().backpressure_events, 2);
622 }
623
624 #[test]
627 fn test_push_batch_stops_at_capacity() {
628 let config = WriteBufferConfig {
629 max_capacity: 3,
630 flush_policy: FlushPolicy::Explicit,
631 enable_wal: false,
632 ..Default::default()
633 };
634 let mut buf = WriteBuffer::new(config);
635 let points: Vec<DataPoint> = (0..10).map(|i| make_point(1, i, 0.0)).collect();
636 let pushed = buf.push_batch(points);
637 assert_eq!(pushed, 3);
638 }
639
640 #[test]
643 fn test_flush_returns_points_sorted_by_timestamp() {
644 let mut buf = size_buffer(10);
645 buf.push(make_point(1, 300, 1.0)).expect("push");
646 buf.push(make_point(1, 100, 2.0)).expect("push");
647 buf.push(make_point(1, 200, 3.0)).expect("push");
648 let flushed = buf.flush().expect("flush failed");
649 assert_eq!(flushed[0].timestamp_ms, 100);
650 assert_eq!(flushed[1].timestamp_ms, 200);
651 assert_eq!(flushed[2].timestamp_ms, 300);
652 }
653
654 #[test]
655 fn test_out_of_order_counted_on_flush() {
656 let mut buf = size_buffer(10);
657 buf.push(make_point(1, 200, 1.0)).expect("push");
658 buf.push(make_point(1, 100, 2.0)).expect("push");
659 let _ = buf.flush().expect("flush");
660 assert_eq!(buf.stats().out_of_order_count, 2);
661 }
662
663 #[test]
664 fn test_in_order_write_not_counted_as_out_of_order() {
665 let mut buf = size_buffer(10);
666 buf.push(make_point(1, 100, 1.0)).expect("push");
667 buf.push(make_point(1, 200, 2.0)).expect("push");
668 let _ = buf.flush().expect("flush");
669 assert_eq!(buf.stats().out_of_order_count, 0);
670 }
671
672 #[test]
675 fn test_flush_count_increments() {
676 let mut buf = size_buffer(10);
677 buf.push(make_point(1, 1, 0.0)).expect("push");
678 buf.flush().expect("flush 1");
679 buf.push(make_point(1, 2, 0.0)).expect("push");
680 buf.flush().expect("flush 2");
681 assert_eq!(buf.stats().flush_count, 2);
682 }
683
684 #[test]
685 fn test_total_points_flushed_accumulates() {
686 let mut buf = size_buffer(10);
687 for i in 0..5 {
688 buf.push(make_point(1, i, 0.0)).expect("push");
689 }
690 buf.flush().expect("flush 1");
691 for i in 5..8 {
692 buf.push(make_point(1, i, 0.0)).expect("push");
693 }
694 buf.flush().expect("flush 2");
695 assert_eq!(buf.stats().total_points_flushed, 8);
696 }
697
698 #[test]
699 fn test_buffered_count_reset_after_flush() {
700 let mut buf = size_buffer(10);
701 buf.push(make_point(1, 1, 0.0)).expect("push");
702 buf.flush().expect("flush");
703 assert_eq!(buf.stats().buffered_count, 0);
704 }
705
706 #[test]
709 fn test_timestamp_bounds_updated_on_push() {
710 let mut buf = size_buffer(10);
711 buf.push(make_point(1, 500, 0.0)).expect("push");
712 buf.push(make_point(1, 100, 0.0)).expect("push");
713 buf.push(make_point(1, 900, 0.0)).expect("push");
714 assert_eq!(buf.stats().oldest_timestamp_ms, 100);
715 assert_eq!(buf.stats().newest_timestamp_ms, 900);
716 }
717
718 #[test]
719 fn test_timestamp_bounds_reset_after_flush() {
720 let mut buf = size_buffer(10);
721 buf.push(make_point(1, 100, 0.0)).expect("push");
722 buf.flush().expect("flush");
723 assert_eq!(buf.stats().oldest_timestamp_ms, i64::MAX);
724 assert_eq!(buf.stats().newest_timestamp_ms, i64::MIN);
725 }
726
727 #[test]
730 fn test_should_flush_size_based_below_threshold() {
731 let mut buf = size_buffer(5);
732 buf.push(make_point(1, 1, 0.0)).expect("push");
733 buf.push(make_point(1, 2, 0.0)).expect("push");
734 assert!(!buf.should_flush());
735 }
736
737 #[test]
738 fn test_should_flush_size_based_at_threshold() {
739 let mut buf = size_buffer(2);
740 buf.push(make_point(1, 1, 0.0)).expect("push");
741 buf.push(make_point(1, 2, 0.0)).expect("push");
742 assert!(buf.should_flush());
743 }
744
745 #[test]
746 fn test_should_flush_explicit_never_auto() {
747 let config = WriteBufferConfig {
748 max_capacity: 10_000,
749 flush_policy: FlushPolicy::Explicit,
750 enable_wal: false,
751 ..Default::default()
752 };
753 let mut buf = WriteBuffer::new(config);
754 for i in 0..100 {
755 buf.push(make_point(1, i, 0.0)).expect("push");
756 }
757 assert!(!buf.should_flush());
758 }
759
760 #[test]
761 fn test_should_flush_empty_buffer_false() {
762 let buf = size_buffer(1);
763 assert!(!buf.should_flush());
764 }
765
766 #[test]
767 fn test_should_flush_combined_size_path() {
768 let config = WriteBufferConfig {
769 max_capacity: 10_000,
770 flush_policy: FlushPolicy::Combined {
771 threshold: 2,
772 max_age: Duration::from_secs(3600),
773 },
774 enable_wal: false,
775 ..Default::default()
776 };
777 let mut buf = WriteBuffer::new(config);
778 buf.push(make_point(1, 1, 0.0)).expect("push");
779 buf.push(make_point(1, 2, 0.0)).expect("push");
780 assert!(buf.should_flush());
781 }
782
783 #[test]
786 fn test_partial_flush_removes_oldest_points() {
787 let mut buf = size_buffer(100);
788 buf.push(make_point(1, 300, 1.0)).expect("push");
789 buf.push(make_point(1, 100, 2.0)).expect("push");
790 buf.push(make_point(1, 200, 3.0)).expect("push");
791 let flushed = buf.partial_flush(1).expect("partial flush");
792 assert_eq!(flushed.len(), 1);
793 assert_eq!(flushed[0].timestamp_ms, 100);
794 assert_eq!(buf.len(), 2);
795 }
796
797 #[test]
798 fn test_partial_flush_empty_buffer_returns_empty() {
799 let mut buf = size_buffer(100);
800 let flushed = buf.partial_flush(5).expect("partial flush");
801 assert!(flushed.is_empty());
802 }
803
804 #[test]
805 fn test_partial_flush_count_exceeds_buffer_size() {
806 let mut buf = size_buffer(100);
807 buf.push(make_point(1, 1, 0.0)).expect("push");
808 buf.push(make_point(1, 2, 0.0)).expect("push");
809 let flushed = buf.partial_flush(10).expect("partial flush");
810 assert_eq!(flushed.len(), 2);
811 assert!(buf.is_empty());
812 }
813
814 #[test]
815 fn test_partial_flush_state_after_partial() {
816 let mut buf = size_buffer(100);
817 buf.push(make_point(1, 1, 0.0)).expect("push");
818 buf.push(make_point(1, 2, 0.0)).expect("push");
819 let _ = buf.partial_flush(1).expect("partial flush");
820 assert_eq!(buf.state(), BufferState::Accumulating);
821 }
822
823 #[test]
826 fn test_wal_entry_count_increments_on_flush() {
827 let config = WriteBufferConfig {
828 max_capacity: 10_000,
829 flush_policy: FlushPolicy::Explicit,
830 enable_wal: true,
831 ..Default::default()
832 };
833 let mut buf = WriteBuffer::new(config);
834 buf.push(make_point(1, 100, 1.0)).expect("push");
835 buf.flush().expect("flush");
836 assert_eq!(buf.stats().wal_entries, 1);
837 }
838
839 #[test]
840 fn test_wal_disabled_no_entries_recorded() {
841 let config = WriteBufferConfig {
842 max_capacity: 10_000,
843 flush_policy: FlushPolicy::Explicit,
844 enable_wal: false,
845 ..Default::default()
846 };
847 let mut buf = WriteBuffer::new(config);
848 buf.push(make_point(1, 100, 1.0)).expect("push");
849 buf.flush().expect("flush");
850 assert_eq!(buf.stats().wal_entries, 0);
851 }
852
853 #[test]
856 fn test_write_buffer_error_display() {
857 assert!(WriteBufferError::BufferFull
858 .to_string()
859 .contains("backpressure"));
860 assert!(WriteBufferError::FlushConflict("x".into())
861 .to_string()
862 .contains("x"));
863 assert!(WriteBufferError::WalError("wal".into())
864 .to_string()
865 .contains("wal"));
866 assert!(WriteBufferError::Internal("internal".into())
867 .to_string()
868 .contains("internal"));
869 }
870
871 #[test]
874 fn test_len_reflects_buffered_points() {
875 let mut buf = size_buffer(100);
876 for i in 0..7 {
877 buf.push(make_point(1, i, 0.0)).expect("push");
878 }
879 assert_eq!(buf.len(), 7);
880 }
881
882 #[test]
883 fn test_is_full_at_capacity() {
884 let config = WriteBufferConfig {
885 max_capacity: 2,
886 flush_policy: FlushPolicy::Explicit,
887 enable_wal: false,
888 ..Default::default()
889 };
890 let mut buf = WriteBuffer::new(config);
891 buf.push(make_point(1, 1, 0.0)).expect("push");
892 assert!(!buf.is_full());
893 buf.push(make_point(1, 2, 0.0)).expect("push");
894 assert!(buf.is_full());
895 }
896
897 #[test]
900 fn test_data_point_ordering_by_timestamp() {
901 let a = make_point(1, 100, 0.0);
902 let b = make_point(1, 200, 0.0);
903 assert!(a < b);
904 }
905
906 #[test]
907 fn test_data_point_ordering_same_timestamp_by_series() {
908 let a = make_point(1, 100, 0.0);
909 let b = make_point(2, 100, 0.0);
910 assert!(a < b);
911 }
912
913 #[test]
916 fn test_flush_multiple_series_interleaved() {
917 let mut buf = size_buffer(100);
918 buf.push(make_point(2, 200, 1.0)).expect("push");
919 buf.push(make_point(1, 100, 2.0)).expect("push");
920 buf.push(make_point(3, 150, 3.0)).expect("push");
921 let flushed = buf.flush().expect("flush");
922 assert_eq!(flushed[0].series_id, 1);
923 assert_eq!(flushed[1].series_id, 3);
924 assert_eq!(flushed[2].series_id, 2);
925 }
926
927 #[test]
930 fn test_flush_with_noop_wal_sink() {
931 let mut buf = WriteBuffer::new(WriteBufferConfig {
932 max_capacity: 10_000,
933 flush_policy: FlushPolicy::Explicit,
934 enable_wal: true,
935 ..Default::default()
936 });
937 buf.push(make_point(1, 100, 1.0)).expect("push");
938 let mut sink = NoopWalSink;
939 let flushed = buf.flush_with_wal(&mut sink).expect("flush_with_wal");
940 assert_eq!(flushed.len(), 1);
941 }
942}