ddex_builder/streaming/
buffer_manager.rs

1//! Buffer management for streaming DDEX XML output
2//!
3//! Handles chunked writing to disk/network with automatic flushing
4//! and memory-bounded operations.
5
6use std::collections::VecDeque;
7use std::io::{Result as IoResult, Write as IoWrite};
8
9/// Callback type for flush events
10pub type FlushCallback = Box<dyn Fn(usize) + Send + Sync>;
11
12/// Configuration for buffer management
13#[derive(Debug, Clone)]
14pub struct BufferConfig {
15    /// Maximum buffer size before automatic flush
16    pub max_buffer_size: usize,
17    /// Number of buffers to keep in memory
18    pub buffer_count: usize,
19    /// Whether to enable compression
20    pub enable_compression: bool,
21}
22
23impl Default for BufferConfig {
24    fn default() -> Self {
25        Self {
26            max_buffer_size: 1024 * 1024, // 1MB per buffer
27            buffer_count: 10,             // Up to 10MB total
28            enable_compression: false,    // Disabled by default for simplicity
29        }
30    }
31}
32
33/// Manages buffered writing with automatic flushing and memory limits
34pub struct BufferManager<W: IoWrite> {
35    writer: W,
36    config: BufferConfig,
37
38    // Buffer management
39    buffers: VecDeque<Vec<u8>>,
40    current_buffer: Vec<u8>,
41
42    // Statistics
43    total_bytes_written: usize,
44    total_flushes: usize,
45    peak_buffer_size: usize,
46
47    // Callbacks
48    flush_callback: Option<FlushCallback>,
49}
50
51impl<W: IoWrite> BufferManager<W> {
52    /// Create a new buffer manager with default configuration
53    pub fn new(writer: W, max_buffer_size: usize) -> IoResult<Self> {
54        let config = BufferConfig {
55            max_buffer_size,
56            ..BufferConfig::default()
57        };
58        Self::new_with_config(writer, config)
59    }
60
61    /// Create a new buffer manager with custom configuration
62    pub fn new_with_config(writer: W, config: BufferConfig) -> IoResult<Self> {
63        let buffer_capacity = config.max_buffer_size;
64        Ok(BufferManager {
65            writer,
66            config,
67            buffers: VecDeque::new(),
68            current_buffer: Vec::with_capacity(buffer_capacity),
69            total_bytes_written: 0,
70            total_flushes: 0,
71            peak_buffer_size: 0,
72            flush_callback: None,
73        })
74    }
75
76    /// Set a callback to be called when buffers are flushed
77    pub fn set_flush_callback(&mut self, callback: FlushCallback) {
78        self.flush_callback = Some(callback);
79    }
80
81    /// Write a chunk of data to the buffer
82    pub fn write_chunk(&mut self, data: &[u8]) -> IoResult<()> {
83        // If this chunk would overflow current buffer, flush it first
84        if self.current_buffer.len() + data.len() > self.config.max_buffer_size {
85            self.flush_current_buffer()?;
86        }
87
88        // If chunk is larger than max buffer size, write it directly
89        if data.len() > self.config.max_buffer_size {
90            self.write_directly(data)?;
91            return Ok(());
92        }
93
94        // Add to current buffer
95        self.current_buffer.extend_from_slice(data);
96
97        // Update peak memory usage
98        let current_memory = self.current_memory_usage();
99        if current_memory > self.peak_buffer_size {
100            self.peak_buffer_size = current_memory;
101        }
102
103        // Check if we need to flush due to buffer count limit
104        if self.buffers.len() >= self.config.buffer_count {
105            self.flush_oldest_buffer()?;
106        }
107
108        Ok(())
109    }
110
111    /// Flush the current buffer to the queue
112    pub fn flush_current_buffer(&mut self) -> IoResult<()> {
113        if !self.current_buffer.is_empty() {
114            let buffer = std::mem::replace(
115                &mut self.current_buffer,
116                Vec::with_capacity(self.config.max_buffer_size),
117            );
118            self.buffers.push_back(buffer);
119
120            // If we have too many buffers, flush the oldest one
121            if self.buffers.len() > self.config.buffer_count {
122                self.flush_oldest_buffer()?;
123            }
124        }
125        Ok(())
126    }
127
128    /// Flush the oldest buffer to the writer
129    pub fn flush_oldest_buffer(&mut self) -> IoResult<()> {
130        if let Some(buffer) = self.buffers.pop_front() {
131            self.write_buffer(&buffer)?;
132
133            // Call flush callback if set
134            if let Some(ref callback) = self.flush_callback {
135                callback(buffer.len());
136            }
137        }
138        Ok(())
139    }
140
141    /// Flush all buffers to the writer
142    pub fn flush_all(&mut self) -> IoResult<()> {
143        // Flush current buffer to queue first
144        self.flush_current_buffer()?;
145
146        // Flush all queued buffers
147        while !self.buffers.is_empty() {
148            self.flush_oldest_buffer()?;
149        }
150
151        // Ensure writer is flushed
152        self.writer.flush()?;
153
154        Ok(())
155    }
156
157    /// Write data directly to the writer without buffering
158    fn write_directly(&mut self, data: &[u8]) -> IoResult<()> {
159        // First flush any existing buffers to maintain order
160        self.flush_all()?;
161
162        // Write directly
163        self.write_buffer(data)?;
164
165        // Call flush callback for direct writes too
166        if let Some(ref callback) = self.flush_callback {
167            callback(data.len());
168        }
169
170        Ok(())
171    }
172
173    /// Write a buffer to the underlying writer
174    fn write_buffer(&mut self, buffer: &[u8]) -> IoResult<()> {
175        if self.config.enable_compression {
176            // TODO: Implement compression if needed
177            self.writer.write_all(buffer)?;
178        } else {
179            self.writer.write_all(buffer)?;
180        }
181
182        self.total_bytes_written += buffer.len();
183        self.total_flushes += 1;
184
185        Ok(())
186    }
187
188    /// Get current memory usage in bytes
189    pub fn current_memory_usage(&self) -> usize {
190        let buffered_size: usize = self.buffers.iter().map(|b| b.len()).sum();
191        buffered_size + self.current_buffer.len()
192    }
193
194    /// Get current buffer size
195    pub fn current_buffer_size(&self) -> usize {
196        self.current_buffer.len()
197    }
198
199    /// Get total bytes written so far
200    pub fn total_bytes_written(&self) -> usize {
201        self.total_bytes_written
202    }
203
204    /// Get peak buffer size reached
205    pub fn peak_buffer_size(&self) -> usize {
206        self.peak_buffer_size
207    }
208
209    /// Get total number of flushes performed
210    pub fn total_flushes(&self) -> usize {
211        self.total_flushes
212    }
213
214    /// Get number of buffers currently queued
215    pub fn queued_buffer_count(&self) -> usize {
216        self.buffers.len()
217    }
218
219    /// Check if buffers are near capacity
220    pub fn is_near_capacity(&self) -> bool {
221        self.current_buffer.len() > (self.config.max_buffer_size * 3 / 4)
222            || self.buffers.len() >= (self.config.buffer_count * 3 / 4)
223    }
224
225    /// Get buffer statistics
226    pub fn get_stats(&self) -> BufferStats {
227        BufferStats {
228            current_memory_usage: self.current_memory_usage(),
229            peak_memory_usage: self.peak_buffer_size,
230            total_bytes_written: self.total_bytes_written,
231            total_flushes: self.total_flushes,
232            queued_buffers: self.buffers.len(),
233            current_buffer_size: self.current_buffer.len(),
234            is_near_capacity: self.is_near_capacity(),
235        }
236    }
237}
238
239/// Statistics for buffer management
240#[derive(Debug, Default)]
241pub struct BufferStats {
242    /// Current memory usage in bytes
243    pub current_memory_usage: usize,
244    /// Peak memory usage reached
245    pub peak_memory_usage: usize,
246    /// Total bytes written through buffers
247    pub total_bytes_written: usize,
248    /// Number of times buffers were flushed
249    pub total_flushes: usize,
250    /// Number of buffers currently queued
251    pub queued_buffers: usize,
252    /// Size of the current active buffer
253    pub current_buffer_size: usize,
254    /// Whether we're near memory capacity
255    pub is_near_capacity: bool,
256}
257
258impl<W: IoWrite> Drop for BufferManager<W> {
259    /// Ensure all buffers are flushed when dropped
260    fn drop(&mut self) {
261        let _ = self.flush_all();
262    }
263}
264
265#[cfg(test)]
266mod tests {
267    use super::*;
268    use std::io::Cursor;
269
270    #[test]
271    fn test_basic_buffering() {
272        let output = Vec::new();
273        let cursor = Cursor::new(output);
274        let mut buffer_manager = BufferManager::new(cursor, 100).unwrap();
275
276        // Write some data
277        buffer_manager.write_chunk(b"Hello, ").unwrap();
278        buffer_manager.write_chunk(b"World!").unwrap();
279
280        assert_eq!(buffer_manager.current_buffer_size(), 13);
281        assert_eq!(buffer_manager.total_bytes_written(), 0); // Not flushed yet
282
283        // Flush all
284        buffer_manager.flush_all().unwrap();
285
286        assert_eq!(buffer_manager.total_bytes_written(), 13);
287        let output = buffer_manager.writer.clone().into_inner();
288        assert_eq!(output, b"Hello, World!");
289    }
290
291    #[test]
292    fn test_automatic_flushing() {
293        let output = Vec::new();
294        let cursor = Cursor::new(output);
295        let mut buffer_manager = BufferManager::new(cursor, 10).unwrap(); // Small buffer
296
297        // Write data that exceeds buffer size
298        buffer_manager
299            .write_chunk(b"This is a longer string")
300            .unwrap();
301
302        // Should have been written directly
303        assert!(buffer_manager.total_bytes_written() > 0);
304    }
305
306    #[test]
307    fn test_buffer_stats() {
308        let output = Vec::new();
309        let cursor = Cursor::new(output);
310        let mut buffer_manager = BufferManager::new(cursor, 100).unwrap();
311
312        buffer_manager.write_chunk(b"test data").unwrap();
313
314        let stats = buffer_manager.get_stats();
315        assert_eq!(stats.current_buffer_size, 9);
316        assert_eq!(stats.total_bytes_written, 0);
317        assert_eq!(stats.queued_buffers, 0);
318    }
319
320    #[test]
321    fn test_flush_callback() {
322        use std::sync::{Arc, Mutex};
323
324        let output = Vec::new();
325        let cursor = Cursor::new(output);
326        let mut buffer_manager = BufferManager::new(cursor, 10).unwrap();
327
328        let flush_count = Arc::new(Mutex::new(0));
329        let flush_count_clone = flush_count.clone();
330
331        buffer_manager.set_flush_callback(Box::new(move |_size| {
332            let mut count = flush_count_clone.lock().unwrap();
333            *count += 1;
334        }));
335
336        // Write enough to trigger flush
337        buffer_manager
338            .write_chunk(b"This will trigger a flush")
339            .unwrap();
340        buffer_manager.flush_all().unwrap();
341
342        let count = *flush_count.lock().unwrap();
343        assert!(count > 0);
344    }
345}