1use base64::Engine;
18
19use totalreclaw_core::blind;
20use totalreclaw_core::claims::{
21 Claim, MemoryClaimV1, MemoryScope, MemorySource, MemoryTypeV1, MemoryVolatility,
22 ResolutionAction, MEMORY_CLAIM_V1_SCHEMA_VERSION,
23};
24use totalreclaw_core::consolidation;
25use totalreclaw_core::contradiction;
26use totalreclaw_core::crypto;
27use totalreclaw_core::decision_log;
28use totalreclaw_core::fingerprint;
29use totalreclaw_core::lsh::LshHasher;
30use totalreclaw_core::store as core_store;
31
32use crate::embedding::EmbeddingProvider;
33use crate::relay::RelayClient;
34use crate::search;
35use crate::Result;
36
37pub async fn store_fact(
47 content: &str,
48 source: &str,
49 keys: &crypto::DerivedKeys,
50 lsh_hasher: &LshHasher,
51 embedding_provider: &dyn EmbeddingProvider,
52 relay: &RelayClient,
53 private_key: Option<&[u8; 32]>,
54) -> Result<String> {
55 let content_fp = fingerprint::generate_content_fingerprint(content, &keys.dedup_key);
57
58 if let Ok(existing) =
61 search::search_by_fingerprint(relay, relay.wallet_address(), &content_fp).await
62 {
63 if let Some(dup) = existing {
64 let _ = store_tombstone(&dup.id, relay, private_key).await;
66 }
67 }
68
69 let embedding = embedding_provider.embed(content).await?;
71
72 if let Some(dup) = find_best_duplicate(content, &embedding, keys, relay).await {
74 let _ = store_tombstone(&dup.fact_id, relay, private_key).await;
76 }
77
78 let prepared = core_store::prepare_fact_with_decay_score(
80 content,
81 &keys.encryption_key,
82 &keys.dedup_key,
83 lsh_hasher,
84 &embedding,
85 1.0, source,
87 relay.wallet_address(),
88 "zeroclaw",
89 )
90 .map_err(|e| crate::Error::Crypto(e.to_string()))?;
91
92 if let Some(pk) = private_key {
94 relay
95 .submit_fact_native(&prepared.protobuf_bytes, pk)
96 .await?;
97 } else {
98 relay.submit_protobuf(&prepared.protobuf_bytes).await?;
99 }
100
101 Ok(prepared.fact_id)
102}
103
104pub async fn store_fact_with_importance(
109 content: &str,
110 source: &str,
111 importance: f64,
112 keys: &crypto::DerivedKeys,
113 lsh_hasher: &LshHasher,
114 embedding_provider: &dyn EmbeddingProvider,
115 relay: &RelayClient,
116 private_key: Option<&[u8; 32]>,
117) -> Result<String> {
118 let content_fp = fingerprint::generate_content_fingerprint(content, &keys.dedup_key);
120
121 if let Ok(existing) =
123 search::search_by_fingerprint(relay, relay.wallet_address(), &content_fp).await
124 {
125 if let Some(dup) = existing {
126 let _ = store_tombstone(&dup.id, relay, private_key).await;
127 }
128 }
129
130 let embedding = embedding_provider.embed(content).await?;
132
133 if let Some(dup) = find_best_duplicate(content, &embedding, keys, relay).await {
135 let _ = store_tombstone(&dup.fact_id, relay, private_key).await;
136 }
137
138 let prepared = core_store::prepare_fact(
140 content,
141 &keys.encryption_key,
142 &keys.dedup_key,
143 lsh_hasher,
144 &embedding,
145 importance,
146 source,
147 relay.wallet_address(),
148 "zeroclaw",
149 )
150 .map_err(|e| crate::Error::Crypto(e.to_string()))?;
151
152 if let Some(pk) = private_key {
154 relay
155 .submit_fact_native(&prepared.protobuf_bytes, pk)
156 .await?;
157 } else {
158 relay.submit_protobuf(&prepared.protobuf_bytes).await?;
159 }
160
161 Ok(prepared.fact_id)
162}
163
164pub async fn store_fact_batch(
169 facts: &[(&str, &str)], keys: &crypto::DerivedKeys,
171 lsh_hasher: &LshHasher,
172 embedding_provider: &dyn EmbeddingProvider,
173 relay: &RelayClient,
174 private_key: &[u8; 32],
175) -> Result<Vec<String>> {
176 let mut prepared_facts = Vec::with_capacity(facts.len());
177
178 for (content, source) in facts {
179 let embedding = embedding_provider.embed(content).await?;
181
182 let prepared = core_store::prepare_fact_with_decay_score(
184 content,
185 &keys.encryption_key,
186 &keys.dedup_key,
187 lsh_hasher,
188 &embedding,
189 1.0,
190 source,
191 relay.wallet_address(),
192 "zeroclaw",
193 )
194 .map_err(|e| crate::Error::Crypto(e.to_string()))?;
195
196 prepared_facts.push(prepared);
197 }
198
199 let protobuf_payloads: Vec<Vec<u8>> = prepared_facts
201 .iter()
202 .map(|p| p.protobuf_bytes.clone())
203 .collect();
204 let fact_ids: Vec<String> = prepared_facts.iter().map(|p| p.fact_id.clone()).collect();
205
206 relay
208 .submit_fact_batch_native(&protobuf_payloads, private_key)
209 .await?;
210
211 Ok(fact_ids)
212}
213
214pub async fn store_tombstone(
219 fact_id: &str,
220 relay: &RelayClient,
221 private_key: Option<&[u8; 32]>,
222) -> Result<()> {
223 let protobuf = core_store::prepare_tombstone(fact_id, relay.wallet_address());
224
225 if let Some(pk) = private_key {
226 relay.submit_fact_native(&protobuf, pk).await?;
227 } else {
228 relay.submit_protobuf(&protobuf).await?;
229 }
230 Ok(())
231}
232
233pub async fn store_tombstone_v1(
239 fact_id: &str,
240 relay: &RelayClient,
241 private_key: Option<&[u8; 32]>,
242) -> Result<()> {
243 let protobuf = core_store::prepare_tombstone_v1(fact_id, relay.wallet_address());
244
245 if let Some(pk) = private_key {
246 relay.submit_fact_native(&protobuf, pk).await?;
247 } else {
248 relay.submit_protobuf(&protobuf).await?;
249 }
250 Ok(())
251}
252
253#[derive(Debug, Clone)]
264pub struct V1StoreInput {
265 pub text: String,
267 pub memory_type: MemoryTypeV1,
270 pub source: MemorySource,
272 pub importance: u8,
274 pub scope: MemoryScope,
276 pub volatility: MemoryVolatility,
278 pub reasoning: Option<String>,
280}
281
282impl V1StoreInput {
283 pub fn new_claim(text: impl Into<String>, importance: u8) -> Self {
285 Self {
286 text: text.into(),
287 memory_type: MemoryTypeV1::Claim,
288 source: MemorySource::UserInferred,
289 importance,
290 scope: MemoryScope::Unspecified,
291 volatility: MemoryVolatility::Updatable,
292 reasoning: None,
293 }
294 }
295}
296
297pub fn build_memory_claim_v1(input: &V1StoreInput) -> MemoryClaimV1 {
303 MemoryClaimV1 {
304 id: uuid::Uuid::now_v7().to_string(),
305 text: input.text.clone(),
306 memory_type: input.memory_type,
307 source: input.source,
308 created_at: chrono::Utc::now()
309 .to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
310 schema_version: MEMORY_CLAIM_V1_SCHEMA_VERSION.to_string(),
311 scope: input.scope,
312 volatility: input.volatility,
313 entities: Vec::new(),
314 reasoning: input.reasoning.clone(),
315 expires_at: None,
316 importance: Some(input.importance),
317 confidence: None,
318 superseded_by: None,
319 }
320}
321
322pub async fn store_fact_v1(
336 input: &V1StoreInput,
337 keys: &crypto::DerivedKeys,
338 lsh_hasher: &LshHasher,
339 embedding_provider: &dyn EmbeddingProvider,
340 relay: &RelayClient,
341 private_key: Option<&[u8; 32]>,
342) -> Result<String> {
343 let claim = build_memory_claim_v1(input);
345
346 let envelope_json = serde_json::to_string(&claim)
348 .map_err(|e| crate::Error::Crypto(format!("v1 envelope serialize: {e}")))?;
349
350 let content_fp = fingerprint::generate_content_fingerprint(&claim.text, &keys.dedup_key);
352 if let Ok(Some(dup)) =
353 search::search_by_fingerprint(relay, relay.wallet_address(), &content_fp).await
354 {
355 let _ = store_tombstone_v1(&dup.id, relay, private_key).await;
357 }
358
359 let embedding = embedding_provider.embed(&claim.text).await?;
361
362 if let Some(dup) = find_best_duplicate(&claim.text, &embedding, keys, relay).await {
364 let _ = store_tombstone_v1(&dup.fact_id, relay, private_key).await;
365 }
366
367 let source_tag = format!("zeroclaw_v1_{}", v1_source_to_str(input.source));
371 let prepared = core_store::prepare_fact_v1(
372 &envelope_json,
373 &claim.text,
374 &keys.encryption_key,
375 &keys.dedup_key,
376 lsh_hasher,
377 &embedding,
378 input.importance as f64,
379 &source_tag,
380 relay.wallet_address(),
381 "zeroclaw",
382 )
383 .map_err(|e| crate::Error::Crypto(e.to_string()))?;
384
385 if let Some(pk) = private_key {
387 relay.submit_fact_native(&prepared.protobuf_bytes, pk).await?;
388 } else {
389 relay.submit_protobuf(&prepared.protobuf_bytes).await?;
390 }
391
392 Ok(prepared.fact_id)
393}
394
395fn v1_source_to_str(src: MemorySource) -> &'static str {
397 match src {
398 MemorySource::User => "user",
399 MemorySource::UserInferred => "user-inferred",
400 MemorySource::Assistant => "assistant",
401 MemorySource::External => "external",
402 MemorySource::Derived => "derived",
403 }
404}
405
406async fn find_best_duplicate(
420 content: &str,
421 new_embedding: &[f32],
422 keys: &crypto::DerivedKeys,
423 relay: &RelayClient,
424) -> Option<consolidation::DupMatch> {
425 let trapdoors = blind::generate_blind_indices(content);
427 if trapdoors.is_empty() {
428 return None;
429 }
430
431 let candidates = search::search_candidates(
433 relay,
434 relay.wallet_address(),
435 &trapdoors,
436 consolidation::STORE_DEDUP_MAX_CANDIDATES,
437 )
438 .await
439 .ok()?;
440
441 let mut existing: Vec<(String, Vec<f32>)> = Vec::with_capacity(candidates.len());
443 for fact in &candidates {
444 let enc_emb = match &fact.encrypted_embedding {
445 Some(e) => e,
446 None => continue,
447 };
448 let b64 = match crypto::decrypt(enc_emb, &keys.encryption_key) {
449 Ok(b) => b,
450 Err(_) => continue,
451 };
452 let bytes = match base64::engine::general_purpose::STANDARD.decode(&b64) {
453 Ok(b) => b,
454 Err(_) => continue,
455 };
456 let emb: Vec<f32> = bytes
457 .chunks_exact(4)
458 .map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
459 .collect();
460 existing.push((fact.id.clone(), emb));
461 }
462
463 consolidation::find_best_near_duplicate(
464 new_embedding,
465 &existing,
466 consolidation::STORE_DEDUP_COSINE_THRESHOLD,
467 )
468}
469
470#[derive(Debug)]
476pub struct ContradictionStoreResult {
477 pub fact_id: String,
479 pub actions: Vec<ResolutionAction>,
481 pub decision_log_entries: Vec<decision_log::DecisionLogEntry>,
483}
484
485pub async fn store_claim_with_contradiction_check(
499 claim: &Claim,
500 claim_id: &str,
501 source: &str,
502 importance: f64,
503 keys: &crypto::DerivedKeys,
504 lsh_hasher: &LshHasher,
505 embedding_provider: &dyn EmbeddingProvider,
506 relay: &RelayClient,
507 private_key: Option<&[u8; 32]>,
508 weights: &contradiction::ResolutionWeights,
509 now_unix_seconds: i64,
510) -> Result<ContradictionStoreResult> {
511 let content = &claim.text;
512
513 let content_fp = fingerprint::generate_content_fingerprint(content, &keys.dedup_key);
515 if let Ok(Some(dup)) =
516 search::search_by_fingerprint(relay, relay.wallet_address(), &content_fp).await
517 {
518 let _ = store_tombstone(&dup.id, relay, private_key).await;
519 }
520
521 let embedding = embedding_provider.embed(content).await?;
523
524 let candidates = fetch_contradiction_candidates(
526 claim,
527 &embedding,
528 keys,
529 relay,
530 )
531 .await;
532
533 let actions = contradiction::resolve_with_candidates(
535 claim,
536 claim_id,
537 &embedding,
538 &candidates,
539 weights,
540 contradiction::DEFAULT_LOWER_THRESHOLD,
541 contradiction::DEFAULT_UPPER_THRESHOLD,
542 now_unix_seconds,
543 totalreclaw_core::claims::TIE_ZONE_SCORE_TOLERANCE,
544 );
545
546 let existing_claims_json: std::collections::HashMap<String, String> = candidates
548 .iter()
549 .filter_map(|(c, id, _)| {
550 serde_json::to_string(c).ok().map(|json| (id.clone(), json))
551 })
552 .collect();
553 let new_claim_json = serde_json::to_string(claim).unwrap_or_default();
554 let decision_log_entries = contradiction::build_decision_log_entries(
555 &actions,
556 &new_claim_json,
557 &existing_claims_json,
558 "active",
559 now_unix_seconds,
560 );
561
562 let mut should_store = true;
564 for action in &actions {
565 match action {
566 ResolutionAction::SupersedeExisting { existing_id, .. } => {
567 let _ = store_tombstone(existing_id, relay, private_key).await;
569 }
570 ResolutionAction::SkipNew { .. } => {
571 should_store = false;
573 break;
574 }
575 ResolutionAction::TieLeaveBoth { .. } | ResolutionAction::NoContradiction => {
576 }
578 }
579 }
580
581 if !should_store {
582 return Ok(ContradictionStoreResult {
583 fact_id: claim_id.to_string(),
584 actions,
585 decision_log_entries,
586 });
587 }
588
589 let prepared = core_store::prepare_fact(
591 content,
592 &keys.encryption_key,
593 &keys.dedup_key,
594 lsh_hasher,
595 &embedding,
596 importance,
597 source,
598 relay.wallet_address(),
599 "zeroclaw",
600 )
601 .map_err(|e| crate::Error::Crypto(e.to_string()))?;
602
603 if let Some(pk) = private_key {
604 relay
605 .submit_fact_native(&prepared.protobuf_bytes, pk)
606 .await?;
607 } else {
608 relay.submit_protobuf(&prepared.protobuf_bytes).await?;
609 }
610
611 Ok(ContradictionStoreResult {
612 fact_id: prepared.fact_id,
613 actions,
614 decision_log_entries,
615 })
616}
617
618async fn fetch_contradiction_candidates(
624 new_claim: &Claim,
625 _new_embedding: &[f32],
626 keys: &crypto::DerivedKeys,
627 relay: &RelayClient,
628) -> Vec<(Claim, String, Vec<f32>)> {
629 if new_claim.entities.is_empty() {
630 return Vec::new();
631 }
632
633 let mut trapdoors = Vec::new();
635 for entity in &new_claim.entities {
636 trapdoors.extend(blind::generate_blind_indices(&entity.name));
637 }
638 if trapdoors.is_empty() {
639 return Vec::new();
640 }
641
642 let facts = match search::search_candidates(
644 relay,
645 relay.wallet_address(),
646 &trapdoors,
647 decision_log::CONTRADICTION_CANDIDATE_CAP,
648 )
649 .await
650 {
651 Ok(f) => f,
652 Err(_) => return Vec::new(),
653 };
654
655 let mut candidates = Vec::new();
657 for fact in &facts {
658 let blob_b64 = match search::hex_blob_to_base64(&fact.encrypted_blob) {
660 Some(b) => b,
661 None => continue,
662 };
663 let decrypted = match crypto::decrypt(&blob_b64, &keys.encryption_key) {
664 Ok(t) => t,
665 Err(_) => continue,
666 };
667
668 let claim: Claim = if let Ok(c) = serde_json::from_str(&decrypted) {
671 c
672 } else if let Ok(obj) = serde_json::from_str::<serde_json::Value>(&decrypted) {
673 let text = obj.get("t").and_then(|v| v.as_str()).unwrap_or(&decrypted);
674 match serde_json::from_str(text) {
675 Ok(c) => c,
676 Err(_) => continue, }
678 } else {
679 continue;
680 };
681
682 let emb = fact
684 .encrypted_embedding
685 .as_deref()
686 .and_then(|e| crypto::decrypt(e, &keys.encryption_key).ok())
687 .and_then(|b64| {
688 base64::engine::general_purpose::STANDARD
689 .decode(&b64)
690 .ok()
691 })
692 .map(|bytes| {
693 bytes
694 .chunks_exact(4)
695 .map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
696 .collect::<Vec<f32>>()
697 })
698 .unwrap_or_default();
699
700 candidates.push((claim, fact.id.clone(), emb));
701 }
702
703 candidates
704}
705
706#[cfg(test)]
707mod tests {
708 use super::*;
709
710 #[test]
711 fn test_store_dedup_threshold_matches_core() {
712 assert!(
714 (consolidation::STORE_DEDUP_COSINE_THRESHOLD - 0.85).abs() < 1e-10
715 );
716 }
717
718 #[test]
719 fn test_store_dedup_fetch_limit_matches_core() {
720 assert_eq!(consolidation::STORE_DEDUP_MAX_CANDIDATES, 50);
722 }
723
724 #[test]
725 fn test_find_best_near_duplicate_selects_highest() {
726 let new_emb: Vec<f32> = vec![1.0, 0.0, 0.0];
729 let existing = vec![
730 ("id_a".to_string(), vec![0.9, 0.1, 0.0]), ("id_b".to_string(), vec![0.99, 0.01, 0.0]), ];
733
734 let result =
735 consolidation::find_best_near_duplicate(&new_emb, &existing, 0.5);
736 assert!(result.is_some());
737 let dup = result.unwrap();
738 assert_eq!(dup.fact_id, "id_b");
739 assert!(dup.similarity > 0.99);
740 }
741
742 #[test]
743 fn test_find_best_near_duplicate_none_below_threshold() {
744 let new_emb: Vec<f32> = vec![1.0, 0.0, 0.0];
745 let existing = vec![
746 ("id_a".to_string(), vec![0.0, 1.0, 0.0]), ];
748
749 let result = consolidation::find_best_near_duplicate(
750 &new_emb,
751 &existing,
752 consolidation::STORE_DEDUP_COSINE_THRESHOLD,
753 );
754 assert!(result.is_none());
755 }
756
757 #[test]
758 fn test_importance_normalization() {
759 let importance: f64 = 8.0;
762 let decay_score = (importance / 10.0).clamp(0.0, 1.0);
763 assert!((decay_score - 0.8).abs() < 1e-10);
764
765 assert!((0.0_f64 / 10.0).clamp(0.0, 1.0) == 0.0);
767 assert!((10.0_f64 / 10.0).clamp(0.0, 1.0) == 1.0);
768 assert!((15.0_f64 / 10.0).clamp(0.0, 1.0) == 1.0); }
770
771 #[test]
776 fn test_core_claim_types_accessible() {
777 use totalreclaw_core::claims::{
778 Claim, ClaimCategory, ClaimStatus, EntityRef, EntityType,
779 };
780
781 let claim = Claim {
782 text: "Pedro uses ZeroClaw".to_string(),
783 category: ClaimCategory::Fact,
784 confidence: 0.9,
785 importance: 8,
786 corroboration_count: 1,
787 source_agent: "zeroclaw".to_string(),
788 source_conversation: None,
789 extracted_at: Some("2026-04-16T12:00:00Z".to_string()),
790 entities: vec![EntityRef {
791 name: "Pedro".to_string(),
792 entity_type: EntityType::Person,
793 role: Some("user".to_string()),
794 }],
795 supersedes: None,
796 superseded_by: None,
797 valid_from: None,
798 status: ClaimStatus::Active,
799 };
800 assert_eq!(claim.category, ClaimCategory::Fact);
801 assert!(!totalreclaw_core::claims::is_pinned_claim(&claim));
802 }
803
804 #[test]
805 fn test_pinned_claim_detection() {
806 use totalreclaw_core::claims::{Claim, ClaimCategory, ClaimStatus};
807
808 let mut claim = Claim {
809 text: "pinned fact".to_string(),
810 category: ClaimCategory::Fact,
811 confidence: 1.0,
812 importance: 10,
813 corroboration_count: 1,
814 source_agent: "totalreclaw_remember".to_string(),
815 source_conversation: None,
816 extracted_at: None,
817 entities: vec![],
818 supersedes: None,
819 superseded_by: None,
820 valid_from: None,
821 status: ClaimStatus::Active,
822 };
823 assert!(!totalreclaw_core::claims::is_pinned_claim(&claim));
824
825 claim.status = ClaimStatus::Pinned;
826 assert!(totalreclaw_core::claims::is_pinned_claim(&claim));
827 }
828
829 #[test]
830 fn test_resolve_with_candidates_no_entities() {
831 use totalreclaw_core::claims::{Claim, ClaimCategory, ClaimStatus};
832
833 let claim = Claim {
834 text: "no entities here".to_string(),
835 category: ClaimCategory::Fact,
836 confidence: 0.9,
837 importance: 7,
838 corroboration_count: 1,
839 source_agent: "zeroclaw".to_string(),
840 source_conversation: None,
841 extracted_at: None,
842 entities: vec![], supersedes: None,
844 superseded_by: None,
845 valid_from: None,
846 status: ClaimStatus::Active,
847 };
848
849 let emb = vec![1.0_f32; 3];
850 let weights = contradiction::default_weights();
851 let actions = contradiction::resolve_with_candidates(
852 &claim,
853 "new_id",
854 &emb,
855 &[], &weights,
857 contradiction::DEFAULT_LOWER_THRESHOLD,
858 contradiction::DEFAULT_UPPER_THRESHOLD,
859 1_776_384_000,
860 totalreclaw_core::claims::TIE_ZONE_SCORE_TOLERANCE,
861 );
862 assert!(actions.is_empty());
863 }
864
865 #[test]
866 fn test_decision_log_entry_round_trip() {
867 let entry = decision_log::DecisionLogEntry {
868 ts: 1_776_384_000,
869 entity_id: "ent123".to_string(),
870 new_claim_id: "0xnew".to_string(),
871 existing_claim_id: "0xold".to_string(),
872 similarity: 0.72,
873 action: "supersede_existing".to_string(),
874 reason: Some("new_wins".to_string()),
875 winner_score: Some(0.73),
876 loser_score: Some(0.40),
877 winner_components: None,
878 loser_components: None,
879 loser_claim_json: None,
880 mode: "active".to_string(),
881 };
882 let json = serde_json::to_string(&entry).unwrap();
883 let back: decision_log::DecisionLogEntry = serde_json::from_str(&json).unwrap();
884 assert_eq!(entry, back);
885 }
886
887 #[test]
888 fn test_contradiction_candidate_cap() {
889 assert_eq!(decision_log::CONTRADICTION_CANDIDATE_CAP, 20);
890 }
891
892 #[test]
893 fn test_default_weights() {
894 let w = contradiction::default_weights();
895 let sum = w.confidence + w.corroboration + w.recency + w.validation;
896 assert!((sum - 1.0).abs() < 1e-10, "weights should sum to 1.0");
897 }
898
899 #[test]
900 fn test_tie_zone_tolerance() {
901 assert!(
902 (totalreclaw_core::claims::TIE_ZONE_SCORE_TOLERANCE - 0.01).abs() < 1e-10,
903 "tie zone tolerance should be 0.01"
904 );
905 }
906}