mdf4_rs/writer/streaming.rs
1//! Streaming write configuration for long-running MDF4 captures.
2//!
3//! This module provides [`FlushPolicy`] for configuring automatic flushing
4//! of MDF4 data during capture, enabling memory-efficient logging of long
5//! recordings.
6//!
7//! # Use Cases
8//!
9//! - Hours-long vehicle data logging without running out of memory
10//! - Crash-safe logging where partial files remain valid MDF4
11//! - Embedded systems with limited RAM
12//!
13//! # Example
14//!
15//! ```ignore
16//! use mdf4_rs::{MdfWriter, FlushPolicy};
17//!
18//! let mut writer = MdfWriter::new("output.mf4")?
19//! .with_flush_policy(FlushPolicy::EveryNRecords(1000));
20//!
21//! // Records are automatically flushed to disk every 1000 records
22//! for i in 0..10000 {
23//! writer.write_record(&cg_id, &values)?;
24//! }
25//! ```
26
27/// Policy for automatic flushing of MDF4 data during streaming writes.
28///
29/// When a flush policy is set, the writer will automatically flush buffered
30/// data to disk based on the policy criteria. This is essential for long-running
31/// captures where keeping all data in memory is not feasible.
32///
33/// # Flush Behavior
34///
35/// When a flush is triggered:
36/// 1. All buffered record data is written to the underlying I/O
37/// 2. DT block size links are updated
38/// 3. The I/O buffer is flushed to disk
39///
40/// The file remains in a valid state after each flush, with proper DT block
41/// sizes recorded. Final DL (Data List) blocks are created during finalization.
42#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
43pub enum FlushPolicy {
44 /// Never auto-flush. Data is only flushed on explicit `flush()` or `finalize()` calls.
45 /// This is the default behavior.
46 #[default]
47 Manual,
48
49 /// Flush after every N records written across all channel groups.
50 ///
51 /// This is useful when you want predictable flush intervals based on
52 /// the number of data points captured.
53 ///
54 /// # Example
55 /// ```ignore
56 /// // Flush every 1000 records
57 /// FlushPolicy::EveryNRecords(1000)
58 /// ```
59 EveryNRecords(u64),
60
61 /// Flush after N bytes of record data have been written.
62 ///
63 /// This is useful when you want to limit memory usage to a specific
64 /// amount regardless of record size.
65 ///
66 /// # Example
67 /// ```ignore
68 /// // Flush every 1 MB of data
69 /// FlushPolicy::EveryNBytes(1024 * 1024)
70 /// ```
71 EveryNBytes(u64),
72}
73
74impl FlushPolicy {
75 /// Check if this policy requires automatic flushing.
76 pub fn is_auto(&self) -> bool {
77 !matches!(self, FlushPolicy::Manual)
78 }
79}
80
81/// Configuration for streaming MDF4 writes.
82#[derive(Debug, Clone, Default)]
83pub struct StreamingConfig {
84 /// The flush policy to use.
85 pub policy: FlushPolicy,
86}
87
88impl StreamingConfig {
89 /// Create a new streaming configuration with manual flush policy.
90 pub fn new() -> Self {
91 Self::default()
92 }
93
94 /// Create a streaming configuration that flushes every N records.
95 pub fn every_n_records(n: u64) -> Self {
96 Self {
97 policy: FlushPolicy::EveryNRecords(n),
98 }
99 }
100
101 /// Create a streaming configuration that flushes every N bytes.
102 pub fn every_n_bytes(n: u64) -> Self {
103 Self {
104 policy: FlushPolicy::EveryNBytes(n),
105 }
106 }
107}
108
109/// Tracks flush state for streaming writes.
110#[derive(Debug, Default)]
111pub(super) struct FlushState {
112 /// Records written since last flush.
113 pub records_since_flush: u64,
114 /// Bytes written since last flush.
115 pub bytes_since_flush: u64,
116 /// Total records written.
117 pub total_records: u64,
118 /// Total bytes written.
119 pub total_bytes: u64,
120 /// Number of flushes performed.
121 pub flush_count: u64,
122}
123
124impl FlushState {
125 /// Record that data was written.
126 pub fn record_write(&mut self, records: u64, bytes: u64) {
127 self.records_since_flush += records;
128 self.bytes_since_flush += bytes;
129 self.total_records += records;
130 self.total_bytes += bytes;
131 }
132
133 /// Check if a flush should be triggered based on the policy.
134 pub fn should_flush(&self, policy: &FlushPolicy) -> bool {
135 match policy {
136 FlushPolicy::Manual => false,
137 FlushPolicy::EveryNRecords(n) => self.records_since_flush >= *n,
138 FlushPolicy::EveryNBytes(n) => self.bytes_since_flush >= *n,
139 }
140 }
141
142 /// Reset counters after a flush.
143 pub fn on_flush(&mut self) {
144 self.records_since_flush = 0;
145 self.bytes_since_flush = 0;
146 self.flush_count += 1;
147 }
148}
149
150#[cfg(test)]
151mod tests {
152 use super::*;
153
154 #[test]
155 fn test_flush_policy_default() {
156 assert_eq!(FlushPolicy::default(), FlushPolicy::Manual);
157 }
158
159 #[test]
160 fn test_flush_policy_is_auto() {
161 assert!(!FlushPolicy::Manual.is_auto());
162 assert!(FlushPolicy::EveryNRecords(100).is_auto());
163 assert!(FlushPolicy::EveryNBytes(1024).is_auto());
164 }
165
166 #[test]
167 fn test_flush_state_should_flush() {
168 let mut state = FlushState::default();
169
170 // Manual policy never triggers
171 state.record_write(1000, 10000);
172 assert!(!state.should_flush(&FlushPolicy::Manual));
173
174 // EveryNRecords triggers at threshold
175 assert!(!state.should_flush(&FlushPolicy::EveryNRecords(1001)));
176 assert!(state.should_flush(&FlushPolicy::EveryNRecords(1000)));
177 assert!(state.should_flush(&FlushPolicy::EveryNRecords(500)));
178
179 // EveryNBytes triggers at threshold
180 assert!(!state.should_flush(&FlushPolicy::EveryNBytes(10001)));
181 assert!(state.should_flush(&FlushPolicy::EveryNBytes(10000)));
182 assert!(state.should_flush(&FlushPolicy::EveryNBytes(5000)));
183 }
184
185 #[test]
186 fn test_flush_state_reset() {
187 let mut state = FlushState::default();
188 state.record_write(100, 1000);
189 assert_eq!(state.records_since_flush, 100);
190 assert_eq!(state.bytes_since_flush, 1000);
191
192 state.on_flush();
193 assert_eq!(state.records_since_flush, 0);
194 assert_eq!(state.bytes_since_flush, 0);
195 assert_eq!(state.total_records, 100); // Total preserved
196 assert_eq!(state.total_bytes, 1000);
197 assert_eq!(state.flush_count, 1);
198 }
199
200 #[test]
201 fn test_streaming_config_constructors() {
202 let config = StreamingConfig::every_n_records(500);
203 assert_eq!(config.policy, FlushPolicy::EveryNRecords(500));
204
205 let config = StreamingConfig::every_n_bytes(1024);
206 assert_eq!(config.policy, FlushPolicy::EveryNBytes(1024));
207 }
208}