thread_counter/lib.rs
1// SPDX-License-Identifier: MIT OR Apache-2.0
2//! A lightweight, thread-safe library for counting and synchronizing concurrent
3//! operations.
4//!
5//! This crate provides a [`ThreadCounter`] type that can be used to keep track
6//! of the number of active threads or operations, and to synchronize the
7//! completion of these operations. It's particularly useful for scenarios where
8//! you need to wait for a group of tasks to complete before proceeding.
9//!
10//! ## Features
11//!
12//! - Thread-safe counting of active operations.
13//! - RAII-based automatic decrementing using [`Ticket`]s.
14//! - Ability to wait for all operations to complete, with optional timeout.
15//!
16//! ## Usage
17//!
18//! Here's a basic example of how to use the [`ThreadCounter`]:
19//!
20//! ```rust
21//! use std::{thread, time::Duration};
22//! use thread_counter::ThreadCounter;
23//!
24//! let counter = ThreadCounter::default();
25//!
26//! // Spawn some threads
27//! for _ in 0..5 {
28//! thread::spawn(move || {
29//! // Take a ticket, incrementing the counter.
30//! let ticket = counter.ticket();
31//! // Simulate some work
32//! thread::sleep(Duration::from_millis(100));
33//! // `ticket` is automatically dropped here, decrementing the counter
34//! });
35//! }
36//!
37//! // Wait for all threads to complete, timing out after 200ms.
38//! counter.wait(Duration::from_millis(200));
39//! println!("All threads have completed!");
40//! ```
41
42#![forbid(unsafe_code)]
43#![warn(
44 clippy::correctness,
45 clippy::suspicious,
46 clippy::complexity,
47 clippy::perf,
48 clippy::style
49)]
50#![allow(clippy::tabs_in_doc_comments)]
51
52use parking_lot::{Condvar, Mutex};
53use std::{ops::Deref, sync::Arc, time::Duration};
54
55/// A thread-safe counter for tracking the number of active threads or
56/// operations.
57///
58/// This struct provides a high-level interface for incrementing, decrementing,
59/// and waiting on a thread counter. It internally uses [`Arc`] to allow for
60/// cheap cloning and shared ownership.
61#[derive(Default, Clone)]
62pub struct ThreadCounter {
63 inner: Arc<RawThreadCounter>,
64}
65
66impl ThreadCounter {
67 /// Creates a new [`Ticket`] from this thread counter.
68 ///
69 /// This method increments the thread count and returns a [`Ticket`] that
70 /// will automatically decrement the count when dropped.
71 ///
72 /// # Returns
73 /// A new [`Ticket`] instance associated with this counter.
74 pub fn ticket(&self) -> Ticket {
75 self.increment();
76 Ticket {
77 counter: self.clone(),
78 }
79 }
80}
81
82impl Deref for ThreadCounter {
83 type Target = RawThreadCounter;
84
85 fn deref(&self) -> &Self::Target {
86 &self.inner
87 }
88}
89
90impl AsRef<RawThreadCounter> for ThreadCounter {
91 fn as_ref(&self) -> &RawThreadCounter {
92 &self.inner
93 }
94}
95
96/// The internal implementation of the thread counter.
97///
98/// This struct handles the actual counting and synchronization mechanisms.
99pub struct RawThreadCounter {
100 count: Mutex<usize>,
101 condvar: Condvar,
102}
103
104impl RawThreadCounter {
105 /// Increments the thread counter.
106 ///
107 /// # Note
108 /// It's preferable to use [`ThreadCounter::ticket()`] instead, which
109 /// ensures that the count is automatically decremented when the ticket is
110 /// dropped.
111 pub fn increment(&self) {
112 let mut count = self.count.lock();
113 *count += 1;
114 }
115
116 /// Decrements the thread counter.
117 ///
118 /// If the count reaches zero, it notifies all waiting threads.
119 ///
120 /// # Note
121 /// It's preferable to use [`ThreadCounter::ticket()`] instead, which
122 /// ensures that the count is automatically decremented when the ticket is
123 /// dropped.
124 pub fn decrement(&self) {
125 let mut count = self.count.lock();
126 *count -= 1;
127 if *count == 0 {
128 self.condvar.notify_all();
129 }
130 }
131
132 /// Waits for the counter to reach zero, with an optional timeout.
133 ///
134 /// # Arguments
135 /// * `timeout` - An optional duration to wait. If `None`, waits
136 /// indefinitely.
137 ///
138 /// # Returns
139 /// * `true` if the count reached zero.
140 /// * `false` if the timeout was reached before the count reached zero.
141 pub fn wait(&self, timeout: impl Into<Option<Duration>>) -> bool {
142 let mut count = self.count.lock();
143 let condition = |count: &mut usize| *count > 0;
144 match timeout.into() {
145 Some(timeout) => !self
146 .condvar
147 .wait_while_for(&mut count, condition, timeout)
148 .timed_out(),
149 None => {
150 self.condvar.wait_while(&mut count, condition);
151 true
152 }
153 }
154 }
155}
156
157impl Default for RawThreadCounter {
158 fn default() -> Self {
159 Self {
160 count: Mutex::new(0),
161 condvar: Condvar::new(),
162 }
163 }
164}
165
166/// A RAII guard for automatically managing the thread count.
167///
168/// When a `Ticket` is created, it increments the associated thread counter.
169/// When the `Ticket` is dropped, it automatically decrements the counter.
170pub struct Ticket {
171 counter: ThreadCounter,
172}
173
174impl Drop for Ticket {
175 fn drop(&mut self) {
176 self.counter.decrement();
177 }
178}