use serde_json::{Map, Value};
use super::reliability::{
try_load_stateful_reliability, write_stateful_reliability_unlocked, StatefulDeadLetterRecord,
StatefulDeadLetterStatus, StatefulToolEffectRecord, STATEFUL_RELIABILITY_STORE_LOCK,
};
use tandem_types::TenantContext;
pub async fn mark_dead_letter_retry_dispatched(
path: &std::path::Path,
tenant: &TenantContext,
dead_letter_id: &str,
backoff_ms: u64,
now_ms: u64,
) -> anyhow::Result<Option<StatefulDeadLetterRecord>> {
let _guard = STATEFUL_RELIABILITY_STORE_LOCK.lock().await;
let mut store = try_load_stateful_reliability(path)?;
let Some(row) = store.dead_letters.iter_mut().find(|row| {
row.dead_letter_id == dead_letter_id
&& row.visible_to_tenant(tenant)
&& matches!(
row.status,
StatefulDeadLetterStatus::RetryRequested | StatefulDeadLetterStatus::Retrying
)
}) else {
return Ok(None);
};
let next_dispatch_count = dead_letter_retry_dispatch_count(row).saturating_add(1);
row.status = StatefulDeadLetterStatus::Retrying;
row.updated_at_ms = now_ms;
stamp_dead_letter_retry_dispatch(&mut row.metadata, next_dispatch_count, now_ms, backoff_ms);
let updated = row.clone();
write_stateful_reliability_unlocked(path, &store).await?;
Ok(Some(updated))
}
fn stamp_dead_letter_retry_dispatch(
metadata: &mut Option<Value>,
dispatch_count: u32,
now_ms: u64,
backoff_ms: u64,
) {
let mut object = match metadata.take() {
Some(Value::Object(object)) => object,
Some(value) => {
let mut object = Map::new();
object.insert("previous_metadata".to_string(), value);
object
}
None => Map::new(),
};
object.insert(
"retry_dispatch_count".to_string(),
Value::Number(dispatch_count.into()),
);
object.insert(
"retry_dispatched_at_ms".to_string(),
Value::Number(now_ms.into()),
);
object.insert(
"retry_backoff_ms".to_string(),
Value::Number(backoff_ms.into()),
);
*metadata = Some(Value::Object(object));
}
pub fn dead_letter_retry_dispatched_at_ms(record: &StatefulDeadLetterRecord) -> Option<u64> {
record
.metadata
.as_ref()?
.get("retry_dispatched_at_ms")
.and_then(Value::as_u64)
}
pub fn dead_letter_retry_dispatch_count(record: &StatefulDeadLetterRecord) -> u32 {
record
.metadata
.as_ref()
.and_then(|meta| meta.get("retry_dispatch_count"))
.and_then(Value::as_u64)
.and_then(|value| u32::try_from(value).ok())
.unwrap_or(0)
}
pub fn dead_letter_superseded_by_success(record: &StatefulDeadLetterRecord) -> bool {
metadata_superseded_by_success(record.metadata.as_ref())
}
pub(super) fn mark_reliability_row_superseded_by_success(
metadata: &mut Option<Value>,
effect: &StatefulToolEffectRecord,
outbox_id: Option<&str>,
) {
let mut object = match metadata.take() {
Some(Value::Object(object)) => object,
Some(value) => {
let mut object = Map::new();
object.insert("previous_metadata".to_string(), value);
object
}
None => Map::new(),
};
object.insert("superseded_by_success".to_string(), Value::Bool(true));
object.insert(
"superseded_by_effect_id".to_string(),
Value::String(effect.effect_id.clone()),
);
object.insert(
"superseded_at_ms".to_string(),
Value::Number(effect.updated_at_ms.into()),
);
if let Some(outbox_id) = outbox_id {
object.insert(
"superseded_by_outbox_id".to_string(),
Value::String(outbox_id.to_string()),
);
}
*metadata = Some(Value::Object(object));
}
pub(super) fn metadata_superseded_by_success(metadata: Option<&Value>) -> bool {
let Some(metadata) = metadata else {
return false;
};
let marked_success = metadata
.get("superseded_by_success")
.and_then(Value::as_bool)
.unwrap_or(false);
let has_effect_id = metadata
.get("superseded_by_effect_id")
.and_then(Value::as_str)
.map(|value| !value.trim().is_empty())
.unwrap_or(false);
let has_timestamp = metadata
.get("superseded_at_ms")
.and_then(Value::as_u64)
.is_some();
marked_success && has_effect_id && has_timestamp
}