use std::collections::BTreeMap;
use serde::{Deserialize, Serialize};
use crate::coroutine::Value;
use crate::session::Edge;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum BufferMode {
Fifo,
LatestValue,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum BackpressurePolicy {
Block,
Drop,
Error,
Resize {
max_capacity: usize,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BufferConfig {
pub mode: BufferMode,
pub initial_capacity: usize,
pub policy: BackpressurePolicy,
}
impl Default for BufferConfig {
fn default() -> Self {
Self {
mode: BufferMode::Fifo,
initial_capacity: 64,
policy: BackpressurePolicy::Block,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BoundedBuffer<T = Value> {
data: Vec<Option<T>>,
head: usize,
tail: usize,
capacity: usize,
count: usize,
epoch: usize,
mode: BufferMode,
policy: BackpressurePolicy,
}
#[derive(Debug)]
pub enum EnqueueResult {
Ok,
WouldBlock,
Dropped,
Full,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct SignedValue<V> {
pub payload: Value,
pub signature: V,
#[serde(default)]
pub sequence_no: u64,
}
pub type SignedBuffer<V> = BoundedBuffer<SignedValue<V>>;
pub type SignedBuffers<V> = BTreeMap<Edge, SignedBuffer<V>>;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum SignedDequeueError {
VerificationFailed,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum SignedEnqueueResult {
Ok,
Blocked,
Dropped,
Error(String),
}
impl From<EnqueueResult> for SignedEnqueueResult {
fn from(value: EnqueueResult) -> Self {
match value {
EnqueueResult::Ok => Self::Ok,
EnqueueResult::WouldBlock => Self::Blocked,
EnqueueResult::Dropped => Self::Dropped,
EnqueueResult::Full => Self::Error("buffer full".to_string()),
}
}
}
pub fn signed_enqueue<V>(
buffers: &mut SignedBuffers<V>,
edge: Edge,
payload: Value,
signature: V,
) -> SignedEnqueueResult {
signed_enqueue_with_sequence(buffers, edge, payload, signature, 0)
}
pub fn signed_enqueue_with_sequence<V>(
buffers: &mut SignedBuffers<V>,
edge: Edge,
payload: Value,
signature: V,
sequence_no: u64,
) -> SignedEnqueueResult {
let queue = buffers
.entry(edge)
.or_insert_with(|| BoundedBuffer::new(&BufferConfig::default()));
queue
.enqueue(SignedValue {
payload,
signature,
sequence_no,
})
.into()
}
pub fn signed_dequeue_verified<V, F>(
buffers: &mut SignedBuffers<V>,
edge: &Edge,
verifier: F,
) -> Result<Option<Value>, SignedDequeueError>
where
F: Fn(&Value, &V) -> bool,
{
let Some(queue) = buffers.get_mut(edge) else {
return Ok(None);
};
let Some(signed) = queue.dequeue() else {
return Ok(None);
};
if verifier(&signed.payload, &signed.signature) {
Ok(Some(signed.payload))
} else {
Err(SignedDequeueError::VerificationFailed)
}
}
impl<T> BoundedBuffer<T> {
#[must_use]
pub fn new(config: &BufferConfig) -> Self {
let capacity = config.initial_capacity.max(1);
let mut data = Vec::with_capacity(capacity);
data.resize_with(capacity, || None);
Self {
data,
head: 0,
tail: 0,
capacity,
count: 0,
epoch: 0,
mode: config.mode,
policy: config.policy,
}
}
pub fn enqueue(&mut self, v: T) -> EnqueueResult {
match self.mode {
BufferMode::LatestValue => {
self.data[0] = Some(v);
self.count = 1;
EnqueueResult::Ok
}
BufferMode::Fifo => {
if self.count >= self.capacity {
match self.policy {
BackpressurePolicy::Block => EnqueueResult::WouldBlock,
BackpressurePolicy::Drop => EnqueueResult::Dropped,
BackpressurePolicy::Error => EnqueueResult::Full,
BackpressurePolicy::Resize { max_capacity } => {
if self.capacity < max_capacity {
self.grow();
self.enqueue_fifo(v);
EnqueueResult::Ok
} else {
EnqueueResult::Full
}
}
}
} else {
self.enqueue_fifo(v);
EnqueueResult::Ok
}
}
}
}
pub fn dequeue(&mut self) -> Option<T> {
match self.mode {
BufferMode::LatestValue => {
if self.count > 0 {
self.count = 0;
self.data[0].take()
} else {
None
}
}
BufferMode::Fifo => {
if self.count == 0 {
return None;
}
let val = self.data[self.head].take();
self.head = (self.head + 1) % self.capacity;
self.count -= 1;
val
}
}
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.count == 0
}
#[must_use]
pub fn is_full(&self) -> bool {
self.count >= self.capacity
}
#[must_use]
pub fn len(&self) -> usize {
self.count
}
#[must_use]
pub fn capacity(&self) -> usize {
self.capacity
}
#[must_use]
pub fn epoch(&self) -> usize {
self.epoch
}
pub fn advance_epoch(&mut self) {
self.epoch += 1;
}
fn enqueue_fifo(&mut self, v: T) {
self.data[self.tail] = Some(v);
self.tail = (self.tail + 1) % self.capacity;
self.count += 1;
}
fn grow(&mut self) {
let new_cap = self.capacity * 2;
let mut new_data = Vec::with_capacity(new_cap);
new_data.resize_with(new_cap, || None);
for (i, slot) in new_data.iter_mut().enumerate().take(self.count) {
let idx = (self.head + i) % self.capacity;
*slot = self.data[idx].take();
}
self.data = new_data;
self.head = 0;
self.tail = self.count;
self.capacity = new_cap;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_fifo_basic() {
let mut buf = BoundedBuffer::new(&BufferConfig::default());
buf.enqueue(Value::Nat(1));
buf.enqueue(Value::Nat(2));
assert_eq!(buf.len(), 2);
assert_eq!(buf.dequeue(), Some(Value::Nat(1)));
assert_eq!(buf.dequeue(), Some(Value::Nat(2)));
assert!(buf.is_empty());
}
#[test]
fn test_fifo_wraparound() {
let config = BufferConfig {
initial_capacity: 3,
..Default::default()
};
let mut buf = BoundedBuffer::new(&config);
buf.enqueue(Value::Nat(1));
buf.enqueue(Value::Nat(2));
buf.dequeue(); buf.enqueue(Value::Nat(3));
buf.enqueue(Value::Nat(4));
assert_eq!(buf.dequeue(), Some(Value::Nat(2)));
assert_eq!(buf.dequeue(), Some(Value::Nat(3)));
assert_eq!(buf.dequeue(), Some(Value::Nat(4)));
assert!(buf.is_empty());
}
#[test]
fn test_latest_value_overwrites() {
let config = BufferConfig {
mode: BufferMode::LatestValue,
initial_capacity: 1,
policy: BackpressurePolicy::Block,
};
let mut buf = BoundedBuffer::new(&config);
buf.enqueue(Value::Nat(1));
buf.enqueue(Value::Nat(2));
buf.enqueue(Value::Nat(3));
assert_eq!(buf.dequeue(), Some(Value::Nat(3)));
assert!(buf.is_empty());
}
#[test]
fn test_backpressure_block() {
let config = BufferConfig {
initial_capacity: 2,
policy: BackpressurePolicy::Block,
..Default::default()
};
let mut buf = BoundedBuffer::new(&config);
buf.enqueue(Value::Nat(1));
buf.enqueue(Value::Nat(2));
assert!(matches!(
buf.enqueue(Value::Nat(3)),
EnqueueResult::WouldBlock
));
}
#[test]
fn test_backpressure_resize() {
let config = BufferConfig {
initial_capacity: 2,
policy: BackpressurePolicy::Resize { max_capacity: 8 },
..Default::default()
};
let mut buf = BoundedBuffer::new(&config);
buf.enqueue(Value::Nat(1));
buf.enqueue(Value::Nat(2));
assert!(matches!(buf.enqueue(Value::Nat(3)), EnqueueResult::Ok));
assert_eq!(buf.len(), 3);
}
#[test]
fn test_signed_buffer_alias_and_enqueue_result_mapping() {
let edge = Edge::new(7usize, "A", "B");
let signed = SignedValue {
payload: Value::Nat(9),
signature: vec![0_u8, 1_u8],
sequence_no: 0,
};
let mut buffers: SignedBuffers<Vec<u8>> = BTreeMap::new();
assert_eq!(
signed_enqueue(
&mut buffers,
edge.clone(),
signed.payload.clone(),
signed.signature.clone(),
),
SignedEnqueueResult::Ok
);
assert_eq!(buffers.get(&edge).map(BoundedBuffer::len), Some(1));
assert_eq!(
buffers.get_mut(&edge).and_then(BoundedBuffer::dequeue),
Some(signed)
);
assert_eq!(
SignedEnqueueResult::from(EnqueueResult::Ok),
SignedEnqueueResult::Ok
);
assert_eq!(
SignedEnqueueResult::from(EnqueueResult::WouldBlock),
SignedEnqueueResult::Blocked
);
assert_eq!(
SignedEnqueueResult::from(EnqueueResult::Dropped),
SignedEnqueueResult::Dropped
);
assert!(matches!(
SignedEnqueueResult::from(EnqueueResult::Full),
SignedEnqueueResult::Error(_)
));
}
#[test]
fn test_signed_dequeue_verified_success() {
let edge = Edge::new(11usize, "A", "B");
let mut buffers: SignedBuffers<Vec<u8>> = BTreeMap::new();
let _ = signed_enqueue(&mut buffers, edge.clone(), Value::Nat(5), vec![1, 2, 3]);
let out = signed_dequeue_verified(&mut buffers, &edge, |payload, signature| {
*payload == Value::Nat(5) && signature == &vec![1, 2, 3]
})
.expect("signature must verify");
assert_eq!(out, Some(Value::Nat(5)));
}
#[test]
fn test_signed_dequeue_verified_failure() {
let edge = Edge::new(12usize, "A", "B");
let mut buffers: SignedBuffers<Vec<u8>> = BTreeMap::new();
let _ = signed_enqueue(&mut buffers, edge.clone(), Value::Nat(5), vec![1, 2, 3]);
let result = signed_dequeue_verified(&mut buffers, &edge, |_payload, _signature| false);
assert_eq!(result, Err(SignedDequeueError::VerificationFailed));
}
}