use std::collections::VecDeque;
use crate::engine::wasm::bindings::astrid::process::host::{ErrorCode, LogCursor};
#[derive(Clone, Copy)]
pub(super) enum Overflow {
DropOldest,
Backpressure,
}
#[derive(Clone, Copy)]
pub(super) enum Stream {
Out,
Err,
}
pub(super) struct LogRing {
buf: VecDeque<u8>,
front_offset: u64,
pub(super) overflow_dropped: u64,
cap: usize,
overflow: Overflow,
}
impl LogRing {
pub(super) fn new(cap: usize, overflow: Overflow) -> Self {
Self {
buf: VecDeque::new(),
front_offset: 0,
overflow_dropped: 0,
cap,
overflow,
}
}
pub(super) fn len(&self) -> usize {
self.buf.len()
}
pub(super) fn end_offset(&self) -> u64 {
self.front_offset + self.buf.len() as u64
}
pub(super) fn push(&mut self, bytes: &[u8]) -> bool {
match self.overflow {
Overflow::Backpressure => {
if self.buf.len() + bytes.len() > self.cap {
return false;
}
self.buf.extend(bytes);
true
},
Overflow::DropOldest => {
self.buf.extend(bytes);
if self.buf.len() > self.cap {
let excess = self.buf.len() - self.cap;
self.buf.drain(..excess);
self.front_offset += excess as u64;
self.overflow_dropped += excess as u64;
}
true
},
}
}
pub(super) fn drain(&mut self) -> Vec<u8> {
let out: Vec<u8> = self.buf.drain(..).collect();
self.front_offset += out.len() as u64;
out
}
pub(super) fn read_since(&self, cursor: Option<u64>, max: usize) -> (Vec<u8>, u64, u64) {
let requested = cursor.unwrap_or(self.front_offset);
let dropped = self.front_offset.saturating_sub(requested);
let start = requested.max(self.front_offset).min(self.end_offset());
let rel = (start - self.front_offset) as usize;
let take = (self.buf.len() - rel).min(max);
let data: Vec<u8> = self.buf.iter().skip(rel).take(take).copied().collect();
(data, start + take as u64, dropped)
}
}
pub(super) fn encode_cursor(offset: u64) -> LogCursor {
LogCursor {
token: Some(format!("{offset:016x}")),
}
}
pub(super) fn decode_cursor(cursor: &LogCursor) -> Result<Option<u64>, ErrorCode> {
match &cursor.token {
None => Ok(None),
Some(t) => u64::from_str_radix(t, 16)
.map(Some)
.map_err(|_| ErrorCode::NoSuchProcess),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn drop_oldest_evicts_and_counts() {
let mut ring = LogRing::new(4, Overflow::DropOldest);
assert!(ring.push(b"abcdef")); assert_eq!(ring.overflow_dropped, 2);
assert_eq!(ring.front_offset, 2);
let (data, next, dropped) = ring.read_since(None, 100);
assert_eq!(data, b"cdef");
assert_eq!(next, 6);
assert_eq!(dropped, 0); }
#[test]
fn cursor_below_front_reports_drop() {
let mut ring = LogRing::new(4, Overflow::DropOldest);
ring.push(b"abcdef"); let (data, next, dropped) = ring.read_since(Some(0), 100);
assert_eq!(dropped, 2);
assert_eq!(data, b"cdef");
assert_eq!(next, 6);
}
#[test]
fn drain_advances_front() {
let mut ring = LogRing::new(100, Overflow::DropOldest);
ring.push(b"hello");
assert_eq!(ring.drain(), b"hello");
assert_eq!(ring.front_offset, 5);
let (data, _next, dropped) = ring.read_since(Some(0), 100);
assert!(data.is_empty());
assert_eq!(dropped, 5);
}
#[test]
fn read_since_respects_max() {
let mut ring = LogRing::new(100, Overflow::DropOldest);
ring.push(b"abcdefghij");
let (data, next, _) = ring.read_since(Some(0), 4);
assert_eq!(data, b"abcd");
assert_eq!(next, 4);
let (data2, next2, _) = ring.read_since(Some(next), 100);
assert_eq!(data2, b"efghij");
assert_eq!(next2, 10);
}
#[test]
fn backpressure_rejects_when_full() {
let mut ring = LogRing::new(4, Overflow::Backpressure);
assert!(ring.push(b"abcd"));
assert!(!ring.push(b"e")); assert_eq!(ring.overflow_dropped, 0);
}
#[test]
fn backpressure_never_evicts_on_crossing_push() {
let mut ring = LogRing::new(10, Overflow::Backpressure);
assert!(ring.push(b"abcdef")); assert!(!ring.push(b"ghijkl")); assert_eq!(ring.len(), 6);
assert_eq!(ring.overflow_dropped, 0);
let (data, _next, dropped) = ring.read_since(None, 100);
assert_eq!(data, b"abcdef"); assert_eq!(dropped, 0);
}
#[test]
fn cursor_roundtrip_and_reject_garbage() {
let c = encode_cursor(0x1234_5678_9abc_def0);
assert_eq!(decode_cursor(&c).unwrap(), Some(0x1234_5678_9abc_def0));
assert_eq!(decode_cursor(&LogCursor { token: None }).unwrap(), None);
assert!(
decode_cursor(&LogCursor {
token: Some("zzz".into())
})
.is_err()
);
}
}