pub mod curator;
use crate::models::ConfidenceSource;
use std::sync::Arc;
use chrono::Utc;
use rusqlite::{Connection, params};
use crate::identity::keypair::AgentKeypair;
use crate::models::{Memory, MemoryKind, MemoryLinkRelation, SourceSpan};
use crate::signed_events::{SignedEvent, append_signed_event, payload_hash};
use crate::storage as db;
use curator::Curator;
#[derive(Debug, Clone)]
pub struct AtomiserConfig {
pub default_max_atom_tokens: u32,
pub min_atoms_per_source: usize,
pub max_atoms_per_source: usize,
pub curator_max_retries: u32,
pub sync_curator_max_retries: u32,
}
impl Default for AtomiserConfig {
fn default() -> Self {
Self {
default_max_atom_tokens: 200,
min_atoms_per_source: 2,
max_atoms_per_source: 10,
curator_max_retries: 3,
sync_curator_max_retries: 1,
}
}
}
#[derive(Debug, Clone)]
pub struct AtomiseResult {
pub source_id: String,
pub atom_ids: Vec<String>,
pub atom_count: usize,
pub archived_at: String,
}
#[derive(Debug)]
pub enum AtomiseError {
NotFound,
AlreadyAtomised {
source_id: String,
existing_atom_ids: Vec<String>,
},
TierLocked,
CuratorFailed(String),
SourceTooSmall,
GovernanceRefused(String),
SignerError(String),
DbError(String),
DepthExceeded { attempted: u32, cap: u32 },
}
impl std::fmt::Display for AtomiseError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::NotFound => f.write_str("atomise: source memory not found"),
Self::AlreadyAtomised {
source_id,
existing_atom_ids,
} => write!(
f,
"atomise: source '{source_id}' already atomised into {} atoms",
existing_atom_ids.len()
),
Self::TierLocked => f.write_str(
"atomise: feature tier is 'keyword' — atomisation requires curator LLM (smart/autonomous)",
),
Self::CuratorFailed(d) => write!(f, "atomise: curator failed: {d}"),
Self::SourceTooSmall => f.write_str(
"atomise: source body already at or under max_atom_tokens — no decomposition possible",
),
Self::GovernanceRefused(d) => write!(f, "atomise: governance refused: {d}"),
Self::SignerError(d) => write!(f, "atomise: signer error: {d}"),
Self::DbError(d) => write!(f, "atomise: db error: {d}"),
Self::DepthExceeded { attempted, cap } => write!(
f,
"ATOMISATION_DEPTH_EXCEEDED: atomisation depth {attempted} would exceed \
compiled max_atomisation_depth {cap}"
),
}
}
}
impl std::error::Error for AtomiseError {}
pub const MAX_ATOMISATION_DEPTH: u32 = 3;
pub const MIN_ATOM_TOKENS: u32 = 50;
pub const MAX_ATOM_TOKENS: u32 = 1000;
pub const DEFAULT_ATOM_TOKENS: u32 = 200;
thread_local! {
static ATOMISATION_DEPTH: std::cell::Cell<u32> = const { std::cell::Cell::new(0) };
}
#[must_use]
pub fn current_atomisation_depth() -> u32 {
ATOMISATION_DEPTH.with(std::cell::Cell::get)
}
pub struct AtomisationDepthGuard {
prior: u32,
}
impl Drop for AtomisationDepthGuard {
fn drop(&mut self) {
ATOMISATION_DEPTH.with(|cell| cell.set(self.prior));
}
}
#[must_use]
pub fn enter_atomisation_pass() -> (u32, AtomisationDepthGuard) {
let (prior, new) = ATOMISATION_DEPTH.with(|cell| {
let prior = cell.get();
let new = prior.saturating_add(1);
cell.set(new);
(prior, new)
});
(new, AtomisationDepthGuard { prior })
}
pub struct Atomiser {
curator: Box<dyn Curator>,
keypair: Option<Arc<AgentKeypair>>,
config: AtomiserConfig,
tier: crate::config::FeatureTier,
curator_model: String,
}
impl Atomiser {
pub fn new(
curator: Box<dyn Curator>,
keypair: Option<Arc<AgentKeypair>>,
config: AtomiserConfig,
tier: crate::config::FeatureTier,
) -> Self {
Self {
curator,
keypair,
config,
tier,
curator_model: "unknown".to_string(),
}
}
#[must_use]
pub fn with_curator_model(mut self, curator_model: impl Into<String>) -> Self {
let resolved = curator_model.into();
if !resolved.trim().is_empty() {
self.curator_model = resolved;
}
self
}
#[must_use]
pub fn curator_model(&self) -> &str {
&self.curator_model
}
#[must_use]
pub fn sync_curator_max_retries(&self) -> u32 {
self.config.sync_curator_max_retries
}
pub async fn atomise(
&self,
conn: &Connection,
source_id: &str,
max_atom_tokens: u32,
force: bool,
calling_agent_id: &str,
) -> Result<AtomiseResult, AtomiseError> {
self.atomise_sync(conn, source_id, max_atom_tokens, force, calling_agent_id)
}
pub fn atomise_sync(
&self,
conn: &Connection,
source_id: &str,
max_atom_tokens: u32,
force: bool,
calling_agent_id: &str,
) -> Result<AtomiseResult, AtomiseError> {
self.atomise_sync_with_retries(
conn,
source_id,
max_atom_tokens,
force,
calling_agent_id,
self.config.curator_max_retries,
)
}
pub fn atomise_sync_with_retries(
&self,
conn: &Connection,
source_id: &str,
max_atom_tokens: u32,
force: bool,
calling_agent_id: &str,
max_retries: u32,
) -> Result<AtomiseResult, AtomiseError> {
let (_depth, _depth_guard) = enter_atomisation_pass();
if _depth > MAX_ATOMISATION_DEPTH {
tracing::warn!(
target: "atomisation",
source_id = %source_id,
attempted = _depth,
cap = MAX_ATOMISATION_DEPTH,
"atomisation.depth_exceeded",
);
return Err(AtomiseError::DepthExceeded {
attempted: _depth,
cap: MAX_ATOMISATION_DEPTH,
});
}
if self.tier == crate::config::FeatureTier::Keyword {
return Err(AtomiseError::TierLocked);
}
let budget = if max_atom_tokens == 0 {
self.config.default_max_atom_tokens
} else {
max_atom_tokens
};
let source = db::get(conn, source_id)
.map_err(|e| AtomiseError::DbError(e.to_string()))?
.ok_or(AtomiseError::NotFound)?;
if !force {
if let Some(atomised_into) = read_atomised_into(conn, source_id)
.map_err(|e| AtomiseError::DbError(e.to_string()))?
{
if atomised_into > 0 {
let existing = list_atoms_of(conn, source_id)
.map_err(|e| AtomiseError::DbError(e.to_string()))?;
return Err(AtomiseError::AlreadyAtomised {
source_id: source_id.to_string(),
existing_atom_ids: existing,
});
}
}
}
let source_tokens = db::count_tokens_cl100k(&source.content);
if source_tokens <= budget as usize {
return Err(AtomiseError::SourceTooSmall);
}
let atoms = self
.curator
.decompose(&source.content, budget, max_retries)
.map_err(|e| match e {
curator::CuratorError::LlmUnavailable(d)
| curator::CuratorError::MalformedResponse(d) => AtomiseError::CuratorFailed(d),
})?;
if atoms.is_empty() {
return Err(AtomiseError::SourceTooSmall);
}
let atom_count = atoms.len().min(self.config.max_atoms_per_source);
if atom_count < self.config.min_atoms_per_source {
return Err(AtomiseError::SourceTooSmall);
}
let atoms: Vec<curator::Atom> = atoms.into_iter().take(atom_count).collect();
let mut atom_ids: Vec<String> = Vec::with_capacity(atom_count);
let mut search_cursor: usize = 0;
for (idx, atom) in atoms.iter().enumerate() {
let span = compute_atom_span(&source.content, &atom.text, &mut search_cursor);
let atom_id = write_atom(
conn,
&source,
atom,
span,
calling_agent_id,
self.keypair.as_deref(),
)
.map_err(|e| {
if let Some(refusal) = e.downcast_ref::<crate::storage::GovernanceRefusal>() {
AtomiseError::GovernanceRefused(format!("atom[{idx}]: {}", refusal.reason))
} else {
AtomiseError::DbError(format!("atom[{idx}]: {e}"))
}
})?;
atom_ids.push(atom_id);
}
let archived_at = Utc::now().to_rfc3339();
let atom_count_i64 = i64::try_from(atom_count).unwrap_or(i64::MAX);
archive_source(conn, source_id, atom_count_i64, &archived_at)
.map_err(|e| AtomiseError::DbError(e.to_string()))?;
emit_atomisation_complete_event(
conn,
source_id,
&atom_ids,
atom_count,
calling_agent_id,
&archived_at,
self.keypair.as_deref(),
&self.curator_model,
)
.map_err(|e| AtomiseError::DbError(e.to_string()))?;
Ok(AtomiseResult {
source_id: source_id.to_string(),
atom_ids,
atom_count,
archived_at,
})
}
}
fn read_atomised_into(conn: &Connection, id: &str) -> anyhow::Result<Option<i64>> {
match conn.query_row(
"SELECT atomised_into FROM memories WHERE id = ?1",
params![id],
|r| r.get::<_, Option<i64>>(0),
) {
Ok(v) => Ok(v),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
}
fn list_atoms_of(conn: &Connection, source_id: &str) -> anyhow::Result<Vec<String>> {
let mut stmt =
conn.prepare("SELECT id FROM memories WHERE atom_of = ?1 ORDER BY created_at ASC, id ASC")?;
let rows = stmt.query_map(params![source_id], |r| r.get::<_, String>(0))?;
rows.collect::<rusqlite::Result<Vec<_>>>()
.map_err(Into::into)
}
fn write_atom(
conn: &Connection,
source: &Memory,
atom: &curator::Atom,
span: Option<SourceSpan>,
calling_agent_id: &str,
keypair: Option<&AgentKeypair>,
) -> anyhow::Result<String> {
let now = Utc::now().to_rfc3339();
let atom_id = uuid::Uuid::new_v4().to_string();
let prefix: String = atom
.text
.chars()
.take(50)
.collect::<String>()
.trim()
.to_string();
let title = if prefix.is_empty() {
format!("[atom] {} #{}", source.title, &atom_id[..8])
} else {
format!("[atom] {} ({})", prefix, &atom_id[..8])
};
let mut metadata = match source.metadata.clone() {
serde_json::Value::Object(map) => map,
_ => serde_json::Map::new(),
};
metadata.insert(
"agent_id".to_string(),
serde_json::Value::String(calling_agent_id.to_string()),
);
metadata.insert(
"atom_source_id".to_string(),
serde_json::Value::String(source.id.clone()),
);
let mem = Memory {
id: atom_id.clone(),
tier: source.tier.clone(),
namespace: source.namespace.clone(),
title,
content: atom.text.clone(),
tags: source.tags.clone(),
priority: source.priority,
confidence: source.confidence,
source: "atomiser".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now,
last_accessed_at: None,
expires_at: None,
metadata: serde_json::Value::Object(metadata),
reflection_depth: source.reflection_depth,
memory_kind: MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: source.citations.clone(),
source_uri: Some(format!("doc:{}", source.id)),
source_span: span,
confidence_source: ConfidenceSource::CuratorDerived,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
let actual_id = db::insert(conn, &mem)?;
conn.execute(
"UPDATE memories SET atom_of = ?1 WHERE id = ?2",
params![source.id, actual_id],
)?;
db::create_link_signed(
conn,
&actual_id,
&source.id,
MemoryLinkRelation::DerivesFrom.as_str(),
keypair,
)?;
Ok(actual_id)
}
fn archive_source(
conn: &Connection,
source_id: &str,
atom_count: i64,
archived_at: &str,
) -> anyhow::Result<()> {
conn.execute_batch(crate::storage::connection::SQL_BEGIN_IMMEDIATE)?;
let result = (|| -> anyhow::Result<()> {
let existing_metadata_str: String = conn
.query_row(
"SELECT metadata FROM memories WHERE id = ?1",
params![source_id],
|r| {
r.get::<_, Option<String>>(0)
.map(|o| o.unwrap_or_else(|| "{}".to_string()))
},
)
.unwrap_or_else(|_| "{}".to_string());
let mut meta: serde_json::Map<String, serde_json::Value> =
serde_json::from_str(&existing_metadata_str).unwrap_or_default();
meta.insert(
crate::models::field_names::ATOMISATION_ARCHIVED_AT.to_string(),
serde_json::Value::String(archived_at.to_string()),
);
let merged = serde_json::Value::Object(meta).to_string();
conn.execute(
"UPDATE memories SET atomised_into = ?1, metadata = ?2, updated_at = ?3 \
WHERE id = ?4",
params![atom_count, merged, archived_at, source_id],
)?;
Ok(())
})();
match result {
Ok(()) => {
conn.execute_batch(crate::storage::connection::SQL_COMMIT)?;
Ok(())
}
Err(e) => {
let _ = conn.execute_batch(crate::storage::connection::SQL_ROLLBACK);
Err(e)
}
}
}
fn emit_atomisation_complete_event(
conn: &Connection,
source_id: &str,
atom_ids: &[String],
atom_count: usize,
calling_agent_id: &str,
archived_at: &str,
keypair: Option<&AgentKeypair>,
curator_model: &str,
) -> anyhow::Result<()> {
let payload = serde_json::json!({
"event_type": "atomisation_complete",
"source_id": source_id,
"atom_ids": atom_ids,
(crate::models::field_names::ATOM_COUNT): atom_count,
"calling_agent_id": calling_agent_id,
"atomisation_timestamp": archived_at,
"curator_model": curator_model,
});
let bytes = serde_json::to_vec(&payload)?;
let (signature, attest_level) = if let Some(kp) = keypair.filter(|k| k.can_sign()) {
let signing = kp.private.as_ref().expect("can_sign() checked");
use ed25519_dalek::Signer;
let sig = signing.sign(&bytes);
(
Some(sig.to_bytes().to_vec()),
crate::models::AttestLevel::SelfSigned.as_str(),
)
} else {
(None, crate::models::AttestLevel::Unsigned.as_str())
};
let event = SignedEvent {
id: uuid::Uuid::new_v4().to_string(),
agent_id: calling_agent_id.to_string(),
event_type: crate::signed_events::event_types::ATOMISATION_COMPLETE.to_string(),
payload_hash: payload_hash(&bytes),
signature,
attest_level: attest_level.to_string(),
timestamp: Utc::now().to_rfc3339(),
..SignedEvent::default()
};
append_signed_event(conn, &event)?;
Ok(())
}
fn compute_atom_span(source_body: &str, atom_text: &str, cursor: &mut usize) -> Option<SourceSpan> {
let needle = atom_text.trim();
if needle.is_empty() {
return None;
}
let cursor_aligned = floor_char_boundary(source_body, *cursor);
let start = if cursor_aligned < source_body.len() {
source_body[cursor_aligned..]
.find(needle)
.map(|off| cursor_aligned + off)
} else {
None
};
let start = start.or_else(|| source_body.find(needle))?;
let end = (start + needle.len()).min(source_body.len());
*cursor = source_body[start..]
.char_indices()
.nth(1)
.map_or(source_body.len(), |(off, _)| start + off);
Some(SourceSpan { start, end })
}
fn floor_char_boundary(s: &str, index: usize) -> usize {
if index >= s.len() {
return s.len();
}
let mut i = index;
while i > 0 && !s.is_char_boundary(i) {
i -= 1;
}
i
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn config_defaults_match_brief() {
let c = AtomiserConfig::default();
assert_eq!(c.default_max_atom_tokens, 200);
assert_eq!(c.min_atoms_per_source, 2);
assert_eq!(c.max_atoms_per_source, 10);
assert_eq!(c.curator_max_retries, 3);
}
#[test]
fn atomise_error_display_shapes() {
for e in [
AtomiseError::NotFound,
AtomiseError::AlreadyAtomised {
source_id: "src".into(),
existing_atom_ids: vec!["a".into(), "b".into()],
},
AtomiseError::TierLocked,
AtomiseError::CuratorFailed("bad json".into()),
AtomiseError::SourceTooSmall,
AtomiseError::GovernanceRefused("policy".into()),
AtomiseError::SignerError("no key".into()),
AtomiseError::DbError("io".into()),
AtomiseError::DepthExceeded {
attempted: 4,
cap: MAX_ATOMISATION_DEPTH,
},
] {
let s = format!("{e}");
assert!(!s.is_empty());
}
}
#[test]
fn max_atomisation_depth_matches_substrate_recursive_primitive_cap() {
assert_eq!(MAX_ATOMISATION_DEPTH, 3);
assert_eq!(
MAX_ATOMISATION_DEPTH,
crate::synthesis::MAX_SYNTHESIS_DEPTH,
"atomisation cap must match synthesis cap"
);
}
#[test]
fn atomisation_depth_guard_increments_and_decrements() {
assert_eq!(current_atomisation_depth(), 0);
{
let (d1, _g1) = enter_atomisation_pass();
assert_eq!(d1, 1);
assert_eq!(current_atomisation_depth(), 1);
{
let (d2, _g2) = enter_atomisation_pass();
assert_eq!(d2, 2);
assert_eq!(current_atomisation_depth(), 2);
{
let (d3, _g3) = enter_atomisation_pass();
assert_eq!(d3, 3);
{
let (d4, _g4) = enter_atomisation_pass();
assert_eq!(d4, 4);
assert!(d4 > MAX_ATOMISATION_DEPTH, "depth=4 exceeds cap=3");
}
assert_eq!(current_atomisation_depth(), 3);
}
assert_eq!(current_atomisation_depth(), 2);
}
assert_eq!(current_atomisation_depth(), 1);
}
assert_eq!(current_atomisation_depth(), 0);
}
#[test]
fn compute_atom_span_paraphrase_fallback_returns_none() {
let body = "The deployment manifest pins the image digest explicitly.";
let mut cursor = 0_usize;
let got = compute_atom_span(
body,
"Curator paraphrased this sentence entirely.",
&mut cursor,
);
assert!(
got.is_none(),
"paraphrase miss must return None, got {got:?}"
);
assert_eq!(cursor, 0);
}
#[test]
fn compute_atom_span_multibyte_utf8_stays_on_char_boundary() {
let body = "Café 中文 🦀 statement that follows the emoji.";
let mut cursor = 0_usize;
let span = compute_atom_span(body, "中文", &mut cursor)
.expect("multi-byte needle should be found verbatim");
assert!(body.is_char_boundary(span.start));
assert!(body.is_char_boundary(span.end));
assert_eq!(&body[span.start..span.end], "中文");
assert!(cursor > span.start);
assert!(
body.is_char_boundary(cursor),
"cursor={cursor} mid-codepoint"
);
let span2 = compute_atom_span(body, "🦀", &mut cursor)
.expect("emoji needle should be found after CJK");
assert!(body.is_char_boundary(span2.start));
assert!(body.is_char_boundary(span2.end));
assert_eq!(&body[span2.start..span2.end], "🦀");
let span3 = compute_atom_span(body, "statement that follows the emoji.", &mut cursor)
.expect("ascii needle should still be found");
assert_eq!(
&body[span3.start..span3.end],
"statement that follows the emoji."
);
}
#[test]
fn compute_atom_span_cursor_clamps_to_body_length() {
let body = "short body";
let mut cursor = 1_000_usize;
let span = compute_atom_span(body, "body", &mut cursor).expect("fallback whole-body");
assert_eq!(&body[span.start..span.end], "body");
}
#[test]
fn compute_atom_span_stale_cursor_realigns_to_boundary() {
let body = "Café statement";
let mut cursor = 4_usize;
let _ = compute_atom_span(body, "statement", &mut cursor);
}
#[test]
fn floor_char_boundary_walks_back_to_codepoint_start() {
let s = "Café 中文";
assert_eq!(floor_char_boundary(s, 0), 0);
assert_eq!(floor_char_boundary(s, 4), 3);
assert_eq!(floor_char_boundary(s, 9999), s.len());
}
}