use std::collections::HashSet;
use std::sync::Arc;
use async_trait::async_trait;
use parking_lot::Mutex;
use thiserror::Error;
use yantrikdb::YantrikDB;
use super::mutation::{MemoryMutation, TenantId};
#[derive(Debug, Clone, PartialEq, Error)]
pub enum ApplyError {
#[error("apply path for `{variant}` not yet wired (planned in {planned_pr})")]
NotYetWired {
variant: &'static str,
planned_pr: &'static str,
},
#[error("(tenant {tenant_id}, log_index {log_index}) already applied")]
AlreadyApplied { tenant_id: TenantId, log_index: u64 },
#[error("engine apply failed: {message}")]
EngineFailure { message: String },
}
impl ApplyError {
pub fn metric_label(&self) -> &'static str {
match self {
ApplyError::NotYetWired { .. } => "not_yet_wired",
ApplyError::AlreadyApplied { .. } => "already_applied",
ApplyError::EngineFailure { .. } => "engine_failure",
}
}
pub fn is_idempotent_ok(&self) -> bool {
matches!(self, ApplyError::AlreadyApplied { .. })
}
}
#[async_trait]
pub trait Applier: Send + Sync {
async fn apply(
&self,
tenant_id: TenantId,
log_index: u64,
mutation: &MemoryMutation,
) -> Result<(), ApplyError>;
async fn applied_high_watermark(&self, tenant_id: TenantId) -> Result<u64, ApplyError>;
}
pub struct LocalApplier {
seen: Arc<Mutex<HashSet<(TenantId, u64)>>>,
}
impl Default for LocalApplier {
fn default() -> Self {
Self::new()
}
}
impl LocalApplier {
pub fn new() -> Self {
Self {
seen: Arc::new(Mutex::new(HashSet::new())),
}
}
}
#[async_trait]
impl Applier for LocalApplier {
async fn apply(
&self,
tenant_id: TenantId,
log_index: u64,
mutation: &MemoryMutation,
) -> Result<(), ApplyError> {
{
let mut seen = self.seen.lock();
if !seen.insert((tenant_id, log_index)) {
return Err(ApplyError::AlreadyApplied {
tenant_id,
log_index,
});
}
}
Err(ApplyError::NotYetWired {
variant: mutation.variant_name(),
planned_pr: "RFC 010 PR-6.4",
})
}
async fn applied_high_watermark(&self, tenant_id: TenantId) -> Result<u64, ApplyError> {
let seen = self.seen.lock();
Ok(seen
.iter()
.filter(|(t, _)| *t == tenant_id)
.map(|(_, idx)| *idx)
.max()
.unwrap_or(0))
}
}
pub trait EngineResolver: Send + Sync {
fn resolve(&self, tenant_id: TenantId) -> Result<Arc<YantrikDB>, ApplyError>;
}
pub struct EngineApplier {
resolver: Arc<dyn EngineResolver>,
seen: Arc<Mutex<HashSet<(TenantId, u64)>>>,
}
impl EngineApplier {
pub fn new(resolver: Arc<dyn EngineResolver>) -> Self {
Self {
resolver,
seen: Arc::new(Mutex::new(HashSet::new())),
}
}
}
#[async_trait]
impl Applier for EngineApplier {
async fn apply(
&self,
tenant_id: TenantId,
log_index: u64,
mutation: &MemoryMutation,
) -> Result<(), ApplyError> {
{
let mut seen = self.seen.lock();
if !seen.insert((tenant_id, log_index)) {
return Err(ApplyError::AlreadyApplied {
tenant_id,
log_index,
});
}
}
let engine = self.resolver.resolve(tenant_id)?;
let mutation = mutation.clone();
let result = tokio::task::spawn_blocking(move || -> Result<(), ApplyError> {
apply_to_engine(&engine, log_index, &mutation)
})
.await
.map_err(|e| ApplyError::EngineFailure {
message: format!("spawn_blocking join: {e}"),
})?;
result
}
async fn applied_high_watermark(&self, tenant_id: TenantId) -> Result<u64, ApplyError> {
let seen = self.seen.lock();
Ok(seen
.iter()
.filter(|(t, _)| *t == tenant_id)
.map(|(_, idx)| *idx)
.max()
.unwrap_or(0))
}
}
fn apply_to_engine(
engine: &YantrikDB,
log_index: u64,
mutation: &MemoryMutation,
) -> Result<(), ApplyError> {
match mutation {
MemoryMutation::UpsertMemory {
rid,
text,
memory_type,
importance,
valence,
half_life,
namespace,
certainty,
domain,
source,
emotional_state,
embedding,
metadata,
extracted_entities,
created_at_unix_micros,
embedding_model,
} => {
let emb = embedding
.as_deref()
.ok_or_else(|| ApplyError::EngineFailure {
message: format!(
"UpsertMemory rid={} missing embedding — leader did not materialize \
before commit (PR 6.2 contract violation)",
rid
),
})?;
let entities_ref: Vec<&str> = extracted_entities.iter().map(String::as_str).collect();
let created_at = created_at_unix_micros.unwrap_or(0);
let model = embedding_model.as_deref().unwrap_or("default");
engine
.record_with_rid(
rid,
text,
memory_type,
*importance,
*valence,
*half_life,
metadata,
emb,
namespace,
*certainty,
domain,
source,
emotional_state.as_deref(),
created_at,
&entities_ref,
model,
Some(log_index),
)
.map_err(|e| ApplyError::EngineFailure {
message: format!("record_with_rid({rid}): {e}"),
})?;
Ok(())
}
MemoryMutation::TombstoneMemory {
rid,
reason,
requested_at_unix_micros,
namespace,
} => {
let ns = if namespace.is_empty() {
tracing::info!(
rid,
log_index,
"TombstoneMemory legacy v1.0/v1.1 payload — namespace empty, engine will resolve from row"
);
""
} else {
namespace.as_str()
};
engine
.tombstone_with_rid(
rid,
ns,
reason.as_deref(),
*requested_at_unix_micros,
Some(log_index),
)
.map_err(|e| ApplyError::EngineFailure {
message: format!("tombstone_with_rid({rid}): {e}"),
})?;
Ok(())
}
MemoryMutation::UpsertEntityEdge {
edge_id,
src,
dst,
rel_type,
weight,
namespace,
} => {
engine
.upsert_entity_edge_with_id(
edge_id,
src,
dst,
rel_type,
*weight,
namespace,
0,
Some(log_index),
)
.map_err(|e| ApplyError::EngineFailure {
message: format!("upsert_entity_edge_with_id({edge_id}): {e}"),
})?;
Ok(())
}
MemoryMutation::DeleteEntityEdge {
edge_id,
namespace,
requested_at_unix_micros,
} => {
let ns = if namespace.is_empty() {
tracing::info!(
edge_id,
log_index,
"DeleteEntityEdge legacy v1.0/v1.1 payload — namespace empty"
);
""
} else {
namespace.as_str()
};
engine
.delete_entity_edge_with_id(edge_id, ns, *requested_at_unix_micros, Some(log_index))
.map_err(|e| ApplyError::EngineFailure {
message: format!("delete_entity_edge_with_id({edge_id}): {e}"),
})?;
Ok(())
}
MemoryMutation::UpdateMemoryPatch { .. } => Err(ApplyError::NotYetWired {
variant: "UpdateMemoryPatch",
planned_pr: "RFC 011-A correct semantics",
}),
MemoryMutation::PurgeMemory { .. } => Err(ApplyError::NotYetWired {
variant: "PurgeMemory",
planned_pr: "RFC 011 PR-3",
}),
MemoryMutation::TenantConfigPatch { .. } => Err(ApplyError::NotYetWired {
variant: "TenantConfigPatch",
planned_pr: "RFC 021 PR-2",
}),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::commit::mutation::OpId;
use serde_json::json;
fn upsert_memory(rid: &str) -> MemoryMutation {
MemoryMutation::UpsertMemory {
rid: rid.to_string(),
text: "hello".into(),
memory_type: "semantic".into(),
importance: 0.5,
valence: 0.0,
half_life: 86400.0,
namespace: "test".into(),
certainty: 1.0,
domain: "general".into(),
source: "test".into(),
emotional_state: None,
embedding: None,
extracted_entities: vec![],
created_at_unix_micros: None,
embedding_model: None,
metadata: json!({}),
}
}
fn tombstone(rid: &str) -> MemoryMutation {
MemoryMutation::TombstoneMemory {
rid: rid.to_string(),
reason: None,
requested_at_unix_micros: 0,
namespace: String::new(),
}
}
#[tokio::test]
async fn first_apply_returns_not_yet_wired() {
let applier = LocalApplier::new();
let err = applier
.apply(TenantId::new(1), 1, &upsert_memory("rid-1"))
.await
.unwrap_err();
assert!(matches!(err, ApplyError::NotYetWired { .. }));
assert_eq!(err.metric_label(), "not_yet_wired");
}
#[tokio::test]
async fn duplicate_apply_is_idempotent() {
let applier = LocalApplier::new();
let m = upsert_memory("rid-1");
let _ = applier.apply(TenantId::new(1), 1, &m).await;
let err = applier.apply(TenantId::new(1), 1, &m).await.unwrap_err();
assert!(matches!(err, ApplyError::AlreadyApplied { .. }));
assert!(err.is_idempotent_ok());
}
#[tokio::test]
async fn different_log_index_same_tenant_is_not_duplicate() {
let applier = LocalApplier::new();
let m1 = upsert_memory("rid-1");
let m2 = upsert_memory("rid-2");
let e1 = applier.apply(TenantId::new(1), 1, &m1).await.unwrap_err();
let e2 = applier.apply(TenantId::new(1), 2, &m2).await.unwrap_err();
assert!(matches!(e1, ApplyError::NotYetWired { .. }));
assert!(matches!(e2, ApplyError::NotYetWired { .. }));
}
#[tokio::test]
async fn same_log_index_different_tenant_is_not_duplicate() {
let applier = LocalApplier::new();
let m = upsert_memory("rid-1");
let e1 = applier.apply(TenantId::new(1), 1, &m).await.unwrap_err();
let e2 = applier.apply(TenantId::new(2), 1, &m).await.unwrap_err();
assert!(matches!(e1, ApplyError::NotYetWired { .. }));
assert!(matches!(e2, ApplyError::NotYetWired { .. }));
}
#[tokio::test]
async fn watermark_tracks_per_tenant_max() {
let applier = LocalApplier::new();
assert_eq!(
applier
.applied_high_watermark(TenantId::new(1))
.await
.unwrap(),
0
);
let m = upsert_memory("rid-1");
let _ = applier.apply(TenantId::new(1), 7, &m).await;
let _ = applier.apply(TenantId::new(1), 3, &m).await;
let _ = applier.apply(TenantId::new(2), 9, &m).await;
assert_eq!(
applier
.applied_high_watermark(TenantId::new(1))
.await
.unwrap(),
7
);
assert_eq!(
applier
.applied_high_watermark(TenantId::new(2))
.await
.unwrap(),
9
);
assert_eq!(
applier
.applied_high_watermark(TenantId::new(99))
.await
.unwrap(),
0
);
}
#[tokio::test]
async fn tombstone_variant_routes_through_apply() {
let applier = LocalApplier::new();
let err = applier
.apply(TenantId::new(1), 1, &tombstone("rid-1"))
.await
.unwrap_err();
match err {
ApplyError::NotYetWired { variant, .. } => {
assert_eq!(variant, "TombstoneMemory");
}
other => panic!("expected NotYetWired, got {other:?}"),
}
}
#[test]
fn apply_error_metric_labels_are_stable() {
assert_eq!(
ApplyError::NotYetWired {
variant: "x",
planned_pr: "y"
}
.metric_label(),
"not_yet_wired"
);
assert_eq!(
ApplyError::AlreadyApplied {
tenant_id: TenantId::new(1),
log_index: 1
}
.metric_label(),
"already_applied"
);
assert_eq!(
ApplyError::EngineFailure {
message: "x".into()
}
.metric_label(),
"engine_failure"
);
}
#[test]
fn is_idempotent_ok_classification() {
assert!(ApplyError::AlreadyApplied {
tenant_id: TenantId::new(1),
log_index: 1
}
.is_idempotent_ok());
assert!(!ApplyError::NotYetWired {
variant: "x",
planned_pr: "y"
}
.is_idempotent_ok());
assert!(!ApplyError::EngineFailure {
message: "x".into()
}
.is_idempotent_ok());
}
#[allow(dead_code)]
fn _op_id_compile_check() {
let _ = OpId::new_random();
}
}