1use std::collections::VecDeque;
4use std::num::NonZeroUsize;
5
6use tokio::sync::RwLock;
7
8use crate::record::MemoryRecord;
9
10#[derive(Debug, Clone, Copy)]
12pub struct VolatileConfig {
13 capacity: NonZeroUsize,
14 max_total_bytes: Option<NonZeroUsize>,
15}
16
17impl VolatileConfig {
18 #[must_use]
20 pub fn new(capacity: NonZeroUsize) -> Self {
21 Self {
22 capacity,
23 max_total_bytes: None,
24 }
25 }
26
27 #[must_use]
29 pub fn with_max_total_bytes(mut self, max_total_bytes: NonZeroUsize) -> Self {
30 self.max_total_bytes = Some(max_total_bytes);
31 self
32 }
33
34 #[must_use]
36 pub const fn capacity(self) -> NonZeroUsize {
37 self.capacity
38 }
39
40 #[must_use]
42 pub const fn max_total_bytes(self) -> Option<NonZeroUsize> {
43 self.max_total_bytes
44 }
45}
46
47impl Default for VolatileConfig {
48 fn default() -> Self {
49 Self {
50 capacity: NonZeroUsize::new(256).expect("non-zero"),
51 max_total_bytes: None,
52 }
53 }
54}
55
56#[derive(Debug, Default)]
57struct VolatileInner {
58 entries: VecDeque<MemoryRecord>,
59 total_bytes: usize,
60}
61
62#[derive(Debug)]
64pub struct VolatileMemory {
65 config: VolatileConfig,
66 inner: RwLock<VolatileInner>,
67}
68
69impl VolatileMemory {
70 #[must_use]
72 pub fn new(config: VolatileConfig) -> Self {
73 Self {
74 config,
75 inner: RwLock::new(VolatileInner {
76 entries: VecDeque::with_capacity(config.capacity().get()),
77 total_bytes: 0,
78 }),
79 }
80 }
81
82 pub async fn push(&self, record: MemoryRecord) {
84 let mut guard = self.inner.write().await;
85 guard.total_bytes += record.payload().len();
86 guard.entries.push_back(record);
87
88 while guard.entries.len() > self.config.capacity().get() {
89 if let Some(evicted) = guard.entries.pop_front() {
90 guard.total_bytes = guard.total_bytes.saturating_sub(evicted.payload().len());
91 }
92 }
93
94 if let Some(limit) = self.config.max_total_bytes() {
95 let limit = limit.get();
96 while guard.total_bytes > limit && guard.entries.len() > 1 {
97 if let Some(evicted) = guard.entries.pop_front() {
98 guard.total_bytes = guard.total_bytes.saturating_sub(evicted.payload().len());
99 }
100 }
101 }
102 }
103
104 #[must_use]
106 pub async fn recent(&self, limit: usize) -> Vec<MemoryRecord> {
107 let guard = self.inner.read().await;
108 let take = limit.min(guard.entries.len());
109 guard
110 .entries
111 .iter()
112 .rev()
113 .take(take)
114 .cloned()
115 .collect::<Vec<_>>()
116 .into_iter()
117 .rev()
118 .collect()
119 }
120
121 #[must_use]
123 pub async fn stats(&self) -> VolatileStats {
124 let guard = self.inner.read().await;
125 VolatileStats {
126 entries: guard.entries.len(),
127 total_bytes: guard.total_bytes,
128 capacity: self.config.capacity().get(),
129 max_total_bytes: self.config.max_total_bytes().map(NonZeroUsize::get),
130 }
131 }
132}
133
134#[derive(Debug, Clone, Copy, PartialEq, Eq)]
136pub struct VolatileStats {
137 pub entries: usize,
139 pub total_bytes: usize,
141 pub capacity: usize,
143 pub max_total_bytes: Option<usize>,
145}
146
147#[cfg(test)]
148mod tests {
149 use super::*;
150 use bytes::Bytes;
151
152 #[tokio::test]
153 async fn respects_capacity() {
154 let config = VolatileConfig::new(NonZeroUsize::new(2).unwrap());
155 let memory = VolatileMemory::new(config);
156
157 memory
158 .push(
159 MemoryRecord::builder(
160 crate::record::MemoryChannel::Input,
161 Bytes::from_static(b"one"),
162 )
163 .build()
164 .unwrap(),
165 )
166 .await;
167 memory
168 .push(
169 MemoryRecord::builder(
170 crate::record::MemoryChannel::Input,
171 Bytes::from_static(b"two"),
172 )
173 .build()
174 .unwrap(),
175 )
176 .await;
177 memory
178 .push(
179 MemoryRecord::builder(
180 crate::record::MemoryChannel::Input,
181 Bytes::from_static(b"three"),
182 )
183 .build()
184 .unwrap(),
185 )
186 .await;
187
188 let recent = memory.recent(10).await;
189 assert_eq!(recent.len(), 2);
190 assert_eq!(recent[0].payload(), &Bytes::from_static(b"two"));
191 assert_eq!(recent[1].payload(), &Bytes::from_static(b"three"));
192 }
193
194 #[tokio::test]
195 async fn respects_total_byte_limit() {
196 let config = VolatileConfig::new(NonZeroUsize::new(10).unwrap())
197 .with_max_total_bytes(NonZeroUsize::new(8).unwrap());
198 let memory = VolatileMemory::new(config);
199
200 for value in [b"aaaa", b"bbbb", b"cccc"] {
201 memory
202 .push(
203 MemoryRecord::builder(
204 crate::record::MemoryChannel::Input,
205 Bytes::copy_from_slice(value),
206 )
207 .build()
208 .unwrap(),
209 )
210 .await;
211 }
212
213 let stats = memory.stats().await;
214 assert!(stats.total_bytes <= 8 || stats.entries == 1);
215 }
216}