functiontrace_server/trace_streamer.rs
1use std::io::{ErrorKind, Read, Seek, SeekFrom};
2use std::sync::{
3 Arc,
4 atomic::{AtomicU32, Ordering},
5};
6
7use crossbeam_queue::SegQueue;
8
9type Chunk = Vec<u8>;
10
11struct TraceQueue {
12 /// Send chunks of data across threads, with None indicating that the sender has stopped
13 /// sending.
14 queue: SegQueue<Option<Chunk>>,
15 /// Store the queue size so the consumer can sleep if it's empty
16 queue_size: AtomicU32,
17}
18
19impl TraceQueue {
20 fn push(&self, chunk: Option<Chunk>) {
21 self.queue.push(chunk);
22
23 // Notify the receiver that there's data available.
24 // There can only be one receiver.
25 self.queue_size.fetch_add(1, Ordering::Release);
26 atomic_wait::wake_one(&self.queue_size);
27 }
28
29 fn pop(&self) -> Option<Chunk> {
30 // If the queue is empty, sleep until there's data in it so we don't busy-wait.
31 loop {
32 atomic_wait::wait(&self.queue_size, 0);
33
34 // Check for spurious wakeups.
35 if self.queue_size.load(Ordering::Acquire) != 0 {
36 let data = self.queue.pop();
37
38 // We're the only thread that'll actually use this, so we don't need to worry about
39 // orderings.
40 self.queue_size.fetch_sub(1, Ordering::Relaxed);
41
42 return data.expect("We only pop for a non-empty queue");
43 }
44 }
45 }
46}
47
48/// Responsible for sending trace data to a `TraceReceiver`, which will parse the data as it comes in.
49pub struct TraceSender {
50 /// An external id for this client for logging purposes.
51 pub client_id: u32,
52
53 queue: Arc<TraceQueue>,
54
55 /// Number of bytes we've sent.
56 sent: usize,
57}
58
59/// Exposes a Read interface for trace data coming from a `TraceSender`, allowing it to be parsed in
60/// a streaming fashion.
61pub struct TraceReceiver {
62 queue: Arc<TraceQueue>,
63
64 /// True iff we've received all of the data from the sender.
65 terminated: bool,
66
67 /// We pull chunks of data from `queue`, then expose them via Read.
68 chunk: Chunk,
69
70 /// The amount of data we've exposed via Read for this chunk.
71 chunk_read: usize,
72
73 /// The total number of bytes we've exposed via Read for all chunks so far.
74 total_read: usize,
75}
76
77impl TraceSender {
78 #[must_use]
79 pub fn new(client_id: u32) -> Self {
80 Self {
81 queue: Arc::new(TraceQueue {
82 queue: SegQueue::new(),
83 queue_size: Default::default(),
84 }),
85 sent: 0,
86 client_id,
87 }
88 }
89
90 /// Create a receiver to consume trace data generated by this sender.
91 #[must_use]
92 pub fn receiver(&self) -> TraceReceiver {
93 TraceReceiver {
94 queue: self.queue.clone(),
95 terminated: false,
96
97 chunk: Vec::new(),
98 chunk_read: 0,
99 total_read: 0,
100 }
101 }
102
103 /// Send the given buffer to the receiver
104 pub fn send(&mut self, buf: Chunk) {
105 self.sent += buf.len();
106 self.queue.push(Some(buf));
107 }
108
109 /// Retire this `TraceSender`, returning the number of bytes we sent over its lifetime.
110 #[must_use]
111 pub fn retire(&self) -> usize {
112 self.queue.push(None);
113 self.sent
114 }
115}
116
117/// `TraceReceivers` only expose a Read interface, as it allows them to collect data in a streaming
118/// fashion from multiple buffers and notify the caller to block when there's no data available.
119impl Read for TraceReceiver {
120 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
121 // See if we have more data left to read.
122 let remaining = self.chunk.len() - self.chunk_read;
123 let write = std::cmp::min(remaining, buf.len());
124
125 if remaining > 0 {
126 // We have a chunk already that we can return data from.
127 // NOTE: Left and write sides of `copy_from_slice()` need to be the same length.
128 buf[0..write].copy_from_slice(&self.chunk[self.chunk_read..self.chunk_read + write]);
129 self.chunk_read += write;
130 self.total_read += write;
131 return Ok(write);
132 }
133
134 if self.terminated {
135 // We've already read a None from the sender - no use in polling the queue again.
136 return Ok(0);
137 }
138
139 match self.queue.pop() {
140 Some(vec) => {
141 // We found a new chunk of data. Mark that we should consume it on the next read.
142 self.chunk = vec;
143 self.chunk_read = 0;
144
145 // TODO: Switch to const_error! once stable
146 // (https://github.com/rust-lang/rust/issues/133448)
147 Err(std::io::Error::new(
148 ErrorKind::Interrupted,
149 "Data loaded - retry",
150 ))
151 }
152 None => {
153 self.terminated = true;
154 Ok(0)
155 }
156 }
157 }
158}
159
160/// We implement a minimal [`Seek`] implementation, since it allows us to track how many bytes have
161/// been read so far. We obviously can't look back at older chunks, but for reading the position
162/// we only need to know total bytes read.
163impl Seek for TraceReceiver {
164 fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
165 match pos {
166 SeekFrom::Current(0) => Ok(self.total_read.try_into().expect("usize <= u64")),
167 _ => Err(std::io::Error::new(
168 ErrorKind::Unsupported,
169 "Only Current(0) is supported",
170 )),
171 }
172 }
173}