use std::collections::HashSet;
use std::sync::Arc;
use async_trait::async_trait;
use parking_lot::Mutex;
use thiserror::Error;
use super::mutation::{MemoryMutation, TenantId};
#[derive(Debug, Clone, PartialEq, Error)]
pub enum ApplyError {
#[error("apply path for `{variant}` not yet wired (planned in {planned_pr})")]
NotYetWired {
variant: &'static str,
planned_pr: &'static str,
},
#[error("(tenant {tenant_id}, log_index {log_index}) already applied")]
AlreadyApplied { tenant_id: TenantId, log_index: u64 },
#[error("engine apply failed: {message}")]
EngineFailure { message: String },
}
impl ApplyError {
pub fn metric_label(&self) -> &'static str {
match self {
ApplyError::NotYetWired { .. } => "not_yet_wired",
ApplyError::AlreadyApplied { .. } => "already_applied",
ApplyError::EngineFailure { .. } => "engine_failure",
}
}
pub fn is_idempotent_ok(&self) -> bool {
matches!(self, ApplyError::AlreadyApplied { .. })
}
}
#[async_trait]
pub trait Applier: Send + Sync {
async fn apply(
&self,
tenant_id: TenantId,
log_index: u64,
mutation: &MemoryMutation,
) -> Result<(), ApplyError>;
async fn applied_high_watermark(&self, tenant_id: TenantId) -> Result<u64, ApplyError>;
}
pub struct LocalApplier {
seen: Arc<Mutex<HashSet<(TenantId, u64)>>>,
}
impl Default for LocalApplier {
fn default() -> Self {
Self::new()
}
}
impl LocalApplier {
pub fn new() -> Self {
Self {
seen: Arc::new(Mutex::new(HashSet::new())),
}
}
}
#[async_trait]
impl Applier for LocalApplier {
async fn apply(
&self,
tenant_id: TenantId,
log_index: u64,
mutation: &MemoryMutation,
) -> Result<(), ApplyError> {
{
let mut seen = self.seen.lock();
if !seen.insert((tenant_id, log_index)) {
return Err(ApplyError::AlreadyApplied {
tenant_id,
log_index,
});
}
}
Err(ApplyError::NotYetWired {
variant: mutation.variant_name(),
planned_pr: "RFC 010 PR-6.4",
})
}
async fn applied_high_watermark(&self, tenant_id: TenantId) -> Result<u64, ApplyError> {
let seen = self.seen.lock();
Ok(seen
.iter()
.filter(|(t, _)| *t == tenant_id)
.map(|(_, idx)| *idx)
.max()
.unwrap_or(0))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::commit::mutation::OpId;
use serde_json::json;
fn upsert_memory(rid: &str) -> MemoryMutation {
MemoryMutation::UpsertMemory {
rid: rid.to_string(),
text: "hello".into(),
memory_type: "semantic".into(),
importance: 0.5,
valence: 0.0,
half_life: 86400.0,
namespace: "test".into(),
certainty: 1.0,
domain: "general".into(),
source: "test".into(),
emotional_state: None,
embedding: None,
extracted_entities: vec![],
created_at_unix_micros: None,
embedding_model: None,
metadata: json!({}),
}
}
fn tombstone(rid: &str) -> MemoryMutation {
MemoryMutation::TombstoneMemory {
rid: rid.to_string(),
reason: None,
requested_at_unix_micros: 0,
}
}
#[tokio::test]
async fn first_apply_returns_not_yet_wired() {
let applier = LocalApplier::new();
let err = applier
.apply(TenantId::new(1), 1, &upsert_memory("rid-1"))
.await
.unwrap_err();
assert!(matches!(err, ApplyError::NotYetWired { .. }));
assert_eq!(err.metric_label(), "not_yet_wired");
}
#[tokio::test]
async fn duplicate_apply_is_idempotent() {
let applier = LocalApplier::new();
let m = upsert_memory("rid-1");
let _ = applier.apply(TenantId::new(1), 1, &m).await;
let err = applier.apply(TenantId::new(1), 1, &m).await.unwrap_err();
assert!(matches!(err, ApplyError::AlreadyApplied { .. }));
assert!(err.is_idempotent_ok());
}
#[tokio::test]
async fn different_log_index_same_tenant_is_not_duplicate() {
let applier = LocalApplier::new();
let m1 = upsert_memory("rid-1");
let m2 = upsert_memory("rid-2");
let e1 = applier.apply(TenantId::new(1), 1, &m1).await.unwrap_err();
let e2 = applier.apply(TenantId::new(1), 2, &m2).await.unwrap_err();
assert!(matches!(e1, ApplyError::NotYetWired { .. }));
assert!(matches!(e2, ApplyError::NotYetWired { .. }));
}
#[tokio::test]
async fn same_log_index_different_tenant_is_not_duplicate() {
let applier = LocalApplier::new();
let m = upsert_memory("rid-1");
let e1 = applier.apply(TenantId::new(1), 1, &m).await.unwrap_err();
let e2 = applier.apply(TenantId::new(2), 1, &m).await.unwrap_err();
assert!(matches!(e1, ApplyError::NotYetWired { .. }));
assert!(matches!(e2, ApplyError::NotYetWired { .. }));
}
#[tokio::test]
async fn watermark_tracks_per_tenant_max() {
let applier = LocalApplier::new();
assert_eq!(
applier
.applied_high_watermark(TenantId::new(1))
.await
.unwrap(),
0
);
let m = upsert_memory("rid-1");
let _ = applier.apply(TenantId::new(1), 7, &m).await;
let _ = applier.apply(TenantId::new(1), 3, &m).await;
let _ = applier.apply(TenantId::new(2), 9, &m).await;
assert_eq!(
applier
.applied_high_watermark(TenantId::new(1))
.await
.unwrap(),
7
);
assert_eq!(
applier
.applied_high_watermark(TenantId::new(2))
.await
.unwrap(),
9
);
assert_eq!(
applier
.applied_high_watermark(TenantId::new(99))
.await
.unwrap(),
0
);
}
#[tokio::test]
async fn tombstone_variant_routes_through_apply() {
let applier = LocalApplier::new();
let err = applier
.apply(TenantId::new(1), 1, &tombstone("rid-1"))
.await
.unwrap_err();
match err {
ApplyError::NotYetWired { variant, .. } => {
assert_eq!(variant, "TombstoneMemory");
}
other => panic!("expected NotYetWired, got {other:?}"),
}
}
#[test]
fn apply_error_metric_labels_are_stable() {
assert_eq!(
ApplyError::NotYetWired {
variant: "x",
planned_pr: "y"
}
.metric_label(),
"not_yet_wired"
);
assert_eq!(
ApplyError::AlreadyApplied {
tenant_id: TenantId::new(1),
log_index: 1
}
.metric_label(),
"already_applied"
);
assert_eq!(
ApplyError::EngineFailure {
message: "x".into()
}
.metric_label(),
"engine_failure"
);
}
#[test]
fn is_idempotent_ok_classification() {
assert!(ApplyError::AlreadyApplied {
tenant_id: TenantId::new(1),
log_index: 1
}
.is_idempotent_ok());
assert!(!ApplyError::NotYetWired {
variant: "x",
planned_pr: "y"
}
.is_idempotent_ok());
assert!(!ApplyError::EngineFailure {
message: "x".into()
}
.is_idempotent_ok());
}
#[allow(dead_code)]
fn _op_id_compile_check() {
let _ = OpId::new_random();
}
}