Skip to main content

qubit_cas/executor/
cas_executor.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! CAS executor implementation.
11
12use std::marker::PhantomData;
13use std::panic::{AssertUnwindSafe, catch_unwind};
14use std::sync::{Arc, Mutex};
15use std::time::Duration;
16
17use qubit_atomic::AtomicRef;
18use qubit_error::BoxError;
19use qubit_function::{Consumer, Function};
20use qubit_retry::{
21    AttemptFailure, AttemptFailureDecision, Retry, RetryContext, RetryError, RetryOptions,
22};
23
24use crate::cas_decision::CasDecision;
25use crate::cas_outcome::CasOutcome;
26use crate::cas_success::CasSuccess;
27use crate::error::{CasAttemptFailure, CasError, CasErrorKind};
28use crate::event::{CasContext, CasEvent, CasHooks};
29use crate::observability::{
30    CasAlert, CasObservabilityConfig, CasObservabilityMode, ListenerPanicPolicy,
31};
32use crate::options::CasTimeoutPolicy;
33use crate::report::{CasExecutionOutcome, CasExecutionReport, CasReportBuilder};
34use crate::strategy::CasStrategy;
35
36use super::cas_builder::CasBuilder;
37
38/// Executor for retry-aware compare-and-swap workflows.
39#[derive(Debug, Clone)]
40pub struct CasExecutor<T, E = BoxError> {
41    /// Immutable retry options shared by every execution.
42    options: RetryOptions,
43    /// Optional timeout for each async CAS attempt.
44    attempt_timeout: Option<Duration>,
45    /// Policy used when one async attempt times out.
46    timeout_policy: CasTimeoutPolicy,
47    /// Observability settings shared by executions.
48    observability: CasObservabilityConfig,
49    /// Marker preserving `T` and `E`.
50    marker: PhantomData<fn() -> (T, E)>,
51}
52
53/// Success payload produced by one successful attempt before context enrichment.
54enum AttemptSuccess<T, R> {
55    /// One compare-and-swap write succeeded.
56    Updated {
57        previous: Arc<T>,
58        current: Arc<T>,
59        output: R,
60    },
61    /// The operation completed successfully without writing.
62    Finished { current: Arc<T>, output: R },
63}
64
65/// Snapshot of retry-layer limits plus terminal outcome for finalizing [`CasReportBuilder`].
66struct CasReportFinishContext {
67    attempts_total: u32,
68    max_attempts: u32,
69    max_operation_elapsed: Option<Duration>,
70    max_total_elapsed: Option<Duration>,
71    outcome: CasExecutionOutcome,
72}
73
74impl CasReportFinishContext {
75    #[inline]
76    fn new(
77        attempts_total: u32,
78        max_attempts: u32,
79        max_operation_elapsed: Option<Duration>,
80        max_total_elapsed: Option<Duration>,
81        outcome: CasExecutionOutcome,
82    ) -> Self {
83        Self {
84            attempts_total,
85            max_attempts,
86            max_operation_elapsed,
87            max_total_elapsed,
88            outcome,
89        }
90    }
91}
92
93impl<T, E> CasExecutor<T, E> {
94    /// Creates a CAS builder.
95    ///
96    /// # Returns
97    /// A builder configured with default retry settings.
98    #[inline]
99    pub fn builder() -> CasBuilder<T, E> {
100        CasBuilder::new()
101    }
102
103    /// Creates an executor from retry options.
104    ///
105    /// # Parameters
106    /// - `options`: Retry options to validate and install.
107    ///
108    /// # Returns
109    /// A configured executor using the default timeout policy.
110    ///
111    /// # Errors
112    /// Returns the retry-layer validation error when `options` are invalid.
113    pub fn from_options(options: RetryOptions) -> Result<Self, qubit_retry::RetryConfigError> {
114        Self::builder().options(options).build()
115    }
116
117    /// Creates an executor tuned for low-latency workloads.
118    ///
119    /// # Returns
120    /// A configured executor. The built-in strategy is always valid.
121    pub fn latency_first() -> Self {
122        Self::builder()
123            .build_latency_first()
124            .expect("latency-first CAS strategy must be valid")
125    }
126
127    /// Creates an executor tuned for hot-contention workloads.
128    ///
129    /// # Returns
130    /// A configured executor. The built-in strategy is always valid.
131    pub fn contention_adaptive() -> Self {
132        Self::builder()
133            .build_contention_adaptive()
134            .expect("contention-adaptive CAS strategy must be valid")
135    }
136
137    /// Creates an executor tuned for reliability-first workloads.
138    ///
139    /// # Returns
140    /// A configured executor. The built-in strategy is always valid.
141    pub fn reliability_first() -> Self {
142        Self::builder()
143            .build_reliability_first()
144            .expect("reliability-first CAS strategy must be valid")
145    }
146
147    /// Creates an executor from a built-in strategy.
148    ///
149    /// # Parameters
150    /// - `strategy`: Strategy to install.
151    ///
152    /// # Returns
153    /// A configured executor. Built-in strategies are always valid.
154    pub fn with_strategy(strategy: CasStrategy) -> Self {
155        Self::builder()
156            .strategy(strategy)
157            .build()
158            .expect("built-in CAS strategy must be valid")
159    }
160
161    /// Creates one executor from validated parts.
162    ///
163    /// # Parameters
164    /// - `options`: Validated retry options.
165    /// - `attempt_timeout`: Optional async attempt timeout.
166    /// - `timeout_policy`: Policy used when one attempt exceeds the timeout.
167    /// - `observability`: Observability settings shared by executions.
168    ///
169    /// # Returns
170    /// A configured executor.
171    #[inline]
172    pub(crate) fn new(
173        options: RetryOptions,
174        attempt_timeout: Option<Duration>,
175        timeout_policy: CasTimeoutPolicy,
176        observability: CasObservabilityConfig,
177    ) -> Self {
178        Self {
179            options,
180            attempt_timeout,
181            timeout_policy,
182            observability,
183            marker: PhantomData,
184        }
185    }
186
187    /// Returns the immutable retry options used by this executor.
188    ///
189    /// # Returns
190    /// Shared retry options.
191    #[inline]
192    pub fn options(&self) -> &RetryOptions {
193        &self.options
194    }
195
196    /// Returns the configured async attempt timeout.
197    ///
198    /// # Returns
199    /// `Some(Duration)` when async attempts have a timeout.
200    #[inline]
201    pub fn attempt_timeout(&self) -> Option<Duration> {
202        self.attempt_timeout
203    }
204
205    /// Returns the timeout policy.
206    ///
207    /// # Returns
208    /// Policy used when an async attempt exceeds the timeout.
209    #[inline]
210    pub fn timeout_policy(&self) -> CasTimeoutPolicy {
211        self.timeout_policy
212    }
213
214    /// Returns observability settings used by this executor.
215    ///
216    /// # Returns
217    /// Shared observability configuration.
218    #[inline]
219    pub fn observability(&self) -> &CasObservabilityConfig {
220        &self.observability
221    }
222
223    /// Executes one synchronous CAS operation.
224    ///
225    /// # Parameters
226    /// - `state`: Shared atomic state container.
227    /// - `operation`: Pure operation that inspects the current state and returns
228    ///   a CAS decision.
229    ///
230    /// # Returns
231    /// A terminal result together with the execution report.
232    pub fn execute<R, O>(&self, state: &AtomicRef<T>, operation: O) -> CasOutcome<T, R, E>
233    where
234        T: 'static,
235        E: 'static,
236        O: Function<T, CasDecision<T, R, E>>,
237    {
238        self.execute_with_hooks(state, operation, CasHooks::new())
239    }
240
241    /// Executes one synchronous CAS operation with lifecycle hooks.
242    ///
243    /// # Parameters
244    /// - `state`: Shared atomic state container.
245    /// - `operation`: Pure operation that inspects the current state and returns
246    ///   a CAS decision.
247    /// - `hooks`: Per-execution hook registrations.
248    ///
249    /// # Returns
250    /// A terminal result together with the execution report.
251    pub fn execute_with_hooks<R, O>(
252        &self,
253        state: &AtomicRef<T>,
254        operation: O,
255        hooks: CasHooks,
256    ) -> CasOutcome<T, R, E>
257    where
258        T: 'static,
259        E: 'static,
260        O: Function<T, CasDecision<T, R, E>>,
261    {
262        let success_context = Arc::new(Mutex::new(None));
263        let report_builder = Arc::new(Mutex::new(CasReportBuilder::start()));
264        self.emit_started(&hooks, &report_builder);
265        let retry = self.build_retry(
266            &hooks,
267            Arc::clone(&success_context),
268            Arc::clone(&report_builder),
269        );
270        let attempt = retry.run(|| self.run_sync_attempt(state, &operation));
271        self.finish_execution(attempt, hooks, success_context, report_builder)
272    }
273
274    /// Executes one asynchronous CAS operation.
275    ///
276    /// # Parameters
277    /// - `state`: Shared atomic state container.
278    /// - `operation`: Async operation factory receiving one state snapshot.
279    ///
280    /// # Returns
281    /// A terminal result together with the execution report.
282    #[cfg(feature = "tokio")]
283    pub async fn execute_async<R, O, Fut>(
284        &self,
285        state: &AtomicRef<T>,
286        operation: O,
287    ) -> CasOutcome<T, R, E>
288    where
289        T: 'static,
290        E: 'static,
291        O: Fn(Arc<T>) -> Fut,
292        Fut: std::future::Future<Output = CasDecision<T, R, E>>,
293    {
294        self.execute_async_with_hooks(state, operation, CasHooks::new())
295            .await
296    }
297
298    /// Executes one asynchronous CAS operation with lifecycle hooks.
299    ///
300    /// # Parameters
301    /// - `state`: Shared atomic state container.
302    /// - `operation`: Async operation factory receiving one state snapshot.
303    /// - `hooks`: Per-execution hook registrations.
304    ///
305    /// # Returns
306    /// A terminal result together with the execution report.
307    #[cfg(feature = "tokio")]
308    pub async fn execute_async_with_hooks<R, O, Fut>(
309        &self,
310        state: &AtomicRef<T>,
311        operation: O,
312        hooks: CasHooks,
313    ) -> CasOutcome<T, R, E>
314    where
315        T: 'static,
316        E: 'static,
317        O: Fn(Arc<T>) -> Fut,
318        Fut: std::future::Future<Output = CasDecision<T, R, E>>,
319    {
320        let success_context = Arc::new(Mutex::new(None));
321        let report_builder = Arc::new(Mutex::new(CasReportBuilder::start()));
322        self.emit_started(&hooks, &report_builder);
323        let retry = self.build_retry(
324            &hooks,
325            Arc::clone(&success_context),
326            Arc::clone(&report_builder),
327        );
328        let attempt = retry
329            .run_async(|| self.run_async_attempt(state, &operation))
330            .await;
331        self.finish_execution(attempt, hooks, success_context, report_builder)
332    }
333
334    /// Builds one retry policy for a single CAS execution.
335    ///
336    /// # Parameters
337    /// - `hooks`: Hook registrations for the current execution.
338    /// - `success_context`: Shared slot used to capture the retry success
339    ///   context.
340    ///
341    /// # Returns
342    /// A retry policy configured for one CAS execution.
343    fn build_retry(
344        &self,
345        hooks: &CasHooks,
346        success_context: Arc<Mutex<Option<RetryContext>>>,
347        report_builder: Arc<Mutex<CasReportBuilder>>,
348    ) -> Retry<CasAttemptFailure<T, E>>
349    where
350        T: 'static,
351        E: 'static,
352    {
353        let event_hook = hooks.event_hook();
354        let timeout_policy = self.timeout_policy;
355        let attempt_timeout = self.attempt_timeout;
356        let observability = self.observability.clone();
357
358        let mut builder = Retry::<CasAttemptFailure<T, E>>::builder()
359            .options(self.options.clone())
360            .on_success(move |context: &RetryContext| {
361                *success_context
362                    .lock()
363                    .expect("CAS success context slot should be lockable") = Some(*context);
364            })
365            .on_failure(
366                move |failure: &AttemptFailure<CasAttemptFailure<T, E>>, context: &RetryContext| {
367                    let failure = match failure {
368                        AttemptFailure::Panic(_) | AttemptFailure::Executor(_) => {
369                            return AttemptFailureDecision::UseDefault;
370                        }
371                        AttemptFailure::Error(failure) => failure,
372                        AttemptFailure::Timeout => {
373                            unreachable!("CAS executor manages async timeouts explicitly")
374                        }
375                    };
376                    let cas_context = CasContext::new(context, attempt_timeout);
377                    {
378                        let mut report = report_builder
379                            .lock()
380                            .expect("CAS report builder should be lockable");
381                        match failure {
382                            CasAttemptFailure::Conflict { .. } => report.record_conflict(),
383                            CasAttemptFailure::Retry { .. } => report.record_retry_error(),
384                            CasAttemptFailure::Abort { .. } => report.record_abort(),
385                            CasAttemptFailure::Timeout { .. } => report.record_timeout(),
386                        }
387                    }
388                    if Self::should_emit_events(&observability, &event_hook) {
389                        Self::dispatch_event(
390                            &observability,
391                            event_hook
392                                .as_ref()
393                                .expect("event hook should exist when events are emitted"),
394                            CasEvent::AttemptFailed {
395                                context: cas_context,
396                                kind: Self::failure_kind(failure),
397                            },
398                        );
399                    }
400                    match failure {
401                        CasAttemptFailure::Conflict { .. } | CasAttemptFailure::Retry { .. } => {
402                            if Self::should_emit_events(&observability, &event_hook) {
403                                Self::dispatch_event(
404                                    &observability,
405                                    event_hook
406                                        .as_ref()
407                                        .expect("event hook should exist when events are emitted"),
408                                    CasEvent::RetryRequested {
409                                        context: cas_context,
410                                    },
411                                );
412                            }
413                            AttemptFailureDecision::Retry
414                        }
415                        CasAttemptFailure::Abort { .. } => AttemptFailureDecision::Abort,
416                        CasAttemptFailure::Timeout { .. } => match timeout_policy {
417                            CasTimeoutPolicy::Retry => {
418                                if Self::should_emit_events(&observability, &event_hook) {
419                                    Self::dispatch_event(
420                                        &observability,
421                                        event_hook.as_ref().expect(
422                                            "event hook should exist when events are emitted",
423                                        ),
424                                        CasEvent::RetryRequested {
425                                            context: cas_context,
426                                        },
427                                    );
428                                }
429                                AttemptFailureDecision::Retry
430                            }
431                            CasTimeoutPolicy::Abort => AttemptFailureDecision::Abort,
432                        },
433                    }
434                },
435            );
436
437        if self.observability.listener_panic_policy() == ListenerPanicPolicy::Isolate {
438            builder = builder.isolate_listener_panics();
439        }
440        builder
441            .build()
442            .expect("validated CAS executor configuration must build retry policy")
443    }
444
445    /// Runs one synchronous CAS attempt.
446    ///
447    /// # Parameters
448    /// - `state`: Shared atomic state container.
449    /// - `operation`: Pure operation over the current state snapshot.
450    ///
451    /// # Returns
452    /// An attempt success or one attempt failure.
453    fn run_sync_attempt<R, O>(
454        &self,
455        state: &AtomicRef<T>,
456        operation: &O,
457    ) -> Result<AttemptSuccess<T, R>, CasAttemptFailure<T, E>>
458    where
459        O: Function<T, CasDecision<T, R, E>>,
460    {
461        let current = state.load();
462        match operation.apply(current.as_ref()) {
463            CasDecision::Update { next, output } => {
464                match state.compare_set(&current, Arc::clone(&next)) {
465                    Ok(()) => Ok(AttemptSuccess::Updated {
466                        previous: current,
467                        current: next,
468                        output,
469                    }),
470                    Err(actual) => Err(CasAttemptFailure::conflict(actual)),
471                }
472            }
473            CasDecision::Finish { output } => Ok(AttemptSuccess::Finished { current, output }),
474            CasDecision::Retry(error) => Err(CasAttemptFailure::retry(current, error)),
475            CasDecision::Abort(error) => Err(CasAttemptFailure::abort(current, error)),
476        }
477    }
478
479    /// Runs one asynchronous CAS attempt.
480    ///
481    /// # Parameters
482    /// - `state`: Shared atomic state container.
483    /// - `operation`: Async operation factory over one state snapshot.
484    ///
485    /// # Returns
486    /// An attempt success or one attempt failure.
487    #[cfg(feature = "tokio")]
488    async fn run_async_attempt<R, O, Fut>(
489        &self,
490        state: &AtomicRef<T>,
491        operation: &O,
492    ) -> Result<AttemptSuccess<T, R>, CasAttemptFailure<T, E>>
493    where
494        O: Fn(Arc<T>) -> Fut,
495        Fut: std::future::Future<Output = CasDecision<T, R, E>>,
496    {
497        let current = state.load();
498        let decision = if let Some(timeout) = self.attempt_timeout {
499            match tokio::time::timeout(timeout, operation(Arc::clone(&current))).await {
500                Ok(decision) => decision,
501                Err(_) => return Err(CasAttemptFailure::timeout(current)),
502            }
503        } else {
504            operation(Arc::clone(&current)).await
505        };
506
507        match decision {
508            CasDecision::Update { next, output } => {
509                match state.compare_set(&current, Arc::clone(&next)) {
510                    Ok(()) => Ok(AttemptSuccess::Updated {
511                        previous: current,
512                        current: next,
513                        output,
514                    }),
515                    Err(actual) => Err(CasAttemptFailure::conflict(actual)),
516                }
517            }
518            CasDecision::Finish { output } => Ok(AttemptSuccess::Finished { current, output }),
519            CasDecision::Retry(error) => Err(CasAttemptFailure::retry(current, error)),
520            CasDecision::Abort(error) => Err(CasAttemptFailure::abort(current, error)),
521        }
522    }
523
524    /// Finalizes one retry execution into the public CAS result type.
525    ///
526    /// # Parameters
527    /// - `attempt`: Retry-layer terminal success or error.
528    /// - `hooks`: Hook registrations for the current execution.
529    /// - `success_context`: Shared slot storing the success context.
530    ///
531    /// # Returns
532    /// Public CAS success or error.
533    fn finish_execution<R>(
534        &self,
535        attempt: Result<AttemptSuccess<T, R>, RetryError<CasAttemptFailure<T, E>>>,
536        hooks: CasHooks,
537        success_context: Arc<Mutex<Option<RetryContext>>>,
538        report_builder: Arc<Mutex<CasReportBuilder>>,
539    ) -> CasOutcome<T, R, E>
540    where
541        T: 'static,
542        E: 'static,
543    {
544        match attempt {
545            Ok(success) => {
546                let context = success_context
547                    .lock()
548                    .expect("CAS success context slot should be lockable")
549                    .take()
550                    .expect("retry success hook must capture CAS success context");
551                let attempts_total = context.attempt();
552                let max_attempts = context.max_attempts();
553                let max_operation_elapsed = context.max_operation_elapsed();
554                let max_total_elapsed = context.max_total_elapsed();
555                let outcome = match success {
556                    AttemptSuccess::Updated { .. } => CasExecutionOutcome::SuccessUpdated,
557                    AttemptSuccess::Finished { .. } => CasExecutionOutcome::SuccessFinished,
558                };
559                let success = self.enrich_success(success, context);
560                let report = self.finish_report(
561                    &hooks,
562                    report_builder,
563                    CasReportFinishContext::new(
564                        attempts_total,
565                        max_attempts,
566                        max_operation_elapsed,
567                        max_total_elapsed,
568                        outcome,
569                    ),
570                );
571                CasOutcome::new(Ok(success), report)
572            }
573            Err(error) => {
574                let error = CasError::new(error, self.attempt_timeout);
575                let context = error.context();
576                let outcome = Self::error_outcome(error.kind());
577                let report = self.finish_report(
578                    &hooks,
579                    report_builder,
580                    CasReportFinishContext::new(
581                        context.attempt(),
582                        context.max_attempts(),
583                        context.max_operation_elapsed(),
584                        context.max_total_elapsed(),
585                        outcome,
586                    ),
587                );
588                CasOutcome::new(Err(error), report)
589            }
590        }
591    }
592
593    /// Enriches one attempt success with the final CAS context.
594    ///
595    /// # Parameters
596    /// - `success`: Attempt success payload.
597    /// - `context`: Retry success context captured by the retry layer.
598    ///
599    /// # Returns
600    /// Public CAS success value with context attached.
601    fn enrich_success<R>(
602        &self,
603        success: AttemptSuccess<T, R>,
604        context: RetryContext,
605    ) -> CasSuccess<T, R> {
606        let context = CasContext::new(&context, self.attempt_timeout);
607        match success {
608            AttemptSuccess::Updated {
609                previous,
610                current,
611                output,
612            } => CasSuccess::updated(previous, current, output, context),
613            AttemptSuccess::Finished { current, output } => {
614                CasSuccess::finished(current, output, context)
615            }
616        }
617    }
618
619    /// Emits the execution-started event when event streaming is enabled.
620    ///
621    /// # Parameters
622    /// - `hooks`: Per-execution hooks (checked for event hook presence).
623    /// - `report_builder`: Used to obtain the start instant for the event.
624    fn emit_started(&self, hooks: &CasHooks, report_builder: &Arc<Mutex<CasReportBuilder>>)
625    where
626        T: 'static,
627        E: 'static,
628    {
629        if hooks.event_hook().is_none()
630            || self.observability.mode() == CasObservabilityMode::ReportOnly
631        {
632            return;
633        }
634        let started_at = report_builder
635            .lock()
636            .expect("CAS report builder should be lockable")
637            .started_at();
638        let event_hook = hooks.event_hook();
639        Self::dispatch_event(
640            &self.observability,
641            event_hook
642                .as_ref()
643                .expect("event hook should exist when events are emitted"),
644            CasEvent::ExecutionStarted { started_at },
645        );
646    }
647
648    /// Finishes and emits one execution report (and optional alert).
649    ///
650    /// Locks the report builder, finalizes the report, emits the
651    /// `ExecutionFinished` event if enabled, and dispatches a contention alert
652    /// if the mode and thresholds warrant it.
653    ///
654    /// # Parameters
655    /// - `hooks`: Used for event and alert dispatching.
656    /// - `report_builder`: Accumulator to finalize.
657    /// - `ctx`: Retry limits and terminal outcome for the report.
658    ///
659    /// # Returns
660    /// The finalized [`CasExecutionReport`].
661    fn finish_report(
662        &self,
663        hooks: &CasHooks,
664        report_builder: Arc<Mutex<CasReportBuilder>>,
665        ctx: CasReportFinishContext,
666    ) -> CasExecutionReport
667    where
668        T: 'static,
669        E: 'static,
670    {
671        let report = report_builder
672            .lock()
673            .expect("CAS report builder should be lockable")
674            .finish(
675                ctx.attempts_total,
676                ctx.max_attempts,
677                ctx.max_operation_elapsed,
678                ctx.max_total_elapsed,
679                ctx.outcome,
680            );
681        let event_hook = hooks.event_hook();
682        if Self::should_emit_events(&self.observability, &event_hook) {
683            Self::dispatch_event(
684                &self.observability,
685                event_hook
686                    .as_ref()
687                    .expect("event hook should exist when events are emitted"),
688                CasEvent::ExecutionFinished {
689                    report: report.clone(),
690                },
691            );
692        }
693        if self.observability.mode() == CasObservabilityMode::EventStreamWithAlert
694            && let Some(thresholds) = self.observability.contention_thresholds()
695            && report.is_contention_hot(&thresholds)
696        {
697            Self::dispatch_alert(
698                &self.observability,
699                &hooks.alert_hook(),
700                CasAlert::contention(report.clone(), thresholds),
701            );
702        }
703        report
704    }
705
706    /// Converts a terminal error kind into a report outcome.
707    ///
708    /// # Parameters
709    /// - `kind`: The high-level [`CasErrorKind`].
710    ///
711    /// # Returns
712    /// Corresponding [`CasExecutionOutcome`] variant for the report.
713    #[inline]
714    fn error_outcome(kind: CasErrorKind) -> CasExecutionOutcome {
715        match kind {
716            CasErrorKind::Abort => CasExecutionOutcome::ErrorAbort,
717            CasErrorKind::Conflict => CasExecutionOutcome::ErrorConflictExhausted,
718            CasErrorKind::RetryExhausted => CasExecutionOutcome::ErrorRetryExhausted,
719            CasErrorKind::AttemptTimeout => CasExecutionOutcome::ErrorAttemptTimeout,
720            CasErrorKind::MaxOperationElapsedExceeded => {
721                CasExecutionOutcome::ErrorMaxOperationElapsedExceeded
722            }
723            CasErrorKind::MaxTotalElapsedExceeded => {
724                CasExecutionOutcome::ErrorMaxTotalElapsedExceeded
725            }
726        }
727    }
728
729    /// Converts one attempt failure into its lightweight event kind.
730    ///
731    /// # Parameters
732    /// - `failure`: The [`CasAttemptFailure`] to classify.
733    ///
734    /// # Returns
735    /// The [`CasAttemptFailureKind`] for event emission.
736    #[inline]
737    fn failure_kind(failure: &CasAttemptFailure<T, E>) -> crate::error::CasAttemptFailureKind {
738        failure.kind()
739    }
740
741    /// Dispatches one lifecycle event if event streaming is enabled.
742    fn dispatch_event(
743        observability: &CasObservabilityConfig,
744        hook: &crate::event::CasEventHook,
745        event: CasEvent,
746    ) where
747        T: 'static,
748        E: 'static,
749    {
750        match observability.listener_panic_policy() {
751            ListenerPanicPolicy::Propagate => hook.accept(&event),
752            ListenerPanicPolicy::Isolate => {
753                let _ = catch_unwind(AssertUnwindSafe(|| hook.accept(&event)));
754            }
755        }
756    }
757
758    /// Returns whether lifecycle event construction and dispatch are needed.
759    #[inline]
760    fn should_emit_events(
761        observability: &CasObservabilityConfig,
762        hook: &Option<crate::event::CasEventHook>,
763    ) -> bool {
764        observability.mode() != CasObservabilityMode::ReportOnly && hook.is_some()
765    }
766
767    /// Dispatches one alert if an alert listener is registered.
768    fn dispatch_alert(
769        observability: &CasObservabilityConfig,
770        hook: &Option<crate::event::CasAlertHook>,
771        alert: CasAlert,
772    ) {
773        if let Some(hook) = hook {
774            match observability.listener_panic_policy() {
775                ListenerPanicPolicy::Propagate => hook.accept(&alert),
776                ListenerPanicPolicy::Isolate => {
777                    let _ = catch_unwind(AssertUnwindSafe(|| hook.accept(&alert)));
778                }
779            }
780        }
781    }
782}