Skip to main content

zeph_memory/semantic/
write_buffer.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Session-scoped write buffer for batching memory writes (#2478).
5//!
6//! Accumulates writes during a turn and flushes them in a single `BEGIN IMMEDIATE`
7//! transaction at turn end, reducing `SQLite` write-lock contention from N writes/turn
8//! to 1 transaction/turn.
9
10use std::collections::VecDeque;
11
12use crate::types::{ConversationId, MemoryTier};
13
14/// A single buffered write operation waiting to be flushed to the store.
15#[non_exhaustive]
16pub enum BufferedWrite {
17    /// Save a message to the messages table.
18    SaveMessage {
19        conversation_id: ConversationId,
20        role: String,
21        content: String,
22        tier: MemoryTier,
23    },
24    /// Upsert a persona fact.
25    UpsertPersonaFact {
26        category: String,
27        content: String,
28        confidence: f64,
29        source_conversation_id: Option<i64>,
30        supersedes_id: Option<i64>,
31    },
32    /// Store an embedding in the vector backend (dispatched after `SQLite` commit).
33    StoreEmbedding {
34        collection: String,
35        point_id: String,
36        vector: Vec<f32>,
37        payload: serde_json::Value,
38    },
39}
40
41/// Session-scoped write buffer.
42///
43/// Writes are queued via `push()` and flushed to the store in one transaction
44/// via `drain()`. The buffer is NOT thread-safe — it is owned by a single agent loop.
45pub struct WriteBuffer {
46    pending: VecDeque<BufferedWrite>,
47    /// Maximum pending writes before auto-flush is signalled.
48    capacity: usize,
49}
50
51impl WriteBuffer {
52    /// Create a new `WriteBuffer` with the given capacity.
53    ///
54    /// When `capacity` is reached, `push()` signals the caller to flush.
55    #[must_use]
56    pub fn new(capacity: usize) -> Self {
57        Self {
58            pending: VecDeque::with_capacity(capacity),
59            capacity,
60        }
61    }
62
63    /// Queue a write operation.
64    ///
65    /// Returns `true` if the buffer has reached capacity and should be flushed
66    /// before the next `push()`.
67    pub fn push(&mut self, write: BufferedWrite) -> bool {
68        self.pending.push_back(write);
69        self.pending.len() >= self.capacity
70    }
71
72    /// Drain all pending writes, returning them in insertion order.
73    ///
74    /// After this call, the buffer is empty and ready for the next turn.
75    pub fn drain(&mut self) -> Vec<BufferedWrite> {
76        self.pending.drain(..).collect()
77    }
78
79    #[must_use]
80    pub fn len(&self) -> usize {
81        self.pending.len()
82    }
83
84    #[must_use]
85    pub fn is_empty(&self) -> bool {
86        self.pending.is_empty()
87    }
88}
89
90#[cfg(test)]
91mod tests {
92    use super::*;
93    use crate::types::{ConversationId, MemoryTier};
94
95    fn make_save_message() -> BufferedWrite {
96        BufferedWrite::SaveMessage {
97            conversation_id: ConversationId(1),
98            role: "user".into(),
99            content: "hello".into(),
100            tier: MemoryTier::Episodic,
101        }
102    }
103
104    #[test]
105    fn push_increases_len() {
106        let mut buf = WriteBuffer::new(5);
107        assert!(buf.is_empty());
108        buf.push(make_save_message());
109        assert_eq!(buf.len(), 1);
110    }
111
112    #[test]
113    fn push_returns_false_below_capacity() {
114        let mut buf = WriteBuffer::new(3);
115        let at_capacity = buf.push(make_save_message());
116        assert!(!at_capacity);
117        let at_capacity = buf.push(make_save_message());
118        assert!(!at_capacity);
119    }
120
121    #[test]
122    fn push_returns_true_at_capacity() {
123        let mut buf = WriteBuffer::new(2);
124        buf.push(make_save_message());
125        let at_capacity = buf.push(make_save_message());
126        assert!(at_capacity);
127    }
128
129    #[test]
130    fn drain_returns_all_items_and_clears_buffer() {
131        let mut buf = WriteBuffer::new(10);
132        buf.push(make_save_message());
133        buf.push(make_save_message());
134        buf.push(make_save_message());
135
136        let drained = buf.drain();
137        assert_eq!(drained.len(), 3);
138        assert!(buf.is_empty());
139        assert_eq!(buf.len(), 0);
140    }
141
142    #[test]
143    fn drain_on_empty_buffer_returns_empty_vec() {
144        let mut buf = WriteBuffer::new(5);
145        let drained = buf.drain();
146        assert!(drained.is_empty());
147    }
148
149    #[test]
150    fn capacity_one_signals_on_first_push() {
151        let mut buf = WriteBuffer::new(1);
152        let at_capacity = buf.push(make_save_message());
153        assert!(at_capacity);
154    }
155}