use std::collections::VecDeque;
use std::io;
use std::os::fd::RawFd;
use std::sync::{Arc, Mutex};
use crossbeam_queue::ArrayQueue;
use microsandbox_utils::wake_pipe::WakePipe;
use msb_krun::ConsolePortBackend;
const DEFAULT_QUEUE_CAPACITY: usize = 2048;
pub struct ConsoleSharedState {
pub tx_ring: ArrayQueue<Vec<u8>>,
pub rx_ring: ArrayQueue<Vec<u8>>,
pub tx_wake: WakePipe,
pub rx_wake: WakePipe,
}
pub struct AgentConsoleBackend {
shared: Arc<ConsoleSharedState>,
pending: Mutex<VecDeque<u8>>,
}
impl ConsoleSharedState {
pub fn new() -> Self {
Self::with_capacity(DEFAULT_QUEUE_CAPACITY)
}
pub fn with_capacity(capacity: usize) -> Self {
Self {
tx_ring: ArrayQueue::new(capacity),
rx_ring: ArrayQueue::new(capacity),
tx_wake: WakePipe::new(),
rx_wake: WakePipe::new(),
}
}
}
impl AgentConsoleBackend {
pub fn new(shared: Arc<ConsoleSharedState>) -> Self {
Self {
shared,
pending: Mutex::new(VecDeque::new()),
}
}
}
impl Default for ConsoleSharedState {
fn default() -> Self {
Self::new()
}
}
impl ConsolePortBackend for AgentConsoleBackend {
fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
self.shared.rx_wake.drain();
let mut pending = self.pending.lock().unwrap();
if !pending.is_empty() {
let n = pending.len().min(buf.len());
let (head, tail) = pending.as_slices();
let from_head = n.min(head.len());
buf[..from_head].copy_from_slice(&head[..from_head]);
if from_head < n {
let from_tail = n - from_head;
buf[from_head..n].copy_from_slice(&tail[..from_tail]);
}
pending.drain(..n);
return Ok(n);
}
match self.shared.rx_ring.pop() {
Some(chunk) => {
let n = chunk.len().min(buf.len());
buf[..n].copy_from_slice(&chunk[..n]);
if chunk.len() > buf.len() {
pending.extend(&chunk[buf.len()..]);
}
Ok(n)
}
None => Err(io::ErrorKind::WouldBlock.into()),
}
}
fn write(&self, buf: &[u8]) -> io::Result<usize> {
self.shared
.tx_ring
.push(buf.to_vec())
.map_err(|_| io::Error::from(io::ErrorKind::WouldBlock))?;
self.shared.tx_wake.wake();
Ok(buf.len())
}
fn read_wake_fd(&self) -> RawFd {
self.shared.rx_wake.as_raw_fd()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn backend_write_and_read_roundtrip() {
let shared = Arc::new(ConsoleSharedState::new());
let backend = AgentConsoleBackend::new(Arc::clone(&shared));
assert_eq!(backend.write(b"hello").unwrap(), 5);
let chunk = shared.tx_ring.pop().unwrap();
assert_eq!(chunk, b"hello");
shared.rx_ring.push(b"world".to_vec()).unwrap();
shared.rx_wake.wake();
let mut buf = [0u8; 16];
let n = backend.read(&mut buf).unwrap();
assert_eq!(&buf[..n], b"world");
}
#[test]
fn backend_read_empty_returns_would_block() {
let shared = Arc::new(ConsoleSharedState::new());
let backend = AgentConsoleBackend::new(shared);
let mut buf = [0u8; 16];
let err = backend.read(&mut buf).unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::WouldBlock);
}
#[test]
fn backend_write_full_returns_would_block() {
let shared = Arc::new(ConsoleSharedState::with_capacity(1));
let backend = AgentConsoleBackend::new(shared);
assert!(backend.write(b"a").is_ok());
let err = backend.write(b"b").unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::WouldBlock);
}
#[test]
fn backend_read_drains_rx_wake_pipe() {
let shared = Arc::new(ConsoleSharedState::new());
let backend = AgentConsoleBackend::new(Arc::clone(&shared));
shared.rx_ring.push(b"ping".to_vec()).unwrap();
shared.rx_wake.wake();
let mut pollfd = libc::pollfd {
fd: backend.read_wake_fd(),
events: libc::POLLIN,
revents: 0,
};
let ret = unsafe { libc::poll(&mut pollfd, 1, 0) };
assert_eq!(ret, 1, "wake pipe should be readable before read()");
assert_ne!(pollfd.revents & libc::POLLIN, 0);
let mut buf = [0u8; 8];
let n = backend.read(&mut buf).unwrap();
assert_eq!(&buf[..n], b"ping");
pollfd.revents = 0;
let ret = unsafe { libc::poll(&mut pollfd, 1, 0) };
assert_eq!(ret, 0, "wake pipe should be drained by read()");
}
}