1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330
use std::collections::VecDeque;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex;
pub struct ShardedQueue<Item> {
modulo_number: usize,
mutexed_queue_shard_list: Vec<Mutex<VecDeque<Item>>>,
head_index: AtomicUsize,
tail_index: AtomicUsize,
}
/// # [ShardedQueue]
/// [ShardedQueue] is needed for some schedulers and [NonBlockingMutex]
/// as a highly specialized for their use case concurrent queue
///
/// [ShardedQueue] is a light-weight concurrent queue, which uses spin locking and fights lock
/// contention with sharding
///
/// Notice that while it may seem that FIFO order is guaranteed, it is not, because
/// there can be a situation, when multiple producers triggered long resize of very large shards,
/// all but last, then passed enough time for resize to finish, then 1 producer triggers long resize of
/// last shard, and all other threads start to consume or produce, and eventually start spinning on
/// last shard, without guarantee which will acquire spin lock first, so we can't even guarantee that
/// [ShardedQueue::pop_front_or_spin] will acquire lock before [ShardedQueue::push_back] on first
/// attempt
///
/// Notice that this queue doesn't track length, since length's increment/decrement logic may change
/// depending on use case, as well as logic when it goes from 1 to 0 or reverse
/// (in some cases, like [NonBlockingMutex], we don't even add action to queue when count
/// reaches 1, but run it immediately in same thread), or even negative
/// (to optimize some hot paths, like in some schedulers,
/// since it is cheaper to restore count to correct state than to enforce that it can not go negative
/// in some schedulers)
///
/// # Examples
/// ```rust
/// use sharded_queue::ShardedQueue;
/// use std::cell::UnsafeCell;
/// use std::fmt::{Debug, Display, Formatter};
/// use std::marker::PhantomData;
/// use std::ops::{Deref, DerefMut};
/// use std::sync::atomic::{AtomicUsize, Ordering};
///
/// pub struct NonBlockingMutex<'captured_variables, State: ?Sized> {
/// task_count: AtomicUsize,
/// task_queue: ShardedQueue<Box<dyn FnOnce(MutexGuard<State>) + Send + 'captured_variables>>,
/// unsafe_state: UnsafeCell<State>,
/// }
///
/// impl<'captured_variables, State> NonBlockingMutex<'captured_variables, State> {
/// #[inline]
/// pub fn new(max_concurrent_thread_count: usize, state: State) -> Self {
/// Self {
/// task_count: Default::default(),
/// task_queue: ShardedQueue::new(max_concurrent_thread_count),
/// unsafe_state: UnsafeCell::new(state),
/// }
/// }
///
/// /// Please don't forget that order of execution is not guaranteed. Atomicity of operations is guaranteed,
/// /// but order can be random
/// #[inline]
/// pub fn run_if_first_or_schedule_on_first(
/// &self,
/// run_with_state: impl FnOnce(MutexGuard<State>) + Send + 'captured_variables,
/// ) {
/// if self.task_count.fetch_add(1, Ordering::Acquire) != 0 {
/// self.task_queue.push_back(Box::new(run_with_state));
/// } else {
/// // If we acquired first lock, run should be executed immediately and run loop started
/// run_with_state(unsafe { MutexGuard::new(self) });
/// /// Note that if [`fetch_sub`] != 1
/// /// => some thread entered first if block in method
/// /// => [ShardedQueue::push_back] is guaranteed to be called
/// /// => [ShardedQueue::pop_front_or_spin] will not deadlock while spins until it gets item
/// ///
/// /// Notice that we run action first, and only then decrement count
/// /// with releasing(pushing) memory changes, even if it looks otherwise
/// while self.task_count.fetch_sub(1, Ordering::Release) != 1 {
/// self.task_queue.pop_front_or_spin()(unsafe { MutexGuard::new(self) });
/// }
/// }
/// }
/// }
///
/// /// [Send], [Sync], and [MutexGuard] logic was taken from [std::sync::Mutex]
/// /// and [std::sync::MutexGuard]
/// ///
/// /// these are the only places where `T: Send` matters; all other
/// /// functionality works fine on a single thread.
/// unsafe impl<'captured_variables, State: Send> Send
/// for NonBlockingMutex<'captured_variables, State>
/// {
/// }
/// unsafe impl<'captured_variables, State: Send> Sync
/// for NonBlockingMutex<'captured_variables, State>
/// {
/// }
///
/// /// Code was mostly taken from [std::sync::MutexGuard], it is expected to protect [State]
/// /// from moving out of synchronized loop
/// pub struct MutexGuard<
/// 'captured_variables,
/// 'non_blocking_mutex_ref,
/// State: ?Sized + 'non_blocking_mutex_ref,
/// > {
/// non_blocking_mutex: &'non_blocking_mutex_ref NonBlockingMutex<'captured_variables, State>,
/// /// Adding it to ensure that [MutexGuard] implements [Send] and [Sync] in same cases
/// /// as [std::sync::MutexGuard] and protects [State] from going out of synchronized
/// /// execution loop
/// ///
/// /// todo remove when this error is no longer actual
/// /// negative trait bounds are not yet fully implemented; use marker types for now [E0658]
/// _phantom_unsend: PhantomData<std::sync::MutexGuard<'non_blocking_mutex_ref, State>>,
/// }
///
/// // todo uncomment when this error is no longer actual
/// // negative trait bounds are not yet fully implemented; use marker types for now [E0658]
/// // impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized> !Send
/// // for MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
/// // {
/// // }
/// unsafe impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized + Sync> Sync
/// for MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
/// {
/// }
///
/// impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized>
/// MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
/// {
/// unsafe fn new(
/// non_blocking_mutex: &'non_blocking_mutex_ref NonBlockingMutex<'captured_variables, State>,
/// ) -> Self {
/// Self {
/// non_blocking_mutex,
/// _phantom_unsend: PhantomData,
/// }
/// }
/// }
///
/// impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized> Deref
/// for MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
/// {
/// type Target = State;
///
/// #[inline]
/// fn deref(&self) -> &State {
/// unsafe { &*self.non_blocking_mutex.unsafe_state.get() }
/// }
/// }
///
/// impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized> DerefMut
/// for MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
/// {
/// #[inline]
/// fn deref_mut(&mut self) -> &mut State {
/// unsafe { &mut *self.non_blocking_mutex.unsafe_state.get() }
/// }
/// }
///
/// impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized + Debug> Debug
/// for MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
/// {
/// #[inline]
/// fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
/// Debug::fmt(&**self, f)
/// }
/// }
///
/// impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized + Display> Display
/// for MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
/// {
/// #[inline]
/// fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
/// (**self).fmt(f)
/// }
/// }
///
/// ```
///
/// ```
/// use sharded_queue::ShardedQueue;
///
/// fn is_send<T: Send>(t: T) {}
///
/// let shardedQueue: ShardedQueue<usize> = ShardedQueue::new(1);
///
/// is_send(shardedQueue);
/// ```
///
/// ```
/// use sharded_queue::ShardedQueue;
///
/// fn is_sync<T: Sync>(t: T) {}
///
/// let shardedQueue: ShardedQueue<usize> = ShardedQueue::new(1);
///
/// is_sync(shardedQueue);
/// ```
///
/// ```compile_fail
/// use std::rc::Rc;
/// use sharded_queue::ShardedQueue;
///
/// fn is_send<T: Send>(t: T) {}
///
/// let shardedQueue: ShardedQueue<Rc<usize>> = ShardedQueue::new(1);
///
/// is_send(shardedQueue);
/// ```
///
/// ```compile_fail
/// use std::rc::Rc;
/// use sharded_queue::ShardedQueue;
///
/// fn is_sync<T: Sync>(t: T) {}
///
/// let shardedQueue: ShardedQueue<Rc<usize>> = ShardedQueue::new(1);
///
/// is_sync(shardedQueue);
/// ```
///
///
///
/// ```
/// use std::collections::VecDeque;
///
/// fn is_send<T: Send>(t: T) {}
///
/// let vecDeque: VecDeque<usize> = VecDeque::new();
///
/// is_send(vecDeque);
/// ```
///
/// ```
/// use std::collections::VecDeque;
///
/// fn is_sync<T: Sync>(t: T) {}
///
/// let vecDeque: VecDeque<usize> = VecDeque::new();
///
/// is_sync(vecDeque);
/// ```
///
/// ```compile_fail
/// use std::collections::VecDeque;
/// use std::rc::Rc;
///
/// fn is_send<T: Send>(t: T) {}
///
/// let vecDeque: VecDeque<Rc<usize>> = VecDeque::new();
///
/// is_send(vecDeque);
/// ```
///
/// ```compile_fail
/// use std::collections::VecDeque;
/// use std::rc::Rc;
///
/// fn is_sync<T: Sync>(t: T) {}
///
/// let vecDeque: VecDeque<Rc<usize>> = VecDeque::new();
///
/// is_sync(vecDeque);
/// ```
impl<Item> ShardedQueue<Item> {
#[inline]
pub fn new(max_concurrent_thread_count: usize) -> Self {
// queue_count should be greater than max_concurrent_thread_count and a power of 2 (to make modulo more performant)
let queue_count =
(2 as usize).pow((max_concurrent_thread_count as f64).log2().ceil() as u32);
Self {
// Computing modulo number, knowing that x % 2^n == x & (2^n - 1)
modulo_number: queue_count - 1,
mutexed_queue_shard_list: (0..queue_count)
.map(|_| Mutex::new(VecDeque::<Item>::new()))
.collect(),
head_index: Default::default(),
tail_index: Default::default(),
}
}
/// Note that it will spin until item is added => it can spin very long if
/// [ShardedQueue::pop_front_or_spin] is called without guarantee that
/// [ShardedQueue::push_back] will be called
#[inline]
pub fn pop_front_or_spin(&self) -> Item {
let queue_index = self.head_index.fetch_add(1, Ordering::Relaxed) & self.modulo_number;
let mutexed_queue_shard =
unsafe { self.mutexed_queue_shard_list.get_unchecked(queue_index) };
// Note that since we already incremented `head_index`,
// it is important to complete operation. If we tried
// to implement `try_remove_first`, we would need to somehow
// balance our `head_index`, complexifying logic and
// introducing overhead, and even if we need it,
// we can just check length outside before calling this method
loop {
let ref mut try_lock_result = mutexed_queue_shard.try_lock();
/// If we acquired lock after [ShardedQueue::push_back] and have item,
/// we can use return item, otherwise we should unlock and restart
/// spinning, giving [ShardedQueue::push_back] chance to lock
if let Ok(queue_shard) = try_lock_result {
let item_option = queue_shard.pop_front();
if let Some(item) = item_option {
return item;
}
}
}
}
#[inline]
pub fn push_back(&self, item: Item) {
let queue_index = self.tail_index.fetch_add(1, Ordering::Relaxed) & self.modulo_number;
let mutexed_queue_shard =
unsafe { self.mutexed_queue_shard_list.get_unchecked(queue_index) };
loop {
/// We can not use [Mutex::lock] instead of [Mutex::try_lock],
/// because it may block thread, which we want to avoid
let ref mut try_lock_result = mutexed_queue_shard.try_lock();
if let Ok(queue_shard) = try_lock_result {
queue_shard.push_back(item);
break;
}
}
}
}