1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
use std::io::Read;
use std::sync::Arc;

use crossbeam_queue::SegQueue;

/// Send chunks of data across threads, with None indicating that the sender has stopped sending.
type Chunk = Vec<u8>;
type TraceQueue = Arc<SegQueue<Option<Chunk>>>;

/// Responsible for sending trace data to a `TraceReceiver`, which will parse the data as it comes in.
pub struct TraceSender {
    /// An external id for this client for logging purposes.
    pub client_id: usize,

    queue: TraceQueue,

    /// Number of bytes we've sent.
    sent: usize,
}

/// Exposes a Read interface for trace data coming from a `TraceSender`, allowing it to be parsed in
/// a streaming fashion.
pub struct TraceReceiver {
    queue: TraceQueue,

    /// True iff we've received all of the data from the sender.
    terminated: bool,

    /// We pull chunks of data from `queue`, then expose them via Read.
    chunk: Chunk,

    /// The amount of data we've exposed via Read for this chunk.
    chunk_read: usize,
}

impl TraceSender {
    #[must_use]
    pub fn new(client_id: usize) -> Self {
        Self {
            queue: Arc::new(SegQueue::new()),
            sent: 0,
            client_id,
        }
    }

    /// Create a receiver to consume trace data generated by this sender.
    #[must_use]
    pub fn receiver(&self) -> TraceReceiver {
        TraceReceiver {
            queue: self.queue.clone(),
            terminated: false,

            chunk: Vec::new(),
            chunk_read: 0,
        }
    }

    /// Send the given buffer to the receiver
    pub fn send(&mut self, buf: Chunk) {
        self.sent += buf.len();
        self.queue.push(Some(buf));
    }

    /// Retire this `TraceSender`, returning the number of bytes we sent over its lifetime.
    #[must_use]
    pub fn retire(&self) -> usize {
        self.queue.push(None);
        self.sent
    }
}

/// `TraceReceivers` only expose a Read interface, as it allows them to collect data in a streaming
/// fashion from multiple buffers and notify the caller to block when there's no data available.
impl Read for TraceReceiver {
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        // See if we have more data left to read.
        let remaining = self.chunk.len() - self.chunk_read;
        let write = std::cmp::min(remaining, buf.len());

        if remaining > 0 {
            // We have a chunk already that we can return data from.
            // NOTE: Left and write sides of `copy_from_slice()` need to be the same length.
            buf[0..write].copy_from_slice(&self.chunk[self.chunk_read..self.chunk_read + write]);
            self.chunk_read += write;
            return Ok(write);
        }

        if self.terminated {
            // We've already read a None from the sender - no use in polling the queue again.
            return Ok(0);
        }

        match self.queue.pop() {
            Some(None) => {
                self.terminated = true;
                Ok(0)
            }
            Some(Some(vec)) => {
                // We found a new chunk of data.  Mark that we should consume it on the next read.
                self.chunk = vec;
                self.chunk_read = 0;
                Err(std::io::Error::new(
                    std::io::ErrorKind::Interrupted,
                    "Data loaded - retry",
                ))
            }
            None => {
                // The queue was almost certainly empty.  We'll try again later.
                Err(std::io::Error::new(
                    std::io::ErrorKind::Interrupted,
                    "Empty queue",
                ))
            }
        }
    }
}