functiontrace_server/trace_streamer.rs
1use std::io::Read;
2use std::sync::{
3 atomic::{AtomicU32, Ordering},
4 Arc,
5};
6
7use crossbeam_queue::SegQueue;
8
9type Chunk = Vec<u8>;
10
11struct TraceQueue {
12 /// Send chunks of data across threads, with None indicating that the sender has stopped
13 /// sending.
14 queue: SegQueue<Option<Chunk>>,
15 /// Store the queue size so the consumer can sleep if it's empty
16 queue_size: AtomicU32,
17}
18
19impl TraceQueue {
20 fn push(&self, chunk: Option<Chunk>) {
21 self.queue.push(chunk);
22
23 // Notify the receiver that there's data available.
24 // There can only be one receiver.
25 self.queue_size.fetch_add(1, Ordering::Release);
26 atomic_wait::wake_one(&self.queue_size);
27 }
28
29 fn pop(&self) -> Option<Chunk> {
30 // If the queue is empty, sleep until there's data in it so we don't busy-wait.
31 loop {
32 atomic_wait::wait(&self.queue_size, 0);
33
34 // Check for spurious wakeups.
35 if self.queue_size.load(Ordering::Acquire) != 0 {
36 let data = self.queue.pop();
37
38 // We're the only thread that'll actually use this, so we don't need to worry about
39 // orderings.
40 self.queue_size.fetch_sub(1, Ordering::Relaxed);
41
42 return data.expect("We only pop for a non-empty queue");
43 }
44 }
45 }
46}
47
48/// Responsible for sending trace data to a `TraceReceiver`, which will parse the data as it comes in.
49pub struct TraceSender {
50 /// An external id for this client for logging purposes.
51 pub client_id: u32,
52
53 queue: Arc<TraceQueue>,
54
55 /// Number of bytes we've sent.
56 sent: usize,
57}
58
59/// Exposes a Read interface for trace data coming from a `TraceSender`, allowing it to be parsed in
60/// a streaming fashion.
61pub struct TraceReceiver {
62 queue: Arc<TraceQueue>,
63
64 /// True iff we've received all of the data from the sender.
65 terminated: bool,
66
67 /// We pull chunks of data from `queue`, then expose them via Read.
68 chunk: Chunk,
69
70 /// The amount of data we've exposed via Read for this chunk.
71 chunk_read: usize,
72}
73
74impl TraceSender {
75 #[must_use]
76 pub fn new(client_id: u32) -> Self {
77 Self {
78 queue: Arc::new(TraceQueue {
79 queue: SegQueue::new(),
80 queue_size: Default::default(),
81 }),
82 sent: 0,
83 client_id,
84 }
85 }
86
87 /// Create a receiver to consume trace data generated by this sender.
88 #[must_use]
89 pub fn receiver(&self) -> TraceReceiver {
90 TraceReceiver {
91 queue: self.queue.clone(),
92 terminated: false,
93
94 chunk: Vec::new(),
95 chunk_read: 0,
96 }
97 }
98
99 /// Send the given buffer to the receiver
100 pub fn send(&mut self, buf: Chunk) {
101 self.sent += buf.len();
102 self.queue.push(Some(buf));
103 }
104
105 /// Retire this `TraceSender`, returning the number of bytes we sent over its lifetime.
106 #[must_use]
107 pub fn retire(&self) -> usize {
108 self.queue.push(None);
109 self.sent
110 }
111}
112
113/// `TraceReceivers` only expose a Read interface, as it allows them to collect data in a streaming
114/// fashion from multiple buffers and notify the caller to block when there's no data available.
115impl Read for TraceReceiver {
116 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
117 // See if we have more data left to read.
118 let remaining = self.chunk.len() - self.chunk_read;
119 let write = std::cmp::min(remaining, buf.len());
120
121 if remaining > 0 {
122 // We have a chunk already that we can return data from.
123 // NOTE: Left and write sides of `copy_from_slice()` need to be the same length.
124 buf[0..write].copy_from_slice(&self.chunk[self.chunk_read..self.chunk_read + write]);
125 self.chunk_read += write;
126 return Ok(write);
127 }
128
129 if self.terminated {
130 // We've already read a None from the sender - no use in polling the queue again.
131 return Ok(0);
132 }
133
134 match self.queue.pop() {
135 Some(vec) => {
136 // We found a new chunk of data. Mark that we should consume it on the next read.
137 self.chunk = vec;
138 self.chunk_read = 0;
139 Err(std::io::Error::new(
140 std::io::ErrorKind::Interrupted,
141 "Data loaded - retry",
142 ))
143 }
144 None => {
145 self.terminated = true;
146 Ok(0)
147 }
148 }
149 }
150}