mdf4_rs/writer/
streaming.rs

1//! Streaming write configuration for long-running MDF4 captures.
2//!
3//! This module provides [`FlushPolicy`] for configuring automatic flushing
4//! of MDF4 data during capture, enabling memory-efficient logging of long
5//! recordings.
6//!
7//! # Use Cases
8//!
9//! - Hours-long vehicle data logging without running out of memory
10//! - Crash-safe logging where partial files remain valid MDF4
11//! - Embedded systems with limited RAM
12//!
13//! # Example
14//!
15//! ```ignore
16//! use mdf4_rs::{MdfWriter, FlushPolicy};
17//!
18//! let mut writer = MdfWriter::new("output.mf4")?
19//!     .with_flush_policy(FlushPolicy::EveryNRecords(1000));
20//!
21//! // Records are automatically flushed to disk every 1000 records
22//! for i in 0..10000 {
23//!     writer.write_record(&cg_id, &values)?;
24//! }
25//! ```
26
27/// Policy for automatic flushing of MDF4 data during streaming writes.
28///
29/// When a flush policy is set, the writer will automatically flush buffered
30/// data to disk based on the policy criteria. This is essential for long-running
31/// captures where keeping all data in memory is not feasible.
32///
33/// # Flush Behavior
34///
35/// When a flush is triggered:
36/// 1. All buffered record data is written to the underlying I/O
37/// 2. DT block size links are updated
38/// 3. The I/O buffer is flushed to disk
39///
40/// The file remains in a valid state after each flush, with proper DT block
41/// sizes recorded. Final DL (Data List) blocks are created during finalization.
42#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
43pub enum FlushPolicy {
44    /// Never auto-flush. Data is only flushed on explicit `flush()` or `finalize()` calls.
45    /// This is the default behavior.
46    #[default]
47    Manual,
48
49    /// Flush after every N records written across all channel groups.
50    ///
51    /// This is useful when you want predictable flush intervals based on
52    /// the number of data points captured.
53    ///
54    /// # Example
55    /// ```ignore
56    /// // Flush every 1000 records
57    /// FlushPolicy::EveryNRecords(1000)
58    /// ```
59    EveryNRecords(u64),
60
61    /// Flush after N bytes of record data have been written.
62    ///
63    /// This is useful when you want to limit memory usage to a specific
64    /// amount regardless of record size.
65    ///
66    /// # Example
67    /// ```ignore
68    /// // Flush every 1 MB of data
69    /// FlushPolicy::EveryNBytes(1024 * 1024)
70    /// ```
71    EveryNBytes(u64),
72}
73
74impl FlushPolicy {
75    /// Check if this policy requires automatic flushing.
76    pub fn is_auto(&self) -> bool {
77        !matches!(self, FlushPolicy::Manual)
78    }
79}
80
81/// Configuration for streaming MDF4 writes.
82#[derive(Debug, Clone, Default)]
83pub struct StreamingConfig {
84    /// The flush policy to use.
85    pub policy: FlushPolicy,
86}
87
88impl StreamingConfig {
89    /// Create a new streaming configuration with manual flush policy.
90    pub fn new() -> Self {
91        Self::default()
92    }
93
94    /// Create a streaming configuration that flushes every N records.
95    pub fn every_n_records(n: u64) -> Self {
96        Self {
97            policy: FlushPolicy::EveryNRecords(n),
98        }
99    }
100
101    /// Create a streaming configuration that flushes every N bytes.
102    pub fn every_n_bytes(n: u64) -> Self {
103        Self {
104            policy: FlushPolicy::EveryNBytes(n),
105        }
106    }
107}
108
109/// Tracks flush state for streaming writes.
110#[derive(Debug, Default)]
111pub(super) struct FlushState {
112    /// Records written since last flush.
113    pub records_since_flush: u64,
114    /// Bytes written since last flush.
115    pub bytes_since_flush: u64,
116    /// Total records written.
117    pub total_records: u64,
118    /// Total bytes written.
119    pub total_bytes: u64,
120    /// Number of flushes performed.
121    pub flush_count: u64,
122}
123
124impl FlushState {
125    /// Record that data was written.
126    pub fn record_write(&mut self, records: u64, bytes: u64) {
127        self.records_since_flush += records;
128        self.bytes_since_flush += bytes;
129        self.total_records += records;
130        self.total_bytes += bytes;
131    }
132
133    /// Check if a flush should be triggered based on the policy.
134    pub fn should_flush(&self, policy: &FlushPolicy) -> bool {
135        match policy {
136            FlushPolicy::Manual => false,
137            FlushPolicy::EveryNRecords(n) => self.records_since_flush >= *n,
138            FlushPolicy::EveryNBytes(n) => self.bytes_since_flush >= *n,
139        }
140    }
141
142    /// Reset counters after a flush.
143    pub fn on_flush(&mut self) {
144        self.records_since_flush = 0;
145        self.bytes_since_flush = 0;
146        self.flush_count += 1;
147    }
148}
149
150#[cfg(test)]
151mod tests {
152    use super::*;
153
154    #[test]
155    fn test_flush_policy_default() {
156        assert_eq!(FlushPolicy::default(), FlushPolicy::Manual);
157    }
158
159    #[test]
160    fn test_flush_policy_is_auto() {
161        assert!(!FlushPolicy::Manual.is_auto());
162        assert!(FlushPolicy::EveryNRecords(100).is_auto());
163        assert!(FlushPolicy::EveryNBytes(1024).is_auto());
164    }
165
166    #[test]
167    fn test_flush_state_should_flush() {
168        let mut state = FlushState::default();
169
170        // Manual policy never triggers
171        state.record_write(1000, 10000);
172        assert!(!state.should_flush(&FlushPolicy::Manual));
173
174        // EveryNRecords triggers at threshold
175        assert!(!state.should_flush(&FlushPolicy::EveryNRecords(1001)));
176        assert!(state.should_flush(&FlushPolicy::EveryNRecords(1000)));
177        assert!(state.should_flush(&FlushPolicy::EveryNRecords(500)));
178
179        // EveryNBytes triggers at threshold
180        assert!(!state.should_flush(&FlushPolicy::EveryNBytes(10001)));
181        assert!(state.should_flush(&FlushPolicy::EveryNBytes(10000)));
182        assert!(state.should_flush(&FlushPolicy::EveryNBytes(5000)));
183    }
184
185    #[test]
186    fn test_flush_state_reset() {
187        let mut state = FlushState::default();
188        state.record_write(100, 1000);
189        assert_eq!(state.records_since_flush, 100);
190        assert_eq!(state.bytes_since_flush, 1000);
191
192        state.on_flush();
193        assert_eq!(state.records_since_flush, 0);
194        assert_eq!(state.bytes_since_flush, 0);
195        assert_eq!(state.total_records, 100); // Total preserved
196        assert_eq!(state.total_bytes, 1000);
197        assert_eq!(state.flush_count, 1);
198    }
199
200    #[test]
201    fn test_streaming_config_constructors() {
202        let config = StreamingConfig::every_n_records(500);
203        assert_eq!(config.policy, FlushPolicy::EveryNRecords(500));
204
205        let config = StreamingConfig::every_n_bytes(1024);
206        assert_eq!(config.policy, FlushPolicy::EveryNBytes(1024));
207    }
208}