use crossbeam::queue::SegQueue;
use futures::sync::oneshot::{self, Canceled, Receiver, Sender};
use futures::{Async, Future, Poll};
use std::cell::UnsafeCell;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::Ordering::{Acquire, SeqCst};
use std::sync::atomic::{fence, AtomicUsize};
use std::sync::Arc;
use std::thread;
const READ_COUNT_MASK: usize = 0x00FFFFFF;
const WRITE_LOCKED: usize = 1 << 24;
const CONTENDED: usize = 1 << 25;
const PRINT_DEBUG: bool = false;
#[inline]
fn chill_out() {
thread::yield_now();
}
unsafe fn extract_lock<T, G: Guard<T>>(guard: G) -> QrwLock<T> {
let lock = ::std::ptr::read(guard.lock());
::std::mem::forget(guard);
lock
}
trait Guard<T>
where
Self: ::std::marker::Sized,
{
fn lock(&self) -> &QrwLock<T>;
unsafe fn forget(self) {
::std::mem::forget(self);
}
}
#[derive(Debug)]
pub struct ReadGuard<T> {
lock: QrwLock<T>,
}
impl<T> ReadGuard<T> {
pub fn upgrade(guard: ReadGuard<T>) -> FutureUpgrade<T> {
debug_assert!(guard.lock.read_count().unwrap() > 0);
match unsafe { guard.lock.upgrade_read_lock() } {
Ok(_) => {
if PRINT_DEBUG {
println!(
"ReadGuard::upgrade: Read lock is now upgraded (thread: {}) ...",
thread::current().name().unwrap_or("<unnamed>")
);
}
unsafe { FutureUpgrade::new(extract_lock(guard), None) }
}
Err(rx) => {
if PRINT_DEBUG {
println!("ReadGuard::upgrade: Waiting for the read count to reach 1 (thread: {}) ...",
thread::current().name().unwrap_or("<unnamed>"));
}
unsafe { FutureUpgrade::new(extract_lock(guard), Some(rx)) }
}
}
}
pub fn release(guard: ReadGuard<T>) -> QrwLock<T> {
unsafe {
guard.lock.release_read_lock();
extract_lock(guard)
}
}
}
impl<T> Deref for ReadGuard<T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.lock.inner.cell.get() }
}
}
impl<T> Drop for ReadGuard<T> {
fn drop(&mut self) {
unsafe { self.lock.release_read_lock() }
}
}
impl<T> Guard<T> for ReadGuard<T> {
fn lock(&self) -> &QrwLock<T> {
&self.lock
}
}
#[derive(Debug)]
pub struct WriteGuard<T> {
lock: QrwLock<T>,
}
impl<T> WriteGuard<T> {
pub fn downgrade(guard: WriteGuard<T>) -> ReadGuard<T> {
unsafe {
guard.lock.downgrade_write_lock();
ReadGuard {
lock: extract_lock(guard),
}
}
}
pub fn release(guard: WriteGuard<T>) -> QrwLock<T> {
unsafe {
guard.lock.release_write_lock();
extract_lock(guard)
}
}
}
impl<T> Deref for WriteGuard<T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.lock.inner.cell.get() }
}
}
impl<T> DerefMut for WriteGuard<T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.lock.inner.cell.get() }
}
}
impl<T> Drop for WriteGuard<T> {
fn drop(&mut self) {
unsafe { self.lock.release_write_lock() }
}
}
impl<T> Guard<T> for WriteGuard<T> {
fn lock(&self) -> &QrwLock<T> {
&self.lock
}
}
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub struct FutureUpgrade<T> {
lock: Option<QrwLock<T>>,
rx: Option<Receiver<()>>,
}
impl<T> FutureUpgrade<T> {
fn new(lock: QrwLock<T>, rx: Option<Receiver<()>>) -> FutureUpgrade<T> {
FutureUpgrade {
lock: Some(lock),
rx: rx,
}
}
#[inline]
pub fn wait(self) -> Result<WriteGuard<T>, Canceled> {
<Self as Future>::wait(self)
}
}
impl<T> Future for FutureUpgrade<T> {
type Item = WriteGuard<T>;
type Error = Canceled;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.lock.is_some() {
if self.rx.is_none() {
if PRINT_DEBUG {
println!(
"FutureUpgrade::poll: Uncontended. Upgrading. (thread: {})",
thread::current().name().unwrap_or("<unnamed>")
);
}
Ok(Async::Ready(WriteGuard {
lock: self.lock.take().unwrap(),
}))
} else {
unsafe { self.lock.as_ref().unwrap().process_queues() }
self.rx.as_mut().unwrap().poll().map(|res| {
res.map(|_| {
if PRINT_DEBUG {
println!(
"FutureUpgrade::poll: Ready. Upgrading. (thread: {})",
thread::current().name().unwrap_or("<unnamed>")
);
}
WriteGuard {
lock: self.lock.take().unwrap(),
}
})
})
}
} else {
panic!("FutureUpgrade::poll: Task already completed.");
}
}
}
impl<T> Drop for FutureUpgrade<T> {
fn drop(&mut self) {
if let Some(lock) = self.lock.take() {
match self.rx.take() {
Some(mut rx) => {
rx.close();
match rx.try_recv() {
Ok(status) => {
if status.is_some() {
unsafe { lock.release_write_lock() }
}
}
Err(_) => (),
}
}
None => unsafe { lock.release_write_lock() },
}
}
}
}
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub struct FutureReadGuard<T> {
lock: Option<QrwLock<T>>,
rx: Receiver<()>,
}
impl<T> FutureReadGuard<T> {
fn new(lock: QrwLock<T>, rx: Receiver<()>) -> FutureReadGuard<T> {
FutureReadGuard {
lock: Some(lock),
rx: rx,
}
}
#[inline]
pub fn wait(self) -> Result<ReadGuard<T>, Canceled> {
<Self as Future>::wait(self)
}
}
impl<T> Future for FutureReadGuard<T> {
type Item = ReadGuard<T>;
type Error = Canceled;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.lock.is_some() {
unsafe { self.lock.as_ref().unwrap().process_queues() }
self.rx.poll().map(|res| {
res.map(|_| {
if PRINT_DEBUG {
println!(
"FutureReadGuard::poll: ReadGuard acquired. (thread: {})",
thread::current().name().unwrap_or("<unnamed>")
);
}
ReadGuard {
lock: self.lock.take().unwrap(),
}
})
})
} else {
panic!("FutureReadGuard::poll: Task already completed.");
}
}
}
impl<T> Drop for FutureReadGuard<T> {
fn drop(&mut self) {
if let Some(lock) = self.lock.take() {
self.rx.close();
match self.rx.try_recv() {
Ok(status) => {
if status.is_some() {
unsafe { lock.release_read_lock() }
}
}
Err(_) => (),
}
}
}
}
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub struct FutureWriteGuard<T> {
lock: Option<QrwLock<T>>,
rx: Receiver<()>,
}
impl<T> FutureWriteGuard<T> {
fn new(lock: QrwLock<T>, rx: Receiver<()>) -> FutureWriteGuard<T> {
FutureWriteGuard {
lock: Some(lock),
rx: rx,
}
}
#[inline]
pub fn wait(self) -> Result<WriteGuard<T>, Canceled> {
<Self as Future>::wait(self)
}
}
impl<T> Future for FutureWriteGuard<T> {
type Item = WriteGuard<T>;
type Error = Canceled;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.lock.is_some() {
unsafe { self.lock.as_ref().unwrap().process_queues() }
self.rx.poll().map(|res| {
res.map(|_| {
if PRINT_DEBUG {
println!(
"FutureWriteGuard::poll: WriteGuard acquired. (thread: {})",
thread::current().name().unwrap_or("<unnamed>")
);
}
WriteGuard {
lock: self.lock.take().unwrap(),
}
})
})
} else {
panic!("FutureWriteGuard::poll: Task already completed.");
}
}
}
impl<T> Drop for FutureWriteGuard<T> {
fn drop(&mut self) {
if let Some(lock) = self.lock.take() {
self.rx.close();
match self.rx.try_recv() {
Ok(status) => {
if status.is_some() {
unsafe { lock.release_write_lock() }
}
}
Err(_) => (),
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RequestKind {
Read,
Write,
}
#[derive(Debug)]
pub struct QrwRequest {
tx: Sender<()>,
kind: RequestKind,
}
impl QrwRequest {
pub fn new(tx: Sender<()>, kind: RequestKind) -> QrwRequest {
QrwRequest { tx: tx, kind: kind }
}
}
#[derive(Debug)]
struct Inner<T> {
state: AtomicUsize,
cell: UnsafeCell<T>,
queue: SegQueue<QrwRequest>,
tip: UnsafeCell<Option<QrwRequest>>,
upgrade_queue: SegQueue<Sender<()>>,
}
impl<T> From<T> for Inner<T> {
#[inline]
fn from(val: T) -> Inner<T> {
Inner {
state: AtomicUsize::new(0),
cell: UnsafeCell::new(val),
queue: SegQueue::new(),
tip: UnsafeCell::new(None),
upgrade_queue: SegQueue::new(),
}
}
}
unsafe impl<T: Send> Send for Inner<T> {}
unsafe impl<T: Send> Sync for Inner<T> {}
#[derive(Debug)]
pub struct QrwLock<T> {
inner: Arc<Inner<T>>,
}
impl<T> QrwLock<T> {
#[inline]
pub fn new(val: T) -> QrwLock<T> {
QrwLock {
inner: Arc::new(Inner::from(val)),
}
}
#[inline]
pub fn read(self) -> FutureReadGuard<T> {
if PRINT_DEBUG {
println!(
"Requesting read lock.(thread: {}) ...",
thread::current().name().unwrap_or("<unnamed>")
);
}
let (tx, rx) = oneshot::channel();
unsafe {
self.enqueue_lock_request(QrwRequest::new(tx, RequestKind::Read));
}
FutureReadGuard::new(self, rx)
}
#[inline]
pub fn write(self) -> FutureWriteGuard<T> {
if PRINT_DEBUG {
println!(
"Requesting write lock.(thread: {}) ...",
thread::current().name().unwrap_or("<unnamed>")
);
}
let (tx, rx) = oneshot::channel();
unsafe {
self.enqueue_lock_request(QrwRequest::new(tx, RequestKind::Write));
}
FutureWriteGuard::new(self, rx)
}
#[inline]
pub unsafe fn enqueue_lock_request(&self, req: QrwRequest) {
self.inner.queue.push(req);
}
#[inline]
pub fn get_mut(&mut self) -> Option<&mut T> {
Arc::get_mut(&mut self.inner).map(|inn| unsafe { &mut *inn.cell.get() })
}
#[inline]
pub fn as_ptr(&self) -> *const T {
self.inner.cell.get()
}
#[inline]
pub fn as_mut_ptr(&self) -> *mut T {
self.inner.cell.get()
}
#[inline]
fn pop_request(&self) -> Option<QrwRequest> {
debug_assert_eq!(self.inner.state.load(Acquire) & CONTENDED, CONTENDED);
if PRINT_DEBUG {
println!(
"Popping request from queue \
(thread: {}) ...",
thread::current().name().unwrap_or("<unnamed>")
);
}
unsafe {
::std::mem::replace(&mut *self.inner.tip.get(), self.inner.queue.pop().ok()).or_else(
|| {
if (*self.inner.tip.get()).is_some() {
self.pop_request()
} else {
None
}
},
)
}
}
#[inline]
fn peek_request_kind(&self) -> Option<RequestKind> {
debug_assert_eq!(self.inner.state.load(Acquire) & CONTENDED, CONTENDED);
unsafe {
if (*self.inner.tip.get()).is_none() {
::std::mem::replace(&mut *self.inner.tip.get(), self.inner.queue.pop().ok());
}
(*self.inner.tip.get()).as_ref().map(|req| req.kind)
}
}
#[inline]
fn fulfill_request(&self, mut state: usize) -> usize {
loop {
debug_assert_eq!(self.inner.state.load(Acquire) & CONTENDED, CONTENDED);
debug_assert_eq!(self.inner.state.load(Acquire) & WRITE_LOCKED, 0);
if let Some(req) = self.pop_request() {
if req.tx.send(()).is_ok() {
debug_assert_eq!(self.inner.state.load(Acquire) & WRITE_LOCKED, 0);
match req.kind {
RequestKind::Read => {
state += 1;
if PRINT_DEBUG {
println!(
"Locked for reading (state: {}) \
(thread: {}) ...",
state,
thread::current().name().unwrap_or("<unnamed>")
);
}
}
RequestKind::Write => {
debug_assert_eq!(state, 0);
state = WRITE_LOCKED;
if PRINT_DEBUG {
println!(
"Locked for writing (state: {}) \
(thread: {}) ...",
state,
thread::current().name().unwrap_or("<unnamed>")
);
}
break;
}
}
} else {
if PRINT_DEBUG {
println!(
"A requester has dropped \
(thread: {}) ...",
thread::current().name().unwrap_or("<unnamed>")
);
}
}
if let Some(RequestKind::Read) = self.peek_request_kind() {
debug_assert!(state != WRITE_LOCKED);
if PRINT_DEBUG {
println!(
"Next request kind is a read, popping next request...\
(thread: {}) ...",
thread::current().name().unwrap_or("<unnamed>")
);
}
continue;
} else {
break;
}
} else {
break;
}
}
state
}
#[inline]
fn read_count(&self) -> Option<u32> {
let state = self.inner.state.load(Acquire);
let read_count = state & READ_COUNT_MASK;
if state & READ_COUNT_MASK == read_count {
if PRINT_DEBUG {
println!(
"Read count: {} (thread: {}) ...",
read_count,
thread::current().name().unwrap_or("<unnamed>")
);
}
Some(read_count as u32)
} else {
None
}
}
#[inline(always)]
fn contend(&self) -> usize {
if PRINT_DEBUG {
println!(
"Processing state... (thread: {})",
thread::current().name().unwrap_or("<unnamed>")
);
}
let mut spins: u32 = 0;
loop {
let state = self.inner.state.fetch_or(CONTENDED, SeqCst);
if state & CONTENDED != 0 {
if spins >= 16 {
chill_out();
} else {
for _ in 0..(2 << spins) {
fence(SeqCst);
}
}
spins += 1;
} else {
return state;
}
}
}
fn process_upgrade_queue(&self) -> bool {
if PRINT_DEBUG {
println!(
"Processing upgrade queue... (thread: {})",
thread::current().name().unwrap_or("<unnamed>")
);
}
debug_assert!(self.inner.state.load(Acquire) == CONTENDED);
loop {
match self.inner.upgrade_queue.pop().ok() {
Some(tx) => match tx.send(()) {
Ok(_) => {
if PRINT_DEBUG {
println!(
"Upgrading to write lock. \
(thread: {}) ...",
thread::current().name().unwrap_or("<unnamed>")
);
}
return true;
}
Err(()) => {
if PRINT_DEBUG {
println!(
"Unable to upgrade: error completing oneshot \
(thread: {}) ...",
thread::current().name().unwrap_or("<unnamed>")
);
}
continue;
}
},
None => break,
}
}
false
}
#[inline]
pub unsafe fn process_queues(&self) {
if PRINT_DEBUG {
println!(
"Processing queue (thread: {}) ...",
thread::current().name().unwrap_or("<unnamed>")
);
}
match self.contend() {
0 => {
if PRINT_DEBUG {
println!(
"Processing queue: Unlocked (thread: {}) ...",
thread::current().name().unwrap_or("<unnamed>")
);
}
let new_state = if self.process_upgrade_queue() {
WRITE_LOCKED
} else {
self.fulfill_request(0)
};
self.inner.state.store(new_state, SeqCst);
}
WRITE_LOCKED => {
if PRINT_DEBUG {
println!(
"Processing queue: Write Locked (thread: {}) ...",
thread::current().name().unwrap_or("<unnamed>")
);
}
self.inner.state.store(WRITE_LOCKED, SeqCst);
}
state => {
debug_assert!(self.inner.state.load(SeqCst) & CONTENDED != 0);
debug_assert!(state <= READ_COUNT_MASK);
if PRINT_DEBUG {
println!(
"Processing queue: Other {{ state: {}, peek: {:?} }} \
(thread: {}) ...",
state,
self.peek_request_kind(),
thread::current().name().unwrap_or("<unnamed>")
);
}
if self.peek_request_kind() == Some(RequestKind::Read) {
let new_state = self.fulfill_request(state);
self.inner.state.store(new_state, SeqCst);
} else {
self.inner.state.store(state, SeqCst);
}
}
}
}
fn enqueue_upgrade_request(&self, state: usize) -> Receiver<()> {
debug_assert!(state > 0 && state & CONTENDED == 0 && state & WRITE_LOCKED == 0);
let (tx, rx) = oneshot::channel();
self.inner.upgrade_queue.push(tx);
self.inner.state.store(state - 1, SeqCst);
rx
}
#[inline]
pub unsafe fn upgrade_read_lock(&self) -> Result<(), Receiver<()>> {
if PRINT_DEBUG {
println!(
"Attempting to upgrade reader to writer: (thread: {}) ...",
thread::current().name().unwrap_or("<unnamed>")
);
}
match self.contend() {
0 => panic!("Unable to upgrade this QrwLock: no read locks."),
WRITE_LOCKED => panic!("Unable to upgrade this QrwLock: already write locked."),
state => {
debug_assert!(self.inner.state.load(SeqCst) & CONTENDED != 0);
debug_assert_eq!(state, self.read_count().unwrap() as usize);
if state == 1 {
if self.inner.upgrade_queue.is_empty() {
self.inner.state.store(WRITE_LOCKED, SeqCst);
Ok(())
} else {
Err(self.enqueue_upgrade_request(state))
}
} else {
Err(self.enqueue_upgrade_request(state))
}
}
}
}
#[inline]
pub unsafe fn downgrade_write_lock(&self) {
if PRINT_DEBUG {
println!(
"Attempting to downgrade write lock...(thread: {}) ...",
thread::current().name().unwrap_or("<unnamed>")
);
}
debug_assert_eq!(self.inner.state.load(SeqCst) & WRITE_LOCKED, WRITE_LOCKED);
match self.contend() {
0 => debug_assert!(false, "unreachable"),
WRITE_LOCKED => {
self.inner.state.store(1, SeqCst);
}
_state => debug_assert!(false, "unreachable"),
}
self.process_queues();
}
#[inline]
pub unsafe fn release_read_lock(&self) {
if PRINT_DEBUG {
println!(
"Releasing read lock...(thread: {}) ...",
thread::current().name().unwrap_or("<unnamed>")
);
}
debug_assert!(self.inner.state.load(SeqCst) & READ_COUNT_MASK != 0);
debug_assert_eq!(self.inner.state.load(SeqCst) & WRITE_LOCKED, 0);
match self.contend() {
0 => debug_assert!(false, "unreachable"),
WRITE_LOCKED => debug_assert!(false, "unreachable"),
state => {
debug_assert!(self.inner.state.load(SeqCst) & CONTENDED != 0);
assert!(state > 0 && state <= READ_COUNT_MASK);
let new_state = state - 1;
self.inner.state.store(new_state, SeqCst);
self.process_queues();
}
}
}
#[inline]
pub unsafe fn release_write_lock(&self) {
if PRINT_DEBUG {
println!(
"Releasing write lock...(thread: {}) ...",
thread::current().name().unwrap_or("<unnamed>")
);
}
debug_assert!({
let state = self.inner.state.load(SeqCst);
(state & CONTENDED == CONTENDED) == (state & WRITE_LOCKED != WRITE_LOCKED)
|| (state & CONTENDED == CONTENDED) && (state & WRITE_LOCKED == WRITE_LOCKED)
});
debug_assert!(self.inner.state.load(SeqCst) & READ_COUNT_MASK == 0);
match self.contend() {
0 => debug_assert!(false, "unreachable"),
WRITE_LOCKED => {
self.inner.state.store(0, SeqCst);
self.process_queues();
}
_state => debug_assert!(false, "unreachable"),
}
}
}
impl<T> From<T> for QrwLock<T> {
#[inline]
fn from(val: T) -> QrwLock<T> {
QrwLock::new(val)
}
}
impl<T> Clone for QrwLock<T> {
#[inline]
fn clone(&self) -> QrwLock<T> {
QrwLock {
inner: self.inner.clone(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::{future, Future};
use std::thread;
#[test]
fn simple() {
let lock = QrwLock::from(0i32);
let future_r0 = Box::new(lock.clone().read().and_then(|guard| {
assert_eq!(*guard, 0);
println!("val[r0]: {}", *guard);
ReadGuard::release(guard);
Ok(())
}));
let future_w0 = Box::new(lock.clone().write().and_then(|mut guard| {
*guard = 5;
println!("val is now: {}", *guard);
Ok(())
}));
let future_r1 = Box::new(lock.clone().read().and_then(|guard| {
assert_eq!(*guard, 5);
println!("val[r1]: {}", *guard);
Ok(())
}));
let future_r2 = Box::new(lock.clone().read().and_then(|guard| {
assert_eq!(*guard, 5);
println!("val[r2]: {}", *guard);
Ok(())
}));
let future_u0 = Box::new(lock.clone().read().and_then(|read_guard| {
println!("Upgrading read guard...");
ReadGuard::upgrade(read_guard).and_then(|mut write_guard| {
println!("Read guard upgraded.");
*write_guard = 6;
Ok(())
})
}));
let future_r3 = Box::new(lock.clone().read().and_then(|guard| {
assert_eq!(*guard, 5);
println!("val[r3]: {}", *guard);
Ok(())
}));
let futures: Vec<Box<Future<Item = (), Error = Canceled>>> = vec![
future_r0, future_w0, future_r1, future_r2, future_u0, future_r3,
];
future::join_all(futures).wait().unwrap();
let future_guard = lock.clone().read();
let guard = future_guard.wait().unwrap();
assert_eq!(*guard, 6);
}
#[test]
fn concurrent() {
let start_val = 0i32;
let lock = QrwLock::new(start_val);
let thread_count = 20;
let mut threads = Vec::with_capacity(thread_count);
for _i in 0..thread_count {
let future_write_guard = lock.clone().write();
let future_read_guard = lock.clone().read();
let future_write = future_write_guard.and_then(|mut guard| {
*guard += 1;
WriteGuard::downgrade(guard);
Ok(())
});
let future_read = future_read_guard.and_then(move |guard| {
Ok(guard)
});
threads.push(thread::spawn(|| {
future_write.join(future_read).wait().unwrap();
}));
}
for i in 0..thread_count {
let future_write_guard = lock.clone().write();
threads.push(
thread::Builder::new()
.name(format!("test_thread_{}", i))
.spawn(|| {
let mut guard = future_write_guard.wait().unwrap();
*guard -= 1
})
.unwrap(),
);
let future_read_guard = lock.clone().read();
threads.push(
thread::Builder::new()
.name(format!("test_thread_{}", i))
.spawn(|| {
let _val = *future_read_guard.wait().unwrap();
})
.unwrap(),
);
}
for thread in threads {
thread.join().unwrap();
}
let guard = lock.clone().read().wait().unwrap();
assert_eq!(*guard, start_val);
}
#[test]
fn read_write_thread_loop() {
let lock = QrwLock::new(vec![0usize; 1 << 13]);
let loop_count = 15;
let redundancy_count = 700;
let mut threads = Vec::with_capacity(loop_count * 2);
for i in 0..loop_count {
let future_write_guard = lock.clone().write();
let future_read_guard = lock.clone().read();
threads.push(
thread::Builder::new()
.name(format!("write_thread_{}", i))
.spawn(move || {
let mut guard = future_write_guard.wait().unwrap();
for _ in 0..redundancy_count {
for idx in guard.iter_mut() {
*idx += 1;
}
}
})
.unwrap(),
);
threads.push(
thread::Builder::new()
.name(format!("read_thread_{}", i))
.spawn(move || {
let guard = future_read_guard.wait().unwrap();
let expected_val = redundancy_count * (i + 1);
for idx in guard.iter() {
assert!(
*idx == expected_val,
"Lock data mismatch. \
{} expected, {} found.",
expected_val,
*idx
);
}
})
.unwrap(),
);
}
for thread in threads {
thread.join().unwrap();
}
let guard = lock.clone().read().wait().unwrap();
for idx in guard.iter() {
assert_eq!(*idx, loop_count * redundancy_count);
}
}
#[test]
fn multiple_upgrades() {
let lock = QrwLock::new(0usize);
let upgrade_count = 12;
let mut threads = Vec::with_capacity(upgrade_count * 2);
for i in 0..upgrade_count {
let future_read_guard_a = lock.clone().read();
let future_read_guard_b = lock.clone().read();
threads.push(
thread::Builder::new()
.name(format!("read_thread_{}", i))
.spawn(move || {
let _read_guard = future_read_guard_a.wait().expect("[0]");
::std::thread::sleep(::std::time::Duration::from_millis(500));
})
.expect("[1]"),
);
threads.push(
thread::Builder::new()
.name(format!("upgrade_thread_{}", i))
.spawn(move || {
let read_guard = future_read_guard_b.wait().expect("[2]");
let upgrade_guard = ReadGuard::upgrade(read_guard);
let mut write_guard = upgrade_guard
.wait()
.expect("Error waiting for upgrade guard");
*write_guard += 1;
})
.expect("[4]"),
);
}
for handle in threads {
let name = handle.thread().name().unwrap().to_owned();
handle
.join()
.expect(&format!("Error joining thread: {:?}", name));
}
let guard = lock.read().wait().expect("[6]");
assert_eq!(*guard, upgrade_count);
}
}