password_worker/
worker.rs

1use crate::Hasher;
2use rayon::ThreadPoolBuilder;
3use tokio::sync::oneshot;
4
5/// Errors that can occur in the `PasswordWorker`.
6#[derive(Debug, thiserror::Error)]
7pub enum PasswordWorkerError<H: Hasher> {
8    /// An error from the Hashing operation
9    #[error("Hashing error: {0}")]
10    Hashing(H::Error),
11    /// The worker thread must have died
12    #[error("Channel send error: {0}")]
13    ChannelSend(#[from] crossbeam_channel::SendError<WorkerCommand<H>>),
14    /// The worker thread must have died
15    #[error("Channel receive error: {0}")]
16    ChannelRecv(#[from] tokio::sync::oneshot::error::RecvError),
17    /// Couldn't create the rayon threadpool
18    #[error("ThreadPool build error: {0}")]
19    ThreadPool(#[from] rayon::ThreadPoolBuildError),
20}
21
22#[derive(Debug)]
23pub enum WorkerCommand<H: Hasher> {
24    Hash(
25        String,
26        H::Config,
27        oneshot::Sender<Result<String, PasswordWorkerError<H>>>,
28    ),
29    Verify(
30        String,
31        String,
32        oneshot::Sender<Result<bool, PasswordWorkerError<H>>>,
33    ),
34}
35
36/// A worker that handles password hashing and verification using a `rayon` thread pool
37/// and `crossbeam-channel`.
38///
39/// The `PasswordWorker` struct provides asynchronous password hashing and verification
40/// operations.
41#[derive(Debug, Clone)]
42pub struct PasswordWorker<H: Hasher> {
43    sender: crossbeam_channel::Sender<WorkerCommand<H>>,
44}
45
46impl<H: Hasher> PasswordWorker<H> {
47    /// Creates a new `PasswordWorker` with the given maximum number of threads.
48    ///
49    /// The `max_threads` parameter specifies the maximum number of threads the worker can use.
50    ///
51    /// # Examples
52    ///
53    /// ```
54    /// # #[tokio::main]
55    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
56    /// use password_worker::{Bcrypt, PasswordWorker};
57    ///
58    /// let max_threads = 4; // rayon thread pool max threads
59    /// let password_worker: PasswordWorker<Bcrypt> = PasswordWorker::new(max_threads)?;
60    /// # Ok(())
61    /// # }
62    /// ```
63    pub fn new(max_threads: usize) -> Result<Self, PasswordWorkerError<H>> {
64        let (sender, receiver) = crossbeam_channel::unbounded::<WorkerCommand<H>>();
65
66        let thread_pool = ThreadPoolBuilder::new().num_threads(max_threads).build()?;
67
68        std::thread::spawn(move || {
69            while let Ok(command) = receiver.recv() {
70                match command {
71                    WorkerCommand::Hash(password, cost, result_sender) => {
72                        let result = thread_pool.install(|| H::hash(&password, &cost));
73                        result_sender
74                            .send(result.map_err(PasswordWorkerError::Hashing))
75                            .ok()?;
76                    }
77                    WorkerCommand::Verify(password, hash, result_sender) => {
78                        let result = thread_pool.install(|| H::verify(&password, &hash));
79                        result_sender
80                            .send(result.map_err(PasswordWorkerError::Hashing))
81                            .ok()?;
82                    }
83                }
84            }
85            Some(())
86        });
87
88        Ok(PasswordWorker { sender })
89    }
90
91    /// Asynchronously hashes the given password using its hashing algorithm.
92    ///
93    /// # Example
94    ///
95    /// ```
96    /// # #[tokio::main]
97    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
98    /// use password_worker::{Bcrypt, BcryptConfig, PasswordWorker};
99    ///
100    /// let password = "hunter2";
101    /// let cost = 12; // bcrypt cost value
102    /// let max_threads = 4; // rayon thread pool max threads
103    /// let password_worker = PasswordWorker::<Bcrypt>::new(max_threads)?;
104    ///
105    /// let hashed_password = password_worker.hash(password, BcryptConfig { cost }).await?;
106    /// println!("Hashed password: {:?}", hashed_password);
107    /// # Ok(())
108    /// # }
109    /// ```
110    pub async fn hash(
111        &self,
112        password: impl Into<String>,
113        cost: H::Config,
114    ) -> Result<String, PasswordWorkerError<H>> {
115        let (tx, rx) = oneshot::channel();
116
117        self.sender
118            .send(WorkerCommand::Hash(password.into(), cost, tx))?;
119
120        rx.await?
121    }
122
123    /// Asynchronously verifies a password against a hash string.
124    ///
125    /// # Example
126    ///
127    /// ```
128    /// # #[tokio::main]
129    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
130    /// use password_worker::{Bcrypt, BcryptConfig, PasswordWorker};
131    ///
132    /// let password = "hunter2";
133    /// let cost = 12; // bcrypt cost value
134    /// let max_threads = 4; // rayon thread pool max threads
135    /// let password_worker = PasswordWorker::<Bcrypt>::new(max_threads)?;
136    /// let hashed_password = password_worker.hash(password, BcryptConfig { cost }).await?;
137    ///
138    /// let is_valid = password_worker.verify(password, hashed_password).await?;
139    /// println!("Verification result: {:?}", is_valid);
140    /// # Ok(())
141    /// # }
142    /// ```
143    pub async fn verify(
144        &self,
145        password: impl Into<String>,
146        hash: impl Into<String>,
147    ) -> Result<bool, PasswordWorkerError<H>> {
148        let (tx, rx) = oneshot::channel();
149
150        self.sender
151            .send(WorkerCommand::Verify(password.into(), hash.into(), tx))?;
152
153        rx.await?
154    }
155}