Skip to main content

zeph_memory/semantic/
write_buffer.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Session-scoped write buffer for batching memory writes (#2478).
5//!
6//! Accumulates writes during a turn and flushes them in a single `BEGIN IMMEDIATE`
7//! transaction at turn end, reducing `SQLite` write-lock contention from N writes/turn
8//! to 1 transaction/turn.
9
10use std::collections::VecDeque;
11
12use crate::types::{ConversationId, MemoryTier};
13
14/// A single buffered write operation waiting to be flushed to the store.
15pub enum BufferedWrite {
16    /// Save a message to the messages table.
17    SaveMessage {
18        conversation_id: ConversationId,
19        role: String,
20        content: String,
21        tier: MemoryTier,
22    },
23    /// Upsert a persona fact.
24    UpsertPersonaFact {
25        category: String,
26        content: String,
27        confidence: f64,
28        source_conversation_id: Option<i64>,
29        supersedes_id: Option<i64>,
30    },
31    /// Store an embedding in the vector backend (dispatched after `SQLite` commit).
32    StoreEmbedding {
33        collection: String,
34        point_id: String,
35        vector: Vec<f32>,
36        payload: serde_json::Value,
37    },
38}
39
40/// Session-scoped write buffer.
41///
42/// Writes are queued via `push()` and flushed to the store in one transaction
43/// via `drain()`. The buffer is NOT thread-safe — it is owned by a single agent loop.
44pub struct WriteBuffer {
45    pending: VecDeque<BufferedWrite>,
46    /// Maximum pending writes before auto-flush is signalled.
47    capacity: usize,
48}
49
50impl WriteBuffer {
51    /// Create a new `WriteBuffer` with the given capacity.
52    ///
53    /// When `capacity` is reached, `push()` signals the caller to flush.
54    #[must_use]
55    pub fn new(capacity: usize) -> Self {
56        Self {
57            pending: VecDeque::with_capacity(capacity),
58            capacity,
59        }
60    }
61
62    /// Queue a write operation.
63    ///
64    /// Returns `true` if the buffer has reached capacity and should be flushed
65    /// before the next `push()`.
66    pub fn push(&mut self, write: BufferedWrite) -> bool {
67        self.pending.push_back(write);
68        self.pending.len() >= self.capacity
69    }
70
71    /// Drain all pending writes, returning them in insertion order.
72    ///
73    /// After this call, the buffer is empty and ready for the next turn.
74    pub fn drain(&mut self) -> Vec<BufferedWrite> {
75        self.pending.drain(..).collect()
76    }
77
78    #[must_use]
79    pub fn len(&self) -> usize {
80        self.pending.len()
81    }
82
83    #[must_use]
84    pub fn is_empty(&self) -> bool {
85        self.pending.is_empty()
86    }
87}
88
89#[cfg(test)]
90mod tests {
91    use super::*;
92    use crate::types::{ConversationId, MemoryTier};
93
94    fn make_save_message() -> BufferedWrite {
95        BufferedWrite::SaveMessage {
96            conversation_id: ConversationId(1),
97            role: "user".into(),
98            content: "hello".into(),
99            tier: MemoryTier::Episodic,
100        }
101    }
102
103    #[test]
104    fn push_increases_len() {
105        let mut buf = WriteBuffer::new(5);
106        assert!(buf.is_empty());
107        buf.push(make_save_message());
108        assert_eq!(buf.len(), 1);
109    }
110
111    #[test]
112    fn push_returns_false_below_capacity() {
113        let mut buf = WriteBuffer::new(3);
114        let at_capacity = buf.push(make_save_message());
115        assert!(!at_capacity);
116        let at_capacity = buf.push(make_save_message());
117        assert!(!at_capacity);
118    }
119
120    #[test]
121    fn push_returns_true_at_capacity() {
122        let mut buf = WriteBuffer::new(2);
123        buf.push(make_save_message());
124        let at_capacity = buf.push(make_save_message());
125        assert!(at_capacity);
126    }
127
128    #[test]
129    fn drain_returns_all_items_and_clears_buffer() {
130        let mut buf = WriteBuffer::new(10);
131        buf.push(make_save_message());
132        buf.push(make_save_message());
133        buf.push(make_save_message());
134
135        let drained = buf.drain();
136        assert_eq!(drained.len(), 3);
137        assert!(buf.is_empty());
138        assert_eq!(buf.len(), 0);
139    }
140
141    #[test]
142    fn drain_on_empty_buffer_returns_empty_vec() {
143        let mut buf = WriteBuffer::new(5);
144        let drained = buf.drain();
145        assert!(drained.is_empty());
146    }
147
148    #[test]
149    fn capacity_one_signals_on_first_push() {
150        let mut buf = WriteBuffer::new(1);
151        let at_capacity = buf.push(make_save_message());
152        assert!(at_capacity);
153    }
154}