functiontrace-server 0.8.6

The server component that FunctionTrace (functiontrace.com) clients will spawn and connect to
Documentation
use std::io::{ErrorKind, Read, Seek, SeekFrom};
use std::sync::{
    Arc,
    atomic::{AtomicU32, Ordering},
};

use crossbeam_queue::SegQueue;

type Chunk = Vec<u8>;

struct TraceQueue {
    /// Send chunks of data across threads, with None indicating that the sender has stopped
    /// sending.
    queue: SegQueue<Option<Chunk>>,
    /// Store the queue size so the consumer can sleep if it's empty
    queue_size: AtomicU32,
}

impl TraceQueue {
    fn push(&self, chunk: Option<Chunk>) {
        self.queue.push(chunk);

        // Notify the receiver that there's data available.
        // There can only be one receiver.
        self.queue_size.fetch_add(1, Ordering::Release);
        atomic_wait::wake_one(&self.queue_size);
    }

    fn pop(&self) -> Option<Chunk> {
        // If the queue is empty, sleep until there's data in it so we don't busy-wait.
        loop {
            atomic_wait::wait(&self.queue_size, 0);

            // Check for spurious wakeups.
            if self.queue_size.load(Ordering::Acquire) != 0 {
                let data = self.queue.pop();

                // We're the only thread that'll actually use this, so we don't need to worry about
                // orderings.
                self.queue_size.fetch_sub(1, Ordering::Relaxed);

                return data.expect("We only pop for a non-empty queue");
            }
        }
    }
}

/// Responsible for sending trace data to a `TraceReceiver`, which will parse the data as it comes in.
pub struct TraceSender {
    /// An external id for this client for logging purposes.
    pub client_id: u32,

    queue: Arc<TraceQueue>,

    /// Number of bytes we've sent.
    sent: usize,
}

/// Exposes a Read interface for trace data coming from a `TraceSender`, allowing it to be parsed in
/// a streaming fashion.
pub struct TraceReceiver {
    queue: Arc<TraceQueue>,

    /// True iff we've received all of the data from the sender.
    terminated: bool,

    /// We pull chunks of data from `queue`, then expose them via Read.
    chunk: Chunk,

    /// The amount of data we've exposed via Read for this chunk.
    chunk_read: usize,

    /// The total number of bytes we've exposed via Read for all chunks so far.
    total_read: usize,
}

impl TraceSender {
    #[must_use]
    pub fn new(client_id: u32) -> Self {
        Self {
            queue: Arc::new(TraceQueue {
                queue: SegQueue::new(),
                queue_size: Default::default(),
            }),
            sent: 0,
            client_id,
        }
    }

    /// Create a receiver to consume trace data generated by this sender.
    #[must_use]
    pub fn receiver(&self) -> TraceReceiver {
        TraceReceiver {
            queue: self.queue.clone(),
            terminated: false,

            chunk: Vec::new(),
            chunk_read: 0,
            total_read: 0,
        }
    }

    /// Send the given buffer to the receiver
    pub fn send(&mut self, buf: Chunk) {
        self.sent += buf.len();
        self.queue.push(Some(buf));
    }

    /// Retire this `TraceSender`, returning the number of bytes we sent over its lifetime.
    #[must_use]
    pub fn retire(&self) -> usize {
        self.queue.push(None);
        self.sent
    }
}

/// `TraceReceivers` only expose a Read interface, as it allows them to collect data in a streaming
/// fashion from multiple buffers and notify the caller to block when there's no data available.
impl Read for TraceReceiver {
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        // See if we have more data left to read.
        let remaining = self.chunk.len() - self.chunk_read;
        let write = std::cmp::min(remaining, buf.len());

        if remaining > 0 {
            // We have a chunk already that we can return data from.
            // NOTE: Left and write sides of `copy_from_slice()` need to be the same length.
            buf[0..write].copy_from_slice(&self.chunk[self.chunk_read..self.chunk_read + write]);
            self.chunk_read += write;
            self.total_read += write;
            return Ok(write);
        }

        if self.terminated {
            // We've already read a None from the sender - no use in polling the queue again.
            return Ok(0);
        }

        match self.queue.pop() {
            Some(vec) => {
                // We found a new chunk of data.  Mark that we should consume it on the next read.
                self.chunk = vec;
                self.chunk_read = 0;

                // TODO: Switch to const_error! once stable
                // (https://github.com/rust-lang/rust/issues/133448)
                Err(std::io::Error::new(
                    ErrorKind::Interrupted,
                    "Data loaded - retry",
                ))
            }
            None => {
                self.terminated = true;
                Ok(0)
            }
        }
    }
}

/// We implement a minimal [`Seek`] implementation, since it allows us to track how many bytes have
/// been read so far.  We obviously can't look back at older chunks, but for reading the position
/// we only need to know total bytes read.
impl Seek for TraceReceiver {
    fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
        match pos {
            SeekFrom::Current(0) => Ok(self.total_read.try_into().expect("usize <= u64")),
            _ => Err(std::io::Error::new(
                ErrorKind::Unsupported,
                "Only Current(0) is supported",
            )),
        }
    }
}