thread_counter/
lib.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2//! A lightweight, thread-safe library for counting and synchronizing concurrent
3//! operations.
4//!
5//! This crate provides a [`ThreadCounter`] type that can be used to keep track
6//! of the number of active threads or operations, and to synchronize the
7//! completion of these operations. It's particularly useful for scenarios where
8//! you need to wait for a group of tasks to complete before proceeding.
9//!
10//! ## Features
11//!
12//! - Thread-safe counting of active operations.
13//! - RAII-based automatic decrementing using [`Ticket`]s.
14//! - Ability to wait for all operations to complete, with optional timeout.
15//!
16//! ## Usage
17//!
18//! Here's a basic example of how to use the [`ThreadCounter`]:
19//!
20//! ```rust
21//! use std::{thread, time::Duration};
22//! use thread_counter::ThreadCounter;
23//!
24//! let counter = ThreadCounter::default();
25//!
26//! // Spawn some threads
27//! for _ in 0..5 {
28//! 	thread::spawn(move || {
29//! 		// Take a ticket, incrementing the counter.
30//! 		let ticket = counter.ticket();
31//! 		// Simulate some work
32//! 		thread::sleep(Duration::from_millis(100));
33//! 		// `ticket` is automatically dropped here, decrementing the counter
34//! 	});
35//! }
36//!
37//! // Wait for all threads to complete, timing out after 200ms.
38//! counter.wait(Duration::from_millis(200));
39//! println!("All threads have completed!");
40//! ```
41
42#![forbid(unsafe_code)]
43#![warn(
44	clippy::correctness,
45	clippy::suspicious,
46	clippy::complexity,
47	clippy::perf,
48	clippy::style
49)]
50#![allow(clippy::tabs_in_doc_comments)]
51
52use parking_lot::{Condvar, Mutex};
53use std::{ops::Deref, sync::Arc, time::Duration};
54
55/// A thread-safe counter for tracking the number of active threads or
56/// operations.
57///
58/// This struct provides a high-level interface for incrementing, decrementing,
59/// and waiting on a thread counter. It internally uses [`Arc`] to allow for
60/// cheap cloning and shared ownership.
61#[derive(Default, Clone)]
62pub struct ThreadCounter {
63	inner: Arc<RawThreadCounter>,
64}
65
66impl ThreadCounter {
67	/// Creates a new [`Ticket`] from this thread counter.
68	///
69	/// This method increments the thread count and returns a [`Ticket`] that
70	/// will automatically decrement the count when dropped.
71	///
72	/// # Returns
73	/// A new [`Ticket`] instance associated with this counter.
74	pub fn ticket(&self) -> Ticket {
75		self.increment();
76		Ticket {
77			counter: self.clone(),
78		}
79	}
80}
81
82impl Deref for ThreadCounter {
83	type Target = RawThreadCounter;
84
85	fn deref(&self) -> &Self::Target {
86		&self.inner
87	}
88}
89
90impl AsRef<RawThreadCounter> for ThreadCounter {
91	fn as_ref(&self) -> &RawThreadCounter {
92		&self.inner
93	}
94}
95
96/// The internal implementation of the thread counter.
97///
98/// This struct handles the actual counting and synchronization mechanisms.
99pub struct RawThreadCounter {
100	count: Mutex<usize>,
101	condvar: Condvar,
102}
103
104impl RawThreadCounter {
105	/// Increments the thread counter.
106	///
107	/// # Note
108	/// It's preferable to use [`ThreadCounter::ticket()`] instead, which
109	/// ensures that the count is automatically decremented when the ticket is
110	/// dropped.
111	pub fn increment(&self) {
112		let mut count = self.count.lock();
113		*count += 1;
114	}
115
116	/// Decrements the thread counter.
117	///
118	/// If the count reaches zero, it notifies all waiting threads.
119	///
120	/// # Note
121	/// It's preferable to use [`ThreadCounter::ticket()`] instead, which
122	/// ensures that the count is automatically decremented when the ticket is
123	/// dropped.
124	pub fn decrement(&self) {
125		let mut count = self.count.lock();
126		*count -= 1;
127		if *count == 0 {
128			self.condvar.notify_all();
129		}
130	}
131
132	/// Waits for the counter to reach zero, with an optional timeout.
133	///
134	/// # Arguments
135	/// * `timeout` - An optional duration to wait. If `None`, waits
136	///   indefinitely.
137	///
138	/// # Returns
139	/// * `true` if the count reached zero.
140	/// * `false` if the timeout was reached before the count reached zero.
141	pub fn wait(&self, timeout: impl Into<Option<Duration>>) -> bool {
142		let mut count = self.count.lock();
143		let condition = |count: &mut usize| *count > 0;
144		match timeout.into() {
145			Some(timeout) => !self
146				.condvar
147				.wait_while_for(&mut count, condition, timeout)
148				.timed_out(),
149			None => {
150				self.condvar.wait_while(&mut count, condition);
151				true
152			}
153		}
154	}
155}
156
157impl Default for RawThreadCounter {
158	fn default() -> Self {
159		Self {
160			count: Mutex::new(0),
161			condvar: Condvar::new(),
162		}
163	}
164}
165
166/// A RAII guard for automatically managing the thread count.
167///
168/// When a `Ticket` is created, it increments the associated thread counter.
169/// When the `Ticket` is dropped, it automatically decrements the counter.
170pub struct Ticket {
171	counter: ThreadCounter,
172}
173
174impl Drop for Ticket {
175	fn drop(&mut self) {
176		self.counter.decrement();
177	}
178}