turul_a2a_aws_lambda/stream_recovery.rs
1//! DynamoDB Stream-triggered push recovery handler (ADR-013 §5.2 / §8).
2//!
3//! When a DynamoDB Stream is configured on the
4//! `a2a_push_pending_dispatches` table with `StreamViewType::NEW_IMAGE`,
5//! every committed marker row triggers a Lambda invocation. This
6//! handler iterates the batch, parses each INSERT record into a
7//! [`PendingDispatch`], and drives it through
8//! [`PushDispatcher::try_redispatch_pending`]. Transient failures are
9//! reported as `BatchItemFailures` so the Lambda service re-delivers
10//! the affected records; successes (including deleted-task cases) are
11//! acknowledged by returning an empty `item_identifier` slot for
12//! that record.
13//!
14//! ## Record classification
15//!
16//! | Record shape | Response |
17//! |-------------------------------------------|-----------------------------|
18//! | `INSERT` with well-formed NEW_IMAGE + OK | success (no BatchItemFailure) |
19//! | `INSERT` with well-formed NEW_IMAGE, task was deleted | success (marker deleted by dispatcher) |
20//! | `INSERT` with transient storage error | BatchItemFailure (record's SequenceNumber) |
21//! | `INSERT` with unparseable NEW_IMAGE | BatchItemFailure (logged) |
22//! | `MODIFY` or `REMOVE` | skipped silently — marker either refreshed or already consumed |
23//! | Record with no SequenceNumber | logged + skipped (cannot be retried) |
24//!
25//! Duplicate records are safe: [`PushDispatcher::try_redispatch_pending`]
26//! is idempotent under claim fencing. Two invocations
27//! targeting the same `(tenant, task_id, event_sequence)` resolve to
28//! at most one terminal claim row and at most one POST per config.
29
30use std::sync::Arc;
31
32use aws_lambda_events::event::dynamodb::Event as DynamoDbEvent;
33use aws_lambda_events::event::streams::{DynamoDbBatchItemFailure, DynamoDbEventResponse};
34use turul_a2a::push::PushDispatcher;
35use turul_a2a::push::claim::PendingDispatch;
36
37/// Handler for DynamoDB Stream events on `a2a_push_pending_dispatches`.
38///
39/// Construct once per Lambda cold start; reuse across invocations. The
40/// handler holds an `Arc<PushDispatcher>` — the same dispatcher the
41/// request-Lambda builds when `push_delivery_store` is wired, so you
42/// can share one state bundle across both entry points.
43#[derive(Clone)]
44pub struct LambdaStreamRecoveryHandler {
45 dispatcher: Arc<PushDispatcher>,
46}
47
48impl LambdaStreamRecoveryHandler {
49 pub fn new(dispatcher: Arc<PushDispatcher>) -> Self {
50 Self { dispatcher }
51 }
52
53 /// Handle a batch of DynamoDB Stream records. Returns the
54 /// `DynamoDbEventResponse` with `BatchItemFailures` populated for
55 /// records whose redispatch hit a transient error or whose
56 /// NEW_IMAGE could not be parsed. Records with a missing
57 /// `SequenceNumber` are skipped with a log warning — they cannot
58 /// be surfaced in `BatchItemFailures` (the field is required by
59 /// the Lambda service to identify the record to retry).
60 pub async fn handle_stream_event(&self, event: DynamoDbEvent) -> DynamoDbEventResponse {
61 let mut failures: Vec<DynamoDbBatchItemFailure> = Vec::new();
62
63 for record in event.records {
64 // MODIFY / REMOVE are not dispatch triggers. MODIFY may
65 // appear when the dispatcher calls record_pending_dispatch
66 // to refresh recorded_at; REMOVE when the marker is
67 // consumed. Either way, the next commit or scheduler tick
68 // handles recovery if anything needs retry.
69 if record.event_name != "INSERT" {
70 continue;
71 }
72
73 let seq_number = record.change.sequence_number.clone();
74
75 match parse_pending_from_new_image(&record.change.new_image) {
76 Ok(pending) => {
77 match self.dispatcher.try_redispatch_pending(pending).await {
78 Ok(()) => {
79 // Success: marker consumed (fan-out
80 // completed or task was deleted).
81 }
82 Err(e) => {
83 tracing::warn!(
84 target: "turul_a2a::lambda_stream_recovery_transient",
85 error = %e,
86 sequence_number = ?seq_number,
87 "stream redispatch returned transient error; \
88 surfacing BatchItemFailure"
89 );
90 push_failure(&mut failures, seq_number);
91 }
92 }
93 }
94 Err(parse_err) => {
95 tracing::error!(
96 target: "turul_a2a::lambda_stream_recovery_parse_error",
97 error = %parse_err,
98 sequence_number = ?seq_number,
99 "failed to parse pending-dispatch NEW_IMAGE; \
100 surfacing BatchItemFailure"
101 );
102 push_failure(&mut failures, seq_number);
103 }
104 }
105 }
106
107 let mut resp = DynamoDbEventResponse::default();
108 resp.batch_item_failures = failures;
109 resp
110 }
111}
112
113fn push_failure(failures: &mut Vec<DynamoDbBatchItemFailure>, seq: Option<String>) {
114 match seq {
115 Some(identifier) => {
116 let mut f = DynamoDbBatchItemFailure::default();
117 f.item_identifier = Some(identifier);
118 failures.push(f);
119 }
120 None => {
121 // Without a SequenceNumber we cannot tell Lambda which
122 // record to retry. Log loudly — this indicates a
123 // malformed event; operators should confirm the stream
124 // configuration.
125 tracing::error!(
126 target: "turul_a2a::lambda_stream_recovery_no_sequence_number",
127 "stream record missing SequenceNumber; cannot surface as \
128 BatchItemFailure. Record will not be retried."
129 );
130 }
131 }
132}
133
134/// Reconstruct a [`PendingDispatch`] from a DynamoDB Stream NEW_IMAGE.
135///
136/// Expects the attributes the backend writes via
137/// `A2aAtomicStore::update_task_status_with_events` (ADR-013 §4.3 /
138/// `dynamodb::DynamoDbA2aStorage` marker Put):
139///
140/// - `tenant` (S)
141/// - `taskId` (S)
142/// - `owner` (S)
143/// - `eventSequence` (N)
144/// - `recordedAtMicros` (N)
145///
146/// A missing or wrong-typed attribute surfaces as a parse error;
147/// the caller turns that into a `BatchItemFailure` so operators can
148/// investigate the malformed record out-of-band.
149fn parse_pending_from_new_image(item: &serde_dynamo::Item) -> Result<PendingDispatch, ParseError> {
150 let tenant = string_attr(item, "tenant")?;
151 let task_id = string_attr(item, "taskId")?;
152 let owner = string_attr(item, "owner")?;
153 let event_sequence = number_attr::<u64>(item, "eventSequence")?;
154 let recorded_at_micros = number_attr::<i64>(item, "recordedAtMicros")?;
155
156 let recorded_at =
157 std::time::UNIX_EPOCH + std::time::Duration::from_micros(recorded_at_micros.max(0) as u64);
158
159 Ok(PendingDispatch::new(
160 tenant,
161 owner,
162 task_id,
163 event_sequence,
164 recorded_at,
165 ))
166}
167
168fn string_attr(item: &serde_dynamo::Item, key: &str) -> Result<String, ParseError> {
169 let raw = item
170 .get(key)
171 .ok_or_else(|| ParseError::MissingAttribute(key.to_string()))?;
172 match raw {
173 serde_dynamo::AttributeValue::S(s) => Ok(s.clone()),
174 _ => Err(ParseError::WrongType {
175 key: key.to_string(),
176 expected: "S",
177 }),
178 }
179}
180
181fn number_attr<T: std::str::FromStr>(
182 item: &serde_dynamo::Item,
183 key: &str,
184) -> Result<T, ParseError> {
185 let raw = item
186 .get(key)
187 .ok_or_else(|| ParseError::MissingAttribute(key.to_string()))?;
188 let n = match raw {
189 serde_dynamo::AttributeValue::N(s) => s,
190 _ => {
191 return Err(ParseError::WrongType {
192 key: key.to_string(),
193 expected: "N",
194 });
195 }
196 };
197 n.parse::<T>().map_err(|_| ParseError::NumberParse {
198 key: key.to_string(),
199 value: n.clone(),
200 })
201}
202
203#[derive(Debug, thiserror::Error)]
204enum ParseError {
205 #[error("NEW_IMAGE missing required attribute {0}")]
206 MissingAttribute(String),
207 #[error("NEW_IMAGE attribute {key} has wrong type (expected {expected})")]
208 WrongType { key: String, expected: &'static str },
209 #[error("NEW_IMAGE attribute {key} = {value:?} is not parseable as a number")]
210 NumberParse { key: String, value: String },
211}