use std::io::{ErrorKind, Read, Seek, SeekFrom};
use std::sync::{
Arc,
atomic::{AtomicU32, Ordering},
};
use crossbeam_queue::SegQueue;
type Chunk = Vec<u8>;
struct TraceQueue {
queue: SegQueue<Option<Chunk>>,
queue_size: AtomicU32,
}
impl TraceQueue {
fn push(&self, chunk: Option<Chunk>) {
self.queue.push(chunk);
self.queue_size.fetch_add(1, Ordering::Release);
atomic_wait::wake_one(&self.queue_size);
}
fn pop(&self) -> Option<Chunk> {
loop {
atomic_wait::wait(&self.queue_size, 0);
if self.queue_size.load(Ordering::Acquire) != 0 {
let data = self.queue.pop();
self.queue_size.fetch_sub(1, Ordering::Relaxed);
return data.expect("We only pop for a non-empty queue");
}
}
}
}
pub struct TraceSender {
pub client_id: u32,
queue: Arc<TraceQueue>,
sent: usize,
}
pub struct TraceReceiver {
queue: Arc<TraceQueue>,
terminated: bool,
chunk: Chunk,
chunk_read: usize,
total_read: usize,
}
impl TraceSender {
#[must_use]
pub fn new(client_id: u32) -> Self {
Self {
queue: Arc::new(TraceQueue {
queue: SegQueue::new(),
queue_size: Default::default(),
}),
sent: 0,
client_id,
}
}
#[must_use]
pub fn receiver(&self) -> TraceReceiver {
TraceReceiver {
queue: self.queue.clone(),
terminated: false,
chunk: Vec::new(),
chunk_read: 0,
total_read: 0,
}
}
pub fn send(&mut self, buf: Chunk) {
self.sent += buf.len();
self.queue.push(Some(buf));
}
#[must_use]
pub fn retire(&self) -> usize {
self.queue.push(None);
self.sent
}
}
impl Read for TraceReceiver {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let remaining = self.chunk.len() - self.chunk_read;
let write = std::cmp::min(remaining, buf.len());
if remaining > 0 {
buf[0..write].copy_from_slice(&self.chunk[self.chunk_read..self.chunk_read + write]);
self.chunk_read += write;
self.total_read += write;
return Ok(write);
}
if self.terminated {
return Ok(0);
}
match self.queue.pop() {
Some(vec) => {
self.chunk = vec;
self.chunk_read = 0;
Err(std::io::Error::new(
ErrorKind::Interrupted,
"Data loaded - retry",
))
}
None => {
self.terminated = true;
Ok(0)
}
}
}
}
impl Seek for TraceReceiver {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
match pos {
SeekFrom::Current(0) => Ok(self.total_read.try_into().expect("usize <= u64")),
_ => Err(std::io::Error::new(
ErrorKind::Unsupported,
"Only Current(0) is supported",
)),
}
}
}