use std::collections::HashMap;
use std::sync::Mutex;
use crate::{
CallbackResponse, FailureClass, LifecycleEventKind, LifecycleReceipt, PayloadReceipt,
PlacementOutcome, ReceiptStatus, RetryClass, SCHEMA_VERSION, ValidationError,
};
use super::negotiation::{NegotiatedPlan, PayloadPlacementDecision};
use super::seams::ReceiptEmitter;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ReceiptError {
ReceiptEmittedNotEmittable,
Conflict { idempotency_key: String },
Invalid(ValidationError),
}
impl std::fmt::Display for ReceiptError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ReceiptEmittedNotEmittable => f.write_str(
"receipt.emitted is a notification event and does not itself \
produce a receipt",
),
Self::Conflict { idempotency_key } => write!(
f,
"idempotency_key `{idempotency_key}` was reused with \
different receipt content (duplicate_id_conflict)"
),
Self::Invalid(e) => write!(f, "synthesized receipt failed validation: {e}"),
}
}
}
impl std::error::Error for ReceiptError {}
impl From<ValidationError> for ReceiptError {
fn from(e: ValidationError) -> Self {
Self::Invalid(e)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReceiptContext {
pub client_id: String,
pub receipt_id: String,
pub parent_receipt_id: Option<String>,
pub at_epoch_s: u64,
pub harness_session_id: Option<String>,
pub harness_run_id: Option<String>,
pub harness_task_id: Option<String>,
}
#[derive(Debug, Default)]
pub struct SequenceGenerator {
counters: Mutex<HashMap<String, u64>>,
}
impl SequenceGenerator {
pub fn new() -> Self {
Self::default()
}
pub fn next(&self, run_id: &str) -> u64 {
let mut guard = self.counters.lock().expect("sequence mutex poisoned");
let slot = guard.entry(run_id.to_string()).or_insert(0);
*slot += 1;
*slot
}
pub fn peek(&self, run_id: &str) -> u64 {
let guard = self.counters.lock().expect("sequence mutex poisoned");
guard.get(run_id).copied().unwrap_or(0)
}
}
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct IdempotencyKey {
pub client_id: String,
pub adapter_id: String,
pub idempotency_key: String,
}
pub trait IdempotencyStore {
fn get(&self, key: &IdempotencyKey) -> Option<LifecycleReceipt>;
fn put(
&self,
key: &IdempotencyKey,
receipt: &LifecycleReceipt,
) -> Result<StoreOutcome, ReceiptError>;
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[allow(clippy::large_enum_variant)]
pub enum StoreOutcome {
Inserted,
Replayed(LifecycleReceipt),
}
#[derive(Debug, Default)]
pub struct InMemoryIdempotencyStore {
inner: Mutex<HashMap<IdempotencyKey, LifecycleReceipt>>,
}
impl InMemoryIdempotencyStore {
pub fn new() -> Self {
Self::default()
}
pub fn len(&self) -> usize {
self.inner.lock().expect("idem mutex poisoned").len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl IdempotencyStore for InMemoryIdempotencyStore {
fn get(&self, key: &IdempotencyKey) -> Option<LifecycleReceipt> {
self.inner
.lock()
.expect("idem mutex poisoned")
.get(key)
.cloned()
}
fn put(
&self,
key: &IdempotencyKey,
receipt: &LifecycleReceipt,
) -> Result<StoreOutcome, ReceiptError> {
let mut guard = self.inner.lock().expect("idem mutex poisoned");
if let Some(prior) = guard.get(key) {
if prior == receipt {
return Ok(StoreOutcome::Replayed(prior.clone()));
}
return Err(ReceiptError::Conflict {
idempotency_key: key.idempotency_key.clone(),
});
}
guard.insert(key.clone(), receipt.clone());
Ok(StoreOutcome::Inserted)
}
}
#[derive(Debug)]
pub struct LifeloopReceiptEmitter<S: IdempotencyStore = InMemoryIdempotencyStore> {
sequencer: SequenceGenerator,
store: S,
}
impl LifeloopReceiptEmitter<InMemoryIdempotencyStore> {
pub fn in_memory() -> Self {
Self {
sequencer: SequenceGenerator::new(),
store: InMemoryIdempotencyStore::new(),
}
}
}
impl<S: IdempotencyStore> LifeloopReceiptEmitter<S> {
pub fn with_store(store: S) -> Self {
Self {
sequencer: SequenceGenerator::new(),
store,
}
}
pub fn store(&self) -> &S {
&self.store
}
pub fn sequencer(&self) -> &SequenceGenerator {
&self.sequencer
}
pub fn synthesize_and_emit(
&self,
negotiated: &NegotiatedPlan,
response: &CallbackResponse,
ctx: &ReceiptContext,
) -> Result<LifecycleReceipt, ReceiptError> {
if matches!(negotiated.plan.event, LifecycleEventKind::ReceiptEmitted) {
return Err(ReceiptError::ReceiptEmittedNotEmittable);
}
let (status, failure_class, retry_class) = derive_status(negotiated, response);
let prior_sequence = negotiated.plan.idempotency_key.as_deref().and_then(|idem| {
let key = IdempotencyKey {
client_id: ctx.client_id.clone(),
adapter_id: negotiated.plan.adapter.adapter_id.clone(),
idempotency_key: idem.to_string(),
};
self.store.get(&key).map(|prior| prior.sequence)
});
let sequence = match prior_sequence {
Some(reused) => reused,
None => ctx
.harness_run_id
.as_deref()
.map(|run| self.sequencer.next(run)),
};
let receipt = LifecycleReceipt {
schema_version: SCHEMA_VERSION.to_string(),
receipt_id: ctx.receipt_id.clone(),
idempotency_key: negotiated.plan.idempotency_key.clone(),
client_id: ctx.client_id.clone(),
adapter_id: negotiated.plan.adapter.adapter_id.clone(),
invocation_id: negotiated.plan.invocation_id.clone(),
event: negotiated.plan.event,
event_id: negotiated.plan.event_id.clone(),
sequence,
parent_receipt_id: ctx.parent_receipt_id.clone(),
integration_mode: negotiated.plan.integration_mode,
status,
at_epoch_s: ctx.at_epoch_s,
harness_session_id: ctx.harness_session_id.clone(),
harness_run_id: ctx.harness_run_id.clone(),
harness_task_id: ctx.harness_task_id.clone(),
payload_receipts: payload_receipts_from(negotiated),
telemetry_summary: serde_json::Map::new(),
capability_degradations: negotiated.capability_degradations.clone(),
failure_class,
retry_class,
warnings: merged_warnings(negotiated, response),
};
receipt.validate()?;
match negotiated.plan.idempotency_key.as_deref() {
Some(idem) => {
let key = IdempotencyKey {
client_id: ctx.client_id.clone(),
adapter_id: negotiated.plan.adapter.adapter_id.clone(),
idempotency_key: idem.to_string(),
};
match self.store.put(&key, &receipt)? {
StoreOutcome::Inserted => Ok(receipt),
StoreOutcome::Replayed(prior) => Ok(prior),
}
}
None => Ok(receipt),
}
}
}
impl<S: IdempotencyStore> ReceiptEmitter for LifeloopReceiptEmitter<S> {
type Error = ReceiptError;
fn emit(&self, receipt: &LifecycleReceipt) -> Result<(), Self::Error> {
receipt.validate()?;
if let Some(idem) = receipt.idempotency_key.as_deref() {
let key = IdempotencyKey {
client_id: receipt.client_id.clone(),
adapter_id: receipt.adapter_id.clone(),
idempotency_key: idem.to_string(),
};
self.store.put(&key, receipt)?;
}
Ok(())
}
}
fn derive_status(
negotiated: &NegotiatedPlan,
response: &CallbackResponse,
) -> (ReceiptStatus, Option<FailureClass>, Option<RetryClass>) {
use crate::NegotiationOutcome as NO;
match negotiated.outcome {
NO::Unsupported => {
let fc = negotiated
.failure_class
.unwrap_or(FailureClass::CapabilityUnsupported);
return (ReceiptStatus::Failed, Some(fc), Some(fc.default_retry()));
}
NO::RequiresOperator => {
let fc = FailureClass::OperatorRequired;
return (ReceiptStatus::Failed, Some(fc), Some(fc.default_retry()));
}
NO::Degraded => {
if matches!(response.status, ReceiptStatus::Failed) {
let fc = response
.failure_class
.unwrap_or(FailureClass::InternalError);
let rc = response.retry_class.unwrap_or(fc.default_retry());
return (ReceiptStatus::Failed, Some(fc), Some(rc));
}
return (ReceiptStatus::Degraded, None, None);
}
NO::Satisfied => {}
}
if let Some(failed) = negotiated.placement_decisions.iter().find_map(|d| match d {
PayloadPlacementDecision::Failed { failure_class, .. } => Some(*failure_class),
_ => None,
}) {
return (
ReceiptStatus::Failed,
Some(failed),
Some(failed.default_retry()),
);
}
match response.status {
ReceiptStatus::Failed => {
let fc = response
.failure_class
.unwrap_or(FailureClass::InternalError);
let rc = response.retry_class.unwrap_or(fc.default_retry());
(ReceiptStatus::Failed, Some(fc), Some(rc))
}
ReceiptStatus::Skipped => (ReceiptStatus::Skipped, None, None),
ReceiptStatus::Observed => (ReceiptStatus::Observed, None, None),
ReceiptStatus::Degraded => (ReceiptStatus::Degraded, None, None),
ReceiptStatus::Delivered => (ReceiptStatus::Delivered, None, None),
}
}
fn payload_receipts_from(negotiated: &NegotiatedPlan) -> Vec<PayloadReceipt> {
negotiated
.placement_decisions
.iter()
.map(|d| match d {
PayloadPlacementDecision::Chosen {
payload_id,
payload_kind,
byte_size,
content_digest,
chosen,
..
} => PayloadReceipt {
payload_id: payload_id.clone(),
payload_kind: payload_kind.clone(),
placement: *chosen,
status: PlacementOutcome::Delivered,
byte_size: *byte_size,
content_digest: content_digest.clone(),
},
PayloadPlacementDecision::Failed {
payload_id,
payload_kind,
byte_size,
content_digest,
rejected,
..
} => PayloadReceipt {
payload_id: payload_id.clone(),
payload_kind: payload_kind.clone(),
placement: rejected
.first()
.map(|r| r.placement())
.unwrap_or(crate::PlacementClass::ReceiptOnly),
status: PlacementOutcome::Failed,
byte_size: *byte_size,
content_digest: content_digest.clone(),
},
})
.collect()
}
fn merged_warnings(
negotiated: &NegotiatedPlan,
response: &CallbackResponse,
) -> Vec<crate::Warning> {
let mut out = negotiated.warnings.clone();
out.extend(response.warnings.iter().cloned());
out
}