use crate::{HookSchemaVersion, InvalidationOutbox, OutboxStatus, Result};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct OutboxLagPolicy {
pub max_pending_rows: u64,
pub max_oldest_pending_age_ms: u64,
pub fail_on_dead_letters: bool,
}
impl Default for OutboxLagPolicy {
fn default() -> Self {
Self {
max_pending_rows: 0,
max_oldest_pending_age_ms: 0,
fail_on_dead_letters: true,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct OutboxLag {
pub status: OutboxStatus,
pub policy: OutboxLagPolicy,
}
impl OutboxLag {
pub fn new(status: OutboxStatus, policy: OutboxLagPolicy) -> Self {
Self { status, policy }
}
pub fn is_clean(&self) -> bool {
self.reasons().is_empty()
}
pub fn reasons(&self) -> Vec<DriftReason> {
let mut reasons = Vec::new();
if self.status.pending > self.policy.max_pending_rows {
reasons.push(DriftReason::OutboxPendingRows {
pending: self.status.pending,
max_pending_rows: self.policy.max_pending_rows,
});
}
if self.status.pending > 0
&& self.status.oldest_pending_age_ms > self.policy.max_oldest_pending_age_ms
{
reasons.push(DriftReason::OutboxOldestPendingAge {
oldest_pending_age_ms: self.status.oldest_pending_age_ms,
max_oldest_pending_age_ms: self.policy.max_oldest_pending_age_ms,
});
}
if self.policy.fail_on_dead_letters && self.status.dead_lettered > 0 {
reasons.push(DriftReason::OutboxDeadLetters {
dead_lettered: self.status.dead_lettered,
});
}
reasons
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct HookDrift {
pub expected: HookSchemaVersion,
pub installed: Option<HookSchemaVersion>,
}
impl HookDrift {
pub fn new(expected: HookSchemaVersion, installed: Option<HookSchemaVersion>) -> Self {
Self {
expected,
installed,
}
}
pub fn missing(expected: HookSchemaVersion) -> Self {
Self::new(expected, None)
}
pub fn is_clean(&self) -> bool {
self.reasons().is_empty()
}
pub fn reasons(&self) -> Vec<DriftReason> {
let Some(installed) = &self.installed else {
return vec![DriftReason::HookSchemaMissing {
artifact: self.expected.artifact.clone(),
expected_table: self.expected.table.clone(),
}];
};
let mut reasons = Vec::new();
if installed.artifact != self.expected.artifact {
reasons.push(DriftReason::HookArtifactMismatch {
expected: self.expected.artifact.clone(),
installed: installed.artifact.clone(),
});
}
if installed.version != self.expected.version {
reasons.push(DriftReason::HookVersionMismatch {
expected: self.expected.version,
installed: installed.version,
});
}
if installed.table != self.expected.table {
reasons.push(DriftReason::HookTableMismatch {
expected: self.expected.table.clone(),
installed: installed.table.clone(),
});
}
if installed.dialect != self.expected.dialect {
reasons.push(DriftReason::HookDialectMismatch {
expected: self.expected.dialect.to_string(),
installed: installed.dialect.to_string(),
});
}
reasons
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CdcOffsetLag {
pub source: String,
pub lag: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct GenerationDrift {
pub source: String,
pub expected: u64,
pub observed: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReconciliationReport {
pub outbox_lag: OutboxLag,
pub hook_drift: HookDrift,
pub cdc_offset: Option<CdcOffsetLag>,
pub generations: Option<GenerationDrift>,
}
impl ReconciliationReport {
pub fn new(outbox_lag: OutboxLag, hook_drift: HookDrift) -> Self {
Self {
outbox_lag,
hook_drift,
cdc_offset: None,
generations: None,
}
}
pub async fn from_outbox<O>(
outbox: &O,
namespace: &str,
hook_drift: HookDrift,
policy: OutboxLagPolicy,
) -> Result<Self>
where
O: InvalidationOutbox,
{
let status = outbox.status(namespace).await?;
Ok(Self::new(OutboxLag::new(status, policy), hook_drift))
}
pub fn with_cdc_offset(mut self, cdc_offset: CdcOffsetLag) -> Self {
self.cdc_offset = Some(cdc_offset);
self
}
pub fn with_generations(mut self, generations: GenerationDrift) -> Self {
self.generations = Some(generations);
self
}
pub fn status(&self) -> DriftStatus {
let mut reasons = Vec::new();
reasons.extend(self.outbox_lag.reasons());
reasons.extend(self.hook_drift.reasons());
if let Some(cdc_offset) = &self.cdc_offset {
reasons.push(DriftReason::CdcOffsetLag {
source: cdc_offset.source.clone(),
lag: cdc_offset.lag,
});
}
if let Some(generation) = &self.generations {
if generation.expected != generation.observed {
reasons.push(DriftReason::GenerationMismatch {
source: generation.source.clone(),
expected: generation.expected,
observed: generation.observed,
});
}
}
if reasons.is_empty() {
DriftStatus::Clean
} else {
DriftStatus::Drift(reasons)
}
}
pub fn is_clean(&self) -> bool {
matches!(self.status(), DriftStatus::Clean)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DriftStatus {
Clean,
Drift(Vec<DriftReason>),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DriftReason {
OutboxPendingRows { pending: u64, max_pending_rows: u64 },
OutboxOldestPendingAge {
oldest_pending_age_ms: u64,
max_oldest_pending_age_ms: u64,
},
OutboxDeadLetters { dead_lettered: u64 },
HookSchemaMissing {
artifact: String,
expected_table: String,
},
HookArtifactMismatch { expected: String, installed: String },
HookVersionMismatch { expected: i64, installed: i64 },
HookTableMismatch { expected: String, installed: String },
HookDialectMismatch { expected: String, installed: String },
CdcOffsetLag { source: String, lag: u64 },
GenerationMismatch {
source: String,
expected: u64,
observed: u64,
},
}
#[cfg(feature = "sqlx-outbox")]
pub async fn sqlite_hook_drift(
pool: &sqlx::SqlitePool,
expected: HookSchemaVersion,
) -> Result<HookDrift> {
use sqlx::Row;
let row = match sqlx::query(
"select artifact, version, table_name from hydracache_hook_schema where artifact = ?",
)
.bind(crate::HOOK_SCHEMA_ARTIFACT)
.fetch_optional(pool)
.await
{
Ok(row) => row,
Err(error) if is_sqlite_missing_table(&error) => None,
Err(error) => {
return Err(hydracache::CacheError::Backend(format!(
"sqlite hook reconciliation error: {error}"
))
.into())
}
};
let installed = row.map(|row| HookSchemaVersion {
artifact: row.get("artifact"),
version: row.get("version"),
table: row.get("table_name"),
dialect: crate::HookDialect::Sqlite,
});
Ok(HookDrift::new(expected, installed))
}
#[cfg(feature = "sqlx-outbox")]
fn is_sqlite_missing_table(error: &sqlx::Error) -> bool {
error
.to_string()
.contains("no such table: hydracache_hook_schema")
}
#[cfg(test)]
mod tests {
use crate::{HookDialect, OutboxStatus, HOOK_SCHEMA_ARTIFACT};
use super::*;
fn expected_hook() -> HookSchemaVersion {
HookSchemaVersion {
artifact: HOOK_SCHEMA_ARTIFACT.to_owned(),
version: 1,
table: "users".to_owned(),
dialect: HookDialect::Sqlite,
}
}
#[test]
fn reconcile_report_status_is_clean_for_matching_signals() {
let outbox_lag = OutboxLag::new(OutboxStatus::default(), OutboxLagPolicy::default());
let hook = expected_hook();
let hook_drift = HookDrift::new(hook.clone(), Some(hook));
let report = ReconciliationReport::new(outbox_lag, hook_drift);
assert_eq!(report.status(), DriftStatus::Clean);
assert!(report.is_clean());
}
#[test]
fn reconcile_report_status_collects_mandatory_reasons() {
let outbox_lag = OutboxLag::new(
OutboxStatus {
pending: 3,
dead_lettered: 1,
..OutboxStatus::default()
},
OutboxLagPolicy::default(),
);
let report = ReconciliationReport::new(outbox_lag, HookDrift::missing(expected_hook()));
let DriftStatus::Drift(reasons) = report.status() else {
panic!("expected drift");
};
assert!(reasons
.iter()
.any(|reason| matches!(reason, DriftReason::OutboxPendingRows { pending: 3, .. })));
assert!(reasons
.iter()
.any(|reason| matches!(reason, DriftReason::OutboxDeadLetters { dead_lettered: 1 })));
assert!(reasons
.iter()
.any(|reason| matches!(reason, DriftReason::HookSchemaMissing { .. })));
}
}