use std::sync::Arc;
use aws_lambda_events::event::dynamodb::Event as DynamoDbEvent;
use aws_lambda_events::event::streams::{DynamoDbBatchItemFailure, DynamoDbEventResponse};
use turul_a2a::push::PushDispatcher;
use turul_a2a::push::claim::PendingDispatch;
#[derive(Clone)]
pub struct LambdaStreamRecoveryHandler {
dispatcher: Arc<PushDispatcher>,
}
impl LambdaStreamRecoveryHandler {
pub fn new(dispatcher: Arc<PushDispatcher>) -> Self {
Self { dispatcher }
}
pub async fn handle_stream_event(&self, event: DynamoDbEvent) -> DynamoDbEventResponse {
let mut failures: Vec<DynamoDbBatchItemFailure> = Vec::new();
for record in event.records {
if record.event_name != "INSERT" {
continue;
}
let seq_number = record.change.sequence_number.clone();
match parse_pending_from_new_image(&record.change.new_image) {
Ok(pending) => {
match self.dispatcher.try_redispatch_pending(pending).await {
Ok(()) => {
}
Err(e) => {
tracing::warn!(
target: "turul_a2a::lambda_stream_recovery_transient",
error = %e,
sequence_number = ?seq_number,
"stream redispatch returned transient error; \
surfacing BatchItemFailure"
);
push_failure(&mut failures, seq_number);
}
}
}
Err(parse_err) => {
tracing::error!(
target: "turul_a2a::lambda_stream_recovery_parse_error",
error = %parse_err,
sequence_number = ?seq_number,
"failed to parse pending-dispatch NEW_IMAGE; \
surfacing BatchItemFailure"
);
push_failure(&mut failures, seq_number);
}
}
}
let mut resp = DynamoDbEventResponse::default();
resp.batch_item_failures = failures;
resp
}
}
fn push_failure(failures: &mut Vec<DynamoDbBatchItemFailure>, seq: Option<String>) {
match seq {
Some(identifier) => {
let mut f = DynamoDbBatchItemFailure::default();
f.item_identifier = Some(identifier);
failures.push(f);
}
None => {
tracing::error!(
target: "turul_a2a::lambda_stream_recovery_no_sequence_number",
"stream record missing SequenceNumber; cannot surface as \
BatchItemFailure. Record will not be retried."
);
}
}
}
fn parse_pending_from_new_image(item: &serde_dynamo::Item) -> Result<PendingDispatch, ParseError> {
let tenant = string_attr(item, "tenant")?;
let task_id = string_attr(item, "taskId")?;
let owner = string_attr(item, "owner")?;
let event_sequence = number_attr::<u64>(item, "eventSequence")?;
let recorded_at_micros = number_attr::<i64>(item, "recordedAtMicros")?;
let recorded_at =
std::time::UNIX_EPOCH + std::time::Duration::from_micros(recorded_at_micros.max(0) as u64);
Ok(PendingDispatch::new(
tenant,
owner,
task_id,
event_sequence,
recorded_at,
))
}
fn string_attr(item: &serde_dynamo::Item, key: &str) -> Result<String, ParseError> {
let raw = item
.get(key)
.ok_or_else(|| ParseError::MissingAttribute(key.to_string()))?;
match raw {
serde_dynamo::AttributeValue::S(s) => Ok(s.clone()),
_ => Err(ParseError::WrongType {
key: key.to_string(),
expected: "S",
}),
}
}
fn number_attr<T: std::str::FromStr>(
item: &serde_dynamo::Item,
key: &str,
) -> Result<T, ParseError> {
let raw = item
.get(key)
.ok_or_else(|| ParseError::MissingAttribute(key.to_string()))?;
let n = match raw {
serde_dynamo::AttributeValue::N(s) => s,
_ => {
return Err(ParseError::WrongType {
key: key.to_string(),
expected: "N",
});
}
};
n.parse::<T>().map_err(|_| ParseError::NumberParse {
key: key.to_string(),
value: n.clone(),
})
}
#[derive(Debug, thiserror::Error)]
enum ParseError {
#[error("NEW_IMAGE missing required attribute {0}")]
MissingAttribute(String),
#[error("NEW_IMAGE attribute {key} has wrong type (expected {expected})")]
WrongType { key: String, expected: &'static str },
#[error("NEW_IMAGE attribute {key} = {value:?} is not parseable as a number")]
NumberParse { key: String, value: String },
}