use std::path::{Path, PathBuf};
use std::sync::Arc;
use crate::atomisation::{AtomiseError, Atomiser};
use crate::models::Memory;
use crate::storage as db;
const AUTO_ATOMISE_TRACE_TARGET: &str = "pre_store.auto_atomise";
const AUTO_ATOMISE_SYNC_TRACE_TARGET: &str = "pre_store.auto_atomise.sync";
#[derive(Debug, Clone)]
pub enum AutoAtomisationOutcome {
Skipped { reason: &'static str },
UnderThreshold { tokens: usize, threshold: u32 },
Enqueued {
memory_id: String,
namespace: String,
},
}
pub struct AutoAtomisationDispatch {
pub db_path: PathBuf,
pub atomiser: Arc<Atomiser>,
}
impl std::fmt::Debug for AutoAtomisationDispatch {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AutoAtomisationDispatch")
.field("db_path", &self.db_path)
.field("atomiser", &"<Arc<Atomiser>>")
.finish()
}
}
pub static AUTO_ATOMISE_DISPATCH: std::sync::OnceLock<Arc<AutoAtomisationDispatch>> =
std::sync::OnceLock::new();
pub fn install_auto_atomise_dispatch(
dispatch: AutoAtomisationDispatch,
) -> Result<(), Arc<AutoAtomisationDispatch>> {
AUTO_ATOMISE_DISPATCH.set(Arc::new(dispatch))
}
#[must_use]
pub fn maybe_enqueue_auto_atomise(
conn: &rusqlite::Connection,
memory: &Memory,
actual_id: &str,
calling_agent_id: &str,
) -> AutoAtomisationOutcome {
let Some(dispatch) = AUTO_ATOMISE_DISPATCH.get() else {
return AutoAtomisationOutcome::Skipped {
reason: "dispatch_unset",
};
};
let policy = db::resolve_governance_policy(conn, &memory.namespace).unwrap_or_default();
if !policy.effective_auto_atomise() {
return AutoAtomisationOutcome::Skipped {
reason: "policy_disabled",
};
}
let threshold = policy.effective_auto_atomise_threshold_cl100k();
let tokens = db::count_tokens_cl100k(&memory.content);
if tokens <= threshold as usize {
return AutoAtomisationOutcome::UnderThreshold { tokens, threshold };
}
let max_atom_tokens = policy.effective_auto_atomise_max_atom_tokens();
let dispatch_for_thread = Arc::clone(dispatch);
let memory_id = actual_id.to_string();
let namespace = memory.namespace.clone();
let agent_id = calling_agent_id.to_string();
std::thread::spawn(move || {
run_deferred_atomise(
&dispatch_for_thread.db_path,
&dispatch_for_thread.atomiser,
&memory_id,
max_atom_tokens,
&agent_id,
);
});
AutoAtomisationOutcome::Enqueued {
memory_id: actual_id.to_string(),
namespace,
}
}
#[must_use]
pub fn run_synchronous_auto_atomise(
conn: &rusqlite::Connection,
memory: &Memory,
actual_id: &str,
calling_agent_id: &str,
) -> &'static str {
let Some(dispatch) = AUTO_ATOMISE_DISPATCH.get() else {
tracing::info!(
target: AUTO_ATOMISE_SYNC_TRACE_TARGET,
"synchronous-mode dispatch unset for memory={}; substrate stays quiet",
actual_id,
);
return "skipped_dispatch_unset";
};
let policy = db::resolve_governance_policy(conn, &memory.namespace).unwrap_or_default();
let threshold = policy.effective_auto_atomise_threshold_cl100k();
let tokens = db::count_tokens_cl100k(&memory.content);
if tokens <= threshold as usize {
return "skipped_under_threshold";
}
let max_atom_tokens = policy.effective_auto_atomise_max_atom_tokens();
let max_retries = policy
.effective_auto_atomise_max_retries()
.unwrap_or(dispatch.atomiser.sync_curator_max_retries());
match dispatch.atomiser.atomise_sync_with_retries(
conn,
actual_id,
max_atom_tokens,
false,
calling_agent_id,
max_retries,
) {
Ok(result) => {
tracing::info!(
target: AUTO_ATOMISE_SYNC_TRACE_TARGET,
"synchronous-atomise succeeded: source={} atoms={}",
result.source_id,
result.atom_count,
);
"atomised"
}
Err(AtomiseError::SourceTooSmall) => {
tracing::info!(
target: AUTO_ATOMISE_SYNC_TRACE_TARGET,
"synchronous-atomise skipped: source={} body too small",
actual_id,
);
"skipped_source_too_small"
}
Err(AtomiseError::AlreadyAtomised { .. }) => {
tracing::info!(
target: AUTO_ATOMISE_SYNC_TRACE_TARGET,
"synchronous-atomise skipped: source={} already atomised",
actual_id,
);
"skipped_already_atomised"
}
Err(e) => {
tracing::error!(
target: AUTO_ATOMISE_SYNC_TRACE_TARGET,
"synchronous-atomise failed for source={}: {:?}",
actual_id,
e,
);
"failed"
}
}
}
pub fn run_deferred_atomise(
db_path: &Path,
atomiser: &Atomiser,
memory_id: &str,
max_atom_tokens: u32,
calling_agent_id: &str,
) {
std::thread::sleep(std::time::Duration::from_millis(100));
let conn = match db::open(db_path) {
Ok(c) => c,
Err(e) => {
tracing::error!(
target: AUTO_ATOMISE_TRACE_TARGET,
"worker: failed to open db at {} for memory={}: {}",
db_path.display(),
memory_id,
e
);
return;
}
};
match atomiser.atomise_sync(&conn, memory_id, max_atom_tokens, false, calling_agent_id) {
Ok(result) => {
tracing::info!(
target: AUTO_ATOMISE_TRACE_TARGET,
"auto-atomisation succeeded: source={} atoms={}",
result.source_id,
result.atom_count
);
}
Err(AtomiseError::AlreadyAtomised {
source_id,
existing_atom_ids,
}) => {
tracing::info!(
target: AUTO_ATOMISE_TRACE_TARGET,
"auto-atomisation skipped (race): source={} already split into {} atoms",
source_id,
existing_atom_ids.len()
);
}
Err(AtomiseError::SourceTooSmall) => {
tracing::warn!(
target: AUTO_ATOMISE_TRACE_TARGET,
"auto-atomisation skipped: source={} body fits within max_atom_tokens (curator returned no atoms)",
memory_id
);
}
Err(AtomiseError::CuratorFailed(reason)) => {
tracing::error!(
target: AUTO_ATOMISE_TRACE_TARGET,
"auto-atomisation curator failed for source={}: {} — operator may retry with `memory_atomise`",
memory_id,
reason
);
}
Err(AtomiseError::TierLocked) => {
tracing::info!(
target: AUTO_ATOMISE_TRACE_TARGET,
"auto-atomisation skipped: source={} tier_locked (keyword feature tier)",
memory_id
);
}
Err(AtomiseError::NotFound) => {
tracing::info!(
target: AUTO_ATOMISE_TRACE_TARGET,
"auto-atomisation skipped: source={} not found (raced with delete?)",
memory_id
);
}
Err(e) => {
tracing::error!(
target: AUTO_ATOMISE_TRACE_TARGET,
"auto-atomisation failed for source={}: {:?} (full context: {})",
memory_id,
e,
e
);
}
}
}
#[cfg(test)]
pub fn _test_only_take_dispatch() -> Option<Arc<AutoAtomisationDispatch>> {
AUTO_ATOMISE_DISPATCH.get().cloned()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::models::{
ApproverType, AtomisationPolicy, CorePolicy, GovernanceLevel, GovernancePolicy, Tier,
};
use chrono::Utc;
use rusqlite::Connection;
use tempfile::TempDir;
fn fresh_db() -> (Connection, TempDir, PathBuf) {
let dir = TempDir::new().unwrap();
let path = dir.path().join("ai-memory.db");
let conn = db::open(&path).unwrap();
(conn, dir, path)
}
fn make_memory(ns: &str, content: &str) -> Memory {
let now = Utc::now().to_rfc3339();
Memory {
id: uuid::Uuid::new_v4().to_string(),
tier: Tier::Mid,
namespace: ns.to_string(),
title: format!("title-{}", uuid::Uuid::new_v4().simple()),
content: content.to_string(),
created_at: now.clone(),
updated_at: now,
metadata: serde_json::json!({"agent_id": "ai:test"}),
..Default::default()
}
}
fn seed_policy(conn: &Connection, ns: &str, policy: GovernancePolicy) {
let now = Utc::now().to_rfc3339();
let gov_metadata = serde_json::json!({
"agent_id": "ai:test",
"governance": serde_json::to_value(&policy).unwrap(),
});
let std_mem = Memory {
id: uuid::Uuid::new_v4().to_string(),
tier: Tier::Long,
namespace: ns.to_string(),
title: format!("__standard_{ns}"),
content: "standard".into(),
created_at: now.clone(),
updated_at: now,
metadata: gov_metadata,
..Default::default()
};
let std_id = db::insert(conn, &std_mem).unwrap();
db::set_namespace_standard(conn, ns, &std_id, None).unwrap();
}
fn opt_in_policy() -> GovernancePolicy {
GovernancePolicy {
core: CorePolicy {
write: GovernanceLevel::Any,
promote: GovernanceLevel::Any,
delete: GovernanceLevel::Owner,
approver: ApproverType::Human,
inherit: true,
max_reflection_depth: None,
},
atomisation: AtomisationPolicy {
auto_atomise: Some(true),
auto_atomise_threshold_cl100k: Some(50),
auto_atomise_max_atom_tokens: Some(20),
auto_atomise_max_retries: None,
auto_atomise_mode: None,
},
..Default::default()
}
}
#[test]
fn outcome_variants_render_with_debug() {
for o in [
AutoAtomisationOutcome::Skipped {
reason: "policy_disabled",
},
AutoAtomisationOutcome::UnderThreshold {
tokens: 100,
threshold: 500,
},
AutoAtomisationOutcome::Enqueued {
memory_id: "m-1".into(),
namespace: "ns".into(),
},
] {
let s = format!("{o:?}");
assert!(!s.is_empty());
}
}
#[test]
fn dispatch_unset_short_circuits_to_skipped() {
let (conn, _dir, _path) = fresh_db();
let mem = make_memory("any-ns", "any body");
let outcome = maybe_enqueue_auto_atomise(&conn, &mem, &mem.id, "ai:test");
match outcome {
AutoAtomisationOutcome::Skipped { reason } => {
assert!(
reason == "dispatch_unset" || reason == "policy_disabled",
"unexpected skip reason: {reason}"
);
}
_ => panic!("expected Skipped on empty/unconfigured dispatch, got {outcome:?}"),
}
}
#[test]
fn policy_resolution_returns_default_when_no_standard() {
let (conn, _dir, _path) = fresh_db();
let policy = db::resolve_governance_policy(&conn, "fresh-ns").unwrap_or_default();
assert!(!policy.effective_auto_atomise());
assert_eq!(policy.effective_auto_atomise_threshold_cl100k(), 500);
assert_eq!(policy.effective_auto_atomise_max_atom_tokens(), 200);
}
#[test]
fn policy_resolution_picks_up_opt_in() {
let (conn, _dir, _path) = fresh_db();
seed_policy(&conn, "opt-in-ns", opt_in_policy());
let policy = db::resolve_governance_policy(&conn, "opt-in-ns").unwrap_or_default();
assert!(policy.effective_auto_atomise());
assert_eq!(policy.effective_auto_atomise_threshold_cl100k(), 50);
assert_eq!(policy.effective_auto_atomise_max_atom_tokens(), 20);
}
#[test]
fn dispatch_struct_debug_formatter_renders_redacted_atomiser() {
use crate::atomisation::AtomiserConfig;
use crate::atomisation::curator::{Atom, Curator, CuratorError};
use crate::config::FeatureTier;
struct NoopCurator;
impl Curator for NoopCurator {
fn decompose(
&self,
_body: &str,
_max_atom_tokens: u32,
_max_retries: u32,
) -> Result<Vec<Atom>, CuratorError> {
Err(CuratorError::LlmUnavailable("noop".to_string()))
}
}
let atomiser = Arc::new(crate::atomisation::Atomiser::new(
Box::new(NoopCurator),
None,
AtomiserConfig::default(),
FeatureTier::Smart,
));
let dispatch = AutoAtomisationDispatch {
db_path: PathBuf::from("/var/.ai-memory-non-existent-for-debug-fmt.db"),
atomiser,
};
let s = format!("{dispatch:?}");
assert!(s.contains("AutoAtomisationDispatch"));
assert!(s.contains("<Arc<Atomiser>>"));
assert!(s.contains("non-existent-for-debug-fmt"));
}
#[test]
fn run_synchronous_auto_atomise_short_circuits_when_dispatch_unset() {
let (conn, _dir, _path) = fresh_db();
let mem = make_memory("sync-noop-ns", "short body");
let tag = run_synchronous_auto_atomise(&conn, &mem, &mem.id, "ai:test");
let known: &[&str] = &[
"skipped_dispatch_unset",
"skipped_under_threshold",
"atomised",
"skipped_source_too_small",
"skipped_already_atomised",
"failed",
];
assert!(
known.contains(&tag),
"unexpected sync auto-atomise tag: {tag}"
);
}
#[test]
fn run_synchronous_auto_atomise_short_body_under_threshold() {
let (conn, _dir, _path) = fresh_db();
let mem = make_memory("sync-short-ns", "hi");
let tag = run_synchronous_auto_atomise(&conn, &mem, &mem.id, "ai:test");
assert!(
matches!(tag, "skipped_dispatch_unset" | "skipped_under_threshold"),
"unexpected tag: {tag}"
);
}
#[test]
fn test_only_take_dispatch_does_not_panic() {
let _ = _test_only_take_dispatch();
}
use crate::atomisation::curator::{Atom, Curator, CuratorError};
use crate::atomisation::{Atomiser, AtomiserConfig};
use crate::config::FeatureTier;
use std::sync::Mutex as StdMutex;
struct SeqCurator {
responses: StdMutex<Vec<Result<Vec<Atom>, CuratorError>>>,
}
impl SeqCurator {
fn new(responses: Vec<Result<Vec<Atom>, CuratorError>>) -> Self {
Self {
responses: StdMutex::new(responses),
}
}
}
impl Curator for SeqCurator {
fn decompose(
&self,
_body: &str,
_max_atom_tokens: u32,
_max_retries: u32,
) -> Result<Vec<Atom>, CuratorError> {
let mut rs = self.responses.lock().unwrap();
if rs.is_empty() {
return Err(CuratorError::LlmUnavailable("seq exhausted".into()));
}
rs.remove(0)
}
}
fn atomiser_with(curator: Box<dyn Curator>, tier: FeatureTier) -> Atomiser {
Atomiser::new(curator, None, AtomiserConfig::default(), tier)
}
fn seed_big_memory(conn: &Connection, ns: &str) -> String {
let body = "sentence number that adds tokens. ".repeat(400);
let mem = make_memory(ns, &body);
db::insert(conn, &mem).unwrap()
}
#[test]
fn run_deferred_atomise_db_open_failure_is_swallowed() {
let (_conn, dir, _path) = fresh_db();
let file_as_parent = dir.path().join("not-a-dir");
std::fs::write(&file_as_parent, b"x").unwrap();
let bad_path = file_as_parent.join("child.db");
let atomiser = atomiser_with(Box::new(SeqCurator::new(vec![])), FeatureTier::Smart);
run_deferred_atomise(&bad_path, &atomiser, "mem-x", 200, "ai:test");
}
#[test]
fn run_deferred_atomise_not_found_arm() {
let (_conn, _dir, path) = fresh_db();
let atomiser = atomiser_with(Box::new(SeqCurator::new(vec![])), FeatureTier::Smart);
run_deferred_atomise(&path, &atomiser, "no-such-id", 200, "ai:test");
}
#[test]
fn run_deferred_atomise_source_too_small_arm() {
let (conn, _dir, path) = fresh_db();
let mem = make_memory("ns-small", "tiny");
let id = db::insert(&conn, &mem).unwrap();
drop(conn); let atomiser = atomiser_with(Box::new(SeqCurator::new(vec![])), FeatureTier::Smart);
run_deferred_atomise(&path, &atomiser, &id, 200, "ai:test");
}
#[test]
fn run_deferred_atomise_curator_failed_arm() {
let (conn, _dir, path) = fresh_db();
let id = seed_big_memory(&conn, "ns-curfail");
drop(conn);
let atomiser = atomiser_with(
Box::new(SeqCurator::new(vec![Err(CuratorError::LlmUnavailable(
"down".into(),
))])),
FeatureTier::Smart,
);
run_deferred_atomise(&path, &atomiser, &id, 50, "ai:test");
}
#[test]
fn run_deferred_atomise_tier_locked_arm() {
let (conn, _dir, path) = fresh_db();
let id = seed_big_memory(&conn, "ns-tier");
drop(conn);
let atomiser = atomiser_with(Box::new(SeqCurator::new(vec![])), FeatureTier::Keyword);
run_deferred_atomise(&path, &atomiser, &id, 50, "ai:test");
}
#[test]
fn run_deferred_atomise_success_arm() {
let (conn, _dir, path) = fresh_db();
let id = seed_big_memory(&conn, "ns-ok");
drop(conn);
let atomiser = atomiser_with(
Box::new(SeqCurator::new(vec![Ok(vec![
Atom {
text: "first atomic proposition".into(),
},
Atom {
text: "second atomic proposition".into(),
},
])])),
FeatureTier::Smart,
);
run_deferred_atomise(&path, &atomiser, &id, 50, "ai:test");
let conn2 = db::open(&path).unwrap();
let atom_count: i64 = conn2
.query_row(
"SELECT COUNT(*) FROM memories WHERE atom_of = ?1",
rusqlite::params![&id],
|r| r.get(0),
)
.unwrap_or(0);
assert!(atom_count >= 2, "expected atoms written, got {atom_count}");
}
fn ensure_dispatch_installed(db_path: &std::path::Path) {
if AUTO_ATOMISE_DISPATCH.get().is_some() {
return;
}
let atomiser = Arc::new(atomiser_with(
Box::new(SeqCurator::new(vec![Ok(vec![
Atom {
text: "enqueue atom one".into(),
},
Atom {
text: "enqueue atom two".into(),
},
])])),
FeatureTier::Smart,
));
let _ = install_auto_atomise_dispatch(AutoAtomisationDispatch {
db_path: db_path.to_path_buf(),
atomiser,
});
}
#[test]
fn maybe_enqueue_drives_policy_threshold_and_enqueue_arms() {
let (conn, _dir, path) = fresh_db();
ensure_dispatch_installed(&path);
seed_policy(&conn, "enqueue-ns", opt_in_policy());
let big = make_memory("enqueue-ns", &"proposition token padding. ".repeat(400));
let id = db::insert(&conn, &big).unwrap();
let outcome = maybe_enqueue_auto_atomise(&conn, &big, &id, "ai:test");
match outcome {
AutoAtomisationOutcome::Enqueued {
memory_id,
namespace,
} => {
assert_eq!(memory_id, id);
assert_eq!(namespace, "enqueue-ns");
}
other => panic!("expected Enqueued for opt-in over-threshold ns, got {other:?}"),
}
std::thread::sleep(std::time::Duration::from_millis(250));
}
#[test]
fn maybe_enqueue_under_threshold_with_policy_enabled() {
let (conn, _dir, path) = fresh_db();
ensure_dispatch_installed(&path);
seed_policy(&conn, "under-ns", opt_in_policy());
let small = make_memory("under-ns", "hi");
let id = db::insert(&conn, &small).unwrap();
let outcome = maybe_enqueue_auto_atomise(&conn, &small, &id, "ai:test");
match outcome {
AutoAtomisationOutcome::UnderThreshold { threshold, .. } => {
assert_eq!(threshold, 50, "opt_in_policy threshold");
}
other => panic!("expected UnderThreshold, got {other:?}"),
}
}
#[test]
fn run_synchronous_auto_atomise_full_atomise_path() {
let (conn, _dir, path) = fresh_db();
ensure_dispatch_installed(&path);
seed_policy(&conn, "sync-full-ns", opt_in_policy());
let big = make_memory("sync-full-ns", &"proposition padding here. ".repeat(400));
let id = db::insert(&conn, &big).unwrap();
let tag = run_synchronous_auto_atomise(&conn, &big, &id, "ai:test");
let known: &[&str] = &[
"atomised",
"skipped_source_too_small",
"skipped_already_atomised",
"failed",
"skipped_under_threshold",
"skipped_dispatch_unset",
];
assert!(known.contains(&tag), "unexpected sync tag: {tag}");
}
}