Skip to main content

qubit_dcl/double_checked/
double_checked_lock_executor.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026.
4 *    Haixing Hu, Qubit Co. Ltd.
5 *
6 *    All rights reserved.
7 *
8 ******************************************************************************/
9//! # Double-Checked Lock Executor
10//!
11//! Provides a reusable executor for double-checked locking workflows.
12//!
13//! # Author
14//!
15//! Haixing Hu
16
17use std::{
18    any::Any,
19    fmt::Display,
20    marker::PhantomData,
21    panic::{self, AssertUnwindSafe},
22};
23
24use qubit_function::{
25    ArcRunnable, ArcTester, Callable, CallableWith, Runnable, RunnableWith, Tester,
26};
27
28use super::{
29    ExecutionContext, ExecutionLogger, ExecutionResult, ExecutorError,
30    executor_builder::ExecutorBuilder, executor_ready_builder::ExecutorReadyBuilder,
31};
32use crate::lock::Lock;
33
34/// Reusable double-checked lock executor.
35///
36/// The executor owns the lock handle, condition tester, execution logger, and
37/// optional prepare lifecycle callbacks. Each execution performs:
38///
39/// 1. A first condition check outside the lock.
40/// 2. Optional prepare action.
41/// 3. Lock acquisition.
42/// 4. A second condition check inside the lock.
43/// 5. The submitted task.
44/// 6. Optional prepare commit or rollback after the lock is released.
45///
46/// The tester is intentionally run both outside and inside the lock. Any state
47/// read by the first check must therefore use atomics or another synchronization
48/// mechanism that is safe without this executor's lock.
49///
50/// # Type Parameters
51///
52/// * `L` - The lock type implementing [`Lock<T>`].
53/// * `T` - The data type protected by the lock.
54///
55/// # Examples
56///
57/// Use [`DoubleCheckedLockExecutor::builder`] to attach a lock (for example
58/// [`crate::ArcMutex`]), set a [`Tester`] with
59/// [`ExecutorLockBuilder::when`](super::ExecutorLockBuilder::when), then call
60/// [`Self::call`], [`Self::execute`], [`Self::call_with`], or
61/// [`Self::execute_with`] on the built executor.
62///
63/// Panics from the tester, prepare callbacks, or task can be captured with
64/// [`set_catch_panics`](Self::set_catch_panics) and reported as
65/// [`super::ExecutorError::Panic`], so rollback can still be executed.
66///
67/// Cloned executors share their configured prepare callbacks. Concurrent calls
68/// may therefore complete prepare in several threads before one call wins the
69/// second condition check; calls that lose the second check run prepare rollback
70/// if it is configured.
71///
72/// ```rust
73/// use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
74///
75/// use qubit_dcl::{ArcMutex, DoubleCheckedLockExecutor, Lock};
76/// use qubit_dcl::double_checked::ExecutionResult;
77///
78/// fn main() {
79///     let data = ArcMutex::new(10);
80///     let skip = Arc::new(AtomicBool::new(false));
81///
82///     let executor = DoubleCheckedLockExecutor::builder()
83///         .on(data.clone())
84///         .when({
85///             let skip = skip.clone();
86///             move || !skip.load(Ordering::Acquire)
87///         })
88///         .build();
89///
90///     let updated = executor
91///         .call_with(|value: &mut i32| {
92///             *value += 5;
93///             Ok::<i32, std::io::Error>(*value)
94///         })
95///         .get_result();
96///
97///     assert!(matches!(updated, ExecutionResult::Success(15)));
98///     assert_eq!(data.read(|value| *value), 15);
99///
100///     skip.store(true, Ordering::Release);
101///     let skipped = executor
102///         .call_with(|value: &mut i32| {
103///             *value += 1;
104///             Ok::<i32, std::io::Error>(*value)
105///         })
106///         .get_result();
107///
108///     assert!(matches!(skipped, ExecutionResult::ConditionNotMet));
109///     assert_eq!(data.read(|value| *value), 15);
110/// }
111/// ```
112///
113/// # Author
114///
115/// Haixing Hu
116#[derive(Clone)]
117pub struct DoubleCheckedLockExecutor<L = (), T = ()> {
118    /// The lock protecting the target data.
119    lock: L,
120
121    /// Condition checked before and after acquiring the lock.
122    tester: ArcTester,
123
124    /// Logger for unmet conditions and prepare lifecycle failures.
125    logger: ExecutionLogger,
126
127    /// Optional action executed after the first check and before locking.
128    prepare_action: Option<ArcRunnable<CallbackError>>,
129
130    /// Optional action executed when prepare must be rolled back.
131    rollback_prepare_action: Option<ArcRunnable<CallbackError>>,
132
133    /// Optional action executed when prepare should be committed.
134    commit_prepare_action: Option<ArcRunnable<CallbackError>>,
135
136    /// Whether panics from tester, callbacks, and task are captured as errors.
137    catch_panics: bool,
138
139    /// Carries the protected data type.
140    _phantom: PhantomData<fn() -> T>,
141}
142
143impl DoubleCheckedLockExecutor<(), ()> {
144    /// Creates a builder for a reusable double-checked lock executor.
145    ///
146    /// # Returns
147    ///
148    /// A builder in the initial state. Attach a lock with
149    /// [`ExecutorBuilder::on`], then configure a tester with
150    /// [`ExecutorLockBuilder::when`](super::ExecutorLockBuilder::when).
151    #[inline]
152    pub fn builder() -> ExecutorBuilder {
153        ExecutorBuilder::default()
154    }
155}
156
157impl<L, T> DoubleCheckedLockExecutor<L, T>
158where
159    L: Lock<T>,
160{
161    /// Assembles an executor from the ready builder state.
162    ///
163    /// # Parameters
164    ///
165    /// * `builder` - Ready builder carrying the lock, tester, logger, and
166    ///   prepare lifecycle callbacks.
167    ///
168    /// # Returns
169    ///
170    /// A reusable executor containing the supplied builder state.
171    #[inline]
172    pub fn new(builder: ExecutorReadyBuilder<L, T>) -> Self {
173        Self {
174            lock: builder.lock,
175            tester: builder.tester,
176            logger: builder.logger,
177            prepare_action: builder.prepare_action,
178            rollback_prepare_action: builder.rollback_prepare_action,
179            commit_prepare_action: builder.commit_prepare_action,
180            catch_panics: builder.catch_panics,
181            _phantom: builder._phantom,
182        }
183    }
184
185    /// Executes a zero-argument callable while holding the write lock.
186    ///
187    /// Use [`Self::call_with`] when the task needs direct mutable access to the
188    /// protected data.
189    ///
190    /// # Parameters
191    ///
192    /// * `task` - The callable task to execute after both condition checks pass.
193    ///
194    /// # Returns
195    ///
196    /// An [`ExecutionContext`] containing success, unmet-condition, or failure
197    /// information.
198    #[inline]
199    pub fn call<C, R, E>(&self, task: C) -> ExecutionContext<R, E>
200    where
201        C: Callable<R, E>,
202        E: Display,
203    {
204        let mut task = task;
205        let result = self.execute_with_write_lock(move |_data| task.call());
206        ExecutionContext::new(result)
207    }
208
209    /// Executes a zero-argument runnable while holding the write lock.
210    ///
211    /// # Parameters
212    ///
213    /// * `task` - The runnable task to execute after both condition checks pass.
214    ///
215    /// # Returns
216    ///
217    /// An [`ExecutionContext`] containing success, unmet-condition, or failure
218    /// information.
219    #[inline]
220    pub fn execute<Rn, E>(&self, task: Rn) -> ExecutionContext<(), E>
221    where
222        Rn: Runnable<E>,
223        E: Display,
224    {
225        let mut task = task;
226        let result = self.execute_with_write_lock(move |_data| task.run());
227        ExecutionContext::new(result)
228    }
229
230    /// Executes a callable with mutable access to the protected data.
231    ///
232    /// # Parameters
233    ///
234    /// * `task` - The callable receiving `&mut T` after both condition checks
235    ///   pass.
236    ///
237    /// # Returns
238    ///
239    /// An [`ExecutionContext`] containing success, unmet-condition, or failure
240    /// information.
241    #[inline]
242    pub fn call_with<C, R, E>(&self, task: C) -> ExecutionContext<R, E>
243    where
244        C: CallableWith<T, R, E>,
245        E: Display,
246    {
247        let mut task = task;
248        let result = self.execute_with_write_lock(move |data| task.call_with(data));
249        ExecutionContext::new(result)
250    }
251
252    /// Executes a runnable with mutable access to the protected data.
253    ///
254    /// # Parameters
255    ///
256    /// * `task` - The runnable receiving `&mut T` after both condition checks
257    ///   pass.
258    ///
259    /// # Returns
260    ///
261    /// An [`ExecutionContext`] containing success, unmet-condition, or failure
262    /// information.
263    #[inline]
264    pub fn execute_with<Rn, E>(&self, task: Rn) -> ExecutionContext<(), E>
265    where
266        Rn: RunnableWith<T, E>,
267        E: Display,
268    {
269        let mut task = task;
270        let result = self.execute_with_write_lock(move |data| task.run_with(data));
271        ExecutionContext::new(result)
272    }
273
274    /// Enables or disables panic capture for tester, callbacks, and task
275    /// execution.
276    #[inline]
277    pub fn set_catch_panics(mut self, catch_panics: bool) -> Self {
278        self.catch_panics = catch_panics;
279        self
280    }
281
282    /// Deprecated alias for [`Self::set_catch_panics`].
283    #[deprecated(note = "Use `set_catch_panics` instead to align with setter naming.")]
284    #[inline]
285    pub fn with_catch_panics(self, catch_panics: bool) -> Self {
286        self.set_catch_panics(catch_panics)
287    }
288
289    /// Returns whether panic capture is enabled.
290    #[inline]
291    pub fn catch_panics(&self) -> bool {
292        self.catch_panics
293    }
294
295    /// Runs the configured double-checked sequence under a write lock.
296    ///
297    /// # Parameters
298    ///
299    /// * `task` - The task to run with mutable access after both condition
300    ///   checks pass.
301    ///
302    /// # Returns
303    ///
304    /// The final execution result, including prepare finalization.
305    ///
306    /// # Errors
307    ///
308    /// Task errors are captured as [`ExecutionResult::Failed`] with
309    /// [`super::ExecutorError::TaskFailed`]. Prepare, commit, and rollback
310    /// failures are also captured in the returned [`ExecutionResult`] rather
311    /// than returned as a separate `Result`.
312    fn execute_with_write_lock<R, E, F>(&self, task: F) -> ExecutionResult<R, E>
313    where
314        E: Display,
315        F: FnOnce(&mut T) -> Result<R, E>,
316    {
317        let first_check = match self.try_run("tester", || self.tester.test()) {
318            Ok(v) => v,
319            Err(error) => {
320                return ExecutionResult::from_executor_error(ExecutorError::Panic(error));
321            }
322        };
323
324        if !first_check {
325            self.log_unmet_condition();
326            return ExecutionResult::unmet();
327        }
328
329        let prepare_completed = match self.run_prepare_action() {
330            Ok(completed) => completed,
331            Err(error) => {
332                return ExecutionResult::from_executor_error(ExecutorError::PrepareFailed(error));
333            }
334        };
335
336        let result = self.lock.write(|data| {
337            let passed = match self.try_run("tester", || self.tester.test()) {
338                Ok(v) => v,
339                Err(error) => {
340                    return ExecutionResult::from_executor_error(ExecutorError::Panic(error));
341                }
342            };
343            if !passed {
344                return ExecutionResult::unmet();
345            }
346
347            match self.try_run("task", || task(data)) {
348                Ok(Ok(value)) => ExecutionResult::success(value),
349                Ok(Err(error)) => ExecutionResult::task_failed(error),
350                Err(error) => ExecutionResult::from_executor_error(ExecutorError::Panic(error)),
351            }
352        });
353
354        if result.is_unmet() {
355            self.log_unmet_condition();
356        }
357
358        if prepare_completed {
359            self.finalize_prepare(result)
360        } else {
361            result
362        }
363    }
364
365    /// Executes the optional prepare action.
366    ///
367    /// # Returns
368    ///
369    /// `Ok(true)` if prepare exists and succeeds, `Ok(false)` if no prepare
370    /// action is configured, or `Err(message)` if prepare fails.
371    fn run_prepare_action(&self) -> Result<bool, CallbackError> {
372        let Some(mut prepare_action) = self.prepare_action.clone() else {
373            return Ok(false);
374        };
375
376        match self.try_run("prepare", move || prepare_action.run()) {
377            Ok(Ok(_)) => Ok(true),
378            Ok(Err(error)) => {
379                self.logger.log_prepare_failed(&error);
380                Err(error)
381            }
382            Err(error) => {
383                self.logger.log_prepare_failed(&error);
384                Err(error)
385            }
386        }
387    }
388
389    /// Commits or rolls back a successfully completed prepare action.
390    ///
391    /// This method runs after the write lock has been released.
392    ///
393    /// # Parameters
394    ///
395    /// * `result` - Result produced by the condition check and task execution.
396    ///
397    /// # Returns
398    ///
399    /// `result` unchanged when no finalization action fails. Returns a failed
400    /// result when prepare commit or prepare rollback fails.
401    fn finalize_prepare<R, E>(&self, mut result: ExecutionResult<R, E>) -> ExecutionResult<R, E>
402    where
403        E: Display,
404    {
405        if result.is_success() {
406            if let Some(mut commit_prepare_action) = self.commit_prepare_action.clone() {
407                match self.try_run("prepare_commit", move || commit_prepare_action.run()) {
408                    Ok(Ok(_)) => {}
409                    Ok(Err(error)) => {
410                        self.logger.log_prepare_commit_failed(&error);
411                        result = ExecutionResult::from_executor_error(
412                            ExecutorError::PrepareCommitFailed(error),
413                        );
414                    }
415                    Err(error) => {
416                        self.logger.log_prepare_commit_failed(&error);
417                        result = ExecutionResult::from_executor_error(
418                            ExecutorError::PrepareCommitFailed(error),
419                        );
420                    }
421                }
422            }
423            return result;
424        }
425
426        let original = if let ExecutionResult::Failed(error) = &result {
427            error.to_string()
428        } else {
429            "Condition not met".to_string()
430        };
431
432        if let Some(mut rollback_prepare_action) = self.rollback_prepare_action.clone() {
433            match self.try_run("prepare_rollback", move || rollback_prepare_action.run()) {
434                Ok(Ok(_)) => {}
435                Ok(Err(error)) => {
436                    self.logger.log_prepare_rollback_failed(&error);
437                    result = ExecutionResult::prepare_rollback_failed(original, error.message());
438                }
439                Err(error) => {
440                    self.logger.log_prepare_rollback_failed(&error);
441                    result = ExecutionResult::prepare_rollback_failed(original, error.message());
442                }
443            }
444        }
445        result
446    }
447
448    /// Runs a callback with optional panic capture.
449    fn try_run<R>(
450        &self,
451        callback_type: &'static str,
452        callback: impl FnOnce() -> R,
453    ) -> Result<R, CallbackError> {
454        if !self.catch_panics {
455            return Ok(callback());
456        }
457
458        match panic::catch_unwind(AssertUnwindSafe(callback)) {
459            Ok(result) => Ok(result),
460            Err(payload) => {
461                let message = panic_payload_to_message(&*payload);
462                Err(CallbackError::with_type(callback_type, message))
463            }
464        }
465    }
466
467    /// Logs that the double-checked condition was not met.
468    fn log_unmet_condition(&self) {
469        self.logger.log_unmet_condition();
470    }
471}
472
473type CallbackError = super::executor_error::CallbackError;
474
475fn panic_payload_to_message(payload: &(dyn Any + Send)) -> String {
476    if let Some(message) = payload.downcast_ref::<&str>() {
477        (*message).to_string()
478    } else if let Some(message) = payload.downcast_ref::<String>() {
479        message.to_string()
480    } else {
481        format!("{:?}", payload)
482    }
483}