Skip to main content

turul_a2a_aws_lambda/
stream_recovery.rs

1//! DynamoDB Stream-triggered push recovery handler (ADR-013 §5.2 / §8).
2//!
3//! When a DynamoDB Stream is configured on the
4//! `a2a_push_pending_dispatches` table with `StreamViewType::NEW_IMAGE`,
5//! every committed marker row triggers a Lambda invocation. This
6//! handler iterates the batch, parses each INSERT record into a
7//! [`PendingDispatch`], and drives it through
8//! [`PushDispatcher::try_redispatch_pending`]. Transient failures are
9//! reported as `BatchItemFailures` so the Lambda service re-delivers
10//! the affected records; successes (including deleted-task cases) are
11//! acknowledged by returning an empty `item_identifier` slot for
12//! that record.
13//!
14//! ## Record classification
15//!
16//! | Record shape                              | Response                    |
17//! |-------------------------------------------|-----------------------------|
18//! | `INSERT` with well-formed NEW_IMAGE + OK  | success (no BatchItemFailure) |
19//! | `INSERT` with well-formed NEW_IMAGE, task was deleted | success (marker deleted by dispatcher) |
20//! | `INSERT` with transient storage error     | BatchItemFailure (record's SequenceNumber) |
21//! | `INSERT` with unparseable NEW_IMAGE       | BatchItemFailure (logged) |
22//! | `MODIFY` or `REMOVE`                      | skipped silently — marker either refreshed or already consumed |
23//! | Record with no SequenceNumber             | logged + skipped (cannot be retried) |
24//!
25//! Duplicate records are safe: [`PushDispatcher::try_redispatch_pending`]
26//! is idempotent under claim fencing. Two invocations
27//! targeting the same `(tenant, task_id, event_sequence)` resolve to
28//! at most one terminal claim row and at most one POST per config.
29
30use std::sync::Arc;
31
32use aws_lambda_events::event::dynamodb::Event as DynamoDbEvent;
33use aws_lambda_events::event::streams::{DynamoDbBatchItemFailure, DynamoDbEventResponse};
34use turul_a2a::push::PushDispatcher;
35use turul_a2a::push::claim::PendingDispatch;
36
37/// Handler for DynamoDB Stream events on `a2a_push_pending_dispatches`.
38///
39/// Construct once per Lambda cold start; reuse across invocations. The
40/// handler holds an `Arc<PushDispatcher>` — the same dispatcher the
41/// request-Lambda builds when `push_delivery_store` is wired, so you
42/// can share one state bundle across both entry points.
43#[derive(Clone)]
44pub struct LambdaStreamRecoveryHandler {
45    dispatcher: Arc<PushDispatcher>,
46}
47
48impl LambdaStreamRecoveryHandler {
49    pub fn new(dispatcher: Arc<PushDispatcher>) -> Self {
50        Self { dispatcher }
51    }
52
53    /// Handle a batch of DynamoDB Stream records. Returns the
54    /// `DynamoDbEventResponse` with `BatchItemFailures` populated for
55    /// records whose redispatch hit a transient error or whose
56    /// NEW_IMAGE could not be parsed. Records with a missing
57    /// `SequenceNumber` are skipped with a log warning — they cannot
58    /// be surfaced in `BatchItemFailures` (the field is required by
59    /// the Lambda service to identify the record to retry).
60    pub async fn handle_stream_event(&self, event: DynamoDbEvent) -> DynamoDbEventResponse {
61        let mut failures: Vec<DynamoDbBatchItemFailure> = Vec::new();
62
63        for record in event.records {
64            // MODIFY / REMOVE are not dispatch triggers. MODIFY may
65            // appear when the dispatcher calls record_pending_dispatch
66            // to refresh recorded_at; REMOVE when the marker is
67            // consumed. Either way, the next commit or scheduler tick
68            // handles recovery if anything needs retry.
69            if record.event_name != "INSERT" {
70                continue;
71            }
72
73            let seq_number = record.change.sequence_number.clone();
74
75            match parse_pending_from_new_image(&record.change.new_image) {
76                Ok(pending) => {
77                    match self.dispatcher.try_redispatch_pending(pending).await {
78                        Ok(()) => {
79                            // Success: marker consumed (fan-out
80                            // completed or task was deleted).
81                        }
82                        Err(e) => {
83                            tracing::warn!(
84                                target: "turul_a2a::lambda_stream_recovery_transient",
85                                error = %e,
86                                sequence_number = ?seq_number,
87                                "stream redispatch returned transient error; \
88                                 surfacing BatchItemFailure"
89                            );
90                            push_failure(&mut failures, seq_number);
91                        }
92                    }
93                }
94                Err(parse_err) => {
95                    tracing::error!(
96                        target: "turul_a2a::lambda_stream_recovery_parse_error",
97                        error = %parse_err,
98                        sequence_number = ?seq_number,
99                        "failed to parse pending-dispatch NEW_IMAGE; \
100                         surfacing BatchItemFailure"
101                    );
102                    push_failure(&mut failures, seq_number);
103                }
104            }
105        }
106
107        let mut resp = DynamoDbEventResponse::default();
108        resp.batch_item_failures = failures;
109        resp
110    }
111}
112
113fn push_failure(failures: &mut Vec<DynamoDbBatchItemFailure>, seq: Option<String>) {
114    match seq {
115        Some(identifier) => {
116            let mut f = DynamoDbBatchItemFailure::default();
117            f.item_identifier = Some(identifier);
118            failures.push(f);
119        }
120        None => {
121            // Without a SequenceNumber we cannot tell Lambda which
122            // record to retry. Log loudly — this indicates a
123            // malformed event; operators should confirm the stream
124            // configuration.
125            tracing::error!(
126                target: "turul_a2a::lambda_stream_recovery_no_sequence_number",
127                "stream record missing SequenceNumber; cannot surface as \
128                 BatchItemFailure. Record will not be retried."
129            );
130        }
131    }
132}
133
134/// Reconstruct a [`PendingDispatch`] from a DynamoDB Stream NEW_IMAGE.
135///
136/// Expects the attributes the backend writes via
137/// `A2aAtomicStore::update_task_status_with_events` (ADR-013 §4.3 /
138/// `dynamodb::DynamoDbA2aStorage` marker Put):
139///
140/// - `tenant` (S)
141/// - `taskId` (S)
142/// - `owner` (S)
143/// - `eventSequence` (N)
144/// - `recordedAtMicros` (N)
145///
146/// A missing or wrong-typed attribute surfaces as a parse error;
147/// the caller turns that into a `BatchItemFailure` so operators can
148/// investigate the malformed record out-of-band.
149fn parse_pending_from_new_image(item: &serde_dynamo::Item) -> Result<PendingDispatch, ParseError> {
150    let tenant = string_attr(item, "tenant")?;
151    let task_id = string_attr(item, "taskId")?;
152    let owner = string_attr(item, "owner")?;
153    let event_sequence = number_attr::<u64>(item, "eventSequence")?;
154    let recorded_at_micros = number_attr::<i64>(item, "recordedAtMicros")?;
155
156    let recorded_at =
157        std::time::UNIX_EPOCH + std::time::Duration::from_micros(recorded_at_micros.max(0) as u64);
158
159    Ok(PendingDispatch::new(
160        tenant,
161        owner,
162        task_id,
163        event_sequence,
164        recorded_at,
165    ))
166}
167
168fn string_attr(item: &serde_dynamo::Item, key: &str) -> Result<String, ParseError> {
169    let raw = item
170        .get(key)
171        .ok_or_else(|| ParseError::MissingAttribute(key.to_string()))?;
172    match raw {
173        serde_dynamo::AttributeValue::S(s) => Ok(s.clone()),
174        _ => Err(ParseError::WrongType {
175            key: key.to_string(),
176            expected: "S",
177        }),
178    }
179}
180
181fn number_attr<T: std::str::FromStr>(
182    item: &serde_dynamo::Item,
183    key: &str,
184) -> Result<T, ParseError> {
185    let raw = item
186        .get(key)
187        .ok_or_else(|| ParseError::MissingAttribute(key.to_string()))?;
188    let n = match raw {
189        serde_dynamo::AttributeValue::N(s) => s,
190        _ => {
191            return Err(ParseError::WrongType {
192                key: key.to_string(),
193                expected: "N",
194            });
195        }
196    };
197    n.parse::<T>().map_err(|_| ParseError::NumberParse {
198        key: key.to_string(),
199        value: n.clone(),
200    })
201}
202
203#[derive(Debug, thiserror::Error)]
204enum ParseError {
205    #[error("NEW_IMAGE missing required attribute {0}")]
206    MissingAttribute(String),
207    #[error("NEW_IMAGE attribute {key} has wrong type (expected {expected})")]
208    WrongType { key: String, expected: &'static str },
209    #[error("NEW_IMAGE attribute {key} = {value:?} is not parseable as a number")]
210    NumberParse { key: String, value: String },
211}