rt_history/lib.rs
1//! An RT-safe history log with error checking.
2//!
3//! This is a bounded wait-free thread synchronization primitive which allows
4//! you to record the time evolution of some data on one thread and be able to
5//! query the last N data points on other threads.
6//!
7//! By definition of wait-free synchronization, the producer and consumer
8//! threads cannot wait for each other, so in bad conditions (too small a buffer
9//! size, too low a thread's priority, too loaded a system...), two race
10//! conditions can occur:
11//!
12//! - The producer can be too fast with respect to consumers, and overwrite
13//! historical data which a consumer was still in the process of reading.
14//! This buffer overrun scenario is reported, along with the degree of overrun
15//! that occurred, which can be used to guide system parameter adjustments so
16//! that the error stops occurring.
17//! - The producer can be too slow with respect to consumers, and fail to write
18//! a sufficient amount of new data inbetween two consumer readouts. The
19//! precise definition of this buffer underrun error is workload-dependent, so
20//! we provide the right tool to detect it in the form of a latest data point
21//! timestamp, but we do not handle it ourselves.
22//!
23//! ```
24//! # use rt_history::RTHistory;
25//! let (mut input, output) = RTHistory::<u8>::new(8).split();
26//!
27//! let in_buf = [1, 2, 3];
28//! input.write(&in_buf[..]);
29//!
30//! let mut out_buf = [0; 3];
31//! assert_eq!(output.read(&mut out_buf[..]), Ok(3));
32//! assert_eq!(in_buf, out_buf);
33//! ```
34
35#![deny(missing_docs)]
36
37use atomic::{Atomic, Ordering};
38use bytemuck::NoUninit;
39use std::sync::Arc;
40use thiserror::Error;
41
42/// Re-export of the bytemuck version used by this crate
43pub use bytemuck;
44
45/// Data that is shared between the producer and the consumer
46struct SharedState<T: NoUninit + Sync> {
47 /// Circular buffer
48 data: Box<[Atomic<T>]>,
49
50 /// data.len() will always be a power of 2, with this exponent, and the
51 /// compiler can use this knowledge to compute modulos much faster
52 data_len_pow2: u32,
53
54 /// Timestamp of the last entry that has been published
55 readable: Atomic<usize>,
56
57 /// Timestamp of the last entry that has been or is being overwritten
58 writing: Atomic<usize>,
59}
60//
61impl<T: NoUninit + Sync> SharedState<T> {
62 /// Length of the inner circular buffer
63 #[inline]
64 fn data_len(&self) -> usize {
65 let data_len = 1 << self.data_len_pow2;
66 debug_assert_eq!(self.data.len(), data_len);
67 data_len
68 }
69}
70
71/// Producer interface to the history log
72pub struct Input<T: NoUninit + Sync>(Arc<SharedState<T>>);
73//
74impl<T: NoUninit + Sync> Input<T> {
75 /// Query the history capacity
76 ///
77 /// Once this number of entries has been recorded, subsequent writes will
78 /// overwrite old entries in a FIFO manner.
79 ///
80 pub fn capacity(&self) -> usize {
81 self.0.data_len()
82 }
83
84 /// Record new entries into the history log
85 ///
86 /// `input` must be shorter than the buffer length.
87 ///
88 pub fn write(&mut self, input: &[T]) {
89 // Check that the request makes sense
90 let data_len = self.0.data_len();
91 assert!(
92 input.len() <= data_len,
93 "History is shorter than provided input"
94 );
95
96 // Notify the consumer that we are writing new data
97 let old_writing = self.0.writing.load(Ordering::Relaxed);
98 let new_writing = old_writing.wrapping_add(input.len());
99 self.0.writing.store(new_writing, Ordering::Relaxed);
100
101 // Make sure that this notification is not reordered after data writes
102 atomic::fence(Ordering::Release);
103
104 // Wrap old and new writing timestamps into circular buffer range
105 let old_writing_idx = old_writing % data_len;
106 let new_writing_idx = new_writing % data_len;
107
108 // Perform the data writes
109 let first_output_len = input.len().min(data_len - old_writing_idx);
110 let first_output = &self.0.data[old_writing_idx..old_writing_idx + first_output_len];
111 let (first_input, second_input) = input.split_at(first_output_len);
112 for (src, dst) in first_input.iter().zip(first_output.iter()) {
113 // NOTE: This compiles to a memcpy with N-bit granularity. More
114 // performance can be obtained by using a regular memcpy,
115 // which will fully leverage wide hardware instructions like
116 // REP MOVS and SIMD load/stores. But unfortunately, racing
117 // memcpys are also UB in the Rust memory model. So we'd need
118 // to defer to the hardware memory model for this, which means
119 // volatile memcpy, which is nightly-only.
120 dst.store(*src, Ordering::Relaxed);
121 }
122 if first_output_len < input.len() {
123 debug_assert!(old_writing_idx >= new_writing_idx);
124 debug_assert_eq!(new_writing_idx, second_input.len());
125 let second_output = &self.0.data[..new_writing_idx];
126 for (src, dst) in second_input.iter().zip(second_output.iter()) {
127 // NOTE: See above
128 dst.store(*src, Ordering::Relaxed);
129 }
130 }
131
132 // Notify the consumer that new data has been published, make sure that
133 // this write is ordered after previous data writes
134 if cfg!(debug_assertions) {
135 assert_eq!(self.0.readable.load(Ordering::Relaxed), old_writing);
136 }
137 self.0.readable.store(new_writing, Ordering::Release);
138 }
139}
140
141/// Number of entries that the producer wrote so far (with wraparound)
142///
143/// Differences between two consecutive readouts of this counter can be used to
144/// tell how many new entries arrived between two readouts, and detect buffer
145/// underruns where the producer wrote too few data. How few is too few is
146/// workload-dependent, so we do not implement this logic ourselves.
147///
148/// This counter may wrap around from time to time, every couple of hours for a
149/// mono audio stream on a 32-bit CPU. Use wrapping_sub() for deltas.
150///
151pub type Clock = usize;
152
153/// Indication that a buffer overrun occured
154///
155/// This means that the producer wrote over the data that the consumer was in
156/// the process of reading. Possible fixes include making the buffer larger,
157/// speeding up the consumer, and slowing down the producer, in order of
158/// decreasing preference.
159///
160#[derive(Clone, Copy, Debug, Error, PartialEq, Eq)]
161#[error("A buffer overrun occured while reading history of entry {clock}, writer overwrote {excess_entries} entries")]
162pub struct Overrun {
163 /// Number of entries that were fully written by the producer at the time
164 /// where the consumer started reading out data
165 pub clock: Clock,
166
167 /// Number of early history entries that were overwritten by the producer
168 /// while the consumer was in the process of reading out data.
169 ///
170 /// This can be higher than the number of entries which the consumer
171 /// requested. The intent is to provide a quantitative indication of how
172 /// late the consumer is with respect to the producer, so that the buffer
173 /// size can be increased by a matching amount if need be.
174 ///
175 pub excess_entries: Clock,
176}
177
178/// Consumer interface to the history log
179#[derive(Clone)]
180pub struct Output<T: NoUninit + Sync>(Arc<SharedState<T>>);
181//
182impl<T: NoUninit + Sync> Output<T> {
183 /// Query the history capacity
184 ///
185 /// Once this number of entries has been recorded, subsequent writes will
186 /// overwrite old entries in a FIFO manner.
187 ///
188 pub fn capacity(&self) -> usize {
189 self.0.data_len()
190 }
191
192 /// Read the last N entries, check timestamp and errors
193 ///
194 /// `output` must be shorter than the buffer length.
195 ///
196 /// On successful readout, this method returns the timestamp of the latest
197 /// entry, which can be used to check how many samples were written by the
198 /// producer since the previous readout.
199 ///
200 /// If the producer overwrote some of the entries that were read (a scenario
201 /// known as a buffer overrun, this error is reported, along with an
202 /// indication of how late the consumer is with respect to the producer).
203 ///
204 pub fn read(&self, output: &mut [T]) -> Result<Clock, Overrun> {
205 // Check that the request makes sense
206 let data_len = self.0.data_len();
207 assert!(
208 output.len() <= data_len,
209 "History is shorter than requested output"
210 );
211
212 // Check the timestamp of the last published data point, and make sure
213 // this read is ordered before subsequent data reads
214 let last_readable = self.0.readable.load(Ordering::Acquire);
215
216 // Deduce the timestamp of the first data point that we're interested in
217 let first_readable = last_readable.wrapping_sub(output.len());
218
219 // Wrap old and new timestamps into circular buffer range
220 let last_readable_idx = last_readable % data_len;
221 let first_readable_idx = first_readable % data_len;
222
223 // Perform the data reads
224 let output_len = output.len();
225 let first_input_len = output_len.min(data_len - first_readable_idx);
226 let first_input = &self.0.data[first_readable_idx..first_readable_idx + first_input_len];
227 let (first_output, second_output) = output.split_at_mut(first_input_len);
228 for (src, dst) in first_input.iter().zip(first_output.iter_mut()) {
229 // NOTE: See write()
230 *dst = src.load(Ordering::Relaxed);
231 }
232 if first_input_len < output_len {
233 debug_assert!(first_readable_idx >= last_readable_idx);
234 debug_assert_eq!(last_readable_idx, second_output.len());
235 let second_input = &self.0.data[..last_readable_idx];
236 for (src, dst) in second_input.iter().zip(second_output.iter_mut()) {
237 // NOTE: See write()
238 *dst = src.load(Ordering::Relaxed);
239 }
240 }
241
242 // Make sure the producer did not concurrently overwrite our data.
243 // This overrun check must be ordered after the previous data reads.
244 atomic::fence(Ordering::Acquire);
245 let last_writing = self.0.writing.load(Ordering::Relaxed);
246 let excess_entries = last_writing
247 .wrapping_sub(first_readable)
248 .saturating_sub(data_len);
249
250 // Produce final result
251 if excess_entries > 0 {
252 Err(Overrun {
253 clock: last_readable,
254 excess_entries,
255 })
256 } else {
257 Ok(last_readable)
258 }
259 }
260}
261
262/// A realtime-safe (bounded wait-free) history log
263pub struct RTHistory<T: NoUninit + Sync>(Arc<SharedState<T>>);
264//
265impl<T: NoUninit + Default + Sync> RTHistory<T> {
266 /// Build a history log that can hold a certain number of entries
267 ///
268 /// To avoid data corruption (buffer overruns), the log must be
269 /// significantly larger than the size of the largest read to be performed
270 /// by the consumer. If you are not tight on RAM, a safe starting point is
271 /// to make it three times as large.
272 ///
273 /// For efficiency reason, the actual history buffer size will be rounded to
274 /// the next power of two.
275 ///
276 pub fn new(min_entries: usize) -> Self {
277 assert!(
278 Atomic::<T>::is_lock_free(),
279 "Cannot build a lock-free history log if type T does not have lock-free atomics"
280 );
281 let data_len = min_entries.next_power_of_two();
282 let data_len_pow2 = data_len.trailing_zeros();
283 Self(Arc::new(SharedState {
284 data: std::iter::repeat_with(Atomic::<T>::default)
285 .take(data_len)
286 .collect(),
287 data_len_pow2,
288 readable: Atomic::<usize>::new(0),
289 writing: Atomic::<usize>::new(0),
290 }))
291 }
292}
293//
294impl<T: NoUninit + Sync> RTHistory<T> {
295 /// Query the history capacity
296 ///
297 /// Once this number of entries has been recorded, subsequent writes will
298 /// overwrite old entries in a FIFO manner.
299 ///
300 pub fn capacity(&self) -> usize {
301 self.0.data_len()
302 }
303
304 /// Split the history log into a producer and consumer interface
305 pub fn split(self) -> (Input<T>, Output<T>) {
306 (Input(self.0.clone()), Output(self.0))
307 }
308}
309
310#[cfg(test)]
311mod tests {
312 use super::*;
313 use proptest::prelude::*;
314 use std::{
315 panic::{self, AssertUnwindSafe},
316 sync::atomic::AtomicUsize,
317 };
318
319 // Number of entries in an history buffer/query
320 //
321 // Can't go arbitrarily high, otherwise the test will take forever to run or
322 // exhaust all available RAM in trying to do so ...
323 fn num_entries() -> impl Strategy<Value = usize> {
324 0usize..(1024 * 1024)
325 }
326
327 proptest! {
328 #[test]
329 fn new_split_clock(min_entries in num_entries()) {
330 // Check initial state
331 let history = RTHistory::<f32>::new(min_entries);
332 prop_assert!(history
333 .0
334 .data
335 .iter()
336 .map(|x| x.load(Ordering::Relaxed))
337 .all(|f| f == 0.0));
338 prop_assert_eq!(history.0.data_len(), min_entries.next_power_of_two());
339 prop_assert_eq!(history.0.data_len(), history.0.data.len());
340 prop_assert_eq!(history.0.readable.load(Ordering::Relaxed), 0);
341 prop_assert_eq!(history.0.writing.load(Ordering::Relaxed), 0);
342
343 // Split producer and consumer interface
344 let (input, output) = history.split();
345
346 // Check that they point to the same thing
347 prop_assert_eq!(&*input.0 as *const _, &*output.0 as *const _);
348 }
349
350 #[test]
351 fn writes_and_read(
352 min_entries in num_entries(),
353 write1: Vec<u32>,
354 write2: Vec<u32>,
355 read_size in num_entries(),
356 ) {
357 // Set up a history log
358 let (mut input, output) = RTHistory::<u32>::new(min_entries).split();
359
360 // Attempt a write, which will fail if and only if the input data is
361 // larger than the history log's inner buffer.
362 macro_rules! checked_write {
363 ($write:expr, $input:expr) => {
364 if let Err(_) = panic::catch_unwind(AssertUnwindSafe(|| $input.write(&$write[..])))
365 {
366 prop_assert!($write.len() > $input.0.data_len());
367 return Ok(());
368 }
369 prop_assert!($write.len() <= $input.0.data_len());
370 };
371 }
372 checked_write!(write1, input);
373
374 // Check the final buffer state
375 prop_assert!(input
376 .0
377 .data
378 .iter()
379 .take(write1.len())
380 .zip(&write1)
381 .all(|(a, &x)| a.load(Ordering::Relaxed) == x));
382 prop_assert!(input
383 .0
384 .data
385 .iter()
386 .skip(write1.len())
387 .all(|a| a.load(Ordering::Relaxed) == 0));
388 prop_assert_eq!(input.0.readable.load(Ordering::Relaxed), write1.len());
389 prop_assert_eq!(input.0.writing.load(Ordering::Relaxed), write1.len());
390
391 // Perform another write, check state again
392 checked_write!(write2, input);
393 let new_readable = write1.len() + write2.len();
394 let data_len = input.0.data_len();
395 let overwritten = new_readable.saturating_sub(data_len);
396 prop_assert!(input
397 .0
398 .data
399 .iter()
400 .take(overwritten)
401 .zip(write2[write2.len() - overwritten..].iter())
402 .all(|(a, &x2)| a.load(Ordering::Relaxed) == x2));
403 prop_assert!(input
404 .0
405 .data
406 .iter()
407 .skip(overwritten)
408 .take(write1.len().saturating_sub(overwritten))
409 .zip(write1.iter().skip(overwritten))
410 .all(|(a, &x1)| a.load(Ordering::Relaxed) == x1));
411 prop_assert!(input
412 .0
413 .data
414 .iter()
415 .skip(write1.len())
416 .take(write2.len())
417 .zip(&write2)
418 .all(|(a, &x2)| a.load(Ordering::Relaxed) == x2));
419 prop_assert!(input
420 .0
421 .data
422 .iter()
423 .skip(new_readable)
424 .all(|a| a.load(Ordering::Relaxed) == 0));
425 prop_assert_eq!(input.0.readable.load(Ordering::Relaxed), new_readable);
426 prop_assert_eq!(input.0.writing.load(Ordering::Relaxed), new_readable);
427
428 // Back up current ring buffer state
429 let data_backup = input
430 .0
431 .data
432 .iter()
433 .map(|a| a.load(Ordering::Relaxed))
434 .collect::<Box<[_]>>();
435
436 // Read some data back and make sure no overrun happened (it is
437 // impossible in single-threaded code)
438 let mut read = vec![0; read_size];
439 match panic::catch_unwind(AssertUnwindSafe(|| output.read(&mut read[..]))) {
440 Err(_) => {
441 prop_assert!(read.len() > output.0.data_len());
442 return Ok(());
443 }
444 Ok(result) => {
445 prop_assert!(read.len() <= output.0.data_len());
446 prop_assert_eq!(result, Ok(new_readable));
447 }
448 }
449
450 // Make sure that the "read" did not alter the history data
451 prop_assert!(input
452 .0
453 .data
454 .iter()
455 .zip(data_backup.iter().copied())
456 .all(|(a, x)| a.load(Ordering::Relaxed) == x));
457 prop_assert_eq!(input.0.readable.load(Ordering::Relaxed), new_readable);
458 prop_assert_eq!(input.0.writing.load(Ordering::Relaxed), new_readable);
459
460 // Check that the collected output is correct
461 prop_assert!(read
462 .iter()
463 .rev()
464 .zip(
465 write2
466 .iter()
467 .rev()
468 .chain(write1.iter().rev())
469 .chain(std::iter::repeat(&0))
470 )
471 .all(|(&x, &y)| x == y));
472 }
473 }
474
475 // This test is ignored because it needs a special configuration:
476 // - Be built in release mode
477 // - Be run on a machine with low background load
478 // - 2+ physical cores are needed to validate basic 1 producer / 1 consumer
479 // operation, and 3+ physical cores to validate multi-consumer operation.
480 #[test]
481 #[ignore]
482 fn concurrent_test() {
483 const PRODUCER_SIZE: usize = 1 << 3; // One 64B cache line
484 const CONSUMER_SIZE: usize = PRODUCER_SIZE << 1; // To observe partial overrun
485 const BUFFER_SIZE: usize = CONSUMER_SIZE << 2; // 2x = too many overruns
486
487 const NUM_ELEMS: usize = 1 << 31;
488
489 let (mut input, output) = RTHistory::<u64>::new(BUFFER_SIZE).split();
490
491 // The producer simply emits regularly increasing counter values.
492 // This makes consumer validation as it means value == associated clock.
493 // It also regularly sleeps to trigger some buffer underruns.
494 let mut last_emitted = 0;
495 let producer = move || {
496 while last_emitted < NUM_ELEMS as u64 {
497 let mut buf = [0; PRODUCER_SIZE];
498 for dst in &mut buf {
499 last_emitted += 1;
500 *dst = last_emitted;
501 }
502 input.write(&buf[..]);
503 }
504 };
505
506 // Consumers detect underruns and overruns, and asserts that for data
507 // which has not been corrupted by an overrun, value == clock holds.
508 #[allow(clippy::declare_interior_mutable_const)]
509 const XRUN_CTR_INIT: AtomicUsize = AtomicUsize::new(0);
510 let underrun_ctrs = [XRUN_CTR_INIT; 2];
511 let overrun_ctrs = [XRUN_CTR_INIT; 2];
512 let gen_consumer = |idx| {
513 let num_underruns: &AtomicUsize = &underrun_ctrs[idx];
514 let num_overruns: &AtomicUsize = &overrun_ctrs[idx];
515 let output = output.clone();
516 move || {
517 let mut last_clock = 0;
518 let mut last_underrun = usize::MAX;
519 let mut buf = [0; CONSUMER_SIZE];
520 while last_clock < NUM_ELEMS {
521 // Fetch latest batch from producer, return clock and the
522 // valid subset of data that hasn't been corrupted by a
523 // buffer overrun.
524 let (clock, valid) = match output.read(&mut buf[..]) {
525 // In absence of overrun, all data is valid
526 Ok(clock) => (clock, &buf[..]),
527
528 // When an overrun occurs...
529 Err(Overrun {
530 clock,
531 excess_entries,
532 }) => {
533 // Check the number of excess entries makes sense
534 assert!(excess_entries > 0);
535 assert_eq!(excess_entries % PRODUCER_SIZE, 0);
536
537 // Keep track of number of overruns (increment does
538 // not need to be atomic because the value will only
539 // be read out in the end).
540 num_overruns
541 .store(num_overruns.load(Ordering::Relaxed) + 1, Ordering::Relaxed);
542
543 // Extract valid subset of data
544 if excess_entries < buf.len() {
545 (clock, &buf[excess_entries + 1..])
546 } else {
547 (clock, &[][..])
548 }
549 }
550 };
551
552 // Monitor clock progression
553 if clock == last_clock {
554 // An unchanging clock means a buffer underrun occured,
555 // make sure we only record each of them once.
556 if clock != last_underrun {
557 num_underruns.store(
558 num_underruns.load(Ordering::Relaxed) + 1,
559 Ordering::Relaxed,
560 );
561 last_underrun = clock;
562 }
563 } else {
564 // The clock can only move forward
565 assert!(clock > last_clock);
566 last_clock = clock;
567 }
568
569 // Check that value == clock property is honored
570 for (expected, &actual) in (1..=clock).rev().zip(valid.iter().rev()) {
571 assert_eq!(expected as u64, actual);
572 }
573 }
574 }
575 };
576
577 // Run the test
578 testbench::concurrent_test_3(producer, gen_consumer(0), gen_consumer(1));
579
580 // Check that a significant number of xruns were detected, but that
581 // there was a significant number of "normal" runs too.
582 const NUM_READOUTS: usize = NUM_ELEMS / CONSUMER_SIZE;
583 underrun_ctrs
584 .into_iter()
585 .map(|a| a.load(Ordering::Relaxed))
586 .enumerate()
587 .for_each(|(idx, underrun_ctr)| {
588 println!(
589 "consumer {}: {}/{} underruns",
590 idx, underrun_ctr, NUM_READOUTS
591 );
592 assert!(underrun_ctr > NUM_READOUTS / 10000);
593 assert!(underrun_ctr < NUM_READOUTS / 5);
594 });
595 overrun_ctrs
596 .into_iter()
597 .map(|a| a.load(Ordering::Relaxed))
598 .enumerate()
599 .for_each(|(idx, overrun_ctr)| {
600 println!(
601 "consumer {}: {}/{} overruns",
602 idx, overrun_ctr, NUM_READOUTS
603 );
604 assert!(overrun_ctr > NUM_READOUTS / 10000);
605 assert!(overrun_ctr < NUM_READOUTS / 5);
606 });
607 }
608}