ddex_builder/streaming/
buffer_manager.rs1use std::io::{Write as IoWrite, Result as IoResult};
7use std::collections::VecDeque;
8
9pub type FlushCallback = Box<dyn Fn(usize) + Send + Sync>;
11
12#[derive(Debug, Clone)]
14pub struct BufferConfig {
15 pub max_buffer_size: usize,
17 pub buffer_count: usize,
19 pub enable_compression: bool,
21}
22
23impl Default for BufferConfig {
24 fn default() -> Self {
25 Self {
26 max_buffer_size: 1024 * 1024, buffer_count: 10, enable_compression: false, }
30 }
31}
32
33pub struct BufferManager<W: IoWrite> {
35 writer: W,
36 config: BufferConfig,
37
38 buffers: VecDeque<Vec<u8>>,
40 current_buffer: Vec<u8>,
41
42 total_bytes_written: usize,
44 total_flushes: usize,
45 peak_buffer_size: usize,
46
47 flush_callback: Option<FlushCallback>,
49}
50
51impl<W: IoWrite> BufferManager<W> {
52 pub fn new(writer: W, max_buffer_size: usize) -> IoResult<Self> {
54 let config = BufferConfig {
55 max_buffer_size,
56 ..BufferConfig::default()
57 };
58 Self::new_with_config(writer, config)
59 }
60
61 pub fn new_with_config(writer: W, config: BufferConfig) -> IoResult<Self> {
63 let buffer_capacity = config.max_buffer_size;
64 Ok(BufferManager {
65 writer,
66 config,
67 buffers: VecDeque::new(),
68 current_buffer: Vec::with_capacity(buffer_capacity),
69 total_bytes_written: 0,
70 total_flushes: 0,
71 peak_buffer_size: 0,
72 flush_callback: None,
73 })
74 }
75
76 pub fn set_flush_callback(&mut self, callback: FlushCallback) {
78 self.flush_callback = Some(callback);
79 }
80
81 pub fn write_chunk(&mut self, data: &[u8]) -> IoResult<()> {
83 if self.current_buffer.len() + data.len() > self.config.max_buffer_size {
85 self.flush_current_buffer()?;
86 }
87
88 if data.len() > self.config.max_buffer_size {
90 self.write_directly(data)?;
91 return Ok(());
92 }
93
94 self.current_buffer.extend_from_slice(data);
96
97 let current_memory = self.current_memory_usage();
99 if current_memory > self.peak_buffer_size {
100 self.peak_buffer_size = current_memory;
101 }
102
103 if self.buffers.len() >= self.config.buffer_count {
105 self.flush_oldest_buffer()?;
106 }
107
108 Ok(())
109 }
110
111 pub fn flush_current_buffer(&mut self) -> IoResult<()> {
113 if !self.current_buffer.is_empty() {
114 let buffer = std::mem::replace(
115 &mut self.current_buffer,
116 Vec::with_capacity(self.config.max_buffer_size)
117 );
118 self.buffers.push_back(buffer);
119
120 if self.buffers.len() > self.config.buffer_count {
122 self.flush_oldest_buffer()?;
123 }
124 }
125 Ok(())
126 }
127
128 pub fn flush_oldest_buffer(&mut self) -> IoResult<()> {
130 if let Some(buffer) = self.buffers.pop_front() {
131 self.write_buffer(&buffer)?;
132
133 if let Some(ref callback) = self.flush_callback {
135 callback(buffer.len());
136 }
137 }
138 Ok(())
139 }
140
141 pub fn flush_all(&mut self) -> IoResult<()> {
143 self.flush_current_buffer()?;
145
146 while !self.buffers.is_empty() {
148 self.flush_oldest_buffer()?;
149 }
150
151 self.writer.flush()?;
153
154 Ok(())
155 }
156
157 fn write_directly(&mut self, data: &[u8]) -> IoResult<()> {
159 self.flush_all()?;
161
162 self.write_buffer(data)?;
164
165 Ok(())
166 }
167
168 fn write_buffer(&mut self, buffer: &[u8]) -> IoResult<()> {
170 if self.config.enable_compression {
171 self.writer.write_all(buffer)?;
173 } else {
174 self.writer.write_all(buffer)?;
175 }
176
177 self.total_bytes_written += buffer.len();
178 self.total_flushes += 1;
179
180 Ok(())
181 }
182
183 pub fn current_memory_usage(&self) -> usize {
185 let buffered_size: usize = self.buffers.iter().map(|b| b.len()).sum();
186 buffered_size + self.current_buffer.len()
187 }
188
189 pub fn current_buffer_size(&self) -> usize {
191 self.current_buffer.len()
192 }
193
194 pub fn total_bytes_written(&self) -> usize {
196 self.total_bytes_written
197 }
198
199 pub fn peak_buffer_size(&self) -> usize {
201 self.peak_buffer_size
202 }
203
204 pub fn total_flushes(&self) -> usize {
206 self.total_flushes
207 }
208
209 pub fn queued_buffer_count(&self) -> usize {
211 self.buffers.len()
212 }
213
214 pub fn is_near_capacity(&self) -> bool {
216 self.current_buffer.len() > (self.config.max_buffer_size * 3 / 4) ||
217 self.buffers.len() >= (self.config.buffer_count * 3 / 4)
218 }
219
220 pub fn get_stats(&self) -> BufferStats {
222 BufferStats {
223 current_memory_usage: self.current_memory_usage(),
224 peak_memory_usage: self.peak_buffer_size,
225 total_bytes_written: self.total_bytes_written,
226 total_flushes: self.total_flushes,
227 queued_buffers: self.buffers.len(),
228 current_buffer_size: self.current_buffer.len(),
229 is_near_capacity: self.is_near_capacity(),
230 }
231 }
232}
233
234#[derive(Debug, Clone)]
236pub struct BufferStats {
237 pub current_memory_usage: usize,
238 pub peak_memory_usage: usize,
239 pub total_bytes_written: usize,
240 pub total_flushes: usize,
241 pub queued_buffers: usize,
242 pub current_buffer_size: usize,
243 pub is_near_capacity: bool,
244}
245
246impl<W: IoWrite> Drop for BufferManager<W> {
247 fn drop(&mut self) {
249 let _ = self.flush_all();
250 }
251}
252
253#[cfg(test)]
254mod tests {
255 use super::*;
256 use std::io::Cursor;
257
258 #[test]
259 fn test_basic_buffering() {
260 let output = Vec::new();
261 let cursor = Cursor::new(output);
262 let mut buffer_manager = BufferManager::new(cursor, 100).unwrap();
263
264 buffer_manager.write_chunk(b"Hello, ").unwrap();
266 buffer_manager.write_chunk(b"World!").unwrap();
267
268 assert_eq!(buffer_manager.current_buffer_size(), 13);
269 assert_eq!(buffer_manager.total_bytes_written(), 0); buffer_manager.flush_all().unwrap();
273
274 assert_eq!(buffer_manager.total_bytes_written(), 13);
275 let output = buffer_manager.writer.clone().into_inner();
276 assert_eq!(output, b"Hello, World!");
277 }
278
279 #[test]
280 fn test_automatic_flushing() {
281 let output = Vec::new();
282 let cursor = Cursor::new(output);
283 let mut buffer_manager = BufferManager::new(cursor, 10).unwrap(); buffer_manager.write_chunk(b"This is a longer string").unwrap();
287
288 assert!(buffer_manager.total_bytes_written() > 0);
290 }
291
292 #[test]
293 fn test_buffer_stats() {
294 let output = Vec::new();
295 let cursor = Cursor::new(output);
296 let mut buffer_manager = BufferManager::new(cursor, 100).unwrap();
297
298 buffer_manager.write_chunk(b"test data").unwrap();
299
300 let stats = buffer_manager.get_stats();
301 assert_eq!(stats.current_buffer_size, 9);
302 assert_eq!(stats.total_bytes_written, 0);
303 assert_eq!(stats.queued_buffers, 0);
304 }
305
306 #[test]
307 fn test_flush_callback() {
308 use std::sync::{Arc, Mutex};
309
310 let output = Vec::new();
311 let cursor = Cursor::new(output);
312 let mut buffer_manager = BufferManager::new(cursor, 10).unwrap();
313
314 let flush_count = Arc::new(Mutex::new(0));
315 let flush_count_clone = flush_count.clone();
316
317 buffer_manager.set_flush_callback(Box::new(move |_size| {
318 let mut count = flush_count_clone.lock().unwrap();
319 *count += 1;
320 }));
321
322 buffer_manager.write_chunk(b"This will trigger a flush").unwrap();
324 buffer_manager.flush_all().unwrap();
325
326 let count = *flush_count.lock().unwrap();
327 assert!(count > 0);
328 }
329}