use base64::Engine;
use totalreclaw_core::blind;
use totalreclaw_core::claims::{
Claim, MemoryClaimV1, MemoryScope, MemorySource, MemoryTypeV1, MemoryVolatility,
ResolutionAction, MEMORY_CLAIM_V1_SCHEMA_VERSION,
};
use totalreclaw_core::consolidation;
use totalreclaw_core::contradiction;
use totalreclaw_core::crypto;
use totalreclaw_core::decision_log;
use totalreclaw_core::fingerprint;
use totalreclaw_core::lsh::LshHasher;
use totalreclaw_core::store as core_store;
use crate::embedding::EmbeddingProvider;
use crate::relay::RelayClient;
use crate::search;
use crate::Result;
pub async fn store_fact(
content: &str,
source: &str,
keys: &crypto::DerivedKeys,
lsh_hasher: &LshHasher,
embedding_provider: &dyn EmbeddingProvider,
relay: &RelayClient,
private_key: Option<&[u8; 32]>,
) -> Result<String> {
let content_fp = fingerprint::generate_content_fingerprint(content, &keys.dedup_key);
if let Ok(existing) =
search::search_by_fingerprint(relay, relay.wallet_address(), &content_fp).await
{
if let Some(dup) = existing {
let _ = store_tombstone(&dup.id, relay, private_key).await;
}
}
let embedding = embedding_provider.embed(content).await?;
if let Some(dup) = find_best_duplicate(content, &embedding, keys, relay).await {
let _ = store_tombstone(&dup.fact_id, relay, private_key).await;
}
let prepared = core_store::prepare_fact_with_decay_score(
content,
&keys.encryption_key,
&keys.dedup_key,
lsh_hasher,
&embedding,
1.0, source,
relay.wallet_address(),
"zeroclaw",
)
.map_err(|e| crate::Error::Crypto(e.to_string()))?;
if let Some(pk) = private_key {
relay
.submit_fact_native(&prepared.protobuf_bytes, pk)
.await?;
} else {
relay.submit_protobuf(&prepared.protobuf_bytes).await?;
}
Ok(prepared.fact_id)
}
pub async fn store_fact_with_importance(
content: &str,
source: &str,
importance: f64,
keys: &crypto::DerivedKeys,
lsh_hasher: &LshHasher,
embedding_provider: &dyn EmbeddingProvider,
relay: &RelayClient,
private_key: Option<&[u8; 32]>,
) -> Result<String> {
let content_fp = fingerprint::generate_content_fingerprint(content, &keys.dedup_key);
if let Ok(existing) =
search::search_by_fingerprint(relay, relay.wallet_address(), &content_fp).await
{
if let Some(dup) = existing {
let _ = store_tombstone(&dup.id, relay, private_key).await;
}
}
let embedding = embedding_provider.embed(content).await?;
if let Some(dup) = find_best_duplicate(content, &embedding, keys, relay).await {
let _ = store_tombstone(&dup.fact_id, relay, private_key).await;
}
let prepared = core_store::prepare_fact(
content,
&keys.encryption_key,
&keys.dedup_key,
lsh_hasher,
&embedding,
importance,
source,
relay.wallet_address(),
"zeroclaw",
)
.map_err(|e| crate::Error::Crypto(e.to_string()))?;
if let Some(pk) = private_key {
relay
.submit_fact_native(&prepared.protobuf_bytes, pk)
.await?;
} else {
relay.submit_protobuf(&prepared.protobuf_bytes).await?;
}
Ok(prepared.fact_id)
}
pub async fn store_fact_batch(
facts: &[(&str, &str)], keys: &crypto::DerivedKeys,
lsh_hasher: &LshHasher,
embedding_provider: &dyn EmbeddingProvider,
relay: &RelayClient,
private_key: &[u8; 32],
) -> Result<Vec<String>> {
let mut prepared_facts = Vec::with_capacity(facts.len());
for (content, source) in facts {
let embedding = embedding_provider.embed(content).await?;
let prepared = core_store::prepare_fact_with_decay_score(
content,
&keys.encryption_key,
&keys.dedup_key,
lsh_hasher,
&embedding,
1.0,
source,
relay.wallet_address(),
"zeroclaw",
)
.map_err(|e| crate::Error::Crypto(e.to_string()))?;
prepared_facts.push(prepared);
}
let protobuf_payloads: Vec<Vec<u8>> = prepared_facts
.iter()
.map(|p| p.protobuf_bytes.clone())
.collect();
let fact_ids: Vec<String> = prepared_facts.iter().map(|p| p.fact_id.clone()).collect();
relay
.submit_fact_batch_native(&protobuf_payloads, private_key)
.await?;
Ok(fact_ids)
}
pub async fn store_tombstone(
fact_id: &str,
relay: &RelayClient,
private_key: Option<&[u8; 32]>,
) -> Result<()> {
let protobuf = core_store::prepare_tombstone(fact_id, relay.wallet_address());
if let Some(pk) = private_key {
relay.submit_fact_native(&protobuf, pk).await?;
} else {
relay.submit_protobuf(&protobuf).await?;
}
Ok(())
}
pub async fn store_tombstone_v1(
fact_id: &str,
relay: &RelayClient,
private_key: Option<&[u8; 32]>,
) -> Result<()> {
let protobuf = core_store::prepare_tombstone_v1(fact_id, relay.wallet_address());
if let Some(pk) = private_key {
relay.submit_fact_native(&protobuf, pk).await?;
} else {
relay.submit_protobuf(&protobuf).await?;
}
Ok(())
}
#[derive(Debug, Clone)]
pub struct V1StoreInput {
pub text: String,
pub memory_type: MemoryTypeV1,
pub source: MemorySource,
pub importance: u8,
pub scope: MemoryScope,
pub volatility: MemoryVolatility,
pub reasoning: Option<String>,
}
impl V1StoreInput {
pub fn new_claim(text: impl Into<String>, importance: u8) -> Self {
Self {
text: text.into(),
memory_type: MemoryTypeV1::Claim,
source: MemorySource::UserInferred,
importance,
scope: MemoryScope::Unspecified,
volatility: MemoryVolatility::Updatable,
reasoning: None,
}
}
}
pub fn build_memory_claim_v1(input: &V1StoreInput) -> MemoryClaimV1 {
MemoryClaimV1 {
id: uuid::Uuid::now_v7().to_string(),
text: input.text.clone(),
memory_type: input.memory_type,
source: input.source,
created_at: chrono::Utc::now()
.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
schema_version: MEMORY_CLAIM_V1_SCHEMA_VERSION.to_string(),
scope: input.scope,
volatility: input.volatility,
entities: Vec::new(),
reasoning: input.reasoning.clone(),
expires_at: None,
importance: Some(input.importance),
confidence: None,
superseded_by: None,
pin_status: None,
}
}
pub async fn store_fact_v1(
input: &V1StoreInput,
keys: &crypto::DerivedKeys,
lsh_hasher: &LshHasher,
embedding_provider: &dyn EmbeddingProvider,
relay: &RelayClient,
private_key: Option<&[u8; 32]>,
) -> Result<String> {
let claim = build_memory_claim_v1(input);
let envelope_json = serde_json::to_string(&claim)
.map_err(|e| crate::Error::Crypto(format!("v1 envelope serialize: {e}")))?;
let content_fp = fingerprint::generate_content_fingerprint(&claim.text, &keys.dedup_key);
if let Ok(Some(dup)) =
search::search_by_fingerprint(relay, relay.wallet_address(), &content_fp).await
{
let _ = store_tombstone_v1(&dup.id, relay, private_key).await;
}
let embedding = embedding_provider.embed(&claim.text).await?;
if let Some(dup) = find_best_duplicate(&claim.text, &embedding, keys, relay).await {
let _ = store_tombstone_v1(&dup.fact_id, relay, private_key).await;
}
let source_tag = format!("zeroclaw_v1_{}", v1_source_to_str(input.source));
let prepared = core_store::prepare_fact_v1(
&envelope_json,
&claim.text,
&keys.encryption_key,
&keys.dedup_key,
lsh_hasher,
&embedding,
input.importance as f64,
&source_tag,
relay.wallet_address(),
"zeroclaw",
)
.map_err(|e| crate::Error::Crypto(e.to_string()))?;
if let Some(pk) = private_key {
relay.submit_fact_native(&prepared.protobuf_bytes, pk).await?;
} else {
relay.submit_protobuf(&prepared.protobuf_bytes).await?;
}
Ok(prepared.fact_id)
}
fn v1_source_to_str(src: MemorySource) -> &'static str {
match src {
MemorySource::User => "user",
MemorySource::UserInferred => "user-inferred",
MemorySource::Assistant => "assistant",
MemorySource::External => "external",
MemorySource::Derived => "derived",
}
}
async fn find_best_duplicate(
content: &str,
new_embedding: &[f32],
keys: &crypto::DerivedKeys,
relay: &RelayClient,
) -> Option<consolidation::DupMatch> {
let trapdoors = blind::generate_blind_indices(content);
if trapdoors.is_empty() {
return None;
}
let candidates = search::search_candidates(
relay,
relay.wallet_address(),
&trapdoors,
consolidation::STORE_DEDUP_MAX_CANDIDATES,
)
.await
.ok()?;
let mut existing: Vec<(String, Vec<f32>)> = Vec::with_capacity(candidates.len());
for fact in &candidates {
let enc_emb = match &fact.encrypted_embedding {
Some(e) => e,
None => continue,
};
let b64 = match crypto::decrypt(enc_emb, &keys.encryption_key) {
Ok(b) => b,
Err(_) => continue,
};
let bytes = match base64::engine::general_purpose::STANDARD.decode(&b64) {
Ok(b) => b,
Err(_) => continue,
};
let emb: Vec<f32> = bytes
.chunks_exact(4)
.map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
.collect();
existing.push((fact.id.clone(), emb));
}
consolidation::find_best_near_duplicate(
new_embedding,
&existing,
consolidation::STORE_DEDUP_COSINE_THRESHOLD,
)
}
#[derive(Debug)]
pub struct ContradictionStoreResult {
pub fact_id: String,
pub actions: Vec<ResolutionAction>,
pub decision_log_entries: Vec<decision_log::DecisionLogEntry>,
}
pub async fn store_claim_with_contradiction_check(
claim: &Claim,
claim_id: &str,
source: &str,
importance: f64,
keys: &crypto::DerivedKeys,
lsh_hasher: &LshHasher,
embedding_provider: &dyn EmbeddingProvider,
relay: &RelayClient,
private_key: Option<&[u8; 32]>,
weights: &contradiction::ResolutionWeights,
now_unix_seconds: i64,
) -> Result<ContradictionStoreResult> {
let content = &claim.text;
let content_fp = fingerprint::generate_content_fingerprint(content, &keys.dedup_key);
if let Ok(Some(dup)) =
search::search_by_fingerprint(relay, relay.wallet_address(), &content_fp).await
{
let _ = store_tombstone(&dup.id, relay, private_key).await;
}
let embedding = embedding_provider.embed(content).await?;
let candidates = fetch_contradiction_candidates(
claim,
&embedding,
keys,
relay,
)
.await;
let actions = contradiction::resolve_with_candidates(
claim,
claim_id,
&embedding,
&candidates,
weights,
contradiction::DEFAULT_LOWER_THRESHOLD,
contradiction::DEFAULT_UPPER_THRESHOLD,
now_unix_seconds,
totalreclaw_core::claims::TIE_ZONE_SCORE_TOLERANCE,
);
let existing_claims_json: std::collections::HashMap<String, String> = candidates
.iter()
.filter_map(|(c, id, _)| {
serde_json::to_string(c).ok().map(|json| (id.clone(), json))
})
.collect();
let new_claim_json = serde_json::to_string(claim).unwrap_or_default();
let decision_log_entries = contradiction::build_decision_log_entries(
&actions,
&new_claim_json,
&existing_claims_json,
"active",
now_unix_seconds,
);
let mut should_store = true;
for action in &actions {
match action {
ResolutionAction::SupersedeExisting { existing_id, .. } => {
let _ = store_tombstone(existing_id, relay, private_key).await;
}
ResolutionAction::SkipNew { .. } => {
should_store = false;
break;
}
ResolutionAction::TieLeaveBoth { .. } | ResolutionAction::NoContradiction => {
}
}
}
if !should_store {
return Ok(ContradictionStoreResult {
fact_id: claim_id.to_string(),
actions,
decision_log_entries,
});
}
let prepared = core_store::prepare_fact(
content,
&keys.encryption_key,
&keys.dedup_key,
lsh_hasher,
&embedding,
importance,
source,
relay.wallet_address(),
"zeroclaw",
)
.map_err(|e| crate::Error::Crypto(e.to_string()))?;
if let Some(pk) = private_key {
relay
.submit_fact_native(&prepared.protobuf_bytes, pk)
.await?;
} else {
relay.submit_protobuf(&prepared.protobuf_bytes).await?;
}
Ok(ContradictionStoreResult {
fact_id: prepared.fact_id,
actions,
decision_log_entries,
})
}
async fn fetch_contradiction_candidates(
new_claim: &Claim,
_new_embedding: &[f32],
keys: &crypto::DerivedKeys,
relay: &RelayClient,
) -> Vec<(Claim, String, Vec<f32>)> {
if new_claim.entities.is_empty() {
return Vec::new();
}
let mut trapdoors = Vec::new();
for entity in &new_claim.entities {
trapdoors.extend(blind::generate_blind_indices(&entity.name));
}
if trapdoors.is_empty() {
return Vec::new();
}
let facts = match search::search_candidates(
relay,
relay.wallet_address(),
&trapdoors,
decision_log::CONTRADICTION_CANDIDATE_CAP,
)
.await
{
Ok(f) => f,
Err(_) => return Vec::new(),
};
let mut candidates = Vec::new();
for fact in &facts {
let blob_b64 = match search::hex_blob_to_base64(&fact.encrypted_blob) {
Some(b) => b,
None => continue,
};
let decrypted = match crypto::decrypt(&blob_b64, &keys.encryption_key) {
Ok(t) => t,
Err(_) => continue,
};
let claim: Claim = if let Ok(c) = serde_json::from_str(&decrypted) {
c
} else if let Ok(obj) = serde_json::from_str::<serde_json::Value>(&decrypted) {
let text = obj.get("t").and_then(|v| v.as_str()).unwrap_or(&decrypted);
match serde_json::from_str(text) {
Ok(c) => c,
Err(_) => continue, }
} else {
continue;
};
let emb = fact
.encrypted_embedding
.as_deref()
.and_then(|e| crypto::decrypt(e, &keys.encryption_key).ok())
.and_then(|b64| {
base64::engine::general_purpose::STANDARD
.decode(&b64)
.ok()
})
.map(|bytes| {
bytes
.chunks_exact(4)
.map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
.collect::<Vec<f32>>()
})
.unwrap_or_default();
candidates.push((claim, fact.id.clone(), emb));
}
candidates
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_store_dedup_threshold_matches_core() {
assert!(
(consolidation::STORE_DEDUP_COSINE_THRESHOLD - 0.85).abs() < 1e-10
);
}
#[test]
fn test_store_dedup_fetch_limit_matches_core() {
assert_eq!(consolidation::STORE_DEDUP_MAX_CANDIDATES, 50);
}
#[test]
fn test_find_best_near_duplicate_selects_highest() {
let new_emb: Vec<f32> = vec![1.0, 0.0, 0.0];
let existing = vec![
("id_a".to_string(), vec![0.9, 0.1, 0.0]), ("id_b".to_string(), vec![0.99, 0.01, 0.0]), ];
let result =
consolidation::find_best_near_duplicate(&new_emb, &existing, 0.5);
assert!(result.is_some());
let dup = result.unwrap();
assert_eq!(dup.fact_id, "id_b");
assert!(dup.similarity > 0.99);
}
#[test]
fn test_find_best_near_duplicate_none_below_threshold() {
let new_emb: Vec<f32> = vec![1.0, 0.0, 0.0];
let existing = vec![
("id_a".to_string(), vec![0.0, 1.0, 0.0]), ];
let result = consolidation::find_best_near_duplicate(
&new_emb,
&existing,
consolidation::STORE_DEDUP_COSINE_THRESHOLD,
);
assert!(result.is_none());
}
#[test]
fn test_importance_normalization() {
let importance: f64 = 8.0;
let decay_score = (importance / 10.0).clamp(0.0, 1.0);
assert!((decay_score - 0.8).abs() < 1e-10);
assert!((0.0_f64 / 10.0).clamp(0.0, 1.0) == 0.0);
assert!((10.0_f64 / 10.0).clamp(0.0, 1.0) == 1.0);
assert!((15.0_f64 / 10.0).clamp(0.0, 1.0) == 1.0); }
#[test]
fn test_core_claim_types_accessible() {
use totalreclaw_core::claims::{
Claim, ClaimCategory, ClaimStatus, EntityRef, EntityType,
};
let claim = Claim {
text: "Pedro uses ZeroClaw".to_string(),
category: ClaimCategory::Fact,
confidence: 0.9,
importance: 8,
corroboration_count: 1,
source_agent: "zeroclaw".to_string(),
source_conversation: None,
extracted_at: Some("2026-04-16T12:00:00Z".to_string()),
entities: vec![EntityRef {
name: "Pedro".to_string(),
entity_type: EntityType::Person,
role: Some("user".to_string()),
}],
supersedes: None,
superseded_by: None,
valid_from: None,
status: ClaimStatus::Active,
};
assert_eq!(claim.category, ClaimCategory::Fact);
assert!(!totalreclaw_core::claims::is_pinned_claim(&claim));
}
#[test]
fn test_pinned_claim_detection() {
use totalreclaw_core::claims::{Claim, ClaimCategory, ClaimStatus};
let mut claim = Claim {
text: "pinned fact".to_string(),
category: ClaimCategory::Fact,
confidence: 1.0,
importance: 10,
corroboration_count: 1,
source_agent: "totalreclaw_remember".to_string(),
source_conversation: None,
extracted_at: None,
entities: vec![],
supersedes: None,
superseded_by: None,
valid_from: None,
status: ClaimStatus::Active,
};
assert!(!totalreclaw_core::claims::is_pinned_claim(&claim));
claim.status = ClaimStatus::Pinned;
assert!(totalreclaw_core::claims::is_pinned_claim(&claim));
}
#[test]
fn test_resolve_with_candidates_no_entities() {
use totalreclaw_core::claims::{Claim, ClaimCategory, ClaimStatus};
let claim = Claim {
text: "no entities here".to_string(),
category: ClaimCategory::Fact,
confidence: 0.9,
importance: 7,
corroboration_count: 1,
source_agent: "zeroclaw".to_string(),
source_conversation: None,
extracted_at: None,
entities: vec![], supersedes: None,
superseded_by: None,
valid_from: None,
status: ClaimStatus::Active,
};
let emb = vec![1.0_f32; 3];
let weights = contradiction::default_weights();
let actions = contradiction::resolve_with_candidates(
&claim,
"new_id",
&emb,
&[], &weights,
contradiction::DEFAULT_LOWER_THRESHOLD,
contradiction::DEFAULT_UPPER_THRESHOLD,
1_776_384_000,
totalreclaw_core::claims::TIE_ZONE_SCORE_TOLERANCE,
);
assert!(actions.is_empty());
}
#[test]
fn test_decision_log_entry_round_trip() {
let entry = decision_log::DecisionLogEntry {
ts: 1_776_384_000,
entity_id: "ent123".to_string(),
new_claim_id: "0xnew".to_string(),
existing_claim_id: "0xold".to_string(),
similarity: 0.72,
action: "supersede_existing".to_string(),
reason: Some("new_wins".to_string()),
winner_score: Some(0.73),
loser_score: Some(0.40),
winner_components: None,
loser_components: None,
loser_claim_json: None,
mode: "active".to_string(),
};
let json = serde_json::to_string(&entry).unwrap();
let back: decision_log::DecisionLogEntry = serde_json::from_str(&json).unwrap();
assert_eq!(entry, back);
}
#[test]
fn test_contradiction_candidate_cap() {
assert_eq!(decision_log::CONTRADICTION_CANDIDATE_CAP, 20);
}
#[test]
fn test_default_weights() {
let w = contradiction::default_weights();
let sum = w.confidence + w.corroboration + w.recency + w.validation;
assert!((sum - 1.0).abs() < 1e-10, "weights should sum to 1.0");
}
#[test]
fn test_tie_zone_tolerance() {
assert!(
(totalreclaw_core::claims::TIE_ZONE_SCORE_TOLERANCE - 0.01).abs() < 1e-10,
"tie zone tolerance should be 0.01"
);
}
}