Skip to main content

turul_a2a_aws_lambda/
scheduled_recovery.rs

1//! EventBridge Scheduler-triggered push recovery handler
2//! (ADR-013 §4.2 / §5.3).
3//!
4//! The scheduled worker is the **mandatory** backstop for Lambda push
5//! recovery across all four backends. It exists because:
6//!
7//! - DynamoDB Streams (the [`crate::LambdaStreamRecoveryHandler`]
8//!   trigger) have 24h retention and can be stalled by poison records.
9//! - Non-DynamoDB backends have no equivalent stream. For SQLite /
10//!   PostgreSQL / in-memory deployments this scheduler IS the
11//!   recovery path.
12//! - `tokio::spawn`ed dispatch continuations created inside a Lambda
13//!   invocation are opportunistic at best (ADR-013 §4.4): the
14//!   execution environment MAY be frozen between invocations.
15//!   Correctness cannot depend on post-return completion.
16//!
17//! On every tick the handler:
18//!
19//! 1. Enumerates up to `stale_markers_limit` pending-dispatch markers
20//!    whose `recorded_at` is older than `now() - stale_cutoff` via
21//!    [`A2aPushDeliveryStore::list_stale_pending_dispatches`] and
22//!    drives each through [`PushDispatcher::try_redispatch_pending`].
23//! 2. Enumerates up to `reclaimable_claims_limit` expired-non-terminal
24//!    claim rows via
25//!    [`A2aPushDeliveryStore::list_reclaimable_claims`] and drives
26//!    each through [`PushDispatcher::redispatch_one`].
27//! 3. Returns a [`LambdaScheduledRecoveryResponse`] summary with
28//!    per-stage counts and a capped list of error strings for
29//!    telemetry.
30//!
31//! No in-process background loop is assumed — a single `.invoke()`
32//! equals one sweep tick. Operators wire an EventBridge Scheduler rule
33//! (default recommendation: every 5 minutes).
34
35use std::sync::Arc;
36use std::time::{Duration, SystemTime};
37
38use aws_lambda_events::event::eventbridge::EventBridgeEvent;
39use serde::Serialize;
40use turul_a2a::push::{A2aPushDeliveryStore, PushDispatcher};
41
42/// Summary returned from a single scheduled-recovery tick.
43///
44/// Emitted as the Lambda response so operators can wire CloudWatch
45/// metrics off it (e.g. alert when `stale_markers_transient_errors`
46/// exceeds a threshold). Field shape is camelCase to match AWS Lambda
47/// response conventions.
48#[derive(Debug, Clone, Serialize, Default)]
49#[serde(rename_all = "camelCase")]
50pub struct LambdaScheduledRecoveryResponse {
51    /// Total stale markers read in this tick (pre-redispatch).
52    pub stale_markers_found: usize,
53    /// Markers whose redispatch completed (fan-out ran or task
54    /// deleted).
55    pub stale_markers_recovered: usize,
56    /// Markers whose redispatch hit a transient storage error. The
57    /// markers are retained for the next tick.
58    pub stale_markers_transient_errors: usize,
59    /// Reclaimable claims read in this tick.
60    pub reclaimable_claims_found: usize,
61    /// Claims redispatched (fire-and-forget — error signal isn't
62    /// plumbed back through `redispatch_one`).
63    pub reclaimable_claims_processed: usize,
64    /// First few error strings surfaced during the sweep. Capped at
65    /// [`LambdaScheduledRecoveryHandler::ERROR_SAMPLE_LIMIT`] to keep
66    /// the response body bounded.
67    pub errors: Vec<String>,
68}
69
70/// Configuration for a scheduled-recovery sweep.
71#[derive(Debug, Clone)]
72pub struct LambdaScheduledRecoveryConfig {
73    /// A marker is "stale" when `now() - marker.recorded_at >= stale_cutoff`.
74    /// Must be larger than the worst-case fresh-dispatch latency so
75    /// in-flight dispatches are not preempted. Recommended default
76    /// is the runtime `push_claim_expiry` value (10 min default).
77    pub stale_cutoff: Duration,
78    /// Max markers processed per tick.
79    pub stale_markers_limit: usize,
80    /// Max reclaimable claims processed per tick.
81    pub reclaimable_claims_limit: usize,
82}
83
84impl Default for LambdaScheduledRecoveryConfig {
85    fn default() -> Self {
86        Self {
87            stale_cutoff: Duration::from_secs(10 * 60),
88            stale_markers_limit: 128,
89            reclaimable_claims_limit: 128,
90        }
91    }
92}
93
94/// Handler for scheduled push-recovery ticks.
95///
96/// Construct once per Lambda cold start; invoke per scheduled event.
97#[derive(Clone)]
98pub struct LambdaScheduledRecoveryHandler {
99    dispatcher: Arc<PushDispatcher>,
100    delivery_store: Arc<dyn A2aPushDeliveryStore>,
101    config: LambdaScheduledRecoveryConfig,
102}
103
104impl LambdaScheduledRecoveryHandler {
105    /// Cap on `errors` in a single response so Lambda doesn't return
106    /// megabytes of stringified errors under a persistent outage.
107    pub const ERROR_SAMPLE_LIMIT: usize = 8;
108
109    pub fn new(
110        dispatcher: Arc<PushDispatcher>,
111        delivery_store: Arc<dyn A2aPushDeliveryStore>,
112    ) -> Self {
113        Self {
114            dispatcher,
115            delivery_store,
116            config: LambdaScheduledRecoveryConfig::default(),
117        }
118    }
119
120    pub fn with_config(mut self, config: LambdaScheduledRecoveryConfig) -> Self {
121        self.config = config;
122        self
123    }
124
125    /// Run one recovery sweep. The `_event` argument is accepted so
126    /// the signature matches `lambda_runtime::service_fn`; the
127    /// EventBridge payload is not inspected — each invocation is one
128    /// tick regardless of cron metadata.
129    pub async fn handle_scheduled_event(
130        &self,
131        _event: EventBridgeEvent,
132    ) -> LambdaScheduledRecoveryResponse {
133        self.run_sweep().await
134    }
135
136    /// Testable entry point. Runs the full sweep and returns a
137    /// summary. External callers almost always want
138    /// [`Self::handle_scheduled_event`]; this method is exposed so
139    /// unit tests can exercise the recovery logic without
140    /// constructing an `EventBridgeEvent`.
141    pub async fn run_sweep(&self) -> LambdaScheduledRecoveryResponse {
142        let mut response = LambdaScheduledRecoveryResponse::default();
143
144        // Stage 1: stale pending-dispatch markers. The cutoff is
145        // `now() - stale_cutoff`; markers older than this are
146        // eligible for redispatch.
147        let cutoff = SystemTime::now()
148            .checked_sub(self.config.stale_cutoff)
149            .unwrap_or(SystemTime::UNIX_EPOCH);
150        match self
151            .delivery_store
152            .list_stale_pending_dispatches(cutoff, self.config.stale_markers_limit)
153            .await
154        {
155            Ok(markers) => {
156                response.stale_markers_found = markers.len();
157                for marker in markers {
158                    match self.dispatcher.try_redispatch_pending(marker).await {
159                        Ok(()) => response.stale_markers_recovered += 1,
160                        Err(e) => {
161                            response.stale_markers_transient_errors += 1;
162                            self.sample_error(&mut response.errors, format!("marker: {e}"));
163                        }
164                    }
165                }
166            }
167            Err(e) => {
168                tracing::error!(
169                    target: "turul_a2a::lambda_scheduled_recovery_list_markers_failed",
170                    error = %e,
171                    "list_stale_pending_dispatches failed; stage skipped this tick"
172                );
173                self.sample_error(
174                    &mut response.errors,
175                    format!("list_stale_pending_dispatches: {e}"),
176                );
177            }
178        }
179
180        // Stage 2: reclaimable claims — non-terminal claim rows whose
181        // expiry has passed. `redispatch_one` is fire-and-forget on
182        // the dispatcher side; we count every enumerated row as
183        // "processed" and rely on claim fencing for correctness.
184        match self
185            .delivery_store
186            .list_reclaimable_claims(self.config.reclaimable_claims_limit)
187            .await
188        {
189            Ok(claims) => {
190                response.reclaimable_claims_found = claims.len();
191                for claim in claims {
192                    self.dispatcher.redispatch_one(claim).await;
193                    response.reclaimable_claims_processed += 1;
194                }
195            }
196            Err(e) => {
197                tracing::error!(
198                    target: "turul_a2a::lambda_scheduled_recovery_list_claims_failed",
199                    error = %e,
200                    "list_reclaimable_claims failed; stage skipped this tick"
201                );
202                self.sample_error(
203                    &mut response.errors,
204                    format!("list_reclaimable_claims: {e}"),
205                );
206            }
207        }
208
209        response
210    }
211
212    fn sample_error(&self, errors: &mut Vec<String>, msg: String) {
213        if errors.len() < Self::ERROR_SAMPLE_LIMIT {
214            errors.push(msg);
215        }
216    }
217}