use async_trait::async_trait;
use serde::Deserialize;
use serde::Serialize;
use crate::id::{AgentId, SnapshotId};
use crate::meta::{RestoreReport, SnapshotMeta};
use crate::retention::RetentionTickReport;
pub const LIFECYCLE_SUBJECT_PREFIX: &str = "nexo.memory.snapshot";
pub const MUTATION_SUBJECT_PREFIX: &str = "nexo.memory.mutated";
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum LifecycleEvent {
Created(SnapshotMeta),
Restored(RestoreReport),
Deleted {
agent_id: AgentId,
tenant: String,
snapshot_id: SnapshotId,
ts_ms: i64,
},
Gc {
ts_ms: i64,
report: RetentionTickReport,
},
}
impl LifecycleEvent {
pub fn subject(&self) -> String {
self.subject_with_prefix(LIFECYCLE_SUBJECT_PREFIX)
}
pub fn subject_with_prefix(&self, prefix: &str) -> String {
let (agent_id, kind) = match self {
LifecycleEvent::Created(m) => (m.agent_id.as_str(), "created"),
LifecycleEvent::Restored(r) => (r.agent_id.as_str(), "restored"),
LifecycleEvent::Deleted { agent_id, .. } => (agent_id.as_str(), "deleted"),
LifecycleEvent::Gc { .. } => ("_all", "gc"),
};
format!("{prefix}.{agent_id}.{kind}")
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum MutationScope {
SqliteLongTerm,
SqliteVector,
SqliteConcepts,
SqliteCompactions,
Git,
MemoryFile,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum MutationOp {
Insert,
Update,
Delete,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MutationEvent {
pub agent_id: AgentId,
pub tenant: String,
pub scope: MutationScope,
pub op: MutationOp,
pub key: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub old_hash: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub new_hash: Option<String>,
pub ts_ms: i64,
}
impl MutationEvent {
pub fn subject(&self) -> String {
self.subject_with_prefix(MUTATION_SUBJECT_PREFIX)
}
pub fn subject_with_prefix(&self, prefix: &str) -> String {
format!("{prefix}.{}", self.agent_id)
}
}
#[async_trait]
pub trait EventPublisher: Send + Sync + 'static {
async fn publish_lifecycle(&self, event: LifecycleEvent);
async fn publish_mutation(&self, event: MutationEvent);
}
#[derive(Debug, Default, Clone, Copy)]
pub struct NoopPublisher;
#[async_trait]
impl EventPublisher for NoopPublisher {
async fn publish_lifecycle(&self, _event: LifecycleEvent) {}
async fn publish_mutation(&self, _event: MutationEvent) {}
}
#[derive(Default)]
pub struct RecordingPublisher {
pub lifecycle: parking_lot_compat::Mutex<Vec<LifecycleEvent>>,
pub mutation: parking_lot_compat::Mutex<Vec<MutationEvent>>,
}
#[async_trait]
impl EventPublisher for RecordingPublisher {
async fn publish_lifecycle(&self, event: LifecycleEvent) {
self.lifecycle.lock().push(event);
}
async fn publish_mutation(&self, event: MutationEvent) {
self.mutation.lock().push(event);
}
}
mod parking_lot_compat {
use std::sync::{LockResult, Mutex as StdMutex, MutexGuard};
pub struct Mutex<T>(StdMutex<T>);
impl<T: Default> Default for Mutex<T> {
fn default() -> Self {
Self(StdMutex::new(T::default()))
}
}
impl<T> Mutex<T> {
pub fn lock(&self) -> MutexGuard<'_, T> {
tolerate(self.0.lock())
}
}
fn tolerate<T>(r: LockResult<T>) -> T {
match r {
Ok(g) => g,
Err(p) => p.into_inner(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::id::SnapshotId;
use crate::manifest::SchemaVersions;
use std::path::PathBuf;
use std::sync::Arc;
fn sample_meta() -> SnapshotMeta {
SnapshotMeta {
id: SnapshotId::new(),
agent_id: "ana".into(),
tenant: "default".into(),
label: None,
created_at_ms: 1_700_000_000_000,
bundle_path: PathBuf::from("/tmp/x.tar.zst"),
bundle_size_bytes: 0,
bundle_sha256: String::new(),
git_oid: None,
schema_versions: SchemaVersions::CURRENT,
encrypted: false,
redactions_applied: false,
}
}
#[test]
fn lifecycle_subjects_include_agent_id_and_kind() {
let m = sample_meta();
assert_eq!(
LifecycleEvent::Created(m.clone()).subject(),
"nexo.memory.snapshot.ana.created"
);
assert_eq!(
LifecycleEvent::Deleted {
agent_id: "ana".into(),
tenant: "default".into(),
snapshot_id: m.id,
ts_ms: 0,
}
.subject(),
"nexo.memory.snapshot.ana.deleted"
);
assert_eq!(
LifecycleEvent::Gc {
ts_ms: 0,
report: Default::default(),
}
.subject(),
"nexo.memory.snapshot._all.gc"
);
}
#[test]
fn lifecycle_subject_with_prefix_overrides_default() {
let m = sample_meta();
assert_eq!(
LifecycleEvent::Created(m).subject_with_prefix("shard.alpha.snap"),
"shard.alpha.snap.ana.created"
);
assert_eq!(
LifecycleEvent::Gc {
ts_ms: 0,
report: Default::default(),
}
.subject_with_prefix("shard.alpha.snap"),
"shard.alpha.snap._all.gc"
);
}
#[test]
fn mutation_subject_with_prefix_overrides_default() {
let e = MutationEvent {
agent_id: "ana".into(),
tenant: "default".into(),
scope: MutationScope::SqliteLongTerm,
op: MutationOp::Insert,
key: "row".into(),
old_hash: None,
new_hash: None,
ts_ms: 0,
};
assert_eq!(
e.subject_with_prefix("shard.alpha.mut"),
"shard.alpha.mut.ana"
);
}
#[test]
fn mutation_subject_carries_agent_id() {
let e = MutationEvent {
agent_id: "ana".into(),
tenant: "default".into(),
scope: MutationScope::SqliteLongTerm,
op: MutationOp::Insert,
key: "row-1".into(),
old_hash: None,
new_hash: Some("ab".repeat(32)),
ts_ms: 0,
};
assert_eq!(e.subject(), "nexo.memory.mutated.ana");
}
#[test]
fn mutation_event_round_trips_via_json() {
let e = MutationEvent {
agent_id: "ana".into(),
tenant: "default".into(),
scope: MutationScope::Git,
op: MutationOp::Update,
key: "deadbeef".into(),
old_hash: None,
new_hash: None,
ts_ms: 17,
};
let s = serde_json::to_string(&e).unwrap();
assert!(s.contains("\"scope\":\"git\""));
assert!(s.contains("\"op\":\"update\""));
let back: MutationEvent = serde_json::from_str(&s).unwrap();
assert_eq!(back.key, e.key);
assert_eq!(back.ts_ms, e.ts_ms);
}
#[tokio::test]
async fn noop_publisher_silently_drops_both_event_kinds() {
let p: Arc<dyn EventPublisher> = Arc::new(NoopPublisher);
p.publish_lifecycle(LifecycleEvent::Created(sample_meta()))
.await;
p.publish_mutation(MutationEvent {
agent_id: "ana".into(),
tenant: "default".into(),
scope: MutationScope::SqliteVector,
op: MutationOp::Delete,
key: "k".into(),
old_hash: None,
new_hash: None,
ts_ms: 0,
})
.await;
}
#[tokio::test]
async fn recording_publisher_keeps_every_event() {
let p = Arc::new(RecordingPublisher::default());
let pa: Arc<dyn EventPublisher> = p.clone();
pa.publish_lifecycle(LifecycleEvent::Created(sample_meta()))
.await;
pa.publish_lifecycle(LifecycleEvent::Gc {
ts_ms: 0,
report: Default::default(),
})
.await;
pa.publish_mutation(MutationEvent {
agent_id: "ana".into(),
tenant: "default".into(),
scope: MutationScope::SqliteConcepts,
op: MutationOp::Insert,
key: "k".into(),
old_hash: None,
new_hash: None,
ts_ms: 0,
})
.await;
assert_eq!(p.lifecycle.lock().len(), 2);
assert_eq!(p.mutation.lock().len(), 1);
}
#[test]
fn dyn_publisher_can_be_held_as_arc() {
let _p: Arc<dyn EventPublisher> = Arc::new(NoopPublisher);
}
}