rt_history/
lib.rs

1//! An RT-safe history log with error checking.
2//!
3//! This is a bounded wait-free thread synchronization primitive which allows
4//! you to record the time evolution of some data on one thread and be able to
5//! query the last N data points on other threads.
6//!
7//! By definition of wait-free synchronization, the producer and consumer
8//! threads cannot wait for each other, so in bad conditions (too small a buffer
9//! size, too low a thread's priority, too loaded a system...), two race
10//! conditions can occur:
11//!
12//! - The producer can be too fast with respect to consumers, and overwrite
13//!   historical data which a consumer was still in the process of reading.
14//!   This buffer overrun scenario is reported, along with the degree of overrun
15//!   that occurred, which can be used to guide system parameter adjustments so
16//!   that the error stops occurring.
17//! - The producer can be too slow with respect to consumers, and fail to write
18//!   a sufficient amount of new data inbetween two consumer readouts. The
19//!   precise definition of this buffer underrun error is workload-dependent, so
20//!   we provide the right tool to detect it in the form of a latest data point
21//!   timestamp, but we do not handle it ourselves.
22//!
23//! ```
24//! # use rt_history::RTHistory;
25//! let (mut input, output) = RTHistory::<u8>::new(8).split();
26//!
27//! let in_buf = [1, 2, 3];
28//! input.write(&in_buf[..]);
29//!
30//! let mut out_buf = [0; 3];
31//! assert_eq!(output.read(&mut out_buf[..]), Ok(3));
32//! assert_eq!(in_buf, out_buf);
33//! ```
34
35#![deny(missing_docs)]
36
37use atomic::{Atomic, Ordering};
38use bytemuck::NoUninit;
39use std::sync::Arc;
40use thiserror::Error;
41
42/// Re-export of the bytemuck version used by this crate
43pub use bytemuck;
44
45/// Data that is shared between the producer and the consumer
46struct SharedState<T: NoUninit + Sync> {
47    /// Circular buffer
48    data: Box<[Atomic<T>]>,
49
50    /// data.len() will always be a power of 2, with this exponent, and the
51    /// compiler can use this knowledge to compute modulos much faster
52    data_len_pow2: u32,
53
54    /// Timestamp of the last entry that has been published
55    readable: Atomic<usize>,
56
57    /// Timestamp of the last entry that has been or is being overwritten
58    writing: Atomic<usize>,
59}
60//
61impl<T: NoUninit + Sync> SharedState<T> {
62    /// Length of the inner circular buffer
63    #[inline]
64    fn data_len(&self) -> usize {
65        let data_len = 1 << self.data_len_pow2;
66        debug_assert_eq!(self.data.len(), data_len);
67        data_len
68    }
69}
70
71/// Producer interface to the history log
72pub struct Input<T: NoUninit + Sync>(Arc<SharedState<T>>);
73//
74impl<T: NoUninit + Sync> Input<T> {
75    /// Query the history capacity
76    ///
77    /// Once this number of entries has been recorded, subsequent writes will
78    /// overwrite old entries in a FIFO manner.
79    ///
80    pub fn capacity(&self) -> usize {
81        self.0.data_len()
82    }
83
84    /// Record new entries into the history log
85    ///
86    /// `input` must be shorter than the buffer length.
87    ///
88    pub fn write(&mut self, input: &[T]) {
89        // Check that the request makes sense
90        let data_len = self.0.data_len();
91        assert!(
92            input.len() <= data_len,
93            "History is shorter than provided input"
94        );
95
96        // Notify the consumer that we are writing new data
97        let old_writing = self.0.writing.load(Ordering::Relaxed);
98        let new_writing = old_writing.wrapping_add(input.len());
99        self.0.writing.store(new_writing, Ordering::Relaxed);
100
101        // Make sure that this notification is not reordered after data writes
102        atomic::fence(Ordering::Release);
103
104        // Wrap old and new writing timestamps into circular buffer range
105        let old_writing_idx = old_writing % data_len;
106        let new_writing_idx = new_writing % data_len;
107
108        // Perform the data writes
109        let first_output_len = input.len().min(data_len - old_writing_idx);
110        let first_output = &self.0.data[old_writing_idx..old_writing_idx + first_output_len];
111        let (first_input, second_input) = input.split_at(first_output_len);
112        for (src, dst) in first_input.iter().zip(first_output.iter()) {
113            // NOTE: This compiles to a memcpy with N-bit granularity. More
114            //       performance can be obtained by using a regular memcpy,
115            //       which will fully leverage wide hardware instructions like
116            //       REP MOVS and SIMD load/stores. But unfortunately, racing
117            //       memcpys are also UB in the Rust memory model. So we'd need
118            //       to defer to the hardware memory model for this, which means
119            //       volatile memcpy, which is nightly-only.
120            dst.store(*src, Ordering::Relaxed);
121        }
122        if first_output_len < input.len() {
123            debug_assert!(old_writing_idx >= new_writing_idx);
124            debug_assert_eq!(new_writing_idx, second_input.len());
125            let second_output = &self.0.data[..new_writing_idx];
126            for (src, dst) in second_input.iter().zip(second_output.iter()) {
127                // NOTE: See above
128                dst.store(*src, Ordering::Relaxed);
129            }
130        }
131
132        // Notify the consumer that new data has been published, make sure that
133        // this write is ordered after previous data writes
134        if cfg!(debug_assertions) {
135            assert_eq!(self.0.readable.load(Ordering::Relaxed), old_writing);
136        }
137        self.0.readable.store(new_writing, Ordering::Release);
138    }
139}
140
141/// Number of entries that the producer wrote so far (with wraparound)
142///
143/// Differences between two consecutive readouts of this counter can be used to
144/// tell how many new entries arrived between two readouts, and detect buffer
145/// underruns where the producer wrote too few data. How few is too few is
146/// workload-dependent, so we do not implement this logic ourselves.
147///
148/// This counter may wrap around from time to time, every couple of hours for a
149/// mono audio stream on a 32-bit CPU. Use wrapping_sub() for deltas.
150///
151pub type Clock = usize;
152
153/// Indication that a buffer overrun occured
154///
155/// This means that the producer wrote over the data that the consumer was in
156/// the process of reading. Possible fixes include making the buffer larger,
157/// speeding up the consumer, and slowing down the producer, in order of
158/// decreasing preference.
159///
160#[derive(Clone, Copy, Debug, Error, PartialEq, Eq)]
161#[error("A buffer overrun occured while reading history of entry {clock}, writer overwrote {excess_entries} entries")]
162pub struct Overrun {
163    /// Number of entries that were fully written by the producer at the time
164    /// where the consumer started reading out data
165    pub clock: Clock,
166
167    /// Number of early history entries that were overwritten by the producer
168    /// while the consumer was in the process of reading out data.
169    ///
170    /// This can be higher than the number of entries which the consumer
171    /// requested. The intent is to provide a quantitative indication of how
172    /// late the consumer is with respect to the producer, so that the buffer
173    /// size can be increased by a matching amount if need be.
174    ///
175    pub excess_entries: Clock,
176}
177
178/// Consumer interface to the history log
179#[derive(Clone)]
180pub struct Output<T: NoUninit + Sync>(Arc<SharedState<T>>);
181//
182impl<T: NoUninit + Sync> Output<T> {
183    /// Query the history capacity
184    ///
185    /// Once this number of entries has been recorded, subsequent writes will
186    /// overwrite old entries in a FIFO manner.
187    ///
188    pub fn capacity(&self) -> usize {
189        self.0.data_len()
190    }
191
192    /// Read the last N entries, check timestamp and errors
193    ///
194    /// `output` must be shorter than the buffer length.
195    ///
196    /// On successful readout, this method returns the timestamp of the latest
197    /// entry, which can be used to check how many samples were written by the
198    /// producer since the previous readout.
199    ///
200    /// If the producer overwrote some of the entries that were read (a scenario
201    /// known as a buffer overrun, this error is reported, along with an
202    /// indication of how late the consumer is with respect to the producer).
203    ///
204    pub fn read(&self, output: &mut [T]) -> Result<Clock, Overrun> {
205        // Check that the request makes sense
206        let data_len = self.0.data_len();
207        assert!(
208            output.len() <= data_len,
209            "History is shorter than requested output"
210        );
211
212        // Check the timestamp of the last published data point, and make sure
213        // this read is ordered before subsequent data reads
214        let last_readable = self.0.readable.load(Ordering::Acquire);
215
216        // Deduce the timestamp of the first data point that we're interested in
217        let first_readable = last_readable.wrapping_sub(output.len());
218
219        // Wrap old and new timestamps into circular buffer range
220        let last_readable_idx = last_readable % data_len;
221        let first_readable_idx = first_readable % data_len;
222
223        // Perform the data reads
224        let output_len = output.len();
225        let first_input_len = output_len.min(data_len - first_readable_idx);
226        let first_input = &self.0.data[first_readable_idx..first_readable_idx + first_input_len];
227        let (first_output, second_output) = output.split_at_mut(first_input_len);
228        for (src, dst) in first_input.iter().zip(first_output.iter_mut()) {
229            // NOTE: See write()
230            *dst = src.load(Ordering::Relaxed);
231        }
232        if first_input_len < output_len {
233            debug_assert!(first_readable_idx >= last_readable_idx);
234            debug_assert_eq!(last_readable_idx, second_output.len());
235            let second_input = &self.0.data[..last_readable_idx];
236            for (src, dst) in second_input.iter().zip(second_output.iter_mut()) {
237                // NOTE: See write()
238                *dst = src.load(Ordering::Relaxed);
239            }
240        }
241
242        // Make sure the producer did not concurrently overwrite our data.
243        // This overrun check must be ordered after the previous data reads.
244        atomic::fence(Ordering::Acquire);
245        let last_writing = self.0.writing.load(Ordering::Relaxed);
246        let excess_entries = last_writing
247            .wrapping_sub(first_readable)
248            .saturating_sub(data_len);
249
250        // Produce final result
251        if excess_entries > 0 {
252            Err(Overrun {
253                clock: last_readable,
254                excess_entries,
255            })
256        } else {
257            Ok(last_readable)
258        }
259    }
260}
261
262/// A realtime-safe (bounded wait-free) history log
263pub struct RTHistory<T: NoUninit + Sync>(Arc<SharedState<T>>);
264//
265impl<T: NoUninit + Default + Sync> RTHistory<T> {
266    /// Build a history log that can hold a certain number of entries
267    ///
268    /// To avoid data corruption (buffer overruns), the log must be
269    /// significantly larger than the size of the largest read to be performed
270    /// by the consumer. If you are not tight on RAM, a safe starting point is
271    /// to make it three times as large.
272    ///
273    /// For efficiency reason, the actual history buffer size will be rounded to
274    /// the next power of two.
275    ///
276    pub fn new(min_entries: usize) -> Self {
277        assert!(
278            Atomic::<T>::is_lock_free(),
279            "Cannot build a lock-free history log if type T does not have lock-free atomics"
280        );
281        let data_len = min_entries.next_power_of_two();
282        let data_len_pow2 = data_len.trailing_zeros();
283        Self(Arc::new(SharedState {
284            data: std::iter::repeat_with(Atomic::<T>::default)
285                .take(data_len)
286                .collect(),
287            data_len_pow2,
288            readable: Atomic::<usize>::new(0),
289            writing: Atomic::<usize>::new(0),
290        }))
291    }
292}
293//
294impl<T: NoUninit + Sync> RTHistory<T> {
295    /// Query the history capacity
296    ///
297    /// Once this number of entries has been recorded, subsequent writes will
298    /// overwrite old entries in a FIFO manner.
299    ///
300    pub fn capacity(&self) -> usize {
301        self.0.data_len()
302    }
303
304    /// Split the history log into a producer and consumer interface
305    pub fn split(self) -> (Input<T>, Output<T>) {
306        (Input(self.0.clone()), Output(self.0))
307    }
308}
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313    use proptest::prelude::*;
314    use std::{
315        panic::{self, AssertUnwindSafe},
316        sync::atomic::AtomicUsize,
317    };
318
319    // Number of entries in an history buffer/query
320    //
321    // Can't go arbitrarily high, otherwise the test will take forever to run or
322    // exhaust all available RAM in trying to do so ...
323    fn num_entries() -> impl Strategy<Value = usize> {
324        0usize..(1024 * 1024)
325    }
326
327    proptest! {
328        #[test]
329        fn new_split_clock(min_entries in num_entries()) {
330            // Check initial state
331            let history = RTHistory::<f32>::new(min_entries);
332            prop_assert!(history
333                .0
334                .data
335                .iter()
336                .map(|x| x.load(Ordering::Relaxed))
337                .all(|f| f == 0.0));
338            prop_assert_eq!(history.0.data_len(), min_entries.next_power_of_two());
339            prop_assert_eq!(history.0.data_len(), history.0.data.len());
340            prop_assert_eq!(history.0.readable.load(Ordering::Relaxed), 0);
341            prop_assert_eq!(history.0.writing.load(Ordering::Relaxed), 0);
342
343            // Split producer and consumer interface
344            let (input, output) = history.split();
345
346            // Check that they point to the same thing
347            prop_assert_eq!(&*input.0 as *const _, &*output.0 as *const _);
348        }
349
350        #[test]
351        fn writes_and_read(
352            min_entries in num_entries(),
353            write1: Vec<u32>,
354            write2: Vec<u32>,
355            read_size in num_entries(),
356        ) {
357            // Set up a history log
358            let (mut input, output) = RTHistory::<u32>::new(min_entries).split();
359
360            // Attempt a write, which will fail if and only if the input data is
361            // larger than the history log's inner buffer.
362            macro_rules! checked_write {
363                ($write:expr, $input:expr) => {
364                    if let Err(_) = panic::catch_unwind(AssertUnwindSafe(|| $input.write(&$write[..])))
365                    {
366                        prop_assert!($write.len() > $input.0.data_len());
367                        return Ok(());
368                    }
369                    prop_assert!($write.len() <= $input.0.data_len());
370                };
371            }
372            checked_write!(write1, input);
373
374            // Check the final buffer state
375            prop_assert!(input
376                .0
377                .data
378                .iter()
379                .take(write1.len())
380                .zip(&write1)
381                .all(|(a, &x)| a.load(Ordering::Relaxed) == x));
382            prop_assert!(input
383                .0
384                .data
385                .iter()
386                .skip(write1.len())
387                .all(|a| a.load(Ordering::Relaxed) == 0));
388            prop_assert_eq!(input.0.readable.load(Ordering::Relaxed), write1.len());
389            prop_assert_eq!(input.0.writing.load(Ordering::Relaxed), write1.len());
390
391            // Perform another write, check state again
392            checked_write!(write2, input);
393            let new_readable = write1.len() + write2.len();
394            let data_len = input.0.data_len();
395            let overwritten = new_readable.saturating_sub(data_len);
396            prop_assert!(input
397                .0
398                .data
399                .iter()
400                .take(overwritten)
401                .zip(write2[write2.len() - overwritten..].iter())
402                .all(|(a, &x2)| a.load(Ordering::Relaxed) == x2));
403            prop_assert!(input
404                .0
405                .data
406                .iter()
407                .skip(overwritten)
408                .take(write1.len().saturating_sub(overwritten))
409                .zip(write1.iter().skip(overwritten))
410                .all(|(a, &x1)| a.load(Ordering::Relaxed) == x1));
411            prop_assert!(input
412                .0
413                .data
414                .iter()
415                .skip(write1.len())
416                .take(write2.len())
417                .zip(&write2)
418                .all(|(a, &x2)| a.load(Ordering::Relaxed) == x2));
419            prop_assert!(input
420                .0
421                .data
422                .iter()
423                .skip(new_readable)
424                .all(|a| a.load(Ordering::Relaxed) == 0));
425            prop_assert_eq!(input.0.readable.load(Ordering::Relaxed), new_readable);
426            prop_assert_eq!(input.0.writing.load(Ordering::Relaxed), new_readable);
427
428            // Back up current ring buffer state
429            let data_backup = input
430                .0
431                .data
432                .iter()
433                .map(|a| a.load(Ordering::Relaxed))
434                .collect::<Box<[_]>>();
435
436            // Read some data back and make sure no overrun happened (it is
437            // impossible in single-threaded code)
438            let mut read = vec![0; read_size];
439            match panic::catch_unwind(AssertUnwindSafe(|| output.read(&mut read[..]))) {
440                Err(_) => {
441                    prop_assert!(read.len() > output.0.data_len());
442                    return Ok(());
443                }
444                Ok(result) => {
445                    prop_assert!(read.len() <= output.0.data_len());
446                    prop_assert_eq!(result, Ok(new_readable));
447                }
448            }
449
450            // Make sure that the "read" did not alter the history data
451            prop_assert!(input
452                .0
453                .data
454                .iter()
455                .zip(data_backup.iter().copied())
456                .all(|(a, x)| a.load(Ordering::Relaxed) == x));
457            prop_assert_eq!(input.0.readable.load(Ordering::Relaxed), new_readable);
458            prop_assert_eq!(input.0.writing.load(Ordering::Relaxed), new_readable);
459
460            // Check that the collected output is correct
461            prop_assert!(read
462                .iter()
463                .rev()
464                .zip(
465                    write2
466                        .iter()
467                        .rev()
468                        .chain(write1.iter().rev())
469                        .chain(std::iter::repeat(&0))
470                )
471                .all(|(&x, &y)| x == y));
472        }
473    }
474
475    // This test is ignored because it needs a special configuration:
476    // - Be built in release mode
477    // - Be run on a machine with low background load
478    // - 2+ physical cores are needed to validate basic 1 producer / 1 consumer
479    //   operation, and 3+ physical cores to validate multi-consumer operation.
480    #[test]
481    #[ignore]
482    fn concurrent_test() {
483        const PRODUCER_SIZE: usize = 1 << 3; // One 64B cache line
484        const CONSUMER_SIZE: usize = PRODUCER_SIZE << 1; // To observe partial overrun
485        const BUFFER_SIZE: usize = CONSUMER_SIZE << 2; // 2x = too many overruns
486
487        const NUM_ELEMS: usize = 1 << 31;
488
489        let (mut input, output) = RTHistory::<u64>::new(BUFFER_SIZE).split();
490
491        // The producer simply emits regularly increasing counter values.
492        // This makes consumer validation as it means value == associated clock.
493        // It also regularly sleeps to trigger some buffer underruns.
494        let mut last_emitted = 0;
495        let producer = move || {
496            while last_emitted < NUM_ELEMS as u64 {
497                let mut buf = [0; PRODUCER_SIZE];
498                for dst in &mut buf {
499                    last_emitted += 1;
500                    *dst = last_emitted;
501                }
502                input.write(&buf[..]);
503            }
504        };
505
506        // Consumers detect underruns and overruns, and asserts that for data
507        // which has not been corrupted by an overrun, value == clock holds.
508        #[allow(clippy::declare_interior_mutable_const)]
509        const XRUN_CTR_INIT: AtomicUsize = AtomicUsize::new(0);
510        let underrun_ctrs = [XRUN_CTR_INIT; 2];
511        let overrun_ctrs = [XRUN_CTR_INIT; 2];
512        let gen_consumer = |idx| {
513            let num_underruns: &AtomicUsize = &underrun_ctrs[idx];
514            let num_overruns: &AtomicUsize = &overrun_ctrs[idx];
515            let output = output.clone();
516            move || {
517                let mut last_clock = 0;
518                let mut last_underrun = usize::MAX;
519                let mut buf = [0; CONSUMER_SIZE];
520                while last_clock < NUM_ELEMS {
521                    // Fetch latest batch from producer, return clock and the
522                    // valid subset of data that hasn't been corrupted by a
523                    // buffer overrun.
524                    let (clock, valid) = match output.read(&mut buf[..]) {
525                        // In absence of overrun, all data is valid
526                        Ok(clock) => (clock, &buf[..]),
527
528                        // When an overrun occurs...
529                        Err(Overrun {
530                            clock,
531                            excess_entries,
532                        }) => {
533                            // Check the number of excess entries makes sense
534                            assert!(excess_entries > 0);
535                            assert_eq!(excess_entries % PRODUCER_SIZE, 0);
536
537                            // Keep track of number of overruns (increment does
538                            // not need to be atomic because the value will only
539                            // be read out in the end).
540                            num_overruns
541                                .store(num_overruns.load(Ordering::Relaxed) + 1, Ordering::Relaxed);
542
543                            // Extract valid subset of data
544                            if excess_entries < buf.len() {
545                                (clock, &buf[excess_entries + 1..])
546                            } else {
547                                (clock, &[][..])
548                            }
549                        }
550                    };
551
552                    // Monitor clock progression
553                    if clock == last_clock {
554                        // An unchanging clock means a buffer underrun occured,
555                        // make sure we only record each of them once.
556                        if clock != last_underrun {
557                            num_underruns.store(
558                                num_underruns.load(Ordering::Relaxed) + 1,
559                                Ordering::Relaxed,
560                            );
561                            last_underrun = clock;
562                        }
563                    } else {
564                        // The clock can only move forward
565                        assert!(clock > last_clock);
566                        last_clock = clock;
567                    }
568
569                    // Check that value == clock property is honored
570                    for (expected, &actual) in (1..=clock).rev().zip(valid.iter().rev()) {
571                        assert_eq!(expected as u64, actual);
572                    }
573                }
574            }
575        };
576
577        // Run the test
578        testbench::concurrent_test_3(producer, gen_consumer(0), gen_consumer(1));
579
580        // Check that a significant number of xruns were detected, but that
581        // there was a significant number of "normal" runs too.
582        const NUM_READOUTS: usize = NUM_ELEMS / CONSUMER_SIZE;
583        underrun_ctrs
584            .into_iter()
585            .map(|a| a.load(Ordering::Relaxed))
586            .enumerate()
587            .for_each(|(idx, underrun_ctr)| {
588                println!(
589                    "consumer {}: {}/{} underruns",
590                    idx, underrun_ctr, NUM_READOUTS
591                );
592                assert!(underrun_ctr > NUM_READOUTS / 10000);
593                assert!(underrun_ctr < NUM_READOUTS / 5);
594            });
595        overrun_ctrs
596            .into_iter()
597            .map(|a| a.load(Ordering::Relaxed))
598            .enumerate()
599            .for_each(|(idx, overrun_ctr)| {
600                println!(
601                    "consumer {}: {}/{} overruns",
602                    idx, overrun_ctr, NUM_READOUTS
603                );
604                assert!(overrun_ctr > NUM_READOUTS / 10000);
605                assert!(overrun_ctr < NUM_READOUTS / 5);
606            });
607    }
608}