ddex_builder/streaming/
buffer_manager.rs1use std::collections::VecDeque;
7use std::io::{Result as IoResult, Write as IoWrite};
8
9pub type FlushCallback = Box<dyn Fn(usize) + Send + Sync>;
11
12#[derive(Debug, Clone)]
14pub struct BufferConfig {
15 pub max_buffer_size: usize,
17 pub buffer_count: usize,
19 pub enable_compression: bool,
21}
22
23impl Default for BufferConfig {
24 fn default() -> Self {
25 Self {
26 max_buffer_size: 1024 * 1024, buffer_count: 10, enable_compression: false, }
30 }
31}
32
33pub struct BufferManager<W: IoWrite> {
35 writer: W,
36 config: BufferConfig,
37
38 buffers: VecDeque<Vec<u8>>,
40 current_buffer: Vec<u8>,
41
42 total_bytes_written: usize,
44 total_flushes: usize,
45 peak_buffer_size: usize,
46
47 flush_callback: Option<FlushCallback>,
49}
50
51impl<W: IoWrite> BufferManager<W> {
52 pub fn new(writer: W, max_buffer_size: usize) -> IoResult<Self> {
54 let config = BufferConfig {
55 max_buffer_size,
56 ..BufferConfig::default()
57 };
58 Self::new_with_config(writer, config)
59 }
60
61 pub fn new_with_config(writer: W, config: BufferConfig) -> IoResult<Self> {
63 let buffer_capacity = config.max_buffer_size;
64 Ok(BufferManager {
65 writer,
66 config,
67 buffers: VecDeque::new(),
68 current_buffer: Vec::with_capacity(buffer_capacity),
69 total_bytes_written: 0,
70 total_flushes: 0,
71 peak_buffer_size: 0,
72 flush_callback: None,
73 })
74 }
75
76 pub fn set_flush_callback(&mut self, callback: FlushCallback) {
78 self.flush_callback = Some(callback);
79 }
80
81 pub fn write_chunk(&mut self, data: &[u8]) -> IoResult<()> {
83 if self.current_buffer.len() + data.len() > self.config.max_buffer_size {
85 self.flush_current_buffer()?;
86 }
87
88 if data.len() > self.config.max_buffer_size {
90 self.write_directly(data)?;
91 return Ok(());
92 }
93
94 self.current_buffer.extend_from_slice(data);
96
97 let current_memory = self.current_memory_usage();
99 if current_memory > self.peak_buffer_size {
100 self.peak_buffer_size = current_memory;
101 }
102
103 if self.buffers.len() >= self.config.buffer_count {
105 self.flush_oldest_buffer()?;
106 }
107
108 Ok(())
109 }
110
111 pub fn flush_current_buffer(&mut self) -> IoResult<()> {
113 if !self.current_buffer.is_empty() {
114 let buffer = std::mem::replace(
115 &mut self.current_buffer,
116 Vec::with_capacity(self.config.max_buffer_size),
117 );
118 self.buffers.push_back(buffer);
119
120 if self.buffers.len() > self.config.buffer_count {
122 self.flush_oldest_buffer()?;
123 }
124 }
125 Ok(())
126 }
127
128 pub fn flush_oldest_buffer(&mut self) -> IoResult<()> {
130 if let Some(buffer) = self.buffers.pop_front() {
131 self.write_buffer(&buffer)?;
132
133 if let Some(ref callback) = self.flush_callback {
135 callback(buffer.len());
136 }
137 }
138 Ok(())
139 }
140
141 pub fn flush_all(&mut self) -> IoResult<()> {
143 self.flush_current_buffer()?;
145
146 while !self.buffers.is_empty() {
148 self.flush_oldest_buffer()?;
149 }
150
151 self.writer.flush()?;
153
154 Ok(())
155 }
156
157 fn write_directly(&mut self, data: &[u8]) -> IoResult<()> {
159 self.flush_all()?;
161
162 self.write_buffer(data)?;
164
165 if let Some(ref callback) = self.flush_callback {
167 callback(data.len());
168 }
169
170 Ok(())
171 }
172
173 fn write_buffer(&mut self, buffer: &[u8]) -> IoResult<()> {
175 if self.config.enable_compression {
176 self.writer.write_all(buffer)?;
178 } else {
179 self.writer.write_all(buffer)?;
180 }
181
182 self.total_bytes_written += buffer.len();
183 self.total_flushes += 1;
184
185 Ok(())
186 }
187
188 pub fn current_memory_usage(&self) -> usize {
190 let buffered_size: usize = self.buffers.iter().map(|b| b.len()).sum();
191 buffered_size + self.current_buffer.len()
192 }
193
194 pub fn current_buffer_size(&self) -> usize {
196 self.current_buffer.len()
197 }
198
199 pub fn total_bytes_written(&self) -> usize {
201 self.total_bytes_written
202 }
203
204 pub fn peak_buffer_size(&self) -> usize {
206 self.peak_buffer_size
207 }
208
209 pub fn total_flushes(&self) -> usize {
211 self.total_flushes
212 }
213
214 pub fn queued_buffer_count(&self) -> usize {
216 self.buffers.len()
217 }
218
219 pub fn is_near_capacity(&self) -> bool {
221 self.current_buffer.len() > (self.config.max_buffer_size * 3 / 4)
222 || self.buffers.len() >= (self.config.buffer_count * 3 / 4)
223 }
224
225 pub fn get_stats(&self) -> BufferStats {
227 BufferStats {
228 current_memory_usage: self.current_memory_usage(),
229 peak_memory_usage: self.peak_buffer_size,
230 total_bytes_written: self.total_bytes_written,
231 total_flushes: self.total_flushes,
232 queued_buffers: self.buffers.len(),
233 current_buffer_size: self.current_buffer.len(),
234 is_near_capacity: self.is_near_capacity(),
235 }
236 }
237}
238
239#[derive(Debug, Default)]
241pub struct BufferStats {
242 pub current_memory_usage: usize,
244 pub peak_memory_usage: usize,
246 pub total_bytes_written: usize,
248 pub total_flushes: usize,
250 pub queued_buffers: usize,
252 pub current_buffer_size: usize,
254 pub is_near_capacity: bool,
256}
257
258impl<W: IoWrite> Drop for BufferManager<W> {
259 fn drop(&mut self) {
261 let _ = self.flush_all();
262 }
263}
264
265#[cfg(test)]
266mod tests {
267 use super::*;
268 use std::io::Cursor;
269
270 #[test]
271 fn test_basic_buffering() {
272 let output = Vec::new();
273 let cursor = Cursor::new(output);
274 let mut buffer_manager = BufferManager::new(cursor, 100).unwrap();
275
276 buffer_manager.write_chunk(b"Hello, ").unwrap();
278 buffer_manager.write_chunk(b"World!").unwrap();
279
280 assert_eq!(buffer_manager.current_buffer_size(), 13);
281 assert_eq!(buffer_manager.total_bytes_written(), 0); buffer_manager.flush_all().unwrap();
285
286 assert_eq!(buffer_manager.total_bytes_written(), 13);
287 let output = buffer_manager.writer.clone().into_inner();
288 assert_eq!(output, b"Hello, World!");
289 }
290
291 #[test]
292 fn test_automatic_flushing() {
293 let output = Vec::new();
294 let cursor = Cursor::new(output);
295 let mut buffer_manager = BufferManager::new(cursor, 10).unwrap(); buffer_manager
299 .write_chunk(b"This is a longer string")
300 .unwrap();
301
302 assert!(buffer_manager.total_bytes_written() > 0);
304 }
305
306 #[test]
307 fn test_buffer_stats() {
308 let output = Vec::new();
309 let cursor = Cursor::new(output);
310 let mut buffer_manager = BufferManager::new(cursor, 100).unwrap();
311
312 buffer_manager.write_chunk(b"test data").unwrap();
313
314 let stats = buffer_manager.get_stats();
315 assert_eq!(stats.current_buffer_size, 9);
316 assert_eq!(stats.total_bytes_written, 0);
317 assert_eq!(stats.queued_buffers, 0);
318 }
319
320 #[test]
321 fn test_flush_callback() {
322 use std::sync::{Arc, Mutex};
323
324 let output = Vec::new();
325 let cursor = Cursor::new(output);
326 let mut buffer_manager = BufferManager::new(cursor, 10).unwrap();
327
328 let flush_count = Arc::new(Mutex::new(0));
329 let flush_count_clone = flush_count.clone();
330
331 buffer_manager.set_flush_callback(Box::new(move |_size| {
332 let mut count = flush_count_clone.lock().unwrap();
333 *count += 1;
334 }));
335
336 buffer_manager
338 .write_chunk(b"This will trigger a flush")
339 .unwrap();
340 buffer_manager.flush_all().unwrap();
341
342 let count = *flush_count.lock().unwrap();
343 assert!(count > 0);
344 }
345}