use std::collections::BTreeMap;
use std::fmt;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use async_trait::async_trait;
use sha2::{Digest, Sha256};
use hydracache::{CacheInvalidation, CacheKeyBuilder, HydraCache};
use hydracache_core::CacheCodec;
use crate::{DbCacheError, Result};
pub type InvalidationTargetHash = [u8; 32];
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum InvalidationIntent {
Key {
key: String,
},
Tag {
tag: String,
},
Entity {
entity: String,
key: String,
},
Collection {
collection: String,
},
Flush,
}
impl InvalidationIntent {
pub fn key(key: impl Into<String>) -> Self {
Self::Key { key: key.into() }
}
pub fn tag(tag: impl Into<String>) -> Self {
Self::Tag { tag: tag.into() }
}
pub fn entity(entity: impl Into<String>, key: impl Into<String>) -> Self {
Self::Entity {
entity: entity.into(),
key: key.into(),
}
}
pub fn collection(collection: impl Into<String>) -> Self {
Self::Collection {
collection: collection.into(),
}
}
pub fn flush() -> Self {
Self::Flush
}
pub fn kind(&self) -> &'static str {
match self {
Self::Key { .. } => "key",
Self::Tag { .. } => "tag",
Self::Entity { .. } => "entity",
Self::Collection { .. } => "collection",
Self::Flush => "flush",
}
}
pub fn value(&self) -> Option<&str> {
match self {
Self::Key { key } | Self::Entity { key, .. } => Some(key),
Self::Tag { tag } => Some(tag),
Self::Collection { collection } => Some(collection),
Self::Flush => None,
}
}
pub fn target_hash(&self) -> InvalidationTargetHash {
let mut hasher = Sha256::new();
write_hash_part(&mut hasher, b"hydracache-invalidation-intent-v1");
write_hash_part(&mut hasher, self.kind().as_bytes());
match self {
Self::Key { key } => write_hash_part(&mut hasher, key.as_bytes()),
Self::Tag { tag } => write_hash_part(&mut hasher, tag.as_bytes()),
Self::Entity { entity, key } => {
write_hash_part(&mut hasher, entity.as_bytes());
write_hash_part(&mut hasher, key.as_bytes());
}
Self::Collection { collection } => {
write_hash_part(&mut hasher, collection.as_bytes());
}
Self::Flush => {}
}
hasher.finalize().into()
}
pub fn target_hash_hex(&self) -> String {
hex_encode(&self.target_hash())
}
pub fn to_cache_invalidation(&self) -> CacheInvalidation {
match self {
Self::Key { key } => CacheInvalidation::key(key.clone()),
Self::Tag { tag } => CacheInvalidation::tag(tag.clone()),
Self::Entity { entity, key } => CacheInvalidation::tag(entity_tag(entity, key)),
Self::Collection { collection } => CacheInvalidation::tag(collection_tag(collection)),
Self::Flush => CacheInvalidation::flush(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct InvalidationIntentBatch {
reason: String,
intents: Vec<InvalidationIntent>,
}
impl InvalidationIntentBatch {
pub fn new(reason: impl Into<String>) -> Self {
Self {
reason: reason.into(),
intents: Vec::new(),
}
}
pub fn reason(&self) -> &str {
&self.reason
}
pub fn intents(&self) -> &[InvalidationIntent] {
&self.intents
}
pub fn is_empty(&self) -> bool {
self.intents.is_empty()
}
pub fn len(&self) -> usize {
self.intents.len()
}
pub fn intent(mut self, intent: InvalidationIntent) -> Self {
self.intents.push(intent);
self
}
pub fn invalidate_key(self, key: impl Into<String>) -> Self {
self.intent(InvalidationIntent::key(key))
}
pub fn invalidate_tag(self, tag: impl Into<String>) -> Self {
self.intent(InvalidationIntent::tag(tag))
}
pub fn invalidate_entity(self, entity: impl Into<String>, key: impl Into<String>) -> Self {
self.intent(InvalidationIntent::entity(entity, key))
}
pub fn invalidate_collection(self, collection: impl Into<String>) -> Self {
self.intent(InvalidationIntent::collection(collection))
}
pub fn flush(self) -> Self {
self.intent(InvalidationIntent::flush())
}
}
impl Default for InvalidationIntentBatch {
fn default() -> Self {
Self::new("")
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct CommitPosition(String);
impl CommitPosition {
pub fn new(value: impl Into<String>) -> Self {
Self(value.into())
}
pub fn as_str(&self) -> &str {
&self.0
}
pub fn into_string(self) -> String {
self.0
}
}
impl From<String> for CommitPosition {
fn from(value: String) -> Self {
Self::new(value)
}
}
impl From<&str> for CommitPosition {
fn from(value: &str) -> Self {
Self::new(value)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum OutboxState {
Pending,
Published,
Dead,
}
impl OutboxState {
pub fn as_str(self) -> &'static str {
match self {
Self::Pending => "pending",
Self::Published => "published",
Self::Dead => "dead",
}
}
pub fn from_storage(value: &str) -> Option<Self> {
match value {
"pending" => Some(Self::Pending),
"published" => Some(Self::Published),
"dead" => Some(Self::Dead),
_ => None,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct OutboxRow {
pub id: String,
pub namespace: String,
pub commit_position: CommitPosition,
pub target_hash: String,
pub intent: InvalidationIntent,
pub reason: String,
pub created_at_ms: u64,
pub available_at_ms: u64,
pub claimed_at_ms: Option<u64>,
pub claim_owner: Option<String>,
pub published_at_ms: Option<u64>,
pub attempts: u32,
pub state: OutboxState,
pub last_error: Option<String>,
}
impl OutboxRow {
pub fn is_published(&self) -> bool {
self.state == OutboxState::Published
}
pub fn is_dead_lettered(&self) -> bool {
self.state == OutboxState::Dead
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct OutboxStatus {
pub pending: u64,
pub oldest_pending_age_ms: u64,
pub dead_lettered: u64,
pub last_published_at_ms: Option<u64>,
pub failed_attempts: u64,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct OutboxWorkerDiagnostics {
pub iterations: u64,
pub claimed: u64,
pub published: u64,
pub retried: u64,
pub dead_lettered: u64,
}
#[derive(Debug, Default)]
struct OutboxWorkerCounters {
iterations: AtomicU64,
claimed: AtomicU64,
published: AtomicU64,
retried: AtomicU64,
dead_lettered: AtomicU64,
}
impl OutboxWorkerCounters {
fn snapshot(&self) -> OutboxWorkerDiagnostics {
OutboxWorkerDiagnostics {
iterations: self.iterations.load(Ordering::Relaxed),
claimed: self.claimed.load(Ordering::Relaxed),
published: self.published.load(Ordering::Relaxed),
retried: self.retried.load(Ordering::Relaxed),
dead_lettered: self.dead_lettered.load(Ordering::Relaxed),
}
}
fn record(&self, report: OutboxPublishReport) {
self.iterations.fetch_add(1, Ordering::Relaxed);
self.claimed
.fetch_add(report.claimed as u64, Ordering::Relaxed);
self.published
.fetch_add(report.published as u64, Ordering::Relaxed);
self.retried
.fetch_add(report.retried as u64, Ordering::Relaxed);
self.dead_lettered
.fetch_add(report.dead_lettered as u64, Ordering::Relaxed);
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum ConsistencyMode {
NoWait,
Local,
BestEffort,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct InvalidationReceipt {
namespace: String,
commit_position: CommitPosition,
created_at_ms: u64,
}
impl InvalidationReceipt {
pub fn new(namespace: impl Into<String>, commit_position: CommitPosition) -> Self {
Self {
namespace: namespace.into(),
commit_position,
created_at_ms: now_ms(),
}
}
pub fn namespace(&self) -> &str {
&self.namespace
}
pub fn commit_position(&self) -> &CommitPosition {
&self.commit_position
}
pub fn created_at_ms(&self) -> u64 {
self.created_at_ms
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct InvalidationWaitOutcome {
pub mode: ConsistencyMode,
pub satisfied: bool,
pub degraded: bool,
pub timed_out: bool,
pub pending: u64,
pub elapsed_ms: u64,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct InvalidationWaitDiagnostics {
pub waits: u64,
pub satisfied: u64,
pub timed_out: u64,
pub degraded: u64,
}
#[derive(Debug, Default)]
struct InvalidationWaitCounters {
waits: AtomicU64,
satisfied: AtomicU64,
timed_out: AtomicU64,
degraded: AtomicU64,
}
impl InvalidationWaitCounters {
fn snapshot(&self) -> InvalidationWaitDiagnostics {
InvalidationWaitDiagnostics {
waits: self.waits.load(Ordering::Relaxed),
satisfied: self.satisfied.load(Ordering::Relaxed),
timed_out: self.timed_out.load(Ordering::Relaxed),
degraded: self.degraded.load(Ordering::Relaxed),
}
}
fn record(&self, outcome: InvalidationWaitOutcome) {
self.waits.fetch_add(1, Ordering::Relaxed);
if outcome.satisfied {
self.satisfied.fetch_add(1, Ordering::Relaxed);
}
if outcome.timed_out {
self.timed_out.fetch_add(1, Ordering::Relaxed);
}
if outcome.degraded {
self.degraded.fetch_add(1, Ordering::Relaxed);
}
}
}
#[derive(Debug, Clone)]
pub struct InvalidationWait {
mode: ConsistencyMode,
timeout: Duration,
poll_interval: Duration,
counters: Arc<InvalidationWaitCounters>,
}
impl InvalidationWait {
pub fn no_wait() -> Self {
Self {
mode: ConsistencyMode::NoWait,
timeout: Duration::ZERO,
poll_interval: Duration::from_millis(10),
counters: Arc::default(),
}
}
pub fn local(timeout: Duration) -> Self {
Self {
mode: ConsistencyMode::Local,
timeout,
poll_interval: Duration::from_millis(10),
counters: Arc::default(),
}
}
pub fn best_effort(timeout: Duration) -> Self {
Self {
mode: ConsistencyMode::BestEffort,
timeout,
poll_interval: Duration::from_millis(10),
counters: Arc::default(),
}
}
pub fn poll_interval(mut self, poll_interval: Duration) -> Self {
self.poll_interval = poll_interval;
self
}
pub fn mode(&self) -> ConsistencyMode {
self.mode
}
pub fn diagnostics(&self) -> InvalidationWaitDiagnostics {
self.counters.snapshot()
}
pub async fn wait<O>(
&self,
outbox: &O,
receipt: &InvalidationReceipt,
) -> Result<InvalidationWaitOutcome>
where
O: InvalidationOutbox,
{
let start = tokio::time::Instant::now();
if self.mode == ConsistencyMode::NoWait {
let outcome = InvalidationWaitOutcome {
mode: self.mode,
satisfied: true,
degraded: false,
timed_out: false,
pending: 0,
elapsed_ms: elapsed_ms(start),
};
self.counters.record(outcome);
return Ok(outcome);
}
loop {
let status = outbox.status(receipt.namespace()).await?;
if status.pending == 0 {
let outcome = InvalidationWaitOutcome {
mode: self.mode,
satisfied: true,
degraded: false,
timed_out: false,
pending: 0,
elapsed_ms: elapsed_ms(start),
};
self.counters.record(outcome);
return Ok(outcome);
}
let elapsed = start.elapsed();
if elapsed >= self.timeout {
let outcome = InvalidationWaitOutcome {
mode: self.mode,
satisfied: false,
degraded: true,
timed_out: true,
pending: status.pending,
elapsed_ms: duration_ms(elapsed),
};
self.counters.record(outcome);
return Ok(outcome);
}
let remaining = self.timeout.saturating_sub(elapsed);
tokio::time::sleep(self.poll_interval.min(remaining)).await;
}
}
}
#[async_trait]
pub trait InvalidationOutbox: fmt::Debug + Send + Sync + 'static {
async fn enqueue(
&self,
namespace: &str,
commit_position: &CommitPosition,
batch: &InvalidationIntentBatch,
) -> Result<usize>;
async fn claim(
&self,
namespace: &str,
owner: &str,
limit: usize,
claim_ttl: Duration,
) -> Result<Vec<OutboxRow>>;
async fn mark_published(&self, ids: &[String]) -> Result<()>;
async fn mark_failed(&self, id: &str, error: &str, backoff: Duration, dead: bool)
-> Result<()>;
async fn reset_dead_letters(&self, namespace: &str) -> Result<u64>;
async fn status(&self, namespace: &str) -> Result<OutboxStatus>;
}
#[derive(Clone, Default)]
pub struct InMemoryInvalidationOutbox {
inner: Arc<Mutex<InMemoryOutboxInner>>,
}
impl InMemoryInvalidationOutbox {
pub fn new() -> Self {
Self::default()
}
pub fn row(&self, id: &str) -> Option<OutboxRow> {
self.inner.lock().ok()?.rows.get(id).cloned()
}
pub fn rows(&self) -> Vec<OutboxRow> {
self.inner
.lock()
.map(|inner| inner.rows.values().cloned().collect())
.unwrap_or_default()
}
}
impl fmt::Debug for InMemoryInvalidationOutbox {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter
.debug_struct("InMemoryInvalidationOutbox")
.field("rows", &self.rows().len())
.finish()
}
}
#[derive(Default)]
struct InMemoryOutboxInner {
rows: BTreeMap<String, OutboxRow>,
}
#[async_trait]
impl InvalidationOutbox for InMemoryInvalidationOutbox {
async fn enqueue(
&self,
namespace: &str,
commit_position: &CommitPosition,
batch: &InvalidationIntentBatch,
) -> Result<usize> {
let mut inner = self.lock_inner()?;
let now = now_ms();
let mut inserted = 0;
for intent in batch.intents() {
let target_hash = intent.target_hash_hex();
let id = outbox_row_id(namespace, commit_position.as_str(), &target_hash);
if inner.rows.contains_key(&id) {
continue;
}
inner.rows.insert(
id.clone(),
OutboxRow {
id,
namespace: namespace.to_owned(),
commit_position: commit_position.clone(),
target_hash,
intent: intent.clone(),
reason: batch.reason().to_owned(),
created_at_ms: now,
available_at_ms: now,
claimed_at_ms: None,
claim_owner: None,
published_at_ms: None,
attempts: 0,
state: OutboxState::Pending,
last_error: None,
},
);
inserted += 1;
}
Ok(inserted)
}
async fn claim(
&self,
namespace: &str,
owner: &str,
limit: usize,
claim_ttl: Duration,
) -> Result<Vec<OutboxRow>> {
if limit == 0 {
return Ok(Vec::new());
}
let mut inner = self.lock_inner()?;
let now = now_ms();
let claim_ttl_ms = duration_ms(claim_ttl);
let mut candidates = inner
.rows
.values()
.filter(|row| {
row.namespace == namespace
&& row.state == OutboxState::Pending
&& row.available_at_ms <= now
&& claim_is_available(row, now, claim_ttl_ms)
})
.map(|row| (row.available_at_ms, row.created_at_ms, row.id.clone()))
.collect::<Vec<_>>();
candidates.sort();
let ids = candidates
.into_iter()
.take(limit)
.map(|(_, _, id)| id)
.collect::<Vec<_>>();
let mut claimed = Vec::with_capacity(ids.len());
for id in ids {
if let Some(row) = inner.rows.get_mut(&id) {
row.claimed_at_ms = Some(now);
row.claim_owner = Some(owner.to_owned());
claimed.push(row.clone());
}
}
Ok(claimed)
}
async fn mark_published(&self, ids: &[String]) -> Result<()> {
let mut inner = self.lock_inner()?;
let now = now_ms();
for id in ids {
let Some(row) = inner.rows.get_mut(id) else {
continue;
};
row.state = OutboxState::Published;
row.published_at_ms = Some(now);
row.claimed_at_ms = None;
row.claim_owner = None;
row.last_error = None;
}
Ok(())
}
async fn mark_failed(
&self,
id: &str,
error: &str,
backoff: Duration,
dead: bool,
) -> Result<()> {
let mut inner = self.lock_inner()?;
let Some(row) = inner.rows.get_mut(id) else {
return Ok(());
};
let now = now_ms();
row.attempts = row.attempts.saturating_add(1);
row.last_error = Some(error.to_owned());
row.claimed_at_ms = None;
row.claim_owner = None;
if dead {
row.state = OutboxState::Dead;
} else {
row.state = OutboxState::Pending;
row.available_at_ms = now.saturating_add(duration_ms(backoff));
}
Ok(())
}
async fn reset_dead_letters(&self, namespace: &str) -> Result<u64> {
let mut inner = self.lock_inner()?;
let now = now_ms();
let mut reset = 0;
for row in inner.rows.values_mut() {
if row.namespace == namespace && row.state == OutboxState::Dead {
row.state = OutboxState::Pending;
row.available_at_ms = now;
row.claimed_at_ms = None;
row.claim_owner = None;
row.attempts = 0;
row.last_error = None;
reset += 1;
}
}
Ok(reset)
}
async fn status(&self, namespace: &str) -> Result<OutboxStatus> {
let inner = self.lock_inner()?;
let now = now_ms();
let mut status = OutboxStatus::default();
let mut oldest_pending = None::<u64>;
for row in inner.rows.values().filter(|row| row.namespace == namespace) {
status.failed_attempts += u64::from(row.attempts);
match row.state {
OutboxState::Pending => {
status.pending += 1;
oldest_pending = Some(
oldest_pending
.map_or(row.created_at_ms, |oldest| oldest.min(row.created_at_ms)),
);
}
OutboxState::Published => {
status.last_published_at_ms =
match (status.last_published_at_ms, row.published_at_ms) {
(Some(current), Some(candidate)) => Some(current.max(candidate)),
(None, Some(candidate)) => Some(candidate),
(current, None) => current,
};
}
OutboxState::Dead => {
status.dead_lettered += 1;
}
}
}
status.oldest_pending_age_ms = oldest_pending
.map(|created_at| now.saturating_sub(created_at))
.unwrap_or_default();
Ok(status)
}
}
impl InMemoryInvalidationOutbox {
fn lock_inner(&self) -> Result<std::sync::MutexGuard<'_, InMemoryOutboxInner>> {
self.inner
.lock()
.map_err(|_| backend_error("in-memory invalidation outbox mutex was poisoned"))
}
}
#[async_trait]
pub trait InvalidationApplier: Send + Sync + 'static {
async fn apply_invalidation(&self, intent: &InvalidationIntent) -> hydracache::CacheResult<()>;
}
#[async_trait]
impl<C> InvalidationApplier for HydraCache<C>
where
C: CacheCodec,
{
async fn apply_invalidation(&self, intent: &InvalidationIntent) -> hydracache::CacheResult<()> {
match intent.to_cache_invalidation() {
CacheInvalidation::Key { key } => {
self.invalidate_key(&key).await?;
}
CacheInvalidation::Tag { tag } => {
self.invalidate_tag(&tag).await?;
}
CacheInvalidation::Flush => {
self.flush().await?;
}
}
Ok(())
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct OutboxPublishReport {
pub claimed: usize,
pub published: usize,
pub retried: usize,
pub dead_lettered: usize,
}
#[derive(Debug, Clone)]
pub struct InvalidationOutboxWorker<O, A> {
outbox: O,
applier: A,
namespace: String,
owner: String,
batch_size: usize,
claim_ttl: Duration,
backoff: Duration,
max_attempts: u32,
counters: Arc<OutboxWorkerCounters>,
}
impl<O, A> InvalidationOutboxWorker<O, A> {
pub fn new(outbox: O, applier: A, namespace: impl Into<String>) -> Self {
Self {
outbox,
applier,
namespace: namespace.into(),
owner: "hydracache-outbox-worker".to_owned(),
batch_size: 64,
claim_ttl: Duration::from_secs(30),
backoff: Duration::from_secs(1),
max_attempts: 5,
counters: Arc::default(),
}
}
pub fn owner(mut self, owner: impl Into<String>) -> Self {
self.owner = owner.into();
self
}
pub fn batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size.max(1);
self
}
pub fn claim_ttl(mut self, claim_ttl: Duration) -> Self {
self.claim_ttl = claim_ttl;
self
}
pub fn backoff(mut self, backoff: Duration) -> Self {
self.backoff = backoff;
self
}
pub fn max_attempts(mut self, max_attempts: u32) -> Self {
self.max_attempts = max_attempts.max(1);
self
}
pub fn diagnostics(&self) -> OutboxWorkerDiagnostics {
self.counters.snapshot()
}
pub async fn run_once(&self) -> Result<OutboxPublishReport>
where
O: InvalidationOutbox,
A: InvalidationApplier,
{
let rows = self
.outbox
.claim(
&self.namespace,
&self.owner,
self.batch_size,
self.claim_ttl,
)
.await?;
let mut report = OutboxPublishReport {
claimed: rows.len(),
..OutboxPublishReport::default()
};
for row in rows {
match self.applier.apply_invalidation(&row.intent).await {
Ok(()) => {
self.outbox
.mark_published(std::slice::from_ref(&row.id))
.await?;
report.published += 1;
}
Err(error) => {
let dead = row.attempts.saturating_add(1) >= self.max_attempts;
self.outbox
.mark_failed(&row.id, &error.to_string(), self.backoff, dead)
.await?;
if dead {
report.dead_lettered += 1;
} else {
report.retried += 1;
}
}
}
}
self.counters.record(report);
Ok(report)
}
pub async fn reset_dead_letters(&self) -> Result<u64>
where
O: InvalidationOutbox,
{
self.outbox.reset_dead_letters(&self.namespace).await
}
}
fn write_hash_part(hasher: &mut Sha256, bytes: &[u8]) {
hasher.update((bytes.len() as u64).to_be_bytes());
hasher.update(bytes);
}
fn entity_tag(entity: &str, key: &str) -> String {
CacheKeyBuilder::new()
.segment(entity)
.segment(key)
.build_string()
}
fn collection_tag(collection: &str) -> String {
CacheKeyBuilder::from_segment(collection).build_string()
}
fn hex_encode(bytes: &[u8; 32]) -> String {
const HEX: &[u8; 16] = b"0123456789abcdef";
let mut out = String::with_capacity(bytes.len() * 2);
for byte in bytes {
out.push(HEX[(byte >> 4) as usize] as char);
out.push(HEX[(byte & 0x0f) as usize] as char);
}
out
}
fn outbox_row_id(namespace: &str, commit_position: &str, target_hash: &str) -> String {
let mut hasher = Sha256::new();
write_hash_part(&mut hasher, b"hydracache-outbox-row-id-v1");
write_hash_part(&mut hasher, namespace.as_bytes());
write_hash_part(&mut hasher, commit_position.as_bytes());
write_hash_part(&mut hasher, target_hash.as_bytes());
hex_encode(&hasher.finalize().into())
}
fn claim_is_available(row: &OutboxRow, now: u64, claim_ttl_ms: u64) -> bool {
match row.claimed_at_ms {
Some(claimed_at) => claimed_at.saturating_add(claim_ttl_ms) <= now,
None => true,
}
}
fn duration_ms(duration: Duration) -> u64 {
u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
}
fn elapsed_ms(start: tokio::time::Instant) -> u64 {
duration_ms(start.elapsed())
}
fn now_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis()
.try_into()
.unwrap_or(u64::MAX)
}
fn backend_error(message: impl Into<String>) -> DbCacheError {
hydracache::CacheError::Backend(message.into()).into()
}
#[cfg(test)]
mod tests {
use super::{
CommitPosition, InMemoryInvalidationOutbox, InvalidationIntent, InvalidationIntentBatch,
InvalidationOutbox, OutboxState,
};
#[test]
fn intent_target_hash_is_stable() {
let first = InvalidationIntent::entity("user", "42").target_hash();
let second = InvalidationIntent::entity("user", "42").target_hash();
assert_eq!(first, second);
assert_eq!(
InvalidationIntent::entity("user", "42")
.target_hash_hex()
.len(),
64
);
}
#[test]
fn intent_target_hash_distinguishes_kind_and_length_prefixed_parts() {
let key = InvalidationIntent::key("tenant:7/users");
let tag = InvalidationIntent::tag("tenant:7/users");
let entity = InvalidationIntent::entity("tenant:7", "users");
let collection = InvalidationIntent::collection("tenant:7:users");
assert_ne!(key.target_hash(), tag.target_hash());
assert_ne!(key.target_hash(), entity.target_hash());
assert_ne!(tag.target_hash(), collection.target_hash());
}
#[test]
fn intent_to_cache_invalidation_maps_each_kind() {
let key = InvalidationIntent::key("db:user:42").to_cache_invalidation();
assert_eq!(key.key_value(), Some("db:user:42"));
let tag = InvalidationIntent::tag("users").to_cache_invalidation();
assert_eq!(tag.tag_value(), Some("users"));
let entity = InvalidationIntent::entity("account:user", "42%beta").to_cache_invalidation();
assert_eq!(entity.tag_value(), Some("account%3Auser:42%25beta"));
let collection = InvalidationIntent::collection("users:active").to_cache_invalidation();
assert_eq!(collection.tag_value(), Some("users%3Aactive"));
assert!(InvalidationIntent::flush()
.to_cache_invalidation()
.is_flush());
}
#[test]
fn intent_batch_preserves_reason_and_order() {
let batch = InvalidationIntentBatch::new("user-write")
.invalidate_key("db:user:42")
.invalidate_tag("users")
.invalidate_entity("user", "42")
.invalidate_collection("users:active")
.flush();
assert_eq!(batch.reason(), "user-write");
assert_eq!(batch.len(), 5);
assert_eq!(batch.intents()[0].kind(), "key");
assert_eq!(batch.intents()[4].kind(), "flush");
}
#[test]
fn commit_position_wraps_database_identity() {
let position = CommitPosition::new("pg:123");
assert_eq!(position.as_str(), "pg:123");
assert_eq!(position.clone().into_string(), "pg:123");
assert_eq!(CommitPosition::from("pg:123"), position);
}
#[tokio::test]
async fn in_memory_outbox_enqueue_is_idempotent_for_same_commit_and_target() {
let outbox = InMemoryInvalidationOutbox::new();
let commit = CommitPosition::new("sqlite:1");
let batch = InvalidationIntentBatch::new("write").invalidate_tag("users");
assert_eq!(outbox.enqueue("db", &commit, &batch).await.unwrap(), 1);
assert_eq!(outbox.enqueue("db", &commit, &batch).await.unwrap(), 0);
let rows = outbox.rows();
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].state, OutboxState::Pending);
assert_eq!(rows[0].namespace, "db");
}
}