1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
use std::io::Read;
use std::sync::Arc;
use crossbeam_queue::SegQueue;
/// Send chunks of data across threads, with None indicating that the sender has stopped sending.
type Chunk = Vec<u8>;
type TraceQueue = Arc<SegQueue<Option<Chunk>>>;
/// Responsible for sending trace data to a `TraceReceiver`, which will parse the data as it comes in.
pub struct TraceSender {
/// An external id for this client for logging purposes.
pub client_id: usize,
queue: TraceQueue,
/// Number of bytes we've sent.
sent: usize,
}
/// Exposes a Read interface for trace data coming from a `TraceSender`, allowing it to be parsed in
/// a streaming fashion.
pub struct TraceReceiver {
queue: TraceQueue,
/// True iff we've received all of the data from the sender.
terminated: bool,
/// We pull chunks of data from `queue`, then expose them via Read.
chunk: Chunk,
/// The amount of data we've exposed via Read for this chunk.
chunk_read: usize,
}
impl TraceSender {
#[must_use]
pub fn new(client_id: usize) -> Self {
Self {
queue: Arc::new(SegQueue::new()),
sent: 0,
client_id,
}
}
/// Create a receiver to consume trace data generated by this sender.
#[must_use]
pub fn receiver(&self) -> TraceReceiver {
TraceReceiver {
queue: self.queue.clone(),
terminated: false,
chunk: Vec::new(),
chunk_read: 0,
}
}
/// Send the given buffer to the receiver
pub fn send(&mut self, buf: Chunk) {
self.sent += buf.len();
self.queue.push(Some(buf));
}
/// Retire this `TraceSender`, returning the number of bytes we sent over its lifetime.
#[must_use]
pub fn retire(&self) -> usize {
self.queue.push(None);
self.sent
}
}
/// `TraceReceivers` only expose a Read interface, as it allows them to collect data in a streaming
/// fashion from multiple buffers and notify the caller to block when there's no data available.
impl Read for TraceReceiver {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
// See if we have more data left to read.
let remaining = self.chunk.len() - self.chunk_read;
let write = std::cmp::min(remaining, buf.len());
if remaining > 0 {
// We have a chunk already that we can return data from.
// NOTE: Left and write sides of `copy_from_slice()` need to be the same length.
buf[0..write].copy_from_slice(&self.chunk[self.chunk_read..self.chunk_read + write]);
self.chunk_read += write;
return Ok(write);
}
if self.terminated {
// We've already read a None from the sender - no use in polling the queue again.
return Ok(0);
}
match self.queue.pop() {
Some(None) => {
self.terminated = true;
Ok(0)
}
Some(Some(vec)) => {
// We found a new chunk of data. Mark that we should consume it on the next read.
self.chunk = vec;
self.chunk_read = 0;
Err(std::io::Error::new(
std::io::ErrorKind::Interrupted,
"Data loaded - retry",
))
}
None => {
// The queue was almost certainly empty. We'll try again later.
Err(std::io::Error::new(
std::io::ErrorKind::Interrupted,
"Empty queue",
))
}
}
}
}