ddex_builder/streaming/
buffer_manager.rs

1//! Buffer management for streaming DDEX XML output
2//! 
3//! Handles chunked writing to disk/network with automatic flushing
4//! and memory-bounded operations.
5
6use std::io::{Write as IoWrite, Result as IoResult};
7use std::collections::VecDeque;
8
9/// Callback type for flush events
10pub type FlushCallback = Box<dyn Fn(usize) + Send + Sync>;
11
12/// Configuration for buffer management
13#[derive(Debug, Clone)]
14pub struct BufferConfig {
15    /// Maximum buffer size before automatic flush
16    pub max_buffer_size: usize,
17    /// Number of buffers to keep in memory
18    pub buffer_count: usize,
19    /// Whether to enable compression
20    pub enable_compression: bool,
21}
22
23impl Default for BufferConfig {
24    fn default() -> Self {
25        Self {
26            max_buffer_size: 1024 * 1024, // 1MB per buffer
27            buffer_count: 10, // Up to 10MB total
28            enable_compression: false, // Disabled by default for simplicity
29        }
30    }
31}
32
33/// Manages buffered writing with automatic flushing and memory limits
34pub struct BufferManager<W: IoWrite> {
35    writer: W,
36    config: BufferConfig,
37    
38    // Buffer management
39    buffers: VecDeque<Vec<u8>>,
40    current_buffer: Vec<u8>,
41    
42    // Statistics
43    total_bytes_written: usize,
44    total_flushes: usize,
45    peak_buffer_size: usize,
46    
47    // Callbacks
48    flush_callback: Option<FlushCallback>,
49}
50
51impl<W: IoWrite> BufferManager<W> {
52    /// Create a new buffer manager with default configuration
53    pub fn new(writer: W, max_buffer_size: usize) -> IoResult<Self> {
54        let config = BufferConfig {
55            max_buffer_size,
56            ..BufferConfig::default()
57        };
58        Self::new_with_config(writer, config)
59    }
60    
61    /// Create a new buffer manager with custom configuration
62    pub fn new_with_config(writer: W, config: BufferConfig) -> IoResult<Self> {
63        let buffer_capacity = config.max_buffer_size;
64        Ok(BufferManager {
65            writer,
66            config,
67            buffers: VecDeque::new(),
68            current_buffer: Vec::with_capacity(buffer_capacity),
69            total_bytes_written: 0,
70            total_flushes: 0,
71            peak_buffer_size: 0,
72            flush_callback: None,
73        })
74    }
75    
76    /// Set a callback to be called when buffers are flushed
77    pub fn set_flush_callback(&mut self, callback: FlushCallback) {
78        self.flush_callback = Some(callback);
79    }
80    
81    /// Write a chunk of data to the buffer
82    pub fn write_chunk(&mut self, data: &[u8]) -> IoResult<()> {
83        // If this chunk would overflow current buffer, flush it first
84        if self.current_buffer.len() + data.len() > self.config.max_buffer_size {
85            self.flush_current_buffer()?;
86        }
87        
88        // If chunk is larger than max buffer size, write it directly
89        if data.len() > self.config.max_buffer_size {
90            self.write_directly(data)?;
91            return Ok(());
92        }
93        
94        // Add to current buffer
95        self.current_buffer.extend_from_slice(data);
96        
97        // Update peak memory usage
98        let current_memory = self.current_memory_usage();
99        if current_memory > self.peak_buffer_size {
100            self.peak_buffer_size = current_memory;
101        }
102        
103        // Check if we need to flush due to buffer count limit
104        if self.buffers.len() >= self.config.buffer_count {
105            self.flush_oldest_buffer()?;
106        }
107        
108        Ok(())
109    }
110    
111    /// Flush the current buffer to the queue
112    pub fn flush_current_buffer(&mut self) -> IoResult<()> {
113        if !self.current_buffer.is_empty() {
114            let buffer = std::mem::replace(
115                &mut self.current_buffer,
116                Vec::with_capacity(self.config.max_buffer_size)
117            );
118            self.buffers.push_back(buffer);
119            
120            // If we have too many buffers, flush the oldest one
121            if self.buffers.len() > self.config.buffer_count {
122                self.flush_oldest_buffer()?;
123            }
124        }
125        Ok(())
126    }
127    
128    /// Flush the oldest buffer to the writer
129    pub fn flush_oldest_buffer(&mut self) -> IoResult<()> {
130        if let Some(buffer) = self.buffers.pop_front() {
131            self.write_buffer(&buffer)?;
132            
133            // Call flush callback if set
134            if let Some(ref callback) = self.flush_callback {
135                callback(buffer.len());
136            }
137        }
138        Ok(())
139    }
140    
141    /// Flush all buffers to the writer
142    pub fn flush_all(&mut self) -> IoResult<()> {
143        // Flush current buffer to queue first
144        self.flush_current_buffer()?;
145        
146        // Flush all queued buffers
147        while !self.buffers.is_empty() {
148            self.flush_oldest_buffer()?;
149        }
150        
151        // Ensure writer is flushed
152        self.writer.flush()?;
153        
154        Ok(())
155    }
156    
157    /// Write data directly to the writer without buffering
158    fn write_directly(&mut self, data: &[u8]) -> IoResult<()> {
159        // First flush any existing buffers to maintain order
160        self.flush_all()?;
161        
162        // Write directly
163        self.write_buffer(data)?;
164        
165        Ok(())
166    }
167    
168    /// Write a buffer to the underlying writer
169    fn write_buffer(&mut self, buffer: &[u8]) -> IoResult<()> {
170        if self.config.enable_compression {
171            // TODO: Implement compression if needed
172            self.writer.write_all(buffer)?;
173        } else {
174            self.writer.write_all(buffer)?;
175        }
176        
177        self.total_bytes_written += buffer.len();
178        self.total_flushes += 1;
179        
180        Ok(())
181    }
182    
183    /// Get current memory usage in bytes
184    pub fn current_memory_usage(&self) -> usize {
185        let buffered_size: usize = self.buffers.iter().map(|b| b.len()).sum();
186        buffered_size + self.current_buffer.len()
187    }
188    
189    /// Get current buffer size
190    pub fn current_buffer_size(&self) -> usize {
191        self.current_buffer.len()
192    }
193    
194    /// Get total bytes written so far
195    pub fn total_bytes_written(&self) -> usize {
196        self.total_bytes_written
197    }
198    
199    /// Get peak buffer size reached
200    pub fn peak_buffer_size(&self) -> usize {
201        self.peak_buffer_size
202    }
203    
204    /// Get total number of flushes performed
205    pub fn total_flushes(&self) -> usize {
206        self.total_flushes
207    }
208    
209    /// Get number of buffers currently queued
210    pub fn queued_buffer_count(&self) -> usize {
211        self.buffers.len()
212    }
213    
214    /// Check if buffers are near capacity
215    pub fn is_near_capacity(&self) -> bool {
216        self.current_buffer.len() > (self.config.max_buffer_size * 3 / 4) ||
217        self.buffers.len() >= (self.config.buffer_count * 3 / 4)
218    }
219    
220    /// Get buffer statistics
221    pub fn get_stats(&self) -> BufferStats {
222        BufferStats {
223            current_memory_usage: self.current_memory_usage(),
224            peak_memory_usage: self.peak_buffer_size,
225            total_bytes_written: self.total_bytes_written,
226            total_flushes: self.total_flushes,
227            queued_buffers: self.buffers.len(),
228            current_buffer_size: self.current_buffer.len(),
229            is_near_capacity: self.is_near_capacity(),
230        }
231    }
232}
233
234/// Statistics about buffer usage
235#[derive(Debug, Clone)]
236pub struct BufferStats {
237    pub current_memory_usage: usize,
238    pub peak_memory_usage: usize,
239    pub total_bytes_written: usize,
240    pub total_flushes: usize,
241    pub queued_buffers: usize,
242    pub current_buffer_size: usize,
243    pub is_near_capacity: bool,
244}
245
246impl<W: IoWrite> Drop for BufferManager<W> {
247    /// Ensure all buffers are flushed when dropped
248    fn drop(&mut self) {
249        let _ = self.flush_all();
250    }
251}
252
253#[cfg(test)]
254mod tests {
255    use super::*;
256    use std::io::Cursor;
257    
258    #[test]
259    fn test_basic_buffering() {
260        let output = Vec::new();
261        let cursor = Cursor::new(output);
262        let mut buffer_manager = BufferManager::new(cursor, 100).unwrap();
263        
264        // Write some data
265        buffer_manager.write_chunk(b"Hello, ").unwrap();
266        buffer_manager.write_chunk(b"World!").unwrap();
267        
268        assert_eq!(buffer_manager.current_buffer_size(), 13);
269        assert_eq!(buffer_manager.total_bytes_written(), 0); // Not flushed yet
270        
271        // Flush all
272        buffer_manager.flush_all().unwrap();
273        
274        assert_eq!(buffer_manager.total_bytes_written(), 13);
275        let output = buffer_manager.writer.clone().into_inner();
276        assert_eq!(output, b"Hello, World!");
277    }
278    
279    #[test]
280    fn test_automatic_flushing() {
281        let output = Vec::new();
282        let cursor = Cursor::new(output);
283        let mut buffer_manager = BufferManager::new(cursor, 10).unwrap(); // Small buffer
284        
285        // Write data that exceeds buffer size
286        buffer_manager.write_chunk(b"This is a longer string").unwrap();
287        
288        // Should have been written directly
289        assert!(buffer_manager.total_bytes_written() > 0);
290    }
291    
292    #[test]
293    fn test_buffer_stats() {
294        let output = Vec::new();
295        let cursor = Cursor::new(output);
296        let mut buffer_manager = BufferManager::new(cursor, 100).unwrap();
297        
298        buffer_manager.write_chunk(b"test data").unwrap();
299        
300        let stats = buffer_manager.get_stats();
301        assert_eq!(stats.current_buffer_size, 9);
302        assert_eq!(stats.total_bytes_written, 0);
303        assert_eq!(stats.queued_buffers, 0);
304    }
305    
306    #[test] 
307    fn test_flush_callback() {
308        use std::sync::{Arc, Mutex};
309        
310        let output = Vec::new();
311        let cursor = Cursor::new(output);
312        let mut buffer_manager = BufferManager::new(cursor, 10).unwrap();
313        
314        let flush_count = Arc::new(Mutex::new(0));
315        let flush_count_clone = flush_count.clone();
316        
317        buffer_manager.set_flush_callback(Box::new(move |_size| {
318            let mut count = flush_count_clone.lock().unwrap();
319            *count += 1;
320        }));
321        
322        // Write enough to trigger flush
323        buffer_manager.write_chunk(b"This will trigger a flush").unwrap();
324        buffer_manager.flush_all().unwrap();
325        
326        let count = *flush_count.lock().unwrap();
327        assert!(count > 0);
328    }
329}