functiontrace_server/
trace_streamer.rs

1use std::io::{ErrorKind, Read, Seek, SeekFrom};
2use std::sync::{
3    Arc,
4    atomic::{AtomicU32, Ordering},
5};
6
7use crossbeam_queue::SegQueue;
8
9type Chunk = Vec<u8>;
10
11struct TraceQueue {
12    /// Send chunks of data across threads, with None indicating that the sender has stopped
13    /// sending.
14    queue: SegQueue<Option<Chunk>>,
15    /// Store the queue size so the consumer can sleep if it's empty
16    queue_size: AtomicU32,
17}
18
19impl TraceQueue {
20    fn push(&self, chunk: Option<Chunk>) {
21        self.queue.push(chunk);
22
23        // Notify the receiver that there's data available.
24        // There can only be one receiver.
25        self.queue_size.fetch_add(1, Ordering::Release);
26        atomic_wait::wake_one(&self.queue_size);
27    }
28
29    fn pop(&self) -> Option<Chunk> {
30        // If the queue is empty, sleep until there's data in it so we don't busy-wait.
31        loop {
32            atomic_wait::wait(&self.queue_size, 0);
33
34            // Check for spurious wakeups.
35            if self.queue_size.load(Ordering::Acquire) != 0 {
36                let data = self.queue.pop();
37
38                // We're the only thread that'll actually use this, so we don't need to worry about
39                // orderings.
40                self.queue_size.fetch_sub(1, Ordering::Relaxed);
41
42                return data.expect("We only pop for a non-empty queue");
43            }
44        }
45    }
46}
47
48/// Responsible for sending trace data to a `TraceReceiver`, which will parse the data as it comes in.
49pub struct TraceSender {
50    /// An external id for this client for logging purposes.
51    pub client_id: u32,
52
53    queue: Arc<TraceQueue>,
54
55    /// Number of bytes we've sent.
56    sent: usize,
57}
58
59/// Exposes a Read interface for trace data coming from a `TraceSender`, allowing it to be parsed in
60/// a streaming fashion.
61pub struct TraceReceiver {
62    queue: Arc<TraceQueue>,
63
64    /// True iff we've received all of the data from the sender.
65    terminated: bool,
66
67    /// We pull chunks of data from `queue`, then expose them via Read.
68    chunk: Chunk,
69
70    /// The amount of data we've exposed via Read for this chunk.
71    chunk_read: usize,
72
73    /// The total number of bytes we've exposed via Read for all chunks so far.
74    total_read: usize,
75}
76
77impl TraceSender {
78    #[must_use]
79    pub fn new(client_id: u32) -> Self {
80        Self {
81            queue: Arc::new(TraceQueue {
82                queue: SegQueue::new(),
83                queue_size: Default::default(),
84            }),
85            sent: 0,
86            client_id,
87        }
88    }
89
90    /// Create a receiver to consume trace data generated by this sender.
91    #[must_use]
92    pub fn receiver(&self) -> TraceReceiver {
93        TraceReceiver {
94            queue: self.queue.clone(),
95            terminated: false,
96
97            chunk: Vec::new(),
98            chunk_read: 0,
99            total_read: 0,
100        }
101    }
102
103    /// Send the given buffer to the receiver
104    pub fn send(&mut self, buf: Chunk) {
105        self.sent += buf.len();
106        self.queue.push(Some(buf));
107    }
108
109    /// Retire this `TraceSender`, returning the number of bytes we sent over its lifetime.
110    #[must_use]
111    pub fn retire(&self) -> usize {
112        self.queue.push(None);
113        self.sent
114    }
115}
116
117/// `TraceReceivers` only expose a Read interface, as it allows them to collect data in a streaming
118/// fashion from multiple buffers and notify the caller to block when there's no data available.
119impl Read for TraceReceiver {
120    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
121        // See if we have more data left to read.
122        let remaining = self.chunk.len() - self.chunk_read;
123        let write = std::cmp::min(remaining, buf.len());
124
125        if remaining > 0 {
126            // We have a chunk already that we can return data from.
127            // NOTE: Left and write sides of `copy_from_slice()` need to be the same length.
128            buf[0..write].copy_from_slice(&self.chunk[self.chunk_read..self.chunk_read + write]);
129            self.chunk_read += write;
130            self.total_read += write;
131            return Ok(write);
132        }
133
134        if self.terminated {
135            // We've already read a None from the sender - no use in polling the queue again.
136            return Ok(0);
137        }
138
139        match self.queue.pop() {
140            Some(vec) => {
141                // We found a new chunk of data.  Mark that we should consume it on the next read.
142                self.chunk = vec;
143                self.chunk_read = 0;
144
145                // TODO: Switch to const_error! once stable
146                // (https://github.com/rust-lang/rust/issues/133448)
147                Err(std::io::Error::new(
148                    ErrorKind::Interrupted,
149                    "Data loaded - retry",
150                ))
151            }
152            None => {
153                self.terminated = true;
154                Ok(0)
155            }
156        }
157    }
158}
159
160/// We implement a minimal [`Seek`] implementation, since it allows us to track how many bytes have
161/// been read so far.  We obviously can't look back at older chunks, but for reading the position
162/// we only need to know total bytes read.
163impl Seek for TraceReceiver {
164    fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
165        match pos {
166            SeekFrom::Current(0) => Ok(self.total_read.try_into().expect("usize <= u64")),
167            _ => Err(std::io::Error::new(
168                ErrorKind::Unsupported,
169                "Only Current(0) is supported",
170            )),
171        }
172    }
173}