Skip to main content

qubit_dcl/double_checked/
double_checked_lock_executor.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! # Double-Checked Lock Executor
11//!
12//! Provides a reusable executor for double-checked locking workflows.
13//!
14
15use std::{
16    any::Any,
17    fmt::Display,
18    marker::PhantomData,
19    panic::{self, AssertUnwindSafe},
20};
21
22use qubit_function::{
23    ArcRunnable, ArcTester, Callable, CallableWith, Runnable, RunnableWith, Tester,
24};
25
26use super::{
27    ExecutionContext, ExecutionLogger, ExecutionResult, ExecutorError,
28    executor_builder::ExecutorBuilder, executor_ready_builder::ExecutorReadyBuilder,
29};
30use crate::lock::Lock;
31
32/// Reusable double-checked lock executor.
33///
34/// The executor owns the lock handle, condition tester, execution logger, and
35/// optional prepare lifecycle callbacks. Each execution performs:
36///
37/// 1. A first condition check outside the lock.
38/// 2. Optional prepare action.
39/// 3. Lock acquisition.
40/// 4. A second condition check inside the lock.
41/// 5. The submitted task.
42/// 6. Optional prepare commit or rollback after the lock is released.
43///
44/// The tester is intentionally run both outside and inside the lock. Any state
45/// read by the first check must therefore use atomics or another synchronization
46/// mechanism that is safe without this executor's lock.
47///
48/// # Type Parameters
49///
50/// * `L` - The lock type implementing [`Lock<T>`].
51/// * `T` - The data type protected by the lock.
52///
53/// # Examples
54///
55/// Use [`DoubleCheckedLockExecutor::builder`] to attach a lock (for example
56/// [`crate::ArcMutex`]), set a [`Tester`] with
57/// [`ExecutorLockBuilder::when`](super::ExecutorLockBuilder::when), then call
58/// [`Self::call`], [`Self::execute`], [`Self::call_with`], or
59/// [`Self::execute_with`] on the built executor.
60///
61/// Panics from the tester, prepare callbacks, or task can be captured with
62/// [`set_catch_panics`](Self::set_catch_panics) and reported as
63/// [`super::ExecutorError::Panic`], so rollback can still be executed.
64///
65/// Cloned executors share their configured prepare callbacks. Concurrent calls
66/// may therefore complete prepare in several threads before one call wins the
67/// second condition check; calls that lose the second check run prepare rollback
68/// if it is configured.
69///
70/// ```rust
71/// use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
72///
73/// use qubit_dcl::{ArcMutex, DoubleCheckedLockExecutor, Lock};
74/// use qubit_dcl::double_checked::ExecutionResult;
75///
76/// fn main() {
77///     let data = ArcMutex::new(10);
78///     let skip = Arc::new(AtomicBool::new(false));
79///
80///     let executor = DoubleCheckedLockExecutor::builder()
81///         .on(data.clone())
82///         .when({
83///             let skip = skip.clone();
84///             move || !skip.load(Ordering::Acquire)
85///         })
86///         .build();
87///
88///     let updated = executor
89///         .call_with(|value: &mut i32| {
90///             *value += 5;
91///             Ok::<i32, std::io::Error>(*value)
92///         })
93///         .get_result();
94///
95///     assert!(matches!(updated, ExecutionResult::Success(15)));
96///     assert_eq!(data.read(|value| *value), 15);
97///
98///     skip.store(true, Ordering::Release);
99///     let skipped = executor
100///         .call_with(|value: &mut i32| {
101///             *value += 1;
102///             Ok::<i32, std::io::Error>(*value)
103///         })
104///         .get_result();
105///
106///     assert!(matches!(skipped, ExecutionResult::ConditionNotMet));
107///     assert_eq!(data.read(|value| *value), 15);
108/// }
109/// ```
110///
111#[derive(Clone)]
112pub struct DoubleCheckedLockExecutor<L = (), T = ()> {
113    /// The lock protecting the target data.
114    lock: L,
115
116    /// Condition checked before and after acquiring the lock.
117    tester: ArcTester,
118
119    /// Logger for unmet conditions and prepare lifecycle failures.
120    logger: ExecutionLogger,
121
122    /// Optional action executed after the first check and before locking.
123    prepare_action: Option<ArcRunnable<CallbackError>>,
124
125    /// Optional action executed when prepare must be rolled back.
126    rollback_prepare_action: Option<ArcRunnable<CallbackError>>,
127
128    /// Optional action executed when prepare should be committed.
129    commit_prepare_action: Option<ArcRunnable<CallbackError>>,
130
131    /// Whether panics from tester, callbacks, and task are captured as errors.
132    catch_panics: bool,
133
134    /// Carries the protected data type.
135    _phantom: PhantomData<fn() -> T>,
136}
137
138impl DoubleCheckedLockExecutor<(), ()> {
139    /// Creates a builder for a reusable double-checked lock executor.
140    ///
141    /// # Returns
142    ///
143    /// A builder in the initial state. Attach a lock with
144    /// [`ExecutorBuilder::on`], then configure a tester with
145    /// [`ExecutorLockBuilder::when`](super::ExecutorLockBuilder::when).
146    #[inline]
147    pub fn builder() -> ExecutorBuilder {
148        ExecutorBuilder::default()
149    }
150}
151
152impl<L, T> DoubleCheckedLockExecutor<L, T>
153where
154    L: Lock<T>,
155{
156    /// Assembles an executor from the ready builder state.
157    ///
158    /// # Parameters
159    ///
160    /// * `builder` - Ready builder carrying the lock, tester, logger, and
161    ///   prepare lifecycle callbacks.
162    ///
163    /// # Returns
164    ///
165    /// A reusable executor containing the supplied builder state.
166    #[inline]
167    pub fn new(builder: ExecutorReadyBuilder<L, T>) -> Self {
168        Self {
169            lock: builder.lock,
170            tester: builder.tester,
171            logger: builder.logger,
172            prepare_action: builder.prepare_action,
173            rollback_prepare_action: builder.rollback_prepare_action,
174            commit_prepare_action: builder.commit_prepare_action,
175            catch_panics: builder.catch_panics,
176            _phantom: builder._phantom,
177        }
178    }
179
180    /// Executes a zero-argument callable while holding the write lock.
181    ///
182    /// Use [`Self::call_with`] when the task needs direct mutable access to the
183    /// protected data.
184    ///
185    /// # Parameters
186    ///
187    /// * `task` - The callable task to execute after both condition checks pass.
188    ///
189    /// # Returns
190    ///
191    /// An [`ExecutionContext`] containing success, unmet-condition, or failure
192    /// information.
193    #[inline]
194    pub fn call<C, R, E>(&self, task: C) -> ExecutionContext<R, E>
195    where
196        C: Callable<R, E>,
197        E: Display,
198    {
199        let mut task = task;
200        let result = self.execute_with_write_lock(move |_data| task.call());
201        ExecutionContext::new(result)
202    }
203
204    /// Executes a zero-argument runnable while holding the write lock.
205    ///
206    /// # Parameters
207    ///
208    /// * `task` - The runnable task to execute after both condition checks pass.
209    ///
210    /// # Returns
211    ///
212    /// An [`ExecutionContext`] containing success, unmet-condition, or failure
213    /// information.
214    #[inline]
215    pub fn execute<Rn, E>(&self, task: Rn) -> ExecutionContext<(), E>
216    where
217        Rn: Runnable<E>,
218        E: Display,
219    {
220        let mut task = task;
221        let result = self.execute_with_write_lock(move |_data| task.run());
222        ExecutionContext::new(result)
223    }
224
225    /// Executes a callable with mutable access to the protected data.
226    ///
227    /// # Parameters
228    ///
229    /// * `task` - The callable receiving `&mut T` after both condition checks
230    ///   pass.
231    ///
232    /// # Returns
233    ///
234    /// An [`ExecutionContext`] containing success, unmet-condition, or failure
235    /// information.
236    #[inline]
237    pub fn call_with<C, R, E>(&self, task: C) -> ExecutionContext<R, E>
238    where
239        C: CallableWith<T, R, E>,
240        E: Display,
241    {
242        let mut task = task;
243        let result = self.execute_with_write_lock(move |data| task.call_with(data));
244        ExecutionContext::new(result)
245    }
246
247    /// Executes a runnable with mutable access to the protected data.
248    ///
249    /// # Parameters
250    ///
251    /// * `task` - The runnable receiving `&mut T` after both condition checks
252    ///   pass.
253    ///
254    /// # Returns
255    ///
256    /// An [`ExecutionContext`] containing success, unmet-condition, or failure
257    /// information.
258    #[inline]
259    pub fn execute_with<Rn, E>(&self, task: Rn) -> ExecutionContext<(), E>
260    where
261        Rn: RunnableWith<T, E>,
262        E: Display,
263    {
264        let mut task = task;
265        let result = self.execute_with_write_lock(move |data| task.run_with(data));
266        ExecutionContext::new(result)
267    }
268
269    /// Enables or disables panic capture for tester, callbacks, and task
270    /// execution.
271    #[inline]
272    pub fn set_catch_panics(mut self, catch_panics: bool) -> Self {
273        self.catch_panics = catch_panics;
274        self
275    }
276
277    /// Deprecated alias for [`Self::set_catch_panics`].
278    #[deprecated(note = "Use `set_catch_panics` instead to align with setter naming.")]
279    #[inline]
280    pub fn with_catch_panics(self, catch_panics: bool) -> Self {
281        self.set_catch_panics(catch_panics)
282    }
283
284    /// Returns whether panic capture is enabled.
285    #[inline]
286    pub fn catch_panics(&self) -> bool {
287        self.catch_panics
288    }
289
290    /// Runs the configured double-checked sequence under a write lock.
291    ///
292    /// # Parameters
293    ///
294    /// * `task` - The task to run with mutable access after both condition
295    ///   checks pass.
296    ///
297    /// # Returns
298    ///
299    /// The final execution result, including prepare finalization.
300    ///
301    /// # Errors
302    ///
303    /// Task errors are captured as [`ExecutionResult::Failed`] with
304    /// [`super::ExecutorError::TaskFailed`]. Prepare, commit, and rollback
305    /// failures are also captured in the returned [`ExecutionResult`] rather
306    /// than returned as a separate `Result`.
307    fn execute_with_write_lock<R, E, F>(&self, task: F) -> ExecutionResult<R, E>
308    where
309        E: Display,
310        F: FnOnce(&mut T) -> Result<R, E>,
311    {
312        let first_check = match self.try_run("tester", || self.tester.test()) {
313            Ok(v) => v,
314            Err(error) => {
315                return ExecutionResult::from_executor_error(ExecutorError::Panic(error));
316            }
317        };
318
319        if !first_check {
320            self.log_unmet_condition();
321            return ExecutionResult::unmet();
322        }
323
324        let prepare_completed = match self.run_prepare_action() {
325            Ok(completed) => completed,
326            Err(error) => {
327                return ExecutionResult::from_executor_error(ExecutorError::PrepareFailed(error));
328            }
329        };
330
331        let result = self.lock.write(|data| {
332            let passed = match self.try_run("tester", || self.tester.test()) {
333                Ok(v) => v,
334                Err(error) => {
335                    return ExecutionResult::from_executor_error(ExecutorError::Panic(error));
336                }
337            };
338            if !passed {
339                return ExecutionResult::unmet();
340            }
341
342            match self.try_run("task", || task(data)) {
343                Ok(Ok(value)) => ExecutionResult::success(value),
344                Ok(Err(error)) => ExecutionResult::task_failed(error),
345                Err(error) => ExecutionResult::from_executor_error(ExecutorError::Panic(error)),
346            }
347        });
348
349        if result.is_unmet() {
350            self.log_unmet_condition();
351        }
352
353        if prepare_completed {
354            self.finalize_prepare(result)
355        } else {
356            result
357        }
358    }
359
360    /// Executes the optional prepare action.
361    ///
362    /// # Returns
363    ///
364    /// `Ok(true)` if prepare exists and succeeds, `Ok(false)` if no prepare
365    /// action is configured, or `Err(message)` if prepare fails.
366    fn run_prepare_action(&self) -> Result<bool, CallbackError> {
367        let Some(mut prepare_action) = self.prepare_action.clone() else {
368            return Ok(false);
369        };
370
371        match self.try_run("prepare", move || prepare_action.run()) {
372            Ok(Ok(_)) => Ok(true),
373            Ok(Err(error)) => {
374                self.logger.log_prepare_failed(&error);
375                Err(error)
376            }
377            Err(error) => {
378                self.logger.log_prepare_failed(&error);
379                Err(error)
380            }
381        }
382    }
383
384    /// Commits or rolls back a successfully completed prepare action.
385    ///
386    /// This method runs after the write lock has been released.
387    ///
388    /// # Parameters
389    ///
390    /// * `result` - Result produced by the condition check and task execution.
391    ///
392    /// # Returns
393    ///
394    /// `result` unchanged when no finalization action fails. Returns a failed
395    /// result when prepare commit or prepare rollback fails.
396    fn finalize_prepare<R, E>(&self, mut result: ExecutionResult<R, E>) -> ExecutionResult<R, E>
397    where
398        E: Display,
399    {
400        if result.is_success() {
401            if let Some(mut commit_prepare_action) = self.commit_prepare_action.clone() {
402                match self.try_run("prepare_commit", move || commit_prepare_action.run()) {
403                    Ok(Ok(_)) => {}
404                    Ok(Err(error)) => {
405                        self.logger.log_prepare_commit_failed(&error);
406                        result = ExecutionResult::from_executor_error(
407                            ExecutorError::PrepareCommitFailed(error),
408                        );
409                    }
410                    Err(error) => {
411                        self.logger.log_prepare_commit_failed(&error);
412                        result = ExecutionResult::from_executor_error(
413                            ExecutorError::PrepareCommitFailed(error),
414                        );
415                    }
416                }
417            }
418            return result;
419        }
420
421        let original = if let ExecutionResult::Failed(error) = &result {
422            error.to_string()
423        } else {
424            "Condition not met".to_string()
425        };
426
427        if let Some(mut rollback_prepare_action) = self.rollback_prepare_action.clone() {
428            match self.try_run("prepare_rollback", move || rollback_prepare_action.run()) {
429                Ok(Ok(_)) => {}
430                Ok(Err(error)) => {
431                    self.logger.log_prepare_rollback_failed(&error);
432                    result = ExecutionResult::prepare_rollback_failed(original, error.message());
433                }
434                Err(error) => {
435                    self.logger.log_prepare_rollback_failed(&error);
436                    result = ExecutionResult::prepare_rollback_failed(original, error.message());
437                }
438            }
439        }
440        result
441    }
442
443    /// Runs a callback with optional panic capture.
444    fn try_run<R>(
445        &self,
446        callback_type: &'static str,
447        callback: impl FnOnce() -> R,
448    ) -> Result<R, CallbackError> {
449        if !self.catch_panics {
450            return Ok(callback());
451        }
452
453        match panic::catch_unwind(AssertUnwindSafe(callback)) {
454            Ok(result) => Ok(result),
455            Err(payload) => {
456                let message = panic_payload_to_message(&*payload);
457                Err(CallbackError::with_type(callback_type, message))
458            }
459        }
460    }
461
462    /// Logs that the double-checked condition was not met.
463    fn log_unmet_condition(&self) {
464        self.logger.log_unmet_condition();
465    }
466}
467
468type CallbackError = super::callback_error::CallbackError;
469
470fn panic_payload_to_message(payload: &(dyn Any + Send)) -> String {
471    if let Some(message) = payload.downcast_ref::<&str>() {
472        (*message).to_string()
473    } else if let Some(message) = payload.downcast_ref::<String>() {
474        message.to_string()
475    } else {
476        format!("{:?}", payload)
477    }
478}