turul_a2a_aws_lambda/scheduled_recovery.rs
1//! EventBridge Scheduler-triggered push recovery handler
2//! (ADR-013 §4.2 / §5.3).
3//!
4//! The scheduled worker is the **mandatory** backstop for Lambda push
5//! recovery across all four backends. It exists because:
6//!
7//! - DynamoDB Streams (the [`crate::LambdaStreamRecoveryHandler`]
8//! trigger) have 24h retention and can be stalled by poison records.
9//! - Non-DynamoDB backends have no equivalent stream. For SQLite /
10//! PostgreSQL / in-memory deployments this scheduler IS the
11//! recovery path.
12//! - `tokio::spawn`ed dispatch continuations created inside a Lambda
13//! invocation are opportunistic at best (ADR-013 §4.4): the
14//! execution environment MAY be frozen between invocations.
15//! Correctness cannot depend on post-return completion.
16//!
17//! On every tick the handler:
18//!
19//! 1. Enumerates up to `stale_markers_limit` pending-dispatch markers
20//! whose `recorded_at` is older than `now() - stale_cutoff` via
21//! [`A2aPushDeliveryStore::list_stale_pending_dispatches`] and
22//! drives each through [`PushDispatcher::try_redispatch_pending`].
23//! 2. Enumerates up to `reclaimable_claims_limit` expired-non-terminal
24//! claim rows via
25//! [`A2aPushDeliveryStore::list_reclaimable_claims`] and drives
26//! each through [`PushDispatcher::redispatch_one`].
27//! 3. Returns a [`LambdaScheduledRecoveryResponse`] summary with
28//! per-stage counts and a capped list of error strings for
29//! telemetry.
30//!
31//! No in-process background loop is assumed — a single `.invoke()`
32//! equals one sweep tick. Operators wire an EventBridge Scheduler rule
33//! (default recommendation: every 5 minutes).
34
35use std::sync::Arc;
36use std::time::{Duration, SystemTime};
37
38use aws_lambda_events::event::eventbridge::EventBridgeEvent;
39use serde::Serialize;
40use turul_a2a::push::{A2aPushDeliveryStore, PushDispatcher};
41
42/// Summary returned from a single scheduled-recovery tick.
43///
44/// Emitted as the Lambda response so operators can wire CloudWatch
45/// metrics off it (e.g. alert when `stale_markers_transient_errors`
46/// exceeds a threshold). Field shape is camelCase to match AWS Lambda
47/// response conventions.
48#[derive(Debug, Clone, Serialize, Default)]
49#[serde(rename_all = "camelCase")]
50pub struct LambdaScheduledRecoveryResponse {
51 /// Total stale markers read in this tick (pre-redispatch).
52 pub stale_markers_found: usize,
53 /// Markers whose redispatch completed (fan-out ran or task
54 /// deleted).
55 pub stale_markers_recovered: usize,
56 /// Markers whose redispatch hit a transient storage error. The
57 /// markers are retained for the next tick.
58 pub stale_markers_transient_errors: usize,
59 /// Reclaimable claims read in this tick.
60 pub reclaimable_claims_found: usize,
61 /// Claims redispatched (fire-and-forget — error signal isn't
62 /// plumbed back through `redispatch_one`).
63 pub reclaimable_claims_processed: usize,
64 /// First few error strings surfaced during the sweep. Capped at
65 /// [`LambdaScheduledRecoveryHandler::ERROR_SAMPLE_LIMIT`] to keep
66 /// the response body bounded.
67 pub errors: Vec<String>,
68}
69
70/// Configuration for a scheduled-recovery sweep.
71#[derive(Debug, Clone)]
72pub struct LambdaScheduledRecoveryConfig {
73 /// A marker is "stale" when `now() - marker.recorded_at >= stale_cutoff`.
74 /// Must be larger than the worst-case fresh-dispatch latency so
75 /// in-flight dispatches are not preempted. Recommended default
76 /// is the runtime `push_claim_expiry` value (10 min default).
77 pub stale_cutoff: Duration,
78 /// Max markers processed per tick.
79 pub stale_markers_limit: usize,
80 /// Max reclaimable claims processed per tick.
81 pub reclaimable_claims_limit: usize,
82}
83
84impl Default for LambdaScheduledRecoveryConfig {
85 fn default() -> Self {
86 Self {
87 stale_cutoff: Duration::from_secs(10 * 60),
88 stale_markers_limit: 128,
89 reclaimable_claims_limit: 128,
90 }
91 }
92}
93
94/// Handler for scheduled push-recovery ticks.
95///
96/// Construct once per Lambda cold start; invoke per scheduled event.
97#[derive(Clone)]
98pub struct LambdaScheduledRecoveryHandler {
99 dispatcher: Arc<PushDispatcher>,
100 delivery_store: Arc<dyn A2aPushDeliveryStore>,
101 config: LambdaScheduledRecoveryConfig,
102}
103
104impl LambdaScheduledRecoveryHandler {
105 /// Cap on `errors` in a single response so Lambda doesn't return
106 /// megabytes of stringified errors under a persistent outage.
107 pub const ERROR_SAMPLE_LIMIT: usize = 8;
108
109 pub fn new(
110 dispatcher: Arc<PushDispatcher>,
111 delivery_store: Arc<dyn A2aPushDeliveryStore>,
112 ) -> Self {
113 Self {
114 dispatcher,
115 delivery_store,
116 config: LambdaScheduledRecoveryConfig::default(),
117 }
118 }
119
120 pub fn with_config(mut self, config: LambdaScheduledRecoveryConfig) -> Self {
121 self.config = config;
122 self
123 }
124
125 /// Run one recovery sweep. The `_event` argument is accepted so
126 /// the signature matches `lambda_runtime::service_fn`; the
127 /// EventBridge payload is not inspected — each invocation is one
128 /// tick regardless of cron metadata.
129 pub async fn handle_scheduled_event(
130 &self,
131 _event: EventBridgeEvent,
132 ) -> LambdaScheduledRecoveryResponse {
133 self.run_sweep().await
134 }
135
136 /// Testable entry point. Runs the full sweep and returns a
137 /// summary. External callers almost always want
138 /// [`Self::handle_scheduled_event`]; this method is exposed so
139 /// unit tests can exercise the recovery logic without
140 /// constructing an `EventBridgeEvent`.
141 pub async fn run_sweep(&self) -> LambdaScheduledRecoveryResponse {
142 let mut response = LambdaScheduledRecoveryResponse::default();
143
144 // Stage 1: stale pending-dispatch markers. The cutoff is
145 // `now() - stale_cutoff`; markers older than this are
146 // eligible for redispatch.
147 let cutoff = SystemTime::now()
148 .checked_sub(self.config.stale_cutoff)
149 .unwrap_or(SystemTime::UNIX_EPOCH);
150 match self
151 .delivery_store
152 .list_stale_pending_dispatches(cutoff, self.config.stale_markers_limit)
153 .await
154 {
155 Ok(markers) => {
156 response.stale_markers_found = markers.len();
157 for marker in markers {
158 match self.dispatcher.try_redispatch_pending(marker).await {
159 Ok(()) => response.stale_markers_recovered += 1,
160 Err(e) => {
161 response.stale_markers_transient_errors += 1;
162 self.sample_error(&mut response.errors, format!("marker: {e}"));
163 }
164 }
165 }
166 }
167 Err(e) => {
168 tracing::error!(
169 target: "turul_a2a::lambda_scheduled_recovery_list_markers_failed",
170 error = %e,
171 "list_stale_pending_dispatches failed; stage skipped this tick"
172 );
173 self.sample_error(
174 &mut response.errors,
175 format!("list_stale_pending_dispatches: {e}"),
176 );
177 }
178 }
179
180 // Stage 2: reclaimable claims — non-terminal claim rows whose
181 // expiry has passed. `redispatch_one` is fire-and-forget on
182 // the dispatcher side; we count every enumerated row as
183 // "processed" and rely on claim fencing for correctness.
184 match self
185 .delivery_store
186 .list_reclaimable_claims(self.config.reclaimable_claims_limit)
187 .await
188 {
189 Ok(claims) => {
190 response.reclaimable_claims_found = claims.len();
191 for claim in claims {
192 self.dispatcher.redispatch_one(claim).await;
193 response.reclaimable_claims_processed += 1;
194 }
195 }
196 Err(e) => {
197 tracing::error!(
198 target: "turul_a2a::lambda_scheduled_recovery_list_claims_failed",
199 error = %e,
200 "list_reclaimable_claims failed; stage skipped this tick"
201 );
202 self.sample_error(
203 &mut response.errors,
204 format!("list_reclaimable_claims: {e}"),
205 );
206 }
207 }
208
209 response
210 }
211
212 fn sample_error(&self, errors: &mut Vec<String>, msg: String) {
213 if errors.len() < Self::ERROR_SAMPLE_LIMIT {
214 errors.push(msg);
215 }
216 }
217}