Skip to main content

telltale_machine/
buffer.rs

1//! Bounded buffers with backpressure.
2//!
3//! Matches the Lean `BoundedBuffer` from `lean/Runtime/Resources/BufferRA.lean`.
4//! Ring buffer with configurable mode and backpressure policy.
5
6use std::collections::BTreeMap;
7
8use serde::{Deserialize, Serialize};
9
10use crate::coroutine::Value;
11use crate::session::Edge;
12
13/// Buffer delivery mode.
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
15pub enum BufferMode {
16    /// First-in, first-out. All messages delivered in order.
17    Fifo,
18    /// Only the latest value is retained. Overwrites on enqueue.
19    LatestValue,
20}
21
22/// Policy when a buffer is full.
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
24pub enum BackpressurePolicy {
25    /// Block the sender until space is available.
26    Block,
27    /// Drop the message silently.
28    Drop,
29    /// Return an error to the sender.
30    Error,
31    /// Resize the buffer up to a maximum capacity.
32    Resize {
33        /// Upper bound on buffer capacity.
34        max_capacity: usize,
35    },
36}
37
38/// Configuration for a buffer.
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct BufferConfig {
41    /// Delivery mode.
42    pub mode: BufferMode,
43    /// Initial capacity.
44    pub initial_capacity: usize,
45    /// Backpressure policy.
46    pub policy: BackpressurePolicy,
47}
48
49impl Default for BufferConfig {
50    fn default() -> Self {
51        Self {
52            mode: BufferMode::Fifo,
53            initial_capacity: 64,
54            policy: BackpressurePolicy::Block,
55        }
56    }
57}
58
59/// Bounded ring buffer for inter-role message passing.
60#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct BoundedBuffer<T = Value> {
62    data: Vec<Option<T>>,
63    head: usize,
64    tail: usize,
65    capacity: usize,
66    count: usize,
67    epoch: usize,
68    mode: BufferMode,
69    policy: BackpressurePolicy,
70}
71
72/// Result of attempting to enqueue a value.
73#[derive(Debug)]
74pub enum EnqueueResult {
75    /// Value was enqueued successfully.
76    Ok,
77    /// Buffer is full; sender should block.
78    WouldBlock,
79    /// Value was dropped per policy.
80    Dropped,
81    /// Buffer full and policy is Error.
82    Full,
83}
84
85/// Signed value payload used by authenticated transport layers.
86#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
87pub struct SignedValue<V> {
88    /// The application payload.
89    pub payload: Value,
90    /// The signature/proof attached to the payload.
91    pub signature: V,
92    /// Monotonic sender-side sequence number for replay-consumption checks.
93    #[serde(default)]
94    pub sequence_no: u64,
95}
96
97/// Signed FIFO for a single edge.
98pub type SignedBuffer<V> = BoundedBuffer<SignedValue<V>>;
99
100/// Signed buffers indexed by sid-qualified edge.
101pub type SignedBuffers<V> = BTreeMap<Edge, SignedBuffer<V>>;
102
103/// Signed dequeue failure.
104#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
105pub enum SignedDequeueError {
106    /// Signature verification failed for the dequeued payload.
107    VerificationFailed,
108}
109
110/// Result of signed enqueue attempts.
111#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
112pub enum SignedEnqueueResult {
113    /// Value was enqueued successfully.
114    Ok,
115    /// Buffer is full; sender should block.
116    Blocked,
117    /// Value was dropped per policy.
118    Dropped,
119    /// Error path for failed enqueue.
120    Error(String),
121}
122
123impl From<EnqueueResult> for SignedEnqueueResult {
124    fn from(value: EnqueueResult) -> Self {
125        match value {
126            EnqueueResult::Ok => Self::Ok,
127            EnqueueResult::WouldBlock => Self::Blocked,
128            EnqueueResult::Dropped => Self::Dropped,
129            EnqueueResult::Full => Self::Error("buffer full".to_string()),
130        }
131    }
132}
133
134/// Enqueue one signed payload into per-edge signed buffers.
135pub fn signed_enqueue<V>(
136    buffers: &mut SignedBuffers<V>,
137    edge: Edge,
138    payload: Value,
139    signature: V,
140) -> SignedEnqueueResult {
141    signed_enqueue_with_sequence(buffers, edge, payload, signature, 0)
142}
143
144/// Enqueue one signed payload with explicit sequence number.
145pub fn signed_enqueue_with_sequence<V>(
146    buffers: &mut SignedBuffers<V>,
147    edge: Edge,
148    payload: Value,
149    signature: V,
150    sequence_no: u64,
151) -> SignedEnqueueResult {
152    let queue = buffers
153        .entry(edge)
154        .or_insert_with(|| BoundedBuffer::new(&BufferConfig::default()));
155    queue
156        .enqueue(SignedValue {
157            payload,
158            signature,
159            sequence_no,
160        })
161        .into()
162}
163
164/// Dequeue and verify one signed payload.
165///
166/// The verifier is provided by the caller so this buffer module stays
167/// independent from any specific verification backend.
168///
169/// # Errors
170///
171/// Returns [`SignedDequeueError::VerificationFailed`] if the signature does not verify.
172pub fn signed_dequeue_verified<V, F>(
173    buffers: &mut SignedBuffers<V>,
174    edge: &Edge,
175    verifier: F,
176) -> Result<Option<Value>, SignedDequeueError>
177where
178    F: Fn(&Value, &V) -> bool,
179{
180    let Some(queue) = buffers.get_mut(edge) else {
181        return Ok(None);
182    };
183    let Some(signed) = queue.dequeue() else {
184        return Ok(None);
185    };
186    if verifier(&signed.payload, &signed.signature) {
187        Ok(Some(signed.payload))
188    } else {
189        Err(SignedDequeueError::VerificationFailed)
190    }
191}
192
193impl<T> BoundedBuffer<T> {
194    /// Create a new buffer with the given configuration.
195    #[must_use]
196    pub fn new(config: &BufferConfig) -> Self {
197        let capacity = config.initial_capacity.max(1);
198        let mut data = Vec::with_capacity(capacity);
199        data.resize_with(capacity, || None);
200        Self {
201            data,
202            head: 0,
203            tail: 0,
204            capacity,
205            count: 0,
206            epoch: 0,
207            mode: config.mode,
208            policy: config.policy,
209        }
210    }
211
212    /// Try to enqueue a value.
213    pub fn enqueue(&mut self, v: T) -> EnqueueResult {
214        match self.mode {
215            BufferMode::LatestValue => {
216                // Overwrite the single slot.
217                self.data[0] = Some(v);
218                self.count = 1;
219                EnqueueResult::Ok
220            }
221            BufferMode::Fifo => {
222                if self.count >= self.capacity {
223                    match self.policy {
224                        BackpressurePolicy::Block => EnqueueResult::WouldBlock,
225                        BackpressurePolicy::Drop => EnqueueResult::Dropped,
226                        BackpressurePolicy::Error => EnqueueResult::Full,
227                        BackpressurePolicy::Resize { max_capacity } => {
228                            if self.capacity < max_capacity {
229                                self.grow();
230                                self.enqueue_fifo(v);
231                                EnqueueResult::Ok
232                            } else {
233                                EnqueueResult::Full
234                            }
235                        }
236                    }
237                } else {
238                    self.enqueue_fifo(v);
239                    EnqueueResult::Ok
240                }
241            }
242        }
243    }
244
245    /// Dequeue a value, if available.
246    pub fn dequeue(&mut self) -> Option<T> {
247        match self.mode {
248            BufferMode::LatestValue => {
249                if self.count > 0 {
250                    self.count = 0;
251                    self.data[0].take()
252                } else {
253                    None
254                }
255            }
256            BufferMode::Fifo => {
257                if self.count == 0 {
258                    return None;
259                }
260                let val = self.data[self.head].take();
261                self.head = (self.head + 1) % self.capacity;
262                self.count -= 1;
263                val
264            }
265        }
266    }
267
268    /// Whether the buffer is empty.
269    #[must_use]
270    pub fn is_empty(&self) -> bool {
271        self.count == 0
272    }
273
274    /// Whether the buffer is full (FIFO mode only).
275    #[must_use]
276    pub fn is_full(&self) -> bool {
277        self.count >= self.capacity
278    }
279
280    /// Current number of buffered values.
281    #[must_use]
282    pub fn len(&self) -> usize {
283        self.count
284    }
285
286    /// Buffer capacity.
287    #[must_use]
288    pub fn capacity(&self) -> usize {
289        self.capacity
290    }
291
292    /// Current epoch.
293    #[must_use]
294    pub fn epoch(&self) -> usize {
295        self.epoch
296    }
297
298    /// Advance the epoch (used during session draining).
299    pub fn advance_epoch(&mut self) {
300        self.epoch += 1;
301    }
302
303    fn enqueue_fifo(&mut self, v: T) {
304        self.data[self.tail] = Some(v);
305        self.tail = (self.tail + 1) % self.capacity;
306        self.count += 1;
307    }
308
309    fn grow(&mut self) {
310        let new_cap = self.capacity * 2;
311        let mut new_data = Vec::with_capacity(new_cap);
312        new_data.resize_with(new_cap, || None);
313
314        // Copy existing elements in order.
315        for (i, slot) in new_data.iter_mut().enumerate().take(self.count) {
316            let idx = (self.head + i) % self.capacity;
317            *slot = self.data[idx].take();
318        }
319
320        self.data = new_data;
321        self.head = 0;
322        self.tail = self.count;
323        self.capacity = new_cap;
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use super::*;
330
331    #[test]
332    fn test_fifo_basic() {
333        let mut buf = BoundedBuffer::new(&BufferConfig::default());
334        buf.enqueue(Value::Nat(1));
335        buf.enqueue(Value::Nat(2));
336        assert_eq!(buf.len(), 2);
337        assert_eq!(buf.dequeue(), Some(Value::Nat(1)));
338        assert_eq!(buf.dequeue(), Some(Value::Nat(2)));
339        assert!(buf.is_empty());
340    }
341
342    #[test]
343    fn test_fifo_wraparound() {
344        let config = BufferConfig {
345            initial_capacity: 3,
346            ..Default::default()
347        };
348        let mut buf = BoundedBuffer::new(&config);
349
350        buf.enqueue(Value::Nat(1));
351        buf.enqueue(Value::Nat(2));
352        buf.dequeue(); // remove 1
353        buf.enqueue(Value::Nat(3));
354        buf.enqueue(Value::Nat(4));
355
356        assert_eq!(buf.dequeue(), Some(Value::Nat(2)));
357        assert_eq!(buf.dequeue(), Some(Value::Nat(3)));
358        assert_eq!(buf.dequeue(), Some(Value::Nat(4)));
359        assert!(buf.is_empty());
360    }
361
362    #[test]
363    fn test_latest_value_overwrites() {
364        let config = BufferConfig {
365            mode: BufferMode::LatestValue,
366            initial_capacity: 1,
367            policy: BackpressurePolicy::Block,
368        };
369        let mut buf = BoundedBuffer::new(&config);
370
371        buf.enqueue(Value::Nat(1));
372        buf.enqueue(Value::Nat(2));
373        buf.enqueue(Value::Nat(3));
374
375        assert_eq!(buf.dequeue(), Some(Value::Nat(3)));
376        assert!(buf.is_empty());
377    }
378
379    #[test]
380    fn test_backpressure_block() {
381        let config = BufferConfig {
382            initial_capacity: 2,
383            policy: BackpressurePolicy::Block,
384            ..Default::default()
385        };
386        let mut buf = BoundedBuffer::new(&config);
387        buf.enqueue(Value::Nat(1));
388        buf.enqueue(Value::Nat(2));
389        assert!(matches!(
390            buf.enqueue(Value::Nat(3)),
391            EnqueueResult::WouldBlock
392        ));
393    }
394
395    #[test]
396    fn test_backpressure_resize() {
397        let config = BufferConfig {
398            initial_capacity: 2,
399            policy: BackpressurePolicy::Resize { max_capacity: 8 },
400            ..Default::default()
401        };
402        let mut buf = BoundedBuffer::new(&config);
403        buf.enqueue(Value::Nat(1));
404        buf.enqueue(Value::Nat(2));
405        assert!(matches!(buf.enqueue(Value::Nat(3)), EnqueueResult::Ok));
406        assert_eq!(buf.len(), 3);
407    }
408
409    #[test]
410    fn test_signed_buffer_alias_and_enqueue_result_mapping() {
411        let edge = Edge::new(7usize, "A", "B");
412        let signed = SignedValue {
413            payload: Value::Nat(9),
414            signature: vec![0_u8, 1_u8],
415            sequence_no: 0,
416        };
417        let mut buffers: SignedBuffers<Vec<u8>> = BTreeMap::new();
418        assert_eq!(
419            signed_enqueue(
420                &mut buffers,
421                edge.clone(),
422                signed.payload.clone(),
423                signed.signature.clone(),
424            ),
425            SignedEnqueueResult::Ok
426        );
427        assert_eq!(buffers.get(&edge).map(BoundedBuffer::len), Some(1));
428        assert_eq!(
429            buffers.get_mut(&edge).and_then(BoundedBuffer::dequeue),
430            Some(signed)
431        );
432
433        assert_eq!(
434            SignedEnqueueResult::from(EnqueueResult::Ok),
435            SignedEnqueueResult::Ok
436        );
437        assert_eq!(
438            SignedEnqueueResult::from(EnqueueResult::WouldBlock),
439            SignedEnqueueResult::Blocked
440        );
441        assert_eq!(
442            SignedEnqueueResult::from(EnqueueResult::Dropped),
443            SignedEnqueueResult::Dropped
444        );
445        assert!(matches!(
446            SignedEnqueueResult::from(EnqueueResult::Full),
447            SignedEnqueueResult::Error(_)
448        ));
449    }
450
451    #[test]
452    fn test_signed_dequeue_verified_success() {
453        let edge = Edge::new(11usize, "A", "B");
454        let mut buffers: SignedBuffers<Vec<u8>> = BTreeMap::new();
455        let _ = signed_enqueue(&mut buffers, edge.clone(), Value::Nat(5), vec![1, 2, 3]);
456        let out = signed_dequeue_verified(&mut buffers, &edge, |payload, signature| {
457            *payload == Value::Nat(5) && signature == &vec![1, 2, 3]
458        })
459        .expect("signature must verify");
460        assert_eq!(out, Some(Value::Nat(5)));
461    }
462
463    #[test]
464    fn test_signed_dequeue_verified_failure() {
465        let edge = Edge::new(12usize, "A", "B");
466        let mut buffers: SignedBuffers<Vec<u8>> = BTreeMap::new();
467        let _ = signed_enqueue(&mut buffers, edge.clone(), Value::Nat(5), vec![1, 2, 3]);
468        let result = signed_dequeue_verified(&mut buffers, &edge, |_payload, _signature| false);
469        assert_eq!(result, Err(SignedDequeueError::VerificationFailed));
470    }
471}