agent_memory/
volatile.rs

1//! In-memory volatile store backed by a bounded ring buffer.
2
3use std::collections::VecDeque;
4use std::num::NonZeroUsize;
5
6use tokio::sync::RwLock;
7
8use crate::record::MemoryRecord;
9
10/// Configuration for the volatile memory buffer.
11#[derive(Debug, Clone, Copy)]
12pub struct VolatileConfig {
13    capacity: NonZeroUsize,
14    max_total_bytes: Option<NonZeroUsize>,
15}
16
17impl VolatileConfig {
18    /// Creates a configuration with the provided capacity.
19    #[must_use]
20    pub fn new(capacity: NonZeroUsize) -> Self {
21        Self {
22            capacity,
23            max_total_bytes: None,
24        }
25    }
26
27    /// Sets the optional total byte ceiling for the buffer.
28    #[must_use]
29    pub fn with_max_total_bytes(mut self, max_total_bytes: NonZeroUsize) -> Self {
30        self.max_total_bytes = Some(max_total_bytes);
31        self
32    }
33
34    /// Returns the configured capacity.
35    #[must_use]
36    pub const fn capacity(self) -> NonZeroUsize {
37        self.capacity
38    }
39
40    /// Returns the maximum total bytes, if configured.
41    #[must_use]
42    pub const fn max_total_bytes(self) -> Option<NonZeroUsize> {
43        self.max_total_bytes
44    }
45}
46
47impl Default for VolatileConfig {
48    fn default() -> Self {
49        Self {
50            capacity: NonZeroUsize::new(256).expect("non-zero"),
51            max_total_bytes: None,
52        }
53    }
54}
55
56#[derive(Debug, Default)]
57struct VolatileInner {
58    entries: VecDeque<MemoryRecord>,
59    total_bytes: usize,
60}
61
62/// Volatile memory ring retaining the most recent records.
63#[derive(Debug)]
64pub struct VolatileMemory {
65    config: VolatileConfig,
66    inner: RwLock<VolatileInner>,
67}
68
69impl VolatileMemory {
70    /// Creates a new buffer using the supplied configuration.
71    #[must_use]
72    pub fn new(config: VolatileConfig) -> Self {
73        Self {
74            config,
75            inner: RwLock::new(VolatileInner {
76                entries: VecDeque::with_capacity(config.capacity().get()),
77                total_bytes: 0,
78            }),
79        }
80    }
81
82    /// Inserts a record, evicting the oldest entries if capacity constraints are exceeded.
83    pub async fn push(&self, record: MemoryRecord) {
84        let mut guard = self.inner.write().await;
85        guard.total_bytes += record.payload().len();
86        guard.entries.push_back(record);
87
88        while guard.entries.len() > self.config.capacity().get() {
89            if let Some(evicted) = guard.entries.pop_front() {
90                guard.total_bytes = guard.total_bytes.saturating_sub(evicted.payload().len());
91            }
92        }
93
94        if let Some(limit) = self.config.max_total_bytes() {
95            let limit = limit.get();
96            while guard.total_bytes > limit && guard.entries.len() > 1 {
97                if let Some(evicted) = guard.entries.pop_front() {
98                    guard.total_bytes = guard.total_bytes.saturating_sub(evicted.payload().len());
99                }
100            }
101        }
102    }
103
104    /// Returns the most recent records up to the requested limit.
105    #[must_use]
106    pub async fn recent(&self, limit: usize) -> Vec<MemoryRecord> {
107        let guard = self.inner.read().await;
108        let take = limit.min(guard.entries.len());
109        guard
110            .entries
111            .iter()
112            .rev()
113            .take(take)
114            .cloned()
115            .collect::<Vec<_>>()
116            .into_iter()
117            .rev()
118            .collect()
119    }
120
121    /// Returns statistics about the buffer utilisation.
122    #[must_use]
123    pub async fn stats(&self) -> VolatileStats {
124        let guard = self.inner.read().await;
125        VolatileStats {
126            entries: guard.entries.len(),
127            total_bytes: guard.total_bytes,
128            capacity: self.config.capacity().get(),
129            max_total_bytes: self.config.max_total_bytes().map(NonZeroUsize::get),
130        }
131    }
132}
133
134/// Snapshot describing utilisation of the volatile buffer.
135#[derive(Debug, Clone, Copy, PartialEq, Eq)]
136pub struct VolatileStats {
137    /// Entries currently stored in the buffer.
138    pub entries: usize,
139    /// Accumulated payload bytes currently retained.
140    pub total_bytes: usize,
141    /// Maximum number of entries permitted.
142    pub capacity: usize,
143    /// Optional total byte limit when configured.
144    pub max_total_bytes: Option<usize>,
145}
146
147#[cfg(test)]
148mod tests {
149    use super::*;
150    use bytes::Bytes;
151
152    #[tokio::test]
153    async fn respects_capacity() {
154        let config = VolatileConfig::new(NonZeroUsize::new(2).unwrap());
155        let memory = VolatileMemory::new(config);
156
157        memory
158            .push(
159                MemoryRecord::builder(
160                    crate::record::MemoryChannel::Input,
161                    Bytes::from_static(b"one"),
162                )
163                .build()
164                .unwrap(),
165            )
166            .await;
167        memory
168            .push(
169                MemoryRecord::builder(
170                    crate::record::MemoryChannel::Input,
171                    Bytes::from_static(b"two"),
172                )
173                .build()
174                .unwrap(),
175            )
176            .await;
177        memory
178            .push(
179                MemoryRecord::builder(
180                    crate::record::MemoryChannel::Input,
181                    Bytes::from_static(b"three"),
182                )
183                .build()
184                .unwrap(),
185            )
186            .await;
187
188        let recent = memory.recent(10).await;
189        assert_eq!(recent.len(), 2);
190        assert_eq!(recent[0].payload(), &Bytes::from_static(b"two"));
191        assert_eq!(recent[1].payload(), &Bytes::from_static(b"three"));
192    }
193
194    #[tokio::test]
195    async fn respects_total_byte_limit() {
196        let config = VolatileConfig::new(NonZeroUsize::new(10).unwrap())
197            .with_max_total_bytes(NonZeroUsize::new(8).unwrap());
198        let memory = VolatileMemory::new(config);
199
200        for value in [b"aaaa", b"bbbb", b"cccc"] {
201            memory
202                .push(
203                    MemoryRecord::builder(
204                        crate::record::MemoryChannel::Input,
205                        Bytes::copy_from_slice(value),
206                    )
207                    .build()
208                    .unwrap(),
209                )
210                .await;
211        }
212
213        let stats = memory.stats().await;
214        assert!(stats.total_bytes <= 8 || stats.entries == 1);
215    }
216}