use std::sync::Arc;
use std::time::{Duration, SystemTime};
use aws_lambda_events::event::eventbridge::EventBridgeEvent;
use serde::Serialize;
use turul_a2a::push::{A2aPushDeliveryStore, PushDispatcher};
#[derive(Debug, Clone, Serialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct LambdaScheduledRecoveryResponse {
pub stale_markers_found: usize,
pub stale_markers_recovered: usize,
pub stale_markers_transient_errors: usize,
pub reclaimable_claims_found: usize,
pub reclaimable_claims_processed: usize,
pub errors: Vec<String>,
}
#[derive(Debug, Clone)]
pub struct LambdaScheduledRecoveryConfig {
pub stale_cutoff: Duration,
pub stale_markers_limit: usize,
pub reclaimable_claims_limit: usize,
}
impl Default for LambdaScheduledRecoveryConfig {
fn default() -> Self {
Self {
stale_cutoff: Duration::from_secs(10 * 60),
stale_markers_limit: 128,
reclaimable_claims_limit: 128,
}
}
}
#[derive(Clone)]
pub struct LambdaScheduledRecoveryHandler {
dispatcher: Arc<PushDispatcher>,
delivery_store: Arc<dyn A2aPushDeliveryStore>,
config: LambdaScheduledRecoveryConfig,
}
impl LambdaScheduledRecoveryHandler {
pub const ERROR_SAMPLE_LIMIT: usize = 8;
pub fn new(
dispatcher: Arc<PushDispatcher>,
delivery_store: Arc<dyn A2aPushDeliveryStore>,
) -> Self {
Self {
dispatcher,
delivery_store,
config: LambdaScheduledRecoveryConfig::default(),
}
}
pub fn with_config(mut self, config: LambdaScheduledRecoveryConfig) -> Self {
self.config = config;
self
}
pub async fn handle_scheduled_event(
&self,
_event: EventBridgeEvent,
) -> LambdaScheduledRecoveryResponse {
self.run_sweep().await
}
pub async fn run_sweep(&self) -> LambdaScheduledRecoveryResponse {
let mut response = LambdaScheduledRecoveryResponse::default();
let cutoff = SystemTime::now()
.checked_sub(self.config.stale_cutoff)
.unwrap_or(SystemTime::UNIX_EPOCH);
match self
.delivery_store
.list_stale_pending_dispatches(cutoff, self.config.stale_markers_limit)
.await
{
Ok(markers) => {
response.stale_markers_found = markers.len();
for marker in markers {
match self.dispatcher.try_redispatch_pending(marker).await {
Ok(()) => response.stale_markers_recovered += 1,
Err(e) => {
response.stale_markers_transient_errors += 1;
self.sample_error(&mut response.errors, format!("marker: {e}"));
}
}
}
}
Err(e) => {
tracing::error!(
target: "turul_a2a::lambda_scheduled_recovery_list_markers_failed",
error = %e,
"list_stale_pending_dispatches failed; stage skipped this tick"
);
self.sample_error(
&mut response.errors,
format!("list_stale_pending_dispatches: {e}"),
);
}
}
match self
.delivery_store
.list_reclaimable_claims(self.config.reclaimable_claims_limit)
.await
{
Ok(claims) => {
response.reclaimable_claims_found = claims.len();
for claim in claims {
self.dispatcher.redispatch_one(claim).await;
response.reclaimable_claims_processed += 1;
}
}
Err(e) => {
tracing::error!(
target: "turul_a2a::lambda_scheduled_recovery_list_claims_failed",
error = %e,
"list_reclaimable_claims failed; stage skipped this tick"
);
self.sample_error(
&mut response.errors,
format!("list_reclaimable_claims: {e}"),
);
}
}
response
}
fn sample_error(&self, errors: &mut Vec<String>, msg: String) {
if errors.len() < Self::ERROR_SAMPLE_LIMIT {
errors.push(msg);
}
}
}