functiontrace_server/
trace_streamer.rs

1use std::io::Read;
2use std::sync::{
3    atomic::{AtomicU32, Ordering},
4    Arc,
5};
6
7use crossbeam_queue::SegQueue;
8
9type Chunk = Vec<u8>;
10
11struct TraceQueue {
12    /// Send chunks of data across threads, with None indicating that the sender has stopped
13    /// sending.
14    queue: SegQueue<Option<Chunk>>,
15    /// Store the queue size so the consumer can sleep if it's empty
16    queue_size: AtomicU32,
17}
18
19impl TraceQueue {
20    fn push(&self, chunk: Option<Chunk>) {
21        self.queue.push(chunk);
22
23        // Notify the receiver that there's data available.
24        // There can only be one receiver.
25        self.queue_size.fetch_add(1, Ordering::Release);
26        atomic_wait::wake_one(&self.queue_size);
27    }
28
29    fn pop(&self) -> Option<Chunk> {
30        // If the queue is empty, sleep until there's data in it so we don't busy-wait.
31        loop {
32            atomic_wait::wait(&self.queue_size, 0);
33
34            // Check for spurious wakeups.
35            if self.queue_size.load(Ordering::Acquire) != 0 {
36                let data = self.queue.pop();
37
38                // We're the only thread that'll actually use this, so we don't need to worry about
39                // orderings.
40                self.queue_size.fetch_sub(1, Ordering::Relaxed);
41
42                return data.expect("We only pop for a non-empty queue");
43            }
44        }
45    }
46}
47
48/// Responsible for sending trace data to a `TraceReceiver`, which will parse the data as it comes in.
49pub struct TraceSender {
50    /// An external id for this client for logging purposes.
51    pub client_id: u32,
52
53    queue: Arc<TraceQueue>,
54
55    /// Number of bytes we've sent.
56    sent: usize,
57}
58
59/// Exposes a Read interface for trace data coming from a `TraceSender`, allowing it to be parsed in
60/// a streaming fashion.
61pub struct TraceReceiver {
62    queue: Arc<TraceQueue>,
63
64    /// True iff we've received all of the data from the sender.
65    terminated: bool,
66
67    /// We pull chunks of data from `queue`, then expose them via Read.
68    chunk: Chunk,
69
70    /// The amount of data we've exposed via Read for this chunk.
71    chunk_read: usize,
72}
73
74impl TraceSender {
75    #[must_use]
76    pub fn new(client_id: u32) -> Self {
77        Self {
78            queue: Arc::new(TraceQueue {
79                queue: SegQueue::new(),
80                queue_size: Default::default(),
81            }),
82            sent: 0,
83            client_id,
84        }
85    }
86
87    /// Create a receiver to consume trace data generated by this sender.
88    #[must_use]
89    pub fn receiver(&self) -> TraceReceiver {
90        TraceReceiver {
91            queue: self.queue.clone(),
92            terminated: false,
93
94            chunk: Vec::new(),
95            chunk_read: 0,
96        }
97    }
98
99    /// Send the given buffer to the receiver
100    pub fn send(&mut self, buf: Chunk) {
101        self.sent += buf.len();
102        self.queue.push(Some(buf));
103    }
104
105    /// Retire this `TraceSender`, returning the number of bytes we sent over its lifetime.
106    #[must_use]
107    pub fn retire(&self) -> usize {
108        self.queue.push(None);
109        self.sent
110    }
111}
112
113/// `TraceReceivers` only expose a Read interface, as it allows them to collect data in a streaming
114/// fashion from multiple buffers and notify the caller to block when there's no data available.
115impl Read for TraceReceiver {
116    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
117        // See if we have more data left to read.
118        let remaining = self.chunk.len() - self.chunk_read;
119        let write = std::cmp::min(remaining, buf.len());
120
121        if remaining > 0 {
122            // We have a chunk already that we can return data from.
123            // NOTE: Left and write sides of `copy_from_slice()` need to be the same length.
124            buf[0..write].copy_from_slice(&self.chunk[self.chunk_read..self.chunk_read + write]);
125            self.chunk_read += write;
126            return Ok(write);
127        }
128
129        if self.terminated {
130            // We've already read a None from the sender - no use in polling the queue again.
131            return Ok(0);
132        }
133
134        match self.queue.pop() {
135            Some(vec) => {
136                // We found a new chunk of data.  Mark that we should consume it on the next read.
137                self.chunk = vec;
138                self.chunk_read = 0;
139                Err(std::io::Error::new(
140                    std::io::ErrorKind::Interrupted,
141                    "Data loaded - retry",
142                ))
143            }
144            None => {
145                self.terminated = true;
146                Ok(0)
147            }
148        }
149    }
150}