rag_plusplus_core/buffer/
write_buffer.rs

1//! Write Buffer Implementation
2//!
3//! Buffers write operations for batched flushing.
4
5use crate::error::Result;
6use crate::index::VectorIndex;
7use crate::store::RecordStore;
8use crate::types::{MemoryRecord, RecordId};
9use crate::wal::WalWriter;
10use parking_lot::RwLock;
11use std::collections::VecDeque;
12use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
13use std::sync::Arc;
14use std::time::Instant;
15
16/// Buffered operation type.
17#[derive(Debug, Clone)]
18pub enum BufferedOp {
19    /// Insert a new record
20    Insert(MemoryRecord),
21    /// Update statistics
22    UpdateStats { id: RecordId, outcome: f64 },
23    /// Delete a record
24    Delete(RecordId),
25}
26
27/// Write buffer configuration.
28#[derive(Debug, Clone)]
29pub struct WriteBufferConfig {
30    /// Maximum number of operations before auto-flush
31    pub max_ops: usize,
32    /// Maximum buffer size in bytes before auto-flush
33    pub max_bytes: usize,
34    /// Maximum time in milliseconds before auto-flush (0 = disabled)
35    pub max_age_ms: u64,
36    /// Whether to use WAL
37    pub use_wal: bool,
38}
39
40impl Default for WriteBufferConfig {
41    fn default() -> Self {
42        Self {
43            max_ops: 1000,
44            max_bytes: 64 * 1024 * 1024, // 64 MB
45            max_age_ms: 5000,             // 5 seconds
46            use_wal: true,
47        }
48    }
49}
50
51impl WriteBufferConfig {
52    /// Create new config with defaults.
53    #[must_use]
54    pub fn new() -> Self {
55        Self::default()
56    }
57
58    /// Set maximum operations.
59    #[must_use]
60    pub const fn with_max_ops(mut self, max: usize) -> Self {
61        self.max_ops = max;
62        self
63    }
64
65    /// Set maximum bytes.
66    #[must_use]
67    pub const fn with_max_bytes(mut self, max: usize) -> Self {
68        self.max_bytes = max;
69        self
70    }
71
72    /// Set maximum age in milliseconds.
73    #[must_use]
74    pub const fn with_max_age_ms(mut self, max: u64) -> Self {
75        self.max_age_ms = max;
76        self
77    }
78
79    /// Disable WAL (for testing only).
80    #[must_use]
81    pub const fn without_wal(mut self) -> Self {
82        self.use_wal = false;
83        self
84    }
85}
86
87/// Buffer statistics.
88#[derive(Debug, Clone, Default)]
89pub struct BufferStats {
90    /// Number of operations currently buffered
91    pub buffered_ops: usize,
92    /// Estimated buffer size in bytes
93    pub buffered_bytes: usize,
94    /// Total inserts (including flushed)
95    pub total_inserts: u64,
96    /// Total updates (including flushed)
97    pub total_updates: u64,
98    /// Total deletes (including flushed)
99    pub total_deletes: u64,
100    /// Number of flushes performed
101    pub flush_count: u64,
102}
103
104/// Write buffer for batched operations.
105///
106/// Provides durability through WAL and performance through batching.
107///
108/// # Example
109///
110/// ```ignore
111/// use rag_plusplus_core::buffer::{WriteBuffer, WriteBufferConfig};
112///
113/// let wal = WalWriter::new(wal_config)?;
114/// let mut store = InMemoryStore::new();
115/// let mut index = FlatIndex::new(IndexConfig::new(128));
116///
117/// let mut buffer = WriteBuffer::new(
118///     WriteBufferConfig::new(),
119///     Arc::new(wal),
120/// );
121///
122/// buffer.insert(record)?;
123/// buffer.update_stats(&id, 0.9)?;
124///
125/// // Flush to store and index
126/// buffer.flush(&mut store, &mut index)?;
127/// ```
128pub struct WriteBuffer {
129    config: WriteBufferConfig,
130    /// WAL writer (optional)
131    wal: Option<Arc<WalWriter>>,
132    /// Buffered operations
133    ops: RwLock<VecDeque<BufferedOp>>,
134    /// Estimated buffer size
135    size_bytes: AtomicUsize,
136    /// Buffer creation/last flush time
137    last_flush: RwLock<Instant>,
138    /// Statistics
139    total_inserts: AtomicU64,
140    total_updates: AtomicU64,
141    total_deletes: AtomicU64,
142    flush_count: AtomicU64,
143}
144
145impl std::fmt::Debug for WriteBuffer {
146    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147        f.debug_struct("WriteBuffer")
148            .field("config", &self.config)
149            .field("ops_count", &self.ops.read().len())
150            .field("size_bytes", &self.size_bytes.load(Ordering::Relaxed))
151            .finish()
152    }
153}
154
155impl WriteBuffer {
156    /// Create a new write buffer with WAL.
157    #[must_use]
158    pub fn new(config: WriteBufferConfig, wal: Arc<WalWriter>) -> Self {
159        Self {
160            config,
161            wal: Some(wal),
162            ops: RwLock::new(VecDeque::new()),
163            size_bytes: AtomicUsize::new(0),
164            last_flush: RwLock::new(Instant::now()),
165            total_inserts: AtomicU64::new(0),
166            total_updates: AtomicU64::new(0),
167            total_deletes: AtomicU64::new(0),
168            flush_count: AtomicU64::new(0),
169        }
170    }
171
172    /// Create a write buffer without WAL (testing/in-memory only).
173    #[must_use]
174    pub fn without_wal(config: WriteBufferConfig) -> Self {
175        Self {
176            config,
177            wal: None,
178            ops: RwLock::new(VecDeque::new()),
179            size_bytes: AtomicUsize::new(0),
180            last_flush: RwLock::new(Instant::now()),
181            total_inserts: AtomicU64::new(0),
182            total_updates: AtomicU64::new(0),
183            total_deletes: AtomicU64::new(0),
184            flush_count: AtomicU64::new(0),
185        }
186    }
187
188    /// Estimate size of an operation.
189    fn estimate_op_size(op: &BufferedOp) -> usize {
190        match op {
191            BufferedOp::Insert(record) => {
192                std::mem::size_of::<MemoryRecord>()
193                    + record.embedding.len() * 4
194                    + record.context.len()
195                    + record.id.len()
196            }
197            BufferedOp::UpdateStats { .. } => 32,
198            BufferedOp::Delete(_) => 32,
199        }
200    }
201
202    /// Check if buffer should auto-flush.
203    fn should_flush(&self) -> bool {
204        let ops = self.ops.read();
205        let size = self.size_bytes.load(Ordering::Relaxed);
206        let last_flush = self.last_flush.read();
207
208        // Check capacity
209        if ops.len() >= self.config.max_ops {
210            return true;
211        }
212
213        // Check size
214        if size >= self.config.max_bytes {
215            return true;
216        }
217
218        // Check age
219        if self.config.max_age_ms > 0 {
220            let age = last_flush.elapsed().as_millis() as u64;
221            if age >= self.config.max_age_ms && !ops.is_empty() {
222                return true;
223            }
224        }
225
226        false
227    }
228
229    /// Insert a record.
230    ///
231    /// The record is logged to WAL (if enabled) and buffered.
232    pub fn insert(&self, record: MemoryRecord) -> Result<()> {
233        // Write to WAL first (INV-003)
234        if let Some(wal) = &self.wal {
235            wal.log_insert(&record)?;
236        }
237
238        // Buffer the operation
239        let op = BufferedOp::Insert(record);
240        let size = Self::estimate_op_size(&op);
241
242        {
243            let mut ops = self.ops.write();
244            ops.push_back(op);
245        }
246
247        self.size_bytes.fetch_add(size, Ordering::Relaxed);
248        self.total_inserts.fetch_add(1, Ordering::Relaxed);
249
250        Ok(())
251    }
252
253    /// Update record statistics.
254    pub fn update_stats(&self, id: &RecordId, outcome: f64) -> Result<()> {
255        // Write to WAL first
256        if let Some(wal) = &self.wal {
257            wal.log_update_stats(id, outcome)?;
258        }
259
260        // Buffer the operation
261        let op = BufferedOp::UpdateStats {
262            id: id.clone(),
263            outcome,
264        };
265        let size = Self::estimate_op_size(&op);
266
267        {
268            let mut ops = self.ops.write();
269            ops.push_back(op);
270        }
271
272        self.size_bytes.fetch_add(size, Ordering::Relaxed);
273        self.total_updates.fetch_add(1, Ordering::Relaxed);
274
275        Ok(())
276    }
277
278    /// Delete a record.
279    pub fn delete(&self, id: &RecordId) -> Result<()> {
280        // Write to WAL first
281        if let Some(wal) = &self.wal {
282            wal.log_delete(id)?;
283        }
284
285        // Buffer the operation
286        let op = BufferedOp::Delete(id.clone());
287        let size = Self::estimate_op_size(&op);
288
289        {
290            let mut ops = self.ops.write();
291            ops.push_back(op);
292        }
293
294        self.size_bytes.fetch_add(size, Ordering::Relaxed);
295        self.total_deletes.fetch_add(1, Ordering::Relaxed);
296
297        Ok(())
298    }
299
300    /// Flush buffer to store and index.
301    ///
302    /// All buffered operations are applied atomically.
303    pub fn flush<S: RecordStore, I: VectorIndex>(
304        &self,
305        store: &mut S,
306        index: &mut I,
307    ) -> Result<FlushResult> {
308        let ops: Vec<BufferedOp> = {
309            let mut ops_guard = self.ops.write();
310            std::mem::take(&mut *ops_guard).into()
311        };
312
313        if ops.is_empty() {
314            return Ok(FlushResult::default());
315        }
316
317        let mut result = FlushResult::default();
318
319        for op in ops {
320            match op {
321                BufferedOp::Insert(record) => {
322                    // Add to index
323                    index.add(record.id.to_string(), &record.embedding)?;
324
325                    // Add to store
326                    store.insert(record)?;
327
328                    result.inserts += 1;
329                }
330                BufferedOp::UpdateStats { id, outcome } => {
331                    store.update_stats(&id, outcome)?;
332                    result.updates += 1;
333                }
334                BufferedOp::Delete(id) => {
335                    // Remove from index
336                    index.remove(id.as_str())?;
337
338                    // Remove from store
339                    store.remove(&id)?;
340
341                    result.deletes += 1;
342                }
343            }
344        }
345
346        // Reset buffer state
347        self.size_bytes.store(0, Ordering::SeqCst);
348        *self.last_flush.write() = Instant::now();
349        self.flush_count.fetch_add(1, Ordering::Relaxed);
350
351        // Checkpoint WAL
352        if let Some(wal) = &self.wal {
353            wal.log_checkpoint()?;
354        }
355
356        Ok(result)
357    }
358
359    /// Flush only to store (no index update).
360    pub fn flush_to_store<S: RecordStore>(&self, store: &mut S) -> Result<FlushResult> {
361        let ops: Vec<BufferedOp> = {
362            let mut ops_guard = self.ops.write();
363            std::mem::take(&mut *ops_guard).into()
364        };
365
366        if ops.is_empty() {
367            return Ok(FlushResult::default());
368        }
369
370        let mut result = FlushResult::default();
371
372        for op in ops {
373            match op {
374                BufferedOp::Insert(record) => {
375                    store.insert(record)?;
376                    result.inserts += 1;
377                }
378                BufferedOp::UpdateStats { id, outcome } => {
379                    store.update_stats(&id, outcome)?;
380                    result.updates += 1;
381                }
382                BufferedOp::Delete(id) => {
383                    store.remove(&id)?;
384                    result.deletes += 1;
385                }
386            }
387        }
388
389        self.size_bytes.store(0, Ordering::SeqCst);
390        *self.last_flush.write() = Instant::now();
391        self.flush_count.fetch_add(1, Ordering::Relaxed);
392
393        Ok(result)
394    }
395
396    /// Auto-flush if buffer thresholds are exceeded.
397    ///
398    /// Returns `true` if flush occurred.
399    pub fn maybe_flush<S: RecordStore, I: VectorIndex>(
400        &self,
401        store: &mut S,
402        index: &mut I,
403    ) -> Result<bool> {
404        if self.should_flush() {
405            self.flush(store, index)?;
406            Ok(true)
407        } else {
408            Ok(false)
409        }
410    }
411
412    /// Get buffer statistics.
413    #[must_use]
414    pub fn stats(&self) -> BufferStats {
415        BufferStats {
416            buffered_ops: self.ops.read().len(),
417            buffered_bytes: self.size_bytes.load(Ordering::Relaxed),
418            total_inserts: self.total_inserts.load(Ordering::Relaxed),
419            total_updates: self.total_updates.load(Ordering::Relaxed),
420            total_deletes: self.total_deletes.load(Ordering::Relaxed),
421            flush_count: self.flush_count.load(Ordering::Relaxed),
422        }
423    }
424
425    /// Check if buffer is empty.
426    #[must_use]
427    pub fn is_empty(&self) -> bool {
428        self.ops.read().is_empty()
429    }
430
431    /// Get number of buffered operations.
432    #[must_use]
433    pub fn len(&self) -> usize {
434        self.ops.read().len()
435    }
436}
437
438/// Result of a flush operation.
439#[derive(Debug, Clone, Default)]
440pub struct FlushResult {
441    /// Number of inserts applied
442    pub inserts: usize,
443    /// Number of updates applied
444    pub updates: usize,
445    /// Number of deletes applied
446    pub deletes: usize,
447}
448
449impl FlushResult {
450    /// Total operations applied.
451    #[must_use]
452    pub fn total(&self) -> usize {
453        self.inserts + self.updates + self.deletes
454    }
455}
456
457#[cfg(test)]
458mod tests {
459    use super::*;
460    use crate::index::{FlatIndex, IndexConfig};
461    use crate::stats::OutcomeStats;
462    use crate::store::InMemoryStore;
463    use crate::types::RecordStatus;
464
465    fn create_test_record(id: &str) -> MemoryRecord {
466        MemoryRecord {
467            id: id.into(),
468            embedding: vec![1.0, 2.0, 3.0],
469            context: format!("Context for {id}"),
470            outcome: 0.5,
471            metadata: Default::default(),
472            created_at: 1234567890,
473            status: RecordStatus::Active,
474            stats: OutcomeStats::new(1),
475        }
476    }
477
478    #[test]
479    fn test_buffer_insert_and_flush() {
480        let config = WriteBufferConfig::new().without_wal();
481        let buffer = WriteBuffer::without_wal(config);
482
483        buffer.insert(create_test_record("rec-1")).unwrap();
484        buffer.insert(create_test_record("rec-2")).unwrap();
485
486        assert_eq!(buffer.len(), 2);
487
488        let mut store = InMemoryStore::new();
489        let mut index = FlatIndex::new(IndexConfig::new(3));
490
491        let result = buffer.flush(&mut store, &mut index).unwrap();
492
493        assert_eq!(result.inserts, 2);
494        assert_eq!(store.len(), 2);
495        assert_eq!(index.len(), 2);
496        assert!(buffer.is_empty());
497    }
498
499    #[test]
500    fn test_buffer_update_stats() {
501        let config = WriteBufferConfig::new().without_wal();
502        let buffer = WriteBuffer::without_wal(config);
503
504        buffer.insert(create_test_record("rec-1")).unwrap();
505        buffer.update_stats(&"rec-1".into(), 0.8).unwrap();
506        buffer.update_stats(&"rec-1".into(), 0.9).unwrap();
507
508        let mut store = InMemoryStore::new();
509        let mut index = FlatIndex::new(IndexConfig::new(3));
510
511        let result = buffer.flush(&mut store, &mut index).unwrap();
512
513        assert_eq!(result.inserts, 1);
514        assert_eq!(result.updates, 2);
515
516        let record = store.get(&"rec-1".into()).unwrap();
517        assert_eq!(record.stats.count(), 2);
518    }
519
520    #[test]
521    fn test_buffer_delete() {
522        let config = WriteBufferConfig::new().without_wal();
523        let buffer = WriteBuffer::without_wal(config);
524
525        buffer.insert(create_test_record("rec-1")).unwrap();
526        buffer.insert(create_test_record("rec-2")).unwrap();
527
528        let mut store = InMemoryStore::new();
529        let mut index = FlatIndex::new(IndexConfig::new(3));
530
531        // First flush to populate store/index
532        buffer.flush(&mut store, &mut index).unwrap();
533
534        // Now delete
535        buffer.delete(&"rec-1".into()).unwrap();
536        let result = buffer.flush(&mut store, &mut index).unwrap();
537
538        assert_eq!(result.deletes, 1);
539        assert_eq!(store.len(), 1);
540        assert_eq!(index.len(), 1);
541    }
542
543    #[test]
544    fn test_auto_flush_by_ops() {
545        let config = WriteBufferConfig::new()
546            .without_wal()
547            .with_max_ops(5);
548        let buffer = WriteBuffer::without_wal(config);
549
550        let mut store = InMemoryStore::new();
551        let mut index = FlatIndex::new(IndexConfig::new(3));
552
553        for i in 0..4 {
554            buffer.insert(create_test_record(&format!("rec-{i}"))).unwrap();
555            buffer.maybe_flush(&mut store, &mut index).unwrap();
556        }
557
558        // Should not have flushed yet
559        assert!(!buffer.is_empty());
560
561        // 5th insert triggers flush check
562        buffer.insert(create_test_record("rec-4")).unwrap();
563        let flushed = buffer.maybe_flush(&mut store, &mut index).unwrap();
564
565        assert!(flushed);
566        assert!(buffer.is_empty());
567        assert_eq!(store.len(), 5);
568    }
569
570    #[test]
571    fn test_buffer_stats() {
572        let config = WriteBufferConfig::new().without_wal();
573        let buffer = WriteBuffer::without_wal(config);
574
575        buffer.insert(create_test_record("rec-1")).unwrap();
576        buffer.insert(create_test_record("rec-2")).unwrap();
577        buffer.update_stats(&"rec-1".into(), 0.8).unwrap();
578        buffer.delete(&"rec-2".into()).unwrap();
579
580        let stats = buffer.stats();
581        assert_eq!(stats.buffered_ops, 4);
582        assert!(stats.buffered_bytes > 0);
583        assert_eq!(stats.total_inserts, 2);
584        assert_eq!(stats.total_updates, 1);
585        assert_eq!(stats.total_deletes, 1);
586
587        let mut store = InMemoryStore::new();
588        let mut index = FlatIndex::new(IndexConfig::new(3));
589        buffer.flush(&mut store, &mut index).unwrap();
590
591        let stats_after = buffer.stats();
592        assert_eq!(stats_after.buffered_ops, 0);
593        assert_eq!(stats_after.flush_count, 1);
594    }
595
596    #[test]
597    fn test_flush_to_store_only() {
598        let config = WriteBufferConfig::new().without_wal();
599        let buffer = WriteBuffer::without_wal(config);
600
601        buffer.insert(create_test_record("rec-1")).unwrap();
602
603        let mut store = InMemoryStore::new();
604        let result = buffer.flush_to_store(&mut store).unwrap();
605
606        assert_eq!(result.inserts, 1);
607        assert_eq!(store.len(), 1);
608    }
609
610    #[test]
611    fn test_empty_flush() {
612        let config = WriteBufferConfig::new().without_wal();
613        let buffer = WriteBuffer::without_wal(config);
614
615        let mut store = InMemoryStore::new();
616        let mut index = FlatIndex::new(IndexConfig::new(3));
617
618        let result = buffer.flush(&mut store, &mut index).unwrap();
619        assert_eq!(result.total(), 0);
620    }
621}