# orx-concurrent-queue
[](https://crates.io/crates/orx-concurrent-queue)
[](https://crates.io/crates/orx-concurrent-queue)
[](https://docs.rs/orx-concurrent-queue)
A high performance and convenient thread safe queue that can concurrently grow and shrink with push, extend, pop and pull capabilities.
## Examples
The following example demonstrates a basic usage of [`ConcurrentQueue`](https://docs.rs/orx-concurrent-queue/latest/orx_concurrent_queue/struct.ConcurrentQueue.html) within a synchronous program. Note that `push`, `extend`, `pop` and `pull` methods can be called with a shared reference `&self`. This allows to use the queue conveniently in a concurrent program.
```rust
use orx_concurrent_queue::ConcurrentQueue;
let queue = ConcurrentQueue::new();
queue.push(0); // [0]
queue.push(1); // [0, 1]
let x = queue.pop(); // [1]
assert_eq!(x, Some(0));
queue.extend(2..7); // [1, 2, 3, 4, 5, 6]
let x: Vec<_> = queue.pull(4).unwrap().collect(); // [5, 6]
assert_eq!(x, vec![1, 2, 3, 4]);
assert_eq!(queue.len(), 2);
let vec = queue.into_inner();
assert_eq!(vec, vec![5, 6]);
```
The following example demonstrates the main purpose of the concurrent queue: to simultaneously push to and pop from the queue. This enables a parallel program where tasks can be handled by multiple threads, while at the same time, new tasks can be created and dynamically added to the queue.
In the following example, the queue is created with three pre-populated tasks. Every task might potentially lead to new tasks. These new tasks are also added to the back of the queue, to be popped later and potentially add new tasks to the queue.
```rust
use orx_concurrent_queue::ConcurrentQueue;
use std::sync::atomic::{AtomicUsize, Ordering};
struct Task {
micros: usize,
}
impl Task {
fn perform(&self) {
use std::{thread::sleep, time::Duration};
sleep(Duration::from_micros(self.micros as u64));
}
fn child_tasks(&self) -> impl ExactSizeIterator<Item = Task> {
let range = match self.micros < 5 {
true => 0..0,
false => 0..self.micros,
};
range.rev().take(5).map(|micros| Self { micros })
}
}
let queue = ConcurrentQueue::new();
// pre-populate with 3 tasks
for micros in [10, 15, 10] {
queue.push(Task { micros });
}
// count total number of performed tasks
let num_performed_tasks = AtomicUsize::new(queue.len());
let num_threads = 8;
s.spawn(|| {
// keep popping a task from front of the queue
// as long as the queue is not empty
while let Some(task) = queue.pop() {
// create children tasks, add to back
queue.extend(task.child_tasks());
// perform the popped task
task.perform();
_ = num_performed_tasks.fetch_add(1, Ordering::Relaxed);
}
});
}
});
assert_eq!(num_performed_tasks.load(Ordering::Relaxed), 5046);
```
## Contributing
Contributions are welcome! If you notice an error, have a question or think something could be improved, please open an [issue](https://github.com/orxfun/orx-concurrent-queue/issues/new) or create a PR.
## License
Dual-licensed under [Apache 2.0](LICENSE-APACHE) or [MIT](LICENSE-MIT).