Crate lf_queue[−][src]
Expand description
A lock-free multi-producer multi-consumer unbounded queue.
Examples
Single Producer - Single Consumer:
use lf_queue::Queue;
const COUNT: usize = 1_000;
let queue: Queue<usize> = Queue::new();
for i in 0..COUNT {
queue.push(i);
}
for i in 0..COUNT {
assert_eq!(i, queue.pop().unwrap());
}
assert!(queue.pop().is_none());
Multi Producer - Single Consumer:
use lf_queue::Queue;
use std::thread;
const COUNT: usize = 1_000;
const CONCURRENCY: usize = 4;
let queue: Queue<usize> = Queue::new();
let ths: Vec<_> = (0..CONCURRENCY)
.map(|_| {
let q = queue.clone();
thread::spawn(move || {
for i in 0..COUNT {
q.push(i);
}
})
})
.collect();
for th in ths {
th.join().unwrap();
}
for _ in 0..COUNT * CONCURRENCY {
assert!(queue.pop().is_some());
}
assert!(queue.pop().is_none());
Single Producer - Multi Consumer:
use lf_queue::Queue;
use std::thread;
const COUNT: usize = 1_000;
const CONCURRENCY: usize = 4;
let queue: Queue<usize> = Queue::new();
for i in 0..COUNT * CONCURRENCY {
queue.push(i);
}
let ths: Vec<_> = (0..CONCURRENCY)
.map(|_| {
let q = queue.clone();
thread::spawn(move || {
for _ in 0..COUNT {
loop {
if q.pop().is_some() {
break;
}
}
}
})
})
.collect();
for th in ths {
th.join().unwrap();
}
assert!(queue.pop().is_none());
Multi Producer - Multi Consumer:
use lf_queue::Queue;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
const COUNT: usize = 1_000;
const CONCURRENCY: usize = 4;
let queue: Queue<usize> = Queue::new();
let items = Arc::new((0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>());
let ths: Vec<_> = (0..CONCURRENCY)
.map(|_| {
let q = queue.clone();
let its = items.clone();
thread::spawn(move || {
for _ in 0..COUNT {
let n = loop {
if let Some(x) = q.pop() {
break x;
} else {
thread::yield_now();
}
};
its[n].fetch_add(1, Ordering::SeqCst);
}
})
})
.map(|_| {
let q = queue.clone();
thread::spawn(move || {
for i in 0..COUNT {
q.push(i);
}
})
})
.collect();
for th in ths {
th.join().unwrap();
}
thread::sleep(std::time::Duration::from_millis(10));
for c in &*items {
assert_eq!(c.load(Ordering::SeqCst), CONCURRENCY);
}
assert!(queue.pop().is_none());
Structs
A lock-free multi-producer multi-consumer unbounded queue.