tandem-server 0.6.5

HTTP server for Tandem engine APIs
use serde_json::json;

use crate::automation_v2::types::*;

use super::{
    idempotency_fingerprint, AppState, IdempotencyKeyOutcome, IdempotencyKeyRecord,
    IdempotencyKeyStatus, IdempotencyReservation, IdempotencyReservationInput,
};

const WEBHOOK_PROVIDER_EVENT_IDEMPOTENCY_OPERATION: &str = "webhook.provider_event";
const WEBHOOK_BODY_DIGEST_IDEMPOTENCY_OPERATION: &str = "webhook.body_digest";

#[derive(Debug, Clone)]
pub(crate) struct AutomationWebhookIdempotencyClaim {
    pub(crate) operation: &'static str,
    pub(crate) key: String,
    pub(crate) key_kind: &'static str,
    request_fingerprint: String,
}

#[derive(Debug, Clone)]
pub(crate) struct AutomationWebhookReservedClaim {
    pub(crate) claim: AutomationWebhookIdempotencyClaim,
    pub(crate) record: IdempotencyKeyRecord,
}

#[derive(Debug, Clone)]
pub(crate) enum AutomationWebhookDedupeDecision {
    New {
        records: Vec<AutomationWebhookReservedClaim>,
    },
    Duplicate {
        primary_claim: AutomationWebhookIdempotencyClaim,
        primary_record: IdempotencyKeyRecord,
        reserved_records: Vec<AutomationWebhookReservedClaim>,
    },
    Conflict {
        primary_claim: AutomationWebhookIdempotencyClaim,
        primary_record: IdempotencyKeyRecord,
        reserved_records: Vec<AutomationWebhookReservedClaim>,
    },
}

impl AutomationWebhookDedupeDecision {
    pub(crate) fn accepted_records(&self) -> Vec<AutomationWebhookReservedClaim> {
        match self {
            Self::New { records } => records.clone(),
            Self::Duplicate {
                reserved_records, ..
            }
            | Self::Conflict {
                reserved_records, ..
            } => reserved_records.clone(),
        }
    }
}

impl AppState {
    pub(crate) async fn reserve_automation_webhook_dedupe(
        &self,
        trigger: &AutomationWebhookTriggerRecord,
        provider_event_id: Option<&String>,
        body_digest: &str,
        received_at_ms: u64,
    ) -> anyhow::Result<AutomationWebhookDedupeDecision> {
        let claims = automation_webhook_idempotency_claims(trigger, provider_event_id, body_digest);
        let mut reserved_records = Vec::new();
        for claim in claims {
            let reservation = self
                .reserve_idempotency_key(IdempotencyReservationInput {
                    tenant_context: trigger.tenant_context.clone(),
                    operation: claim.operation.to_string(),
                    key: claim.key.clone(),
                    owner: format!("automation_webhook:{}", trigger.trigger_id),
                    request_fingerprint: claim.request_fingerprint.clone(),
                    first_seen_event_id: provider_event_id.cloned(),
                    now_ms: received_at_ms,
                    expires_at_ms: None,
                })
                .await?;
            match reservation {
                IdempotencyReservation::Reserved(record) => {
                    reserved_records.push(AutomationWebhookReservedClaim {
                        claim: claim.clone(),
                        record,
                    });
                }
                IdempotencyReservation::Duplicate(record) => {
                    if is_recoverable_reserved_record(&record) {
                        reserved_records.push(AutomationWebhookReservedClaim {
                            claim: claim.clone(),
                            record,
                        });
                        continue;
                    }
                    return Ok(AutomationWebhookDedupeDecision::Duplicate {
                        primary_claim: claim,
                        primary_record: record,
                        reserved_records,
                    });
                }
                IdempotencyReservation::Conflict(record) => {
                    return Ok(AutomationWebhookDedupeDecision::Conflict {
                        primary_claim: claim,
                        primary_record: record,
                        reserved_records,
                    });
                }
            }
        }
        Ok(AutomationWebhookDedupeDecision::New {
            records: reserved_records,
        })
    }

    pub(crate) async fn complete_automation_webhook_idempotency_records(
        &self,
        records: &[AutomationWebhookReservedClaim],
        delivery: &AutomationWebhookDeliveryRecord,
        outcome_kind: &'static str,
        now_ms: u64,
    ) -> anyhow::Result<()> {
        let outcome = idempotency_outcome_for_webhook_delivery(delivery, outcome_kind, now_ms);
        for reserved in records {
            self.complete_idempotency_key(
                &delivery.tenant_context,
                reserved.claim.operation,
                &reserved.claim.key,
                outcome.clone(),
                now_ms,
            )
            .await?;
        }
        Ok(())
    }
}

fn is_recoverable_reserved_record(record: &IdempotencyKeyRecord) -> bool {
    matches!(record.status, IdempotencyKeyStatus::Reserved) && record.outcome.is_none()
}

pub(crate) fn idempotency_outcome_ref(
    record: &IdempotencyKeyRecord,
) -> (Option<String>, Option<String>) {
    let Some(outcome) = record.outcome.as_ref() else {
        return (None, None);
    };
    (
        outcome
            .primary_ref_kind
            .as_deref()
            .filter(|kind| *kind == "delivery")
            .and(outcome.primary_ref_id.clone()),
        outcome
            .secondary_ref_kind
            .as_deref()
            .filter(|kind| *kind == "run")
            .and(outcome.secondary_ref_id.clone()),
    )
}

fn automation_webhook_idempotency_claims(
    trigger: &AutomationWebhookTriggerRecord,
    provider_event_id: Option<&String>,
    body_digest: &str,
) -> Vec<AutomationWebhookIdempotencyClaim> {
    let mut claims = Vec::new();
    if let Some(provider_event_id) = provider_event_id
        .map(|event_id| event_id.trim())
        .filter(|event_id| !event_id.is_empty())
    {
        claims.push(AutomationWebhookIdempotencyClaim {
            operation: WEBHOOK_PROVIDER_EVENT_IDEMPOTENCY_OPERATION,
            key: format!(
                "trigger:{}:provider_event:{provider_event_id}",
                trigger.trigger_id
            ),
            key_kind: "provider_event_id",
            request_fingerprint: idempotency_fingerprint(&[
                &trigger.trigger_id,
                "provider_event_id",
                provider_event_id,
                body_digest,
            ]),
        });
    }
    claims.push(AutomationWebhookIdempotencyClaim {
        operation: WEBHOOK_BODY_DIGEST_IDEMPOTENCY_OPERATION,
        key: format!("trigger:{}:body:{body_digest}", trigger.trigger_id),
        key_kind: "body_digest",
        request_fingerprint: idempotency_fingerprint(&[
            &trigger.trigger_id,
            "body_digest",
            body_digest,
        ]),
    });
    claims
}

fn idempotency_outcome_for_webhook_delivery(
    delivery: &AutomationWebhookDeliveryRecord,
    outcome_kind: &'static str,
    completed_at_ms: u64,
) -> IdempotencyKeyOutcome {
    IdempotencyKeyOutcome {
        outcome_kind: outcome_kind.to_string(),
        completed_at_ms,
        primary_ref_kind: Some("delivery".to_string()),
        primary_ref_id: delivery
            .duplicate_of_delivery_id
            .clone()
            .or_else(|| Some(delivery.delivery_id.clone())),
        secondary_ref_kind: delivery
            .duplicate_of_run_id
            .as_ref()
            .or(delivery.queued_run_id.as_ref())
            .or(delivery.woken_run_id.as_ref())
            .map(|_| "run".to_string()),
        secondary_ref_id: delivery
            .duplicate_of_run_id
            .clone()
            .or_else(|| delivery.queued_run_id.clone())
            .or_else(|| delivery.woken_run_id.clone()),
        details: json!({
            "delivery_id": delivery.delivery_id,
            "dedupe_result": delivery.dedupe_result,
            "dedupe_reason_code": delivery.dedupe_reason_code,
            "woken_wait_id": delivery.woken_wait_id,
        }),
    }
}