rag_plusplus_core/buffer/
write_buffer.rs1use crate::error::Result;
6use crate::index::VectorIndex;
7use crate::store::RecordStore;
8use crate::types::{MemoryRecord, RecordId};
9use crate::wal::WalWriter;
10use parking_lot::RwLock;
11use std::collections::VecDeque;
12use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
13use std::sync::Arc;
14use std::time::Instant;
15
16#[derive(Debug, Clone)]
18pub enum BufferedOp {
19 Insert(MemoryRecord),
21 UpdateStats { id: RecordId, outcome: f64 },
23 Delete(RecordId),
25}
26
27#[derive(Debug, Clone)]
29pub struct WriteBufferConfig {
30 pub max_ops: usize,
32 pub max_bytes: usize,
34 pub max_age_ms: u64,
36 pub use_wal: bool,
38}
39
40impl Default for WriteBufferConfig {
41 fn default() -> Self {
42 Self {
43 max_ops: 1000,
44 max_bytes: 64 * 1024 * 1024, max_age_ms: 5000, use_wal: true,
47 }
48 }
49}
50
51impl WriteBufferConfig {
52 #[must_use]
54 pub fn new() -> Self {
55 Self::default()
56 }
57
58 #[must_use]
60 pub const fn with_max_ops(mut self, max: usize) -> Self {
61 self.max_ops = max;
62 self
63 }
64
65 #[must_use]
67 pub const fn with_max_bytes(mut self, max: usize) -> Self {
68 self.max_bytes = max;
69 self
70 }
71
72 #[must_use]
74 pub const fn with_max_age_ms(mut self, max: u64) -> Self {
75 self.max_age_ms = max;
76 self
77 }
78
79 #[must_use]
81 pub const fn without_wal(mut self) -> Self {
82 self.use_wal = false;
83 self
84 }
85}
86
87#[derive(Debug, Clone, Default)]
89pub struct BufferStats {
90 pub buffered_ops: usize,
92 pub buffered_bytes: usize,
94 pub total_inserts: u64,
96 pub total_updates: u64,
98 pub total_deletes: u64,
100 pub flush_count: u64,
102}
103
104pub struct WriteBuffer {
129 config: WriteBufferConfig,
130 wal: Option<Arc<WalWriter>>,
132 ops: RwLock<VecDeque<BufferedOp>>,
134 size_bytes: AtomicUsize,
136 last_flush: RwLock<Instant>,
138 total_inserts: AtomicU64,
140 total_updates: AtomicU64,
141 total_deletes: AtomicU64,
142 flush_count: AtomicU64,
143}
144
145impl std::fmt::Debug for WriteBuffer {
146 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147 f.debug_struct("WriteBuffer")
148 .field("config", &self.config)
149 .field("ops_count", &self.ops.read().len())
150 .field("size_bytes", &self.size_bytes.load(Ordering::Relaxed))
151 .finish()
152 }
153}
154
155impl WriteBuffer {
156 #[must_use]
158 pub fn new(config: WriteBufferConfig, wal: Arc<WalWriter>) -> Self {
159 Self {
160 config,
161 wal: Some(wal),
162 ops: RwLock::new(VecDeque::new()),
163 size_bytes: AtomicUsize::new(0),
164 last_flush: RwLock::new(Instant::now()),
165 total_inserts: AtomicU64::new(0),
166 total_updates: AtomicU64::new(0),
167 total_deletes: AtomicU64::new(0),
168 flush_count: AtomicU64::new(0),
169 }
170 }
171
172 #[must_use]
174 pub fn without_wal(config: WriteBufferConfig) -> Self {
175 Self {
176 config,
177 wal: None,
178 ops: RwLock::new(VecDeque::new()),
179 size_bytes: AtomicUsize::new(0),
180 last_flush: RwLock::new(Instant::now()),
181 total_inserts: AtomicU64::new(0),
182 total_updates: AtomicU64::new(0),
183 total_deletes: AtomicU64::new(0),
184 flush_count: AtomicU64::new(0),
185 }
186 }
187
188 fn estimate_op_size(op: &BufferedOp) -> usize {
190 match op {
191 BufferedOp::Insert(record) => {
192 std::mem::size_of::<MemoryRecord>()
193 + record.embedding.len() * 4
194 + record.context.len()
195 + record.id.len()
196 }
197 BufferedOp::UpdateStats { .. } => 32,
198 BufferedOp::Delete(_) => 32,
199 }
200 }
201
202 fn should_flush(&self) -> bool {
204 let ops = self.ops.read();
205 let size = self.size_bytes.load(Ordering::Relaxed);
206 let last_flush = self.last_flush.read();
207
208 if ops.len() >= self.config.max_ops {
210 return true;
211 }
212
213 if size >= self.config.max_bytes {
215 return true;
216 }
217
218 if self.config.max_age_ms > 0 {
220 let age = last_flush.elapsed().as_millis() as u64;
221 if age >= self.config.max_age_ms && !ops.is_empty() {
222 return true;
223 }
224 }
225
226 false
227 }
228
229 pub fn insert(&self, record: MemoryRecord) -> Result<()> {
233 if let Some(wal) = &self.wal {
235 wal.log_insert(&record)?;
236 }
237
238 let op = BufferedOp::Insert(record);
240 let size = Self::estimate_op_size(&op);
241
242 {
243 let mut ops = self.ops.write();
244 ops.push_back(op);
245 }
246
247 self.size_bytes.fetch_add(size, Ordering::Relaxed);
248 self.total_inserts.fetch_add(1, Ordering::Relaxed);
249
250 Ok(())
251 }
252
253 pub fn update_stats(&self, id: &RecordId, outcome: f64) -> Result<()> {
255 if let Some(wal) = &self.wal {
257 wal.log_update_stats(id, outcome)?;
258 }
259
260 let op = BufferedOp::UpdateStats {
262 id: id.clone(),
263 outcome,
264 };
265 let size = Self::estimate_op_size(&op);
266
267 {
268 let mut ops = self.ops.write();
269 ops.push_back(op);
270 }
271
272 self.size_bytes.fetch_add(size, Ordering::Relaxed);
273 self.total_updates.fetch_add(1, Ordering::Relaxed);
274
275 Ok(())
276 }
277
278 pub fn delete(&self, id: &RecordId) -> Result<()> {
280 if let Some(wal) = &self.wal {
282 wal.log_delete(id)?;
283 }
284
285 let op = BufferedOp::Delete(id.clone());
287 let size = Self::estimate_op_size(&op);
288
289 {
290 let mut ops = self.ops.write();
291 ops.push_back(op);
292 }
293
294 self.size_bytes.fetch_add(size, Ordering::Relaxed);
295 self.total_deletes.fetch_add(1, Ordering::Relaxed);
296
297 Ok(())
298 }
299
300 pub fn flush<S: RecordStore, I: VectorIndex>(
304 &self,
305 store: &mut S,
306 index: &mut I,
307 ) -> Result<FlushResult> {
308 let ops: Vec<BufferedOp> = {
309 let mut ops_guard = self.ops.write();
310 std::mem::take(&mut *ops_guard).into()
311 };
312
313 if ops.is_empty() {
314 return Ok(FlushResult::default());
315 }
316
317 let mut result = FlushResult::default();
318
319 for op in ops {
320 match op {
321 BufferedOp::Insert(record) => {
322 index.add(record.id.to_string(), &record.embedding)?;
324
325 store.insert(record)?;
327
328 result.inserts += 1;
329 }
330 BufferedOp::UpdateStats { id, outcome } => {
331 store.update_stats(&id, outcome)?;
332 result.updates += 1;
333 }
334 BufferedOp::Delete(id) => {
335 index.remove(id.as_str())?;
337
338 store.remove(&id)?;
340
341 result.deletes += 1;
342 }
343 }
344 }
345
346 self.size_bytes.store(0, Ordering::SeqCst);
348 *self.last_flush.write() = Instant::now();
349 self.flush_count.fetch_add(1, Ordering::Relaxed);
350
351 if let Some(wal) = &self.wal {
353 wal.log_checkpoint()?;
354 }
355
356 Ok(result)
357 }
358
359 pub fn flush_to_store<S: RecordStore>(&self, store: &mut S) -> Result<FlushResult> {
361 let ops: Vec<BufferedOp> = {
362 let mut ops_guard = self.ops.write();
363 std::mem::take(&mut *ops_guard).into()
364 };
365
366 if ops.is_empty() {
367 return Ok(FlushResult::default());
368 }
369
370 let mut result = FlushResult::default();
371
372 for op in ops {
373 match op {
374 BufferedOp::Insert(record) => {
375 store.insert(record)?;
376 result.inserts += 1;
377 }
378 BufferedOp::UpdateStats { id, outcome } => {
379 store.update_stats(&id, outcome)?;
380 result.updates += 1;
381 }
382 BufferedOp::Delete(id) => {
383 store.remove(&id)?;
384 result.deletes += 1;
385 }
386 }
387 }
388
389 self.size_bytes.store(0, Ordering::SeqCst);
390 *self.last_flush.write() = Instant::now();
391 self.flush_count.fetch_add(1, Ordering::Relaxed);
392
393 Ok(result)
394 }
395
396 pub fn maybe_flush<S: RecordStore, I: VectorIndex>(
400 &self,
401 store: &mut S,
402 index: &mut I,
403 ) -> Result<bool> {
404 if self.should_flush() {
405 self.flush(store, index)?;
406 Ok(true)
407 } else {
408 Ok(false)
409 }
410 }
411
412 #[must_use]
414 pub fn stats(&self) -> BufferStats {
415 BufferStats {
416 buffered_ops: self.ops.read().len(),
417 buffered_bytes: self.size_bytes.load(Ordering::Relaxed),
418 total_inserts: self.total_inserts.load(Ordering::Relaxed),
419 total_updates: self.total_updates.load(Ordering::Relaxed),
420 total_deletes: self.total_deletes.load(Ordering::Relaxed),
421 flush_count: self.flush_count.load(Ordering::Relaxed),
422 }
423 }
424
425 #[must_use]
427 pub fn is_empty(&self) -> bool {
428 self.ops.read().is_empty()
429 }
430
431 #[must_use]
433 pub fn len(&self) -> usize {
434 self.ops.read().len()
435 }
436}
437
438#[derive(Debug, Clone, Default)]
440pub struct FlushResult {
441 pub inserts: usize,
443 pub updates: usize,
445 pub deletes: usize,
447}
448
449impl FlushResult {
450 #[must_use]
452 pub fn total(&self) -> usize {
453 self.inserts + self.updates + self.deletes
454 }
455}
456
457#[cfg(test)]
458mod tests {
459 use super::*;
460 use crate::index::{FlatIndex, IndexConfig};
461 use crate::stats::OutcomeStats;
462 use crate::store::InMemoryStore;
463 use crate::types::RecordStatus;
464
465 fn create_test_record(id: &str) -> MemoryRecord {
466 MemoryRecord {
467 id: id.into(),
468 embedding: vec![1.0, 2.0, 3.0],
469 context: format!("Context for {id}"),
470 outcome: 0.5,
471 metadata: Default::default(),
472 created_at: 1234567890,
473 status: RecordStatus::Active,
474 stats: OutcomeStats::new(1),
475 }
476 }
477
478 #[test]
479 fn test_buffer_insert_and_flush() {
480 let config = WriteBufferConfig::new().without_wal();
481 let buffer = WriteBuffer::without_wal(config);
482
483 buffer.insert(create_test_record("rec-1")).unwrap();
484 buffer.insert(create_test_record("rec-2")).unwrap();
485
486 assert_eq!(buffer.len(), 2);
487
488 let mut store = InMemoryStore::new();
489 let mut index = FlatIndex::new(IndexConfig::new(3));
490
491 let result = buffer.flush(&mut store, &mut index).unwrap();
492
493 assert_eq!(result.inserts, 2);
494 assert_eq!(store.len(), 2);
495 assert_eq!(index.len(), 2);
496 assert!(buffer.is_empty());
497 }
498
499 #[test]
500 fn test_buffer_update_stats() {
501 let config = WriteBufferConfig::new().without_wal();
502 let buffer = WriteBuffer::without_wal(config);
503
504 buffer.insert(create_test_record("rec-1")).unwrap();
505 buffer.update_stats(&"rec-1".into(), 0.8).unwrap();
506 buffer.update_stats(&"rec-1".into(), 0.9).unwrap();
507
508 let mut store = InMemoryStore::new();
509 let mut index = FlatIndex::new(IndexConfig::new(3));
510
511 let result = buffer.flush(&mut store, &mut index).unwrap();
512
513 assert_eq!(result.inserts, 1);
514 assert_eq!(result.updates, 2);
515
516 let record = store.get(&"rec-1".into()).unwrap();
517 assert_eq!(record.stats.count(), 2);
518 }
519
520 #[test]
521 fn test_buffer_delete() {
522 let config = WriteBufferConfig::new().without_wal();
523 let buffer = WriteBuffer::without_wal(config);
524
525 buffer.insert(create_test_record("rec-1")).unwrap();
526 buffer.insert(create_test_record("rec-2")).unwrap();
527
528 let mut store = InMemoryStore::new();
529 let mut index = FlatIndex::new(IndexConfig::new(3));
530
531 buffer.flush(&mut store, &mut index).unwrap();
533
534 buffer.delete(&"rec-1".into()).unwrap();
536 let result = buffer.flush(&mut store, &mut index).unwrap();
537
538 assert_eq!(result.deletes, 1);
539 assert_eq!(store.len(), 1);
540 assert_eq!(index.len(), 1);
541 }
542
543 #[test]
544 fn test_auto_flush_by_ops() {
545 let config = WriteBufferConfig::new()
546 .without_wal()
547 .with_max_ops(5);
548 let buffer = WriteBuffer::without_wal(config);
549
550 let mut store = InMemoryStore::new();
551 let mut index = FlatIndex::new(IndexConfig::new(3));
552
553 for i in 0..4 {
554 buffer.insert(create_test_record(&format!("rec-{i}"))).unwrap();
555 buffer.maybe_flush(&mut store, &mut index).unwrap();
556 }
557
558 assert!(!buffer.is_empty());
560
561 buffer.insert(create_test_record("rec-4")).unwrap();
563 let flushed = buffer.maybe_flush(&mut store, &mut index).unwrap();
564
565 assert!(flushed);
566 assert!(buffer.is_empty());
567 assert_eq!(store.len(), 5);
568 }
569
570 #[test]
571 fn test_buffer_stats() {
572 let config = WriteBufferConfig::new().without_wal();
573 let buffer = WriteBuffer::without_wal(config);
574
575 buffer.insert(create_test_record("rec-1")).unwrap();
576 buffer.insert(create_test_record("rec-2")).unwrap();
577 buffer.update_stats(&"rec-1".into(), 0.8).unwrap();
578 buffer.delete(&"rec-2".into()).unwrap();
579
580 let stats = buffer.stats();
581 assert_eq!(stats.buffered_ops, 4);
582 assert!(stats.buffered_bytes > 0);
583 assert_eq!(stats.total_inserts, 2);
584 assert_eq!(stats.total_updates, 1);
585 assert_eq!(stats.total_deletes, 1);
586
587 let mut store = InMemoryStore::new();
588 let mut index = FlatIndex::new(IndexConfig::new(3));
589 buffer.flush(&mut store, &mut index).unwrap();
590
591 let stats_after = buffer.stats();
592 assert_eq!(stats_after.buffered_ops, 0);
593 assert_eq!(stats_after.flush_count, 1);
594 }
595
596 #[test]
597 fn test_flush_to_store_only() {
598 let config = WriteBufferConfig::new().without_wal();
599 let buffer = WriteBuffer::without_wal(config);
600
601 buffer.insert(create_test_record("rec-1")).unwrap();
602
603 let mut store = InMemoryStore::new();
604 let result = buffer.flush_to_store(&mut store).unwrap();
605
606 assert_eq!(result.inserts, 1);
607 assert_eq!(store.len(), 1);
608 }
609
610 #[test]
611 fn test_empty_flush() {
612 let config = WriteBufferConfig::new().without_wal();
613 let buffer = WriteBuffer::without_wal(config);
614
615 let mut store = InMemoryStore::new();
616 let mut index = FlatIndex::new(IndexConfig::new(3));
617
618 let result = buffer.flush(&mut store, &mut index).unwrap();
619 assert_eq!(result.total(), 0);
620 }
621}