Skip to main content

allsource_core/infrastructure/persistence/
performance.rs

1use std::time::{Duration, Instant};
2
3/// Batch writer for high-throughput ingestion.
4///
5/// Inspired by SierraDB's focus on high-throughput operations.
6///
7/// # Design Principles
8///
9/// - Batch operations for reduced overhead
10/// - Minimal allocations
11/// - Zero-copy where possible
12/// - Lock-free when feasible
13///
14/// # Behavior
15///
16/// Accumulates items in a buffer and flushes when:
17///
18/// - Buffer reaches capacity
19/// - Time threshold exceeded
20/// - Explicit flush called
21///
22/// # Example
23/// ```ignore
24/// let mut writer = BatchWriter::new(100, Duration::from_millis(100));
25/// writer.add(item)?;
26/// if writer.should_flush() {
27///     writer.flush()?;
28/// }
29/// ```
30pub struct BatchWriter<T> {
31    buffer: Vec<T>,
32    capacity: usize,
33    last_flush: Instant,
34    flush_interval: Duration,
35}
36
37impl<T> BatchWriter<T> {
38    /// Create new batch writer
39    ///
40    /// # Arguments
41    /// - `capacity`: Maximum items before auto-flush
42    /// - `flush_interval`: Maximum time between flushes
43    pub fn new(capacity: usize, flush_interval: Duration) -> Self {
44        Self {
45            buffer: Vec::with_capacity(capacity),
46            capacity,
47            last_flush: Instant::now(),
48            flush_interval,
49        }
50    }
51
52    /// Add item to buffer
53    pub fn add(&mut self, item: T) {
54        self.buffer.push(item);
55    }
56
57    /// Check if buffer should be flushed
58    pub fn should_flush(&self) -> bool {
59        self.buffer.len() >= self.capacity || self.last_flush.elapsed() >= self.flush_interval
60    }
61
62    /// Get current buffer size
63    pub fn len(&self) -> usize {
64        self.buffer.len()
65    }
66
67    /// Check if buffer is empty
68    pub fn is_empty(&self) -> bool {
69        self.buffer.is_empty()
70    }
71
72    /// Flush buffer and return items
73    pub fn flush(&mut self) -> Vec<T> {
74        self.last_flush = Instant::now();
75        std::mem::take(&mut self.buffer)
76    }
77
78    /// Get time since last flush
79    pub fn time_since_flush(&self) -> Duration {
80        self.last_flush.elapsed()
81    }
82}
83
84/// Performance metrics tracker
85#[derive(Debug, Clone)]
86pub struct PerformanceMetrics {
87    pub operations: u64,
88    pub total_duration: Duration,
89    pub min_duration: Option<Duration>,
90    pub max_duration: Option<Duration>,
91}
92
93impl PerformanceMetrics {
94    pub fn new() -> Self {
95        Self {
96            operations: 0,
97            total_duration: Duration::ZERO,
98            min_duration: None,
99            max_duration: None,
100        }
101    }
102
103    pub fn record(&mut self, duration: Duration) {
104        self.operations += 1;
105        self.total_duration += duration;
106
107        self.min_duration = Some(match self.min_duration {
108            Some(min) => min.min(duration),
109            None => duration,
110        });
111
112        self.max_duration = Some(match self.max_duration {
113            Some(max) => max.max(duration),
114            None => duration,
115        });
116    }
117
118    pub fn avg_duration(&self) -> Option<Duration> {
119        if self.operations == 0 {
120            None
121        } else {
122            Some(self.total_duration / self.operations as u32)
123        }
124    }
125
126    pub fn throughput_per_sec(&self) -> f64 {
127        if self.total_duration.as_secs_f64() == 0.0 {
128            0.0
129        } else {
130            self.operations as f64 / self.total_duration.as_secs_f64()
131        }
132    }
133}
134
135impl Default for PerformanceMetrics {
136    fn default() -> Self {
137        Self::new()
138    }
139}
140
141/// Memory pool for reducing allocations
142///
143/// Pre-allocates buffers and reuses them to avoid allocation overhead.
144///
145/// # SierraDB Pattern
146/// - Reduces GC pressure
147/// - Improves throughput for high-frequency operations
148/// - Thread-local pools avoid contention
149pub struct MemoryPool<T> {
150    pool: Vec<Vec<T>>,
151    capacity: usize,
152    max_pool_size: usize,
153}
154
155impl<T> MemoryPool<T> {
156    pub fn new(capacity: usize, max_pool_size: usize) -> Self {
157        Self {
158            pool: Vec::new(),
159            capacity,
160            max_pool_size,
161        }
162    }
163
164    /// Get a buffer from the pool or create new one
165    pub fn get(&mut self) -> Vec<T> {
166        self.pool
167            .pop()
168            .unwrap_or_else(|| Vec::with_capacity(self.capacity))
169    }
170
171    /// Return buffer to pool for reuse
172    pub fn put(&mut self, mut buffer: Vec<T>) {
173        if self.pool.len() < self.max_pool_size {
174            buffer.clear();
175            self.pool.push(buffer);
176        }
177    }
178
179    /// Get current pool size
180    pub fn pool_size(&self) -> usize {
181        self.pool.len()
182    }
183}
184
185#[cfg(test)]
186mod tests {
187    use super::*;
188
189    #[test]
190    fn test_batch_writer_capacity() {
191        let mut writer = BatchWriter::new(3, Duration::from_secs(10));
192
193        writer.add(1);
194        writer.add(2);
195        assert!(!writer.should_flush());
196
197        writer.add(3);
198        assert!(writer.should_flush());
199
200        let items = writer.flush();
201        assert_eq!(items, vec![1, 2, 3]);
202        assert!(writer.is_empty());
203    }
204
205    #[test]
206    fn test_batch_writer_time_threshold() {
207        let mut writer = BatchWriter::new(100, Duration::from_millis(1));
208
209        writer.add(1);
210        std::thread::sleep(Duration::from_millis(2));
211
212        assert!(writer.should_flush());
213    }
214
215    #[test]
216    fn test_performance_metrics() {
217        let mut metrics = PerformanceMetrics::new();
218
219        metrics.record(Duration::from_millis(10));
220        metrics.record(Duration::from_millis(20));
221        metrics.record(Duration::from_millis(30));
222
223        assert_eq!(metrics.operations, 3);
224        assert_eq!(metrics.avg_duration(), Some(Duration::from_millis(20)));
225        assert_eq!(metrics.min_duration, Some(Duration::from_millis(10)));
226        assert_eq!(metrics.max_duration, Some(Duration::from_millis(30)));
227    }
228
229    #[test]
230    fn test_performance_metrics_throughput() {
231        let mut metrics = PerformanceMetrics::new();
232
233        for _ in 0..100 {
234            metrics.record(Duration::from_millis(10));
235        }
236
237        let throughput = metrics.throughput_per_sec();
238        assert!(throughput > 90.0 && throughput < 110.0); // ~100 ops/sec
239    }
240
241    #[test]
242    fn test_memory_pool() {
243        let mut pool: MemoryPool<i32> = MemoryPool::new(10, 5);
244
245        let buf1 = pool.get();
246        assert_eq!(buf1.capacity(), 10);
247
248        let mut buf2 = pool.get();
249        buf2.push(1);
250        buf2.push(2);
251
252        pool.put(buf2);
253        assert_eq!(pool.pool_size(), 1);
254
255        let buf3 = pool.get();
256        assert_eq!(buf3.len(), 0); // Should be cleared
257        assert_eq!(buf3.capacity(), 10);
258    }
259
260    #[test]
261    fn test_memory_pool_max_size() {
262        let mut pool: MemoryPool<i32> = MemoryPool::new(10, 2);
263
264        pool.put(vec![]);
265        pool.put(vec![]);
266        pool.put(vec![]); // Should be dropped
267
268        assert_eq!(pool.pool_size(), 2);
269    }
270
271    #[test]
272    fn test_batch_writer_len() {
273        let mut writer = BatchWriter::new(10, Duration::from_secs(10));
274
275        assert_eq!(writer.len(), 0);
276        writer.add(1);
277        assert_eq!(writer.len(), 1);
278        writer.add(2);
279        assert_eq!(writer.len(), 2);
280
281        writer.flush();
282        assert_eq!(writer.len(), 0);
283    }
284}