functiontrace-server 0.4.0

The server component that FunctionTrace (functiontrace.com) clients will spawn and connect to
Documentation
use std::io::Read;
use std::sync::Arc;

use crossbeam_queue::SegQueue;

/// Send chunks of data across threads, with None indicating that the sender has stopped sending.
type Chunk = Vec<u8>;
type TraceQueue = Arc<SegQueue<Option<Chunk>>>;

/// Responsible for sending trace data to a TraceReceiver, which will parse the data as it comes in.
pub struct TraceSender {
    /// An external id for this client for logging purposes.
    pub client_id: usize,

    queue: TraceQueue,

    /// Number of bytes we've sent.
    sent: usize,
}

/// Exposes a Read interface for trace data coming from a TraceSender, allowing it to be parsed in
/// a streaming fashion.
pub struct TraceReceiver {
    queue: TraceQueue,

    /// True iff we've received all of the data from the sender.
    terminated: bool,

    /// We pull chunks of data from `queue`, then expose them via Read.
    chunk: Chunk,

    /// The amount of data we've exposed via Read for this chunk.
    chunk_read: usize,
}

impl TraceSender {
    pub fn new(client_id: usize) -> Self {
        Self {
            queue: Arc::new(SegQueue::new()),
            sent: 0,
            client_id: client_id,
        }
    }

    /// Create a receiver to consume trace data generated by this sender.
    pub fn receiver(&self) -> TraceReceiver {
        TraceReceiver {
            queue: self.queue.clone(),
            terminated: false,

            chunk: Vec::new(),
            chunk_read: 0,
        }
    }

    /// Send the given buffer to the receiver
    pub fn send(&mut self, buf: Chunk) {
        self.sent += buf.len();
        self.queue.push(Some(buf));
    }

    /// Retire this TraceSender, returning the number of bytes we sent over its lifetime.
    pub fn retire(&self) -> usize {
        self.queue.push(None);
        self.sent
    }
}

/// TraceReceivers only expose a Read interface, as it allows them to collect data in a streaming
/// fashion from multiple buffers and notify the caller to block when there's no data available.
impl Read for TraceReceiver {
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        // See if we have more data left to read.
        let remaining = self.chunk.len() - self.chunk_read;
        let write = std::cmp::min(remaining, buf.len());

        if remaining > 0 {
            // We have a chunk already that we can return data from.
            // NOTE: Left and write sides of `copy_from_slice()` need to be the same length.
            buf[0..write].copy_from_slice(&self.chunk[self.chunk_read..self.chunk_read + write]);
            self.chunk_read += write;
            return Ok(write);
        }

        if self.terminated {
            // We've already read a None from the sender - no use in polling the queue again.
            return Ok(0);
        }

        match self.queue.pop() {
            Some(None) => {
                self.terminated = true;
                Ok(0)
            }
            Some(Some(vec)) => {
                // We found a new chunk of data.  Mark that we should consume it on the next read.
                self.chunk = vec;
                self.chunk_read = 0;
                Err(std::io::Error::new(
                    std::io::ErrorKind::Interrupted,
                    "Data loaded - retry",
                ))
            }
            None => {
                // The queue was almost certainly empty.  We'll try again later.
                Err(std::io::Error::new(
                    std::io::ErrorKind::Interrupted,
                    "Empty queue",
                ))
            }
        }
    }
}