1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
// This file is part of lock-free-multi-producer-single-consumer-ring-buffer. It is subject to the license terms in the COPYRIGHT file found in the top-level directory of this distribution and at https://raw.githubusercontent.com/lemonrock/lock-free-multi-producer-single-consumer-ring-buffer/master/COPYRIGHT. No part of lock-free-multi-producer-single-consumer-ring-buffer, including this file, may be copied, modified, propagated, or distributed except according to the terms contained in the COPYRIGHT file. // Copyright © 2018 The developers of lock-free-multi-producer-single-consumer-ring-buffer. See the COPYRIGHT file in the top-level directory of this distribution and at https://raw.githubusercontent.com/lemonrock/lock-free-multi-producer-single-consumer-ring-buffer/master/COPYRIGHT. #![allow(non_upper_case_globals)] #![deny(missing_docs)] #![feature(allocator_api, core_intrinsics)] //! # lock-free-multi-producer-single-consumer-ring-buffer //! //! ## Usage //! //! ``` //! extern crate lock_free_multi_produce_single_consume_ring_buffer; //! //! use ::lock_free_multi_produce_single_consume_ring_buffer::*; //! //! let (ring_buffer_consumer, ring_buffer_producers) = RingBuffer::new(capacity, number_of_producers); //! //! // For each producer thread. //! let ring_buffer_producer = ring_buffer_producers.get(0); //! //! let result = ring_buffer_producer.acquire(length); //! // result is `None` if length was too much; try a shorter length. //! //! let slice_guard = result.unwrap(); //! //! // Dereferences to a slice. //! // slice_guard[0] = some_value; //! //! // Produce (relinquishes the slice). //! drop(slice_guard); //! //! ring_buffer_producer.produce(); //! //! // For each consumer thread. //! let slice_guard = ring_buffer_consumer.consume(); //! //! // Iterate, move out, etc. //! println!("should be `some_value`", slice_guard.move_out(slice_guard.len())[0]); //! //! // Releases the slice so producers can now use it. //! drop(slice_guard); //! ``` //! //! Once all the producers and the consumer are dropped then the memory underlying the ring buffer is freed and any unconsumed items in it are safely `Drop`ped. //! //! //! ## The following documentation is originally "Copyright (c) 2016-2017 Mindaugas Rasiukevicius <rmind at noxt eu>". //! //! Atomic multi-producer single-consumer ring buffer, which supports contiguous range operations and which can be conveniently used for message passing. //! //! There are three offsets ("think of clock hands"):- //! //! * `NEXT`: marks the beginning of the available space, //! * `WRITTEN`: the point up to which the data is actually written. //! * Observed `READY`: point up to which data is ready to be written. //! //! //! ### Producers //! //! Observe and save the 'next' offset, then request `N` bytes from the ring buffer by atomically advancing the `next` offset. //! Once the data is written into the "reserved" buffer space, the thread clears the saved value; these observed values are used to compute the `ready` offset. //! //! //! ### Consumer //! //! Writes the data between `written` and `ready` offsets and updates the `written` value. //! The consumer thread scans for the lowest seen value by the producers. //! //! //! ### Key invariant //! //! Producers cannot go beyond the `written` offset //! //! Producers are not allowed to catch up with the consumer. //! //! Only the consumer is allowed to catch up with the producer, ie set the `written` offset to be equal to the `next` offset. //! //! //! ### Wrap-around //! //! If the producer cannot acquire the requested length due to little available space at the end of the buffer, then it will wraparound. //! //! The `WrapLockBit` in `next` offset is used to lock the `end` offset. //! //! There is an ABA problem if one producer stalls while a pair of producer and consumer would both successfully wrap-around and set the `next` offset to the stale value of the first producer, thus letting it to perform a successful compare-and-swap (CAS) violating the invariant. //! A counter in the `next` offset (masked by `WrapCounter`) is used to prevent from this problem. //! It is incremented on wraparounds. //! //! The same ABA problem could also cause a stale `ready` offset, which could be observed by the consumer. //! The algorithm sets `WrapLockBit` in the `seen` value before advancing the `next` and clears this bit after the successful advancing; this ensures that only the stable `ready` observed by the consumer. #[macro_use] extern crate likely; use ::std::alloc::Alloc; use ::std::alloc::Global; use ::std::alloc::Layout; use ::std::cell::Cell; use ::std::cell::UnsafeCell; use ::std::cmp::min; use ::std::cmp::max; use ::std::intrinsics::atomic_cxchgweak; use ::std::marker::PhantomData; use ::std::mem::align_of; use ::std::mem::size_of; use ::std::mem::transmute_copy; use ::std::mem::uninitialized; use ::std::ops::Deref; use ::std::ops::DerefMut; use ::std::ptr::drop_in_place; use ::std::ptr::NonNull; use ::std::ptr::write; use ::std::slice::from_raw_parts; use ::std::slice::from_raw_parts_mut; use ::std::sync::atomic::fence; use ::std::sync::atomic::Ordering::SeqCst; use ::std::sync::atomic::spin_loop_hint; use std::sync::Arc; include!("fence_stores.rs"); include!("RingBuffer.rs"); include!("RingBufferConsumer.rs"); include!("RingBufferConsumerGuard.rs"); include!("RingBufferInner.rs"); include!("RingBufferInnerHeader.rs"); include!("RingBufferInnerDropHandler.rs"); include!("RingBufferOffset.rs"); include!("RingBufferProducer.rs"); include!("RingBufferProducerGuard.rs"); include!("RingBufferProducerInner.rs"); include!("SpinLockBackOff.rs"); include!("VolatileRingBufferOffset.rs");