qubit_dcl/double_checked/double_checked_lock_executor.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! # Double-Checked Lock Executor
11//!
12//! Provides a reusable executor for double-checked locking workflows.
13//!
14
15use std::{
16 any::Any,
17 fmt::Display,
18 marker::PhantomData,
19 panic::{
20 self,
21 AssertUnwindSafe,
22 },
23};
24
25use qubit_function::{
26 ArcRunnable,
27 ArcTester,
28 Callable,
29 CallableWith,
30 Runnable,
31 RunnableWith,
32 Tester,
33};
34
35use super::{
36 ExecutionContext,
37 ExecutionLogger,
38 ExecutionResult,
39 ExecutorError,
40 executor_builder::ExecutorBuilder,
41 executor_ready_builder::ExecutorReadyBuilder,
42};
43use crate::lock::Lock;
44
45/// Reusable double-checked lock executor.
46///
47/// The executor owns the lock handle, condition tester, execution logger, and
48/// optional prepare lifecycle callbacks. Each execution performs:
49///
50/// 1. A first condition check outside the lock.
51/// 2. Optional prepare action.
52/// 3. Lock acquisition.
53/// 4. A second condition check inside the lock.
54/// 5. The submitted task.
55/// 6. Optional prepare commit or rollback after the lock is released.
56///
57/// The tester is intentionally run both outside and inside the lock. Any state
58/// read by the first check must therefore use atomics or another synchronization
59/// mechanism that is safe without this executor's lock.
60///
61/// # Type Parameters
62///
63/// * `L` - The lock type implementing [`Lock<T>`].
64/// * `T` - The data type protected by the lock.
65///
66/// # Examples
67///
68/// Use [`DoubleCheckedLockExecutor::builder`] to attach a lock (for example
69/// [`crate::ArcMutex`]), set a [`Tester`] with
70/// [`ExecutorLockBuilder::when`](super::ExecutorLockBuilder::when), then call
71/// [`Self::call`], [`Self::execute`], [`Self::call_with`], or
72/// [`Self::execute_with`] on the built executor.
73///
74/// Panics from the tester, prepare callbacks, or task can be captured with
75/// [`set_catch_panics`](Self::set_catch_panics) and reported as
76/// [`super::ExecutorError::Panic`], so rollback can still be executed.
77///
78/// Cloned executors share their configured prepare callbacks. Concurrent calls
79/// may therefore complete prepare in several threads before one call wins the
80/// second condition check; calls that lose the second check run prepare rollback
81/// if it is configured.
82///
83/// ```rust
84/// use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
85///
86/// use qubit_dcl::{ArcMutex, DoubleCheckedLockExecutor, Lock};
87/// use qubit_dcl::double_checked::ExecutionResult;
88///
89/// fn main() {
90/// let data = ArcMutex::new(10);
91/// let skip = Arc::new(AtomicBool::new(false));
92///
93/// let executor = DoubleCheckedLockExecutor::builder()
94/// .on(data.clone())
95/// .when({
96/// let skip = skip.clone();
97/// move || !skip.load(Ordering::Acquire)
98/// })
99/// .build();
100///
101/// let updated = executor
102/// .call_with(|value: &mut i32| {
103/// *value += 5;
104/// Ok::<i32, std::io::Error>(*value)
105/// })
106/// .get_result();
107///
108/// assert!(matches!(updated, ExecutionResult::Success(15)));
109/// assert_eq!(data.read(|value| *value), 15);
110///
111/// skip.store(true, Ordering::Release);
112/// let skipped = executor
113/// .call_with(|value: &mut i32| {
114/// *value += 1;
115/// Ok::<i32, std::io::Error>(*value)
116/// })
117/// .get_result();
118///
119/// assert!(matches!(skipped, ExecutionResult::ConditionNotMet));
120/// assert_eq!(data.read(|value| *value), 15);
121/// }
122/// ```
123///
124#[derive(Clone)]
125pub struct DoubleCheckedLockExecutor<L = (), T = ()> {
126 /// The lock protecting the target data.
127 lock: L,
128
129 /// Condition checked before and after acquiring the lock.
130 tester: ArcTester,
131
132 /// Logger for unmet conditions and prepare lifecycle failures.
133 logger: ExecutionLogger,
134
135 /// Optional action executed after the first check and before locking.
136 prepare_action: Option<ArcRunnable<CallbackError>>,
137
138 /// Optional action executed when prepare must be rolled back.
139 rollback_prepare_action: Option<ArcRunnable<CallbackError>>,
140
141 /// Optional action executed when prepare should be committed.
142 commit_prepare_action: Option<ArcRunnable<CallbackError>>,
143
144 /// Whether panics from tester, callbacks, and task are captured as errors.
145 catch_panics: bool,
146
147 /// Carries the protected data type.
148 _phantom: PhantomData<fn() -> T>,
149}
150
151impl DoubleCheckedLockExecutor<(), ()> {
152 /// Creates a builder for a reusable double-checked lock executor.
153 ///
154 /// # Returns
155 ///
156 /// A builder in the initial state. Attach a lock with
157 /// [`ExecutorBuilder::on`], then configure a tester with
158 /// [`ExecutorLockBuilder::when`](super::ExecutorLockBuilder::when).
159 #[inline]
160 pub fn builder() -> ExecutorBuilder {
161 ExecutorBuilder::default()
162 }
163}
164
165impl<L, T> DoubleCheckedLockExecutor<L, T>
166where
167 L: Lock<T>,
168{
169 /// Assembles an executor from the ready builder state.
170 ///
171 /// # Parameters
172 ///
173 /// * `builder` - Ready builder carrying the lock, tester, logger, and
174 /// prepare lifecycle callbacks.
175 ///
176 /// # Returns
177 ///
178 /// A reusable executor containing the supplied builder state.
179 #[inline]
180 pub fn new(builder: ExecutorReadyBuilder<L, T>) -> Self {
181 Self {
182 lock: builder.lock,
183 tester: builder.tester,
184 logger: builder.logger,
185 prepare_action: builder.prepare_action,
186 rollback_prepare_action: builder.rollback_prepare_action,
187 commit_prepare_action: builder.commit_prepare_action,
188 catch_panics: builder.catch_panics,
189 _phantom: builder._phantom,
190 }
191 }
192
193 /// Executes a zero-argument callable while holding the write lock.
194 ///
195 /// Use [`Self::call_with`] when the task needs direct mutable access to the
196 /// protected data.
197 ///
198 /// # Parameters
199 ///
200 /// * `task` - The callable task to execute after both condition checks pass.
201 ///
202 /// # Returns
203 ///
204 /// An [`ExecutionContext`] containing success, unmet-condition, or failure
205 /// information.
206 #[inline]
207 pub fn call<C, R, E>(&self, task: C) -> ExecutionContext<R, E>
208 where
209 C: Callable<R, E>,
210 E: Display,
211 {
212 let mut task = task;
213 let result = self.execute_with_write_lock(move |_data| task.call());
214 ExecutionContext::new(result)
215 }
216
217 /// Executes a zero-argument runnable while holding the write lock.
218 ///
219 /// # Parameters
220 ///
221 /// * `task` - The runnable task to execute after both condition checks pass.
222 ///
223 /// # Returns
224 ///
225 /// An [`ExecutionContext`] containing success, unmet-condition, or failure
226 /// information.
227 #[inline]
228 pub fn execute<Rn, E>(&self, task: Rn) -> ExecutionContext<(), E>
229 where
230 Rn: Runnable<E>,
231 E: Display,
232 {
233 let mut task = task;
234 let result = self.execute_with_write_lock(move |_data| task.run());
235 ExecutionContext::new(result)
236 }
237
238 /// Executes a callable with mutable access to the protected data.
239 ///
240 /// # Parameters
241 ///
242 /// * `task` - The callable receiving `&mut T` after both condition checks
243 /// pass.
244 ///
245 /// # Returns
246 ///
247 /// An [`ExecutionContext`] containing success, unmet-condition, or failure
248 /// information.
249 #[inline]
250 pub fn call_with<C, R, E>(&self, task: C) -> ExecutionContext<R, E>
251 where
252 C: CallableWith<T, R, E>,
253 E: Display,
254 {
255 let mut task = task;
256 let result = self.execute_with_write_lock(move |data| task.call_with(data));
257 ExecutionContext::new(result)
258 }
259
260 /// Executes a runnable with mutable access to the protected data.
261 ///
262 /// # Parameters
263 ///
264 /// * `task` - The runnable receiving `&mut T` after both condition checks
265 /// pass.
266 ///
267 /// # Returns
268 ///
269 /// An [`ExecutionContext`] containing success, unmet-condition, or failure
270 /// information.
271 #[inline]
272 pub fn execute_with<Rn, E>(&self, task: Rn) -> ExecutionContext<(), E>
273 where
274 Rn: RunnableWith<T, E>,
275 E: Display,
276 {
277 let mut task = task;
278 let result = self.execute_with_write_lock(move |data| task.run_with(data));
279 ExecutionContext::new(result)
280 }
281
282 /// Enables or disables panic capture for tester, callbacks, and task
283 /// execution.
284 #[inline]
285 pub fn set_catch_panics(mut self, catch_panics: bool) -> Self {
286 self.catch_panics = catch_panics;
287 self
288 }
289
290 /// Deprecated alias for [`Self::set_catch_panics`].
291 #[deprecated(note = "Use `set_catch_panics` instead to align with setter naming.")]
292 #[inline]
293 pub fn with_catch_panics(self, catch_panics: bool) -> Self {
294 self.set_catch_panics(catch_panics)
295 }
296
297 /// Returns whether panic capture is enabled.
298 #[inline]
299 pub fn catch_panics(&self) -> bool {
300 self.catch_panics
301 }
302
303 /// Runs the configured double-checked sequence under a write lock.
304 ///
305 /// # Parameters
306 ///
307 /// * `task` - The task to run with mutable access after both condition
308 /// checks pass.
309 ///
310 /// # Returns
311 ///
312 /// The final execution result, including prepare finalization.
313 ///
314 /// # Errors
315 ///
316 /// Task errors are captured as [`ExecutionResult::Failed`] with
317 /// [`super::ExecutorError::TaskFailed`]. Prepare, commit, and rollback
318 /// failures are also captured in the returned [`ExecutionResult`] rather
319 /// than returned as a separate `Result`.
320 fn execute_with_write_lock<R, E, F>(&self, task: F) -> ExecutionResult<R, E>
321 where
322 E: Display,
323 F: FnOnce(&mut T) -> Result<R, E>,
324 {
325 let first_check = match self.try_run("tester", || self.tester.test()) {
326 Ok(v) => v,
327 Err(error) => {
328 return ExecutionResult::from_executor_error(ExecutorError::Panic(error));
329 }
330 };
331
332 if !first_check {
333 self.log_unmet_condition();
334 return ExecutionResult::unmet();
335 }
336
337 let prepare_completed = match self.run_prepare_action() {
338 Ok(completed) => completed,
339 Err(error) => {
340 return ExecutionResult::from_executor_error(ExecutorError::PrepareFailed(error));
341 }
342 };
343
344 let result = self.lock.write(|data| {
345 let passed = match self.try_run("tester", || self.tester.test()) {
346 Ok(v) => v,
347 Err(error) => {
348 return ExecutionResult::from_executor_error(ExecutorError::Panic(error));
349 }
350 };
351 if !passed {
352 return ExecutionResult::unmet();
353 }
354
355 match self.try_run("task", || task(data)) {
356 Ok(Ok(value)) => ExecutionResult::success(value),
357 Ok(Err(error)) => ExecutionResult::task_failed(error),
358 Err(error) => ExecutionResult::from_executor_error(ExecutorError::Panic(error)),
359 }
360 });
361
362 if result.is_unmet() {
363 self.log_unmet_condition();
364 }
365
366 if prepare_completed {
367 self.finalize_prepare(result)
368 } else {
369 result
370 }
371 }
372
373 /// Executes the optional prepare action.
374 ///
375 /// # Returns
376 ///
377 /// `Ok(true)` if prepare exists and succeeds, `Ok(false)` if no prepare
378 /// action is configured, or `Err(message)` if prepare fails.
379 fn run_prepare_action(&self) -> Result<bool, CallbackError> {
380 let Some(mut prepare_action) = self.prepare_action.clone() else {
381 return Ok(false);
382 };
383
384 match self.try_run("prepare", move || prepare_action.run()) {
385 Ok(Ok(_)) => Ok(true),
386 Ok(Err(error)) => {
387 self.logger.log_prepare_failed(&error);
388 Err(error)
389 }
390 Err(error) => {
391 self.logger.log_prepare_failed(&error);
392 Err(error)
393 }
394 }
395 }
396
397 /// Commits or rolls back a successfully completed prepare action.
398 ///
399 /// This method runs after the write lock has been released.
400 ///
401 /// # Parameters
402 ///
403 /// * `result` - Result produced by the condition check and task execution.
404 ///
405 /// # Returns
406 ///
407 /// `result` unchanged when no finalization action fails. Returns a failed
408 /// result when prepare commit or prepare rollback fails.
409 fn finalize_prepare<R, E>(&self, mut result: ExecutionResult<R, E>) -> ExecutionResult<R, E>
410 where
411 E: Display,
412 {
413 if result.is_success() {
414 if let Some(mut commit_prepare_action) = self.commit_prepare_action.clone() {
415 match self.try_run("prepare_commit", move || commit_prepare_action.run()) {
416 Ok(Ok(_)) => {}
417 Ok(Err(error)) => {
418 self.logger.log_prepare_commit_failed(&error);
419 result = ExecutionResult::from_executor_error(
420 ExecutorError::PrepareCommitFailed(error),
421 );
422 }
423 Err(error) => {
424 self.logger.log_prepare_commit_failed(&error);
425 result = ExecutionResult::from_executor_error(
426 ExecutorError::PrepareCommitFailed(error),
427 );
428 }
429 }
430 }
431 return result;
432 }
433
434 let original = if let ExecutionResult::Failed(error) = &result {
435 error.to_string()
436 } else {
437 "Condition not met".to_string()
438 };
439
440 if let Some(mut rollback_prepare_action) = self.rollback_prepare_action.clone() {
441 match self.try_run("prepare_rollback", move || rollback_prepare_action.run()) {
442 Ok(Ok(_)) => {}
443 Ok(Err(error)) => {
444 self.logger.log_prepare_rollback_failed(&error);
445 result = ExecutionResult::prepare_rollback_failed(original, error.message());
446 }
447 Err(error) => {
448 self.logger.log_prepare_rollback_failed(&error);
449 result = ExecutionResult::prepare_rollback_failed(original, error.message());
450 }
451 }
452 }
453 result
454 }
455
456 /// Runs a callback with optional panic capture.
457 fn try_run<R>(
458 &self,
459 callback_type: &'static str,
460 callback: impl FnOnce() -> R,
461 ) -> Result<R, CallbackError> {
462 if !self.catch_panics {
463 return Ok(callback());
464 }
465
466 match panic::catch_unwind(AssertUnwindSafe(callback)) {
467 Ok(result) => Ok(result),
468 Err(payload) => {
469 let message = panic_payload_to_message(&*payload);
470 Err(CallbackError::with_type(callback_type, message))
471 }
472 }
473 }
474
475 /// Logs that the double-checked condition was not met.
476 fn log_unmet_condition(&self) {
477 self.logger.log_unmet_condition();
478 }
479}
480
481type CallbackError = super::callback_error::CallbackError;
482
483fn panic_payload_to_message(payload: &(dyn Any + Send)) -> String {
484 if let Some(message) = payload.downcast_ref::<&str>() {
485 (*message).to_string()
486 } else if let Some(message) = payload.downcast_ref::<String>() {
487 message.to_string()
488 } else {
489 format!("{:?}", payload)
490 }
491}