1use std::collections::BTreeMap;
7
8use serde::{Deserialize, Serialize};
9
10use crate::coroutine::Value;
11use crate::session::Edge;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
15pub enum BufferMode {
16 Fifo,
18 LatestValue,
20}
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
24pub enum BackpressurePolicy {
25 Block,
27 Drop,
29 Error,
31 Resize {
33 max_capacity: usize,
35 },
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct BufferConfig {
41 pub mode: BufferMode,
43 pub initial_capacity: usize,
45 pub policy: BackpressurePolicy,
47}
48
49impl Default for BufferConfig {
50 fn default() -> Self {
51 Self {
52 mode: BufferMode::Fifo,
53 initial_capacity: 64,
54 policy: BackpressurePolicy::Block,
55 }
56 }
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct BoundedBuffer<T = Value> {
62 data: Vec<Option<T>>,
63 head: usize,
64 tail: usize,
65 capacity: usize,
66 count: usize,
67 epoch: usize,
68 mode: BufferMode,
69 policy: BackpressurePolicy,
70}
71
72#[derive(Debug)]
74pub enum EnqueueResult {
75 Ok,
77 WouldBlock,
79 Dropped,
81 Full,
83}
84
85#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
87pub struct SignedValue<V> {
88 pub payload: Value,
90 pub signature: V,
92 #[serde(default)]
94 pub sequence_no: u64,
95}
96
97pub type SignedBuffer<V> = BoundedBuffer<SignedValue<V>>;
99
100pub type SignedBuffers<V> = BTreeMap<Edge, SignedBuffer<V>>;
102
103#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
105pub enum SignedDequeueError {
106 VerificationFailed,
108}
109
110#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
112pub enum SignedEnqueueResult {
113 Ok,
115 Blocked,
117 Dropped,
119 Error(String),
121}
122
123impl From<EnqueueResult> for SignedEnqueueResult {
124 fn from(value: EnqueueResult) -> Self {
125 match value {
126 EnqueueResult::Ok => Self::Ok,
127 EnqueueResult::WouldBlock => Self::Blocked,
128 EnqueueResult::Dropped => Self::Dropped,
129 EnqueueResult::Full => Self::Error("buffer full".to_string()),
130 }
131 }
132}
133
134pub fn signed_enqueue<V>(
136 buffers: &mut SignedBuffers<V>,
137 edge: Edge,
138 payload: Value,
139 signature: V,
140) -> SignedEnqueueResult {
141 signed_enqueue_with_sequence(buffers, edge, payload, signature, 0)
142}
143
144pub fn signed_enqueue_with_sequence<V>(
146 buffers: &mut SignedBuffers<V>,
147 edge: Edge,
148 payload: Value,
149 signature: V,
150 sequence_no: u64,
151) -> SignedEnqueueResult {
152 let queue = buffers
153 .entry(edge)
154 .or_insert_with(|| BoundedBuffer::new(&BufferConfig::default()));
155 queue
156 .enqueue(SignedValue {
157 payload,
158 signature,
159 sequence_no,
160 })
161 .into()
162}
163
164pub fn signed_dequeue_verified<V, F>(
173 buffers: &mut SignedBuffers<V>,
174 edge: &Edge,
175 verifier: F,
176) -> Result<Option<Value>, SignedDequeueError>
177where
178 F: Fn(&Value, &V) -> bool,
179{
180 let Some(queue) = buffers.get_mut(edge) else {
181 return Ok(None);
182 };
183 let Some(signed) = queue.dequeue() else {
184 return Ok(None);
185 };
186 if verifier(&signed.payload, &signed.signature) {
187 Ok(Some(signed.payload))
188 } else {
189 Err(SignedDequeueError::VerificationFailed)
190 }
191}
192
193impl<T> BoundedBuffer<T> {
194 #[must_use]
196 pub fn new(config: &BufferConfig) -> Self {
197 let capacity = config.initial_capacity.max(1);
198 let mut data = Vec::with_capacity(capacity);
199 data.resize_with(capacity, || None);
200 Self {
201 data,
202 head: 0,
203 tail: 0,
204 capacity,
205 count: 0,
206 epoch: 0,
207 mode: config.mode,
208 policy: config.policy,
209 }
210 }
211
212 pub fn enqueue(&mut self, v: T) -> EnqueueResult {
214 match self.mode {
215 BufferMode::LatestValue => {
216 self.data[0] = Some(v);
218 self.count = 1;
219 EnqueueResult::Ok
220 }
221 BufferMode::Fifo => {
222 if self.count >= self.capacity {
223 match self.policy {
224 BackpressurePolicy::Block => EnqueueResult::WouldBlock,
225 BackpressurePolicy::Drop => EnqueueResult::Dropped,
226 BackpressurePolicy::Error => EnqueueResult::Full,
227 BackpressurePolicy::Resize { max_capacity } => {
228 if self.capacity < max_capacity {
229 self.grow();
230 self.enqueue_fifo(v);
231 EnqueueResult::Ok
232 } else {
233 EnqueueResult::Full
234 }
235 }
236 }
237 } else {
238 self.enqueue_fifo(v);
239 EnqueueResult::Ok
240 }
241 }
242 }
243 }
244
245 pub fn dequeue(&mut self) -> Option<T> {
247 match self.mode {
248 BufferMode::LatestValue => {
249 if self.count > 0 {
250 self.count = 0;
251 self.data[0].take()
252 } else {
253 None
254 }
255 }
256 BufferMode::Fifo => {
257 if self.count == 0 {
258 return None;
259 }
260 let val = self.data[self.head].take();
261 self.head = (self.head + 1) % self.capacity;
262 self.count -= 1;
263 val
264 }
265 }
266 }
267
268 #[must_use]
270 pub fn is_empty(&self) -> bool {
271 self.count == 0
272 }
273
274 #[must_use]
276 pub fn is_full(&self) -> bool {
277 self.count >= self.capacity
278 }
279
280 #[must_use]
282 pub fn len(&self) -> usize {
283 self.count
284 }
285
286 #[must_use]
288 pub fn capacity(&self) -> usize {
289 self.capacity
290 }
291
292 #[must_use]
294 pub fn epoch(&self) -> usize {
295 self.epoch
296 }
297
298 pub fn advance_epoch(&mut self) {
300 self.epoch += 1;
301 }
302
303 fn enqueue_fifo(&mut self, v: T) {
304 self.data[self.tail] = Some(v);
305 self.tail = (self.tail + 1) % self.capacity;
306 self.count += 1;
307 }
308
309 fn grow(&mut self) {
310 let new_cap = self.capacity * 2;
311 let mut new_data = Vec::with_capacity(new_cap);
312 new_data.resize_with(new_cap, || None);
313
314 for (i, slot) in new_data.iter_mut().enumerate().take(self.count) {
316 let idx = (self.head + i) % self.capacity;
317 *slot = self.data[idx].take();
318 }
319
320 self.data = new_data;
321 self.head = 0;
322 self.tail = self.count;
323 self.capacity = new_cap;
324 }
325}
326
327#[cfg(test)]
328mod tests {
329 use super::*;
330
331 #[test]
332 fn test_fifo_basic() {
333 let mut buf = BoundedBuffer::new(&BufferConfig::default());
334 buf.enqueue(Value::Nat(1));
335 buf.enqueue(Value::Nat(2));
336 assert_eq!(buf.len(), 2);
337 assert_eq!(buf.dequeue(), Some(Value::Nat(1)));
338 assert_eq!(buf.dequeue(), Some(Value::Nat(2)));
339 assert!(buf.is_empty());
340 }
341
342 #[test]
343 fn test_fifo_wraparound() {
344 let config = BufferConfig {
345 initial_capacity: 3,
346 ..Default::default()
347 };
348 let mut buf = BoundedBuffer::new(&config);
349
350 buf.enqueue(Value::Nat(1));
351 buf.enqueue(Value::Nat(2));
352 buf.dequeue(); buf.enqueue(Value::Nat(3));
354 buf.enqueue(Value::Nat(4));
355
356 assert_eq!(buf.dequeue(), Some(Value::Nat(2)));
357 assert_eq!(buf.dequeue(), Some(Value::Nat(3)));
358 assert_eq!(buf.dequeue(), Some(Value::Nat(4)));
359 assert!(buf.is_empty());
360 }
361
362 #[test]
363 fn test_latest_value_overwrites() {
364 let config = BufferConfig {
365 mode: BufferMode::LatestValue,
366 initial_capacity: 1,
367 policy: BackpressurePolicy::Block,
368 };
369 let mut buf = BoundedBuffer::new(&config);
370
371 buf.enqueue(Value::Nat(1));
372 buf.enqueue(Value::Nat(2));
373 buf.enqueue(Value::Nat(3));
374
375 assert_eq!(buf.dequeue(), Some(Value::Nat(3)));
376 assert!(buf.is_empty());
377 }
378
379 #[test]
380 fn test_backpressure_block() {
381 let config = BufferConfig {
382 initial_capacity: 2,
383 policy: BackpressurePolicy::Block,
384 ..Default::default()
385 };
386 let mut buf = BoundedBuffer::new(&config);
387 buf.enqueue(Value::Nat(1));
388 buf.enqueue(Value::Nat(2));
389 assert!(matches!(
390 buf.enqueue(Value::Nat(3)),
391 EnqueueResult::WouldBlock
392 ));
393 }
394
395 #[test]
396 fn test_backpressure_resize() {
397 let config = BufferConfig {
398 initial_capacity: 2,
399 policy: BackpressurePolicy::Resize { max_capacity: 8 },
400 ..Default::default()
401 };
402 let mut buf = BoundedBuffer::new(&config);
403 buf.enqueue(Value::Nat(1));
404 buf.enqueue(Value::Nat(2));
405 assert!(matches!(buf.enqueue(Value::Nat(3)), EnqueueResult::Ok));
406 assert_eq!(buf.len(), 3);
407 }
408
409 #[test]
410 fn test_signed_buffer_alias_and_enqueue_result_mapping() {
411 let edge = Edge::new(7usize, "A", "B");
412 let signed = SignedValue {
413 payload: Value::Nat(9),
414 signature: vec![0_u8, 1_u8],
415 sequence_no: 0,
416 };
417 let mut buffers: SignedBuffers<Vec<u8>> = BTreeMap::new();
418 assert_eq!(
419 signed_enqueue(
420 &mut buffers,
421 edge.clone(),
422 signed.payload.clone(),
423 signed.signature.clone(),
424 ),
425 SignedEnqueueResult::Ok
426 );
427 assert_eq!(buffers.get(&edge).map(BoundedBuffer::len), Some(1));
428 assert_eq!(
429 buffers.get_mut(&edge).and_then(BoundedBuffer::dequeue),
430 Some(signed)
431 );
432
433 assert_eq!(
434 SignedEnqueueResult::from(EnqueueResult::Ok),
435 SignedEnqueueResult::Ok
436 );
437 assert_eq!(
438 SignedEnqueueResult::from(EnqueueResult::WouldBlock),
439 SignedEnqueueResult::Blocked
440 );
441 assert_eq!(
442 SignedEnqueueResult::from(EnqueueResult::Dropped),
443 SignedEnqueueResult::Dropped
444 );
445 assert!(matches!(
446 SignedEnqueueResult::from(EnqueueResult::Full),
447 SignedEnqueueResult::Error(_)
448 ));
449 }
450
451 #[test]
452 fn test_signed_dequeue_verified_success() {
453 let edge = Edge::new(11usize, "A", "B");
454 let mut buffers: SignedBuffers<Vec<u8>> = BTreeMap::new();
455 let _ = signed_enqueue(&mut buffers, edge.clone(), Value::Nat(5), vec![1, 2, 3]);
456 let out = signed_dequeue_verified(&mut buffers, &edge, |payload, signature| {
457 *payload == Value::Nat(5) && signature == &vec![1, 2, 3]
458 })
459 .expect("signature must verify");
460 assert_eq!(out, Some(Value::Nat(5)));
461 }
462
463 #[test]
464 fn test_signed_dequeue_verified_failure() {
465 let edge = Edge::new(12usize, "A", "B");
466 let mut buffers: SignedBuffers<Vec<u8>> = BTreeMap::new();
467 let _ = signed_enqueue(&mut buffers, edge.clone(), Value::Nat(5), vec![1, 2, 3]);
468 let result = signed_dequeue_verified(&mut buffers, &edge, |_payload, _signature| false);
469 assert_eq!(result, Err(SignedDequeueError::VerificationFailed));
470 }
471}