1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
use std::io::Read;
use std::sync::Arc;

use crossbeam_queue::SegQueue;

/// Send chunks of data across threads, with None indicating that the sender has stopped sending.
type Chunk = Vec<u8>;
type TraceQueue = Arc<SegQueue<Option<Chunk>>>;

/// Responsible for sending trace data to a TraceReceiver, which will parse the data as it comes in.
pub struct TraceSender {
    /// An external id for this client for logging purposes.
    pub client_id: usize,

    queue: TraceQueue,

    /// Number of bytes we've sent.
    sent: usize,
}

/// Exposes a Read interface for trace data coming from a TraceSender, allowing it to be parsed in
/// a streaming fashion.
pub struct TraceReceiver {
    queue: TraceQueue,

    /// True iff we've received all of the data from the sender.
    terminated: bool,

    /// We pull chunks of data from `queue`, then expose them via Read.
    chunk: Chunk,

    /// The amount of data we've exposed via Read for this chunk.
    chunk_read: usize,
}

impl TraceSender {
    pub fn new(client_id: usize) -> Self {
        Self {
            queue: Arc::new(SegQueue::new()),
            sent: 0,
            client_id: client_id,
        }
    }

    /// Create a receiver to consume trace data generated by this sender.
    pub fn receiver(&self) -> TraceReceiver {
        TraceReceiver {
            queue: self.queue.clone(),
            terminated: false,

            chunk: Vec::new(),
            chunk_read: 0,
        }
    }

    /// Send the given buffer to the receiver
    pub fn send(&mut self, buf: Chunk) {
        self.sent += buf.len();
        self.queue.push(Some(buf));
    }

    /// Retire this TraceSender, returning the number of bytes we sent over its lifetime.
    pub fn retire(&self) -> usize {
        self.queue.push(None);
        self.sent
    }
}

/// TraceReceivers only expose a Read interface, as it allows them to collect data in a streaming
/// fashion from multiple buffers and notify the caller to block when there's no data available.
impl Read for TraceReceiver {
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        // See if we have more data left to read.
        let remaining = self.chunk.len() - self.chunk_read;
        let write = std::cmp::min(remaining, buf.len());

        if remaining > 0 {
            // We have a chunk already that we can return data from.
            // NOTE: Left and write sides of `copy_from_slice()` need to be the same length.
            buf[0..write].copy_from_slice(&self.chunk[self.chunk_read..self.chunk_read + write]);
            self.chunk_read += write;
            return Ok(write);
        }

        if self.terminated {
            // We've already read a None from the sender - no use in polling the queue again.
            return Ok(0);
        }

        match self.queue.pop() {
            Some(None) => {
                self.terminated = true;
                Ok(0)
            }
            Some(Some(vec)) => {
                // We found a new chunk of data.  Mark that we should consume it on the next read.
                self.chunk = vec;
                self.chunk_read = 0;
                Err(std::io::Error::new(
                    std::io::ErrorKind::Interrupted,
                    "Data loaded - retry",
                ))
            }
            None => {
                // The queue was almost certainly empty.  We'll try again later.
                Err(std::io::Error::new(
                    std::io::ErrorKind::Interrupted,
                    "Empty queue",
                ))
            }
        }
    }
}