1use base64::Engine;
18
19use totalreclaw_core::blind;
20use totalreclaw_core::claims::{
21 Claim, MemoryClaimV1, MemoryScope, MemorySource, MemoryTypeV1, MemoryVolatility,
22 ResolutionAction, MEMORY_CLAIM_V1_SCHEMA_VERSION,
23};
24use totalreclaw_core::consolidation;
25use totalreclaw_core::contradiction;
26use totalreclaw_core::crypto;
27use totalreclaw_core::decision_log;
28use totalreclaw_core::fingerprint;
29use totalreclaw_core::lsh::LshHasher;
30use totalreclaw_core::store as core_store;
31
32use crate::embedding::EmbeddingProvider;
33use crate::relay::RelayClient;
34use crate::search;
35use crate::Result;
36
37pub async fn store_fact(
47 content: &str,
48 source: &str,
49 keys: &crypto::DerivedKeys,
50 lsh_hasher: &LshHasher,
51 embedding_provider: &dyn EmbeddingProvider,
52 relay: &RelayClient,
53 private_key: Option<&[u8; 32]>,
54) -> Result<String> {
55 let content_fp = fingerprint::generate_content_fingerprint(content, &keys.dedup_key);
57
58 if let Ok(existing) =
61 search::search_by_fingerprint(relay, relay.wallet_address(), &content_fp).await
62 {
63 if let Some(dup) = existing {
64 let _ = store_tombstone(&dup.id, relay, private_key).await;
66 }
67 }
68
69 let embedding = embedding_provider.embed(content).await?;
71
72 if let Some(dup) = find_best_duplicate(content, &embedding, keys, relay).await {
74 let _ = store_tombstone(&dup.fact_id, relay, private_key).await;
76 }
77
78 let prepared = core_store::prepare_fact_with_decay_score(
80 content,
81 &keys.encryption_key,
82 &keys.dedup_key,
83 lsh_hasher,
84 &embedding,
85 1.0, source,
87 relay.wallet_address(),
88 "zeroclaw",
89 )
90 .map_err(|e| crate::Error::Crypto(e.to_string()))?;
91
92 if let Some(pk) = private_key {
94 relay
95 .submit_fact_native(&prepared.protobuf_bytes, pk)
96 .await?;
97 } else {
98 relay.submit_protobuf(&prepared.protobuf_bytes).await?;
99 }
100
101 Ok(prepared.fact_id)
102}
103
104pub async fn store_fact_with_importance(
109 content: &str,
110 source: &str,
111 importance: f64,
112 keys: &crypto::DerivedKeys,
113 lsh_hasher: &LshHasher,
114 embedding_provider: &dyn EmbeddingProvider,
115 relay: &RelayClient,
116 private_key: Option<&[u8; 32]>,
117) -> Result<String> {
118 let content_fp = fingerprint::generate_content_fingerprint(content, &keys.dedup_key);
120
121 if let Ok(existing) =
123 search::search_by_fingerprint(relay, relay.wallet_address(), &content_fp).await
124 {
125 if let Some(dup) = existing {
126 let _ = store_tombstone(&dup.id, relay, private_key).await;
127 }
128 }
129
130 let embedding = embedding_provider.embed(content).await?;
132
133 if let Some(dup) = find_best_duplicate(content, &embedding, keys, relay).await {
135 let _ = store_tombstone(&dup.fact_id, relay, private_key).await;
136 }
137
138 let prepared = core_store::prepare_fact(
140 content,
141 &keys.encryption_key,
142 &keys.dedup_key,
143 lsh_hasher,
144 &embedding,
145 importance,
146 source,
147 relay.wallet_address(),
148 "zeroclaw",
149 )
150 .map_err(|e| crate::Error::Crypto(e.to_string()))?;
151
152 if let Some(pk) = private_key {
154 relay
155 .submit_fact_native(&prepared.protobuf_bytes, pk)
156 .await?;
157 } else {
158 relay.submit_protobuf(&prepared.protobuf_bytes).await?;
159 }
160
161 Ok(prepared.fact_id)
162}
163
164pub async fn store_fact_batch(
169 facts: &[(&str, &str)], keys: &crypto::DerivedKeys,
171 lsh_hasher: &LshHasher,
172 embedding_provider: &dyn EmbeddingProvider,
173 relay: &RelayClient,
174 private_key: &[u8; 32],
175) -> Result<Vec<String>> {
176 let mut prepared_facts = Vec::with_capacity(facts.len());
177
178 for (content, source) in facts {
179 let embedding = embedding_provider.embed(content).await?;
181
182 let prepared = core_store::prepare_fact_with_decay_score(
184 content,
185 &keys.encryption_key,
186 &keys.dedup_key,
187 lsh_hasher,
188 &embedding,
189 1.0,
190 source,
191 relay.wallet_address(),
192 "zeroclaw",
193 )
194 .map_err(|e| crate::Error::Crypto(e.to_string()))?;
195
196 prepared_facts.push(prepared);
197 }
198
199 let protobuf_payloads: Vec<Vec<u8>> = prepared_facts
201 .iter()
202 .map(|p| p.protobuf_bytes.clone())
203 .collect();
204 let fact_ids: Vec<String> = prepared_facts.iter().map(|p| p.fact_id.clone()).collect();
205
206 relay
208 .submit_fact_batch_native(&protobuf_payloads, private_key)
209 .await?;
210
211 Ok(fact_ids)
212}
213
214pub async fn store_tombstone(
219 fact_id: &str,
220 relay: &RelayClient,
221 private_key: Option<&[u8; 32]>,
222) -> Result<()> {
223 let protobuf = core_store::prepare_tombstone(fact_id, relay.wallet_address());
224
225 if let Some(pk) = private_key {
226 relay.submit_fact_native(&protobuf, pk).await?;
227 } else {
228 relay.submit_protobuf(&protobuf).await?;
229 }
230 Ok(())
231}
232
233pub async fn store_tombstone_v1(
239 fact_id: &str,
240 relay: &RelayClient,
241 private_key: Option<&[u8; 32]>,
242) -> Result<()> {
243 let protobuf = core_store::prepare_tombstone_v1(fact_id, relay.wallet_address());
244
245 if let Some(pk) = private_key {
246 relay.submit_fact_native(&protobuf, pk).await?;
247 } else {
248 relay.submit_protobuf(&protobuf).await?;
249 }
250 Ok(())
251}
252
253#[derive(Debug, Clone)]
264pub struct V1StoreInput {
265 pub text: String,
267 pub memory_type: MemoryTypeV1,
270 pub source: MemorySource,
272 pub importance: u8,
274 pub scope: MemoryScope,
276 pub volatility: MemoryVolatility,
278 pub reasoning: Option<String>,
280}
281
282impl V1StoreInput {
283 pub fn new_claim(text: impl Into<String>, importance: u8) -> Self {
285 Self {
286 text: text.into(),
287 memory_type: MemoryTypeV1::Claim,
288 source: MemorySource::UserInferred,
289 importance,
290 scope: MemoryScope::Unspecified,
291 volatility: MemoryVolatility::Updatable,
292 reasoning: None,
293 }
294 }
295}
296
297pub fn build_memory_claim_v1(input: &V1StoreInput) -> MemoryClaimV1 {
303 MemoryClaimV1 {
304 id: uuid::Uuid::now_v7().to_string(),
305 text: input.text.clone(),
306 memory_type: input.memory_type,
307 source: input.source,
308 created_at: chrono::Utc::now()
309 .to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
310 schema_version: MEMORY_CLAIM_V1_SCHEMA_VERSION.to_string(),
311 scope: input.scope,
312 volatility: input.volatility,
313 entities: Vec::new(),
314 reasoning: input.reasoning.clone(),
315 expires_at: None,
316 importance: Some(input.importance),
317 confidence: None,
318 superseded_by: None,
319 pin_status: None,
322 }
323}
324
325pub async fn store_fact_v1(
339 input: &V1StoreInput,
340 keys: &crypto::DerivedKeys,
341 lsh_hasher: &LshHasher,
342 embedding_provider: &dyn EmbeddingProvider,
343 relay: &RelayClient,
344 private_key: Option<&[u8; 32]>,
345) -> Result<String> {
346 let claim = build_memory_claim_v1(input);
348
349 let envelope_json = serde_json::to_string(&claim)
351 .map_err(|e| crate::Error::Crypto(format!("v1 envelope serialize: {e}")))?;
352
353 let content_fp = fingerprint::generate_content_fingerprint(&claim.text, &keys.dedup_key);
355 if let Ok(Some(dup)) =
356 search::search_by_fingerprint(relay, relay.wallet_address(), &content_fp).await
357 {
358 let _ = store_tombstone_v1(&dup.id, relay, private_key).await;
360 }
361
362 let embedding = embedding_provider.embed(&claim.text).await?;
364
365 if let Some(dup) = find_best_duplicate(&claim.text, &embedding, keys, relay).await {
367 let _ = store_tombstone_v1(&dup.fact_id, relay, private_key).await;
368 }
369
370 let source_tag = format!("zeroclaw_v1_{}", v1_source_to_str(input.source));
374 let prepared = core_store::prepare_fact_v1(
375 &envelope_json,
376 &claim.text,
377 &keys.encryption_key,
378 &keys.dedup_key,
379 lsh_hasher,
380 &embedding,
381 input.importance as f64,
382 &source_tag,
383 relay.wallet_address(),
384 "zeroclaw",
385 )
386 .map_err(|e| crate::Error::Crypto(e.to_string()))?;
387
388 if let Some(pk) = private_key {
390 relay.submit_fact_native(&prepared.protobuf_bytes, pk).await?;
391 } else {
392 relay.submit_protobuf(&prepared.protobuf_bytes).await?;
393 }
394
395 Ok(prepared.fact_id)
396}
397
398fn v1_source_to_str(src: MemorySource) -> &'static str {
400 match src {
401 MemorySource::User => "user",
402 MemorySource::UserInferred => "user-inferred",
403 MemorySource::Assistant => "assistant",
404 MemorySource::External => "external",
405 MemorySource::Derived => "derived",
406 }
407}
408
409async fn find_best_duplicate(
423 content: &str,
424 new_embedding: &[f32],
425 keys: &crypto::DerivedKeys,
426 relay: &RelayClient,
427) -> Option<consolidation::DupMatch> {
428 let trapdoors = blind::generate_blind_indices(content);
430 if trapdoors.is_empty() {
431 return None;
432 }
433
434 let candidates = search::search_candidates(
436 relay,
437 relay.wallet_address(),
438 &trapdoors,
439 consolidation::STORE_DEDUP_MAX_CANDIDATES,
440 )
441 .await
442 .ok()?;
443
444 let mut existing: Vec<(String, Vec<f32>)> = Vec::with_capacity(candidates.len());
446 for fact in &candidates {
447 let enc_emb = match &fact.encrypted_embedding {
448 Some(e) => e,
449 None => continue,
450 };
451 let b64 = match crypto::decrypt(enc_emb, &keys.encryption_key) {
452 Ok(b) => b,
453 Err(_) => continue,
454 };
455 let bytes = match base64::engine::general_purpose::STANDARD.decode(&b64) {
456 Ok(b) => b,
457 Err(_) => continue,
458 };
459 let emb: Vec<f32> = bytes
460 .chunks_exact(4)
461 .map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
462 .collect();
463 existing.push((fact.id.clone(), emb));
464 }
465
466 consolidation::find_best_near_duplicate(
467 new_embedding,
468 &existing,
469 consolidation::STORE_DEDUP_COSINE_THRESHOLD,
470 )
471}
472
473#[derive(Debug)]
479pub struct ContradictionStoreResult {
480 pub fact_id: String,
482 pub actions: Vec<ResolutionAction>,
484 pub decision_log_entries: Vec<decision_log::DecisionLogEntry>,
486}
487
488pub async fn store_claim_with_contradiction_check(
502 claim: &Claim,
503 claim_id: &str,
504 source: &str,
505 importance: f64,
506 keys: &crypto::DerivedKeys,
507 lsh_hasher: &LshHasher,
508 embedding_provider: &dyn EmbeddingProvider,
509 relay: &RelayClient,
510 private_key: Option<&[u8; 32]>,
511 weights: &contradiction::ResolutionWeights,
512 now_unix_seconds: i64,
513) -> Result<ContradictionStoreResult> {
514 let content = &claim.text;
515
516 let content_fp = fingerprint::generate_content_fingerprint(content, &keys.dedup_key);
518 if let Ok(Some(dup)) =
519 search::search_by_fingerprint(relay, relay.wallet_address(), &content_fp).await
520 {
521 let _ = store_tombstone(&dup.id, relay, private_key).await;
522 }
523
524 let embedding = embedding_provider.embed(content).await?;
526
527 let candidates = fetch_contradiction_candidates(
529 claim,
530 &embedding,
531 keys,
532 relay,
533 )
534 .await;
535
536 let actions = contradiction::resolve_with_candidates(
538 claim,
539 claim_id,
540 &embedding,
541 &candidates,
542 weights,
543 contradiction::DEFAULT_LOWER_THRESHOLD,
544 contradiction::DEFAULT_UPPER_THRESHOLD,
545 now_unix_seconds,
546 totalreclaw_core::claims::TIE_ZONE_SCORE_TOLERANCE,
547 );
548
549 let existing_claims_json: std::collections::HashMap<String, String> = candidates
551 .iter()
552 .filter_map(|(c, id, _)| {
553 serde_json::to_string(c).ok().map(|json| (id.clone(), json))
554 })
555 .collect();
556 let new_claim_json = serde_json::to_string(claim).unwrap_or_default();
557 let decision_log_entries = contradiction::build_decision_log_entries(
558 &actions,
559 &new_claim_json,
560 &existing_claims_json,
561 "active",
562 now_unix_seconds,
563 );
564
565 let mut should_store = true;
567 for action in &actions {
568 match action {
569 ResolutionAction::SupersedeExisting { existing_id, .. } => {
570 let _ = store_tombstone(existing_id, relay, private_key).await;
572 }
573 ResolutionAction::SkipNew { .. } => {
574 should_store = false;
576 break;
577 }
578 ResolutionAction::TieLeaveBoth { .. } | ResolutionAction::NoContradiction => {
579 }
581 }
582 }
583
584 if !should_store {
585 return Ok(ContradictionStoreResult {
586 fact_id: claim_id.to_string(),
587 actions,
588 decision_log_entries,
589 });
590 }
591
592 let prepared = core_store::prepare_fact(
594 content,
595 &keys.encryption_key,
596 &keys.dedup_key,
597 lsh_hasher,
598 &embedding,
599 importance,
600 source,
601 relay.wallet_address(),
602 "zeroclaw",
603 )
604 .map_err(|e| crate::Error::Crypto(e.to_string()))?;
605
606 if let Some(pk) = private_key {
607 relay
608 .submit_fact_native(&prepared.protobuf_bytes, pk)
609 .await?;
610 } else {
611 relay.submit_protobuf(&prepared.protobuf_bytes).await?;
612 }
613
614 Ok(ContradictionStoreResult {
615 fact_id: prepared.fact_id,
616 actions,
617 decision_log_entries,
618 })
619}
620
621async fn fetch_contradiction_candidates(
627 new_claim: &Claim,
628 _new_embedding: &[f32],
629 keys: &crypto::DerivedKeys,
630 relay: &RelayClient,
631) -> Vec<(Claim, String, Vec<f32>)> {
632 if new_claim.entities.is_empty() {
633 return Vec::new();
634 }
635
636 let mut trapdoors = Vec::new();
638 for entity in &new_claim.entities {
639 trapdoors.extend(blind::generate_blind_indices(&entity.name));
640 }
641 if trapdoors.is_empty() {
642 return Vec::new();
643 }
644
645 let facts = match search::search_candidates(
647 relay,
648 relay.wallet_address(),
649 &trapdoors,
650 decision_log::CONTRADICTION_CANDIDATE_CAP,
651 )
652 .await
653 {
654 Ok(f) => f,
655 Err(_) => return Vec::new(),
656 };
657
658 let mut candidates = Vec::new();
660 for fact in &facts {
661 let blob_b64 = match search::hex_blob_to_base64(&fact.encrypted_blob) {
663 Some(b) => b,
664 None => continue,
665 };
666 let decrypted = match crypto::decrypt(&blob_b64, &keys.encryption_key) {
667 Ok(t) => t,
668 Err(_) => continue,
669 };
670
671 let claim: Claim = if let Ok(c) = serde_json::from_str(&decrypted) {
674 c
675 } else if let Ok(obj) = serde_json::from_str::<serde_json::Value>(&decrypted) {
676 let text = obj.get("t").and_then(|v| v.as_str()).unwrap_or(&decrypted);
677 match serde_json::from_str(text) {
678 Ok(c) => c,
679 Err(_) => continue, }
681 } else {
682 continue;
683 };
684
685 let emb = fact
687 .encrypted_embedding
688 .as_deref()
689 .and_then(|e| crypto::decrypt(e, &keys.encryption_key).ok())
690 .and_then(|b64| {
691 base64::engine::general_purpose::STANDARD
692 .decode(&b64)
693 .ok()
694 })
695 .map(|bytes| {
696 bytes
697 .chunks_exact(4)
698 .map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
699 .collect::<Vec<f32>>()
700 })
701 .unwrap_or_default();
702
703 candidates.push((claim, fact.id.clone(), emb));
704 }
705
706 candidates
707}
708
709#[cfg(test)]
710mod tests {
711 use super::*;
712
713 #[test]
714 fn test_store_dedup_threshold_matches_core() {
715 assert!(
717 (consolidation::STORE_DEDUP_COSINE_THRESHOLD - 0.85).abs() < 1e-10
718 );
719 }
720
721 #[test]
722 fn test_store_dedup_fetch_limit_matches_core() {
723 assert_eq!(consolidation::STORE_DEDUP_MAX_CANDIDATES, 50);
725 }
726
727 #[test]
728 fn test_find_best_near_duplicate_selects_highest() {
729 let new_emb: Vec<f32> = vec![1.0, 0.0, 0.0];
732 let existing = vec![
733 ("id_a".to_string(), vec![0.9, 0.1, 0.0]), ("id_b".to_string(), vec![0.99, 0.01, 0.0]), ];
736
737 let result =
738 consolidation::find_best_near_duplicate(&new_emb, &existing, 0.5);
739 assert!(result.is_some());
740 let dup = result.unwrap();
741 assert_eq!(dup.fact_id, "id_b");
742 assert!(dup.similarity > 0.99);
743 }
744
745 #[test]
746 fn test_find_best_near_duplicate_none_below_threshold() {
747 let new_emb: Vec<f32> = vec![1.0, 0.0, 0.0];
748 let existing = vec![
749 ("id_a".to_string(), vec![0.0, 1.0, 0.0]), ];
751
752 let result = consolidation::find_best_near_duplicate(
753 &new_emb,
754 &existing,
755 consolidation::STORE_DEDUP_COSINE_THRESHOLD,
756 );
757 assert!(result.is_none());
758 }
759
760 #[test]
761 fn test_importance_normalization() {
762 let importance: f64 = 8.0;
765 let decay_score = (importance / 10.0).clamp(0.0, 1.0);
766 assert!((decay_score - 0.8).abs() < 1e-10);
767
768 assert!((0.0_f64 / 10.0).clamp(0.0, 1.0) == 0.0);
770 assert!((10.0_f64 / 10.0).clamp(0.0, 1.0) == 1.0);
771 assert!((15.0_f64 / 10.0).clamp(0.0, 1.0) == 1.0); }
773
774 #[test]
779 fn test_core_claim_types_accessible() {
780 use totalreclaw_core::claims::{
781 Claim, ClaimCategory, ClaimStatus, EntityRef, EntityType,
782 };
783
784 let claim = Claim {
785 text: "Pedro uses ZeroClaw".to_string(),
786 category: ClaimCategory::Fact,
787 confidence: 0.9,
788 importance: 8,
789 corroboration_count: 1,
790 source_agent: "zeroclaw".to_string(),
791 source_conversation: None,
792 extracted_at: Some("2026-04-16T12:00:00Z".to_string()),
793 entities: vec![EntityRef {
794 name: "Pedro".to_string(),
795 entity_type: EntityType::Person,
796 role: Some("user".to_string()),
797 }],
798 supersedes: None,
799 superseded_by: None,
800 valid_from: None,
801 status: ClaimStatus::Active,
802 };
803 assert_eq!(claim.category, ClaimCategory::Fact);
804 assert!(!totalreclaw_core::claims::is_pinned_claim(&claim));
805 }
806
807 #[test]
808 fn test_pinned_claim_detection() {
809 use totalreclaw_core::claims::{Claim, ClaimCategory, ClaimStatus};
810
811 let mut claim = Claim {
812 text: "pinned fact".to_string(),
813 category: ClaimCategory::Fact,
814 confidence: 1.0,
815 importance: 10,
816 corroboration_count: 1,
817 source_agent: "totalreclaw_remember".to_string(),
818 source_conversation: None,
819 extracted_at: None,
820 entities: vec![],
821 supersedes: None,
822 superseded_by: None,
823 valid_from: None,
824 status: ClaimStatus::Active,
825 };
826 assert!(!totalreclaw_core::claims::is_pinned_claim(&claim));
827
828 claim.status = ClaimStatus::Pinned;
829 assert!(totalreclaw_core::claims::is_pinned_claim(&claim));
830 }
831
832 #[test]
833 fn test_resolve_with_candidates_no_entities() {
834 use totalreclaw_core::claims::{Claim, ClaimCategory, ClaimStatus};
835
836 let claim = Claim {
837 text: "no entities here".to_string(),
838 category: ClaimCategory::Fact,
839 confidence: 0.9,
840 importance: 7,
841 corroboration_count: 1,
842 source_agent: "zeroclaw".to_string(),
843 source_conversation: None,
844 extracted_at: None,
845 entities: vec![], supersedes: None,
847 superseded_by: None,
848 valid_from: None,
849 status: ClaimStatus::Active,
850 };
851
852 let emb = vec![1.0_f32; 3];
853 let weights = contradiction::default_weights();
854 let actions = contradiction::resolve_with_candidates(
855 &claim,
856 "new_id",
857 &emb,
858 &[], &weights,
860 contradiction::DEFAULT_LOWER_THRESHOLD,
861 contradiction::DEFAULT_UPPER_THRESHOLD,
862 1_776_384_000,
863 totalreclaw_core::claims::TIE_ZONE_SCORE_TOLERANCE,
864 );
865 assert!(actions.is_empty());
866 }
867
868 #[test]
869 fn test_decision_log_entry_round_trip() {
870 let entry = decision_log::DecisionLogEntry {
871 ts: 1_776_384_000,
872 entity_id: "ent123".to_string(),
873 new_claim_id: "0xnew".to_string(),
874 existing_claim_id: "0xold".to_string(),
875 similarity: 0.72,
876 action: "supersede_existing".to_string(),
877 reason: Some("new_wins".to_string()),
878 winner_score: Some(0.73),
879 loser_score: Some(0.40),
880 winner_components: None,
881 loser_components: None,
882 loser_claim_json: None,
883 mode: "active".to_string(),
884 };
885 let json = serde_json::to_string(&entry).unwrap();
886 let back: decision_log::DecisionLogEntry = serde_json::from_str(&json).unwrap();
887 assert_eq!(entry, back);
888 }
889
890 #[test]
891 fn test_contradiction_candidate_cap() {
892 assert_eq!(decision_log::CONTRADICTION_CANDIDATE_CAP, 20);
893 }
894
895 #[test]
896 fn test_default_weights() {
897 let w = contradiction::default_weights();
898 let sum = w.confidence + w.corroboration + w.recency + w.validation;
899 assert!((sum - 1.0).abs() < 1e-10, "weights should sum to 1.0");
900 }
901
902 #[test]
903 fn test_tie_zone_tolerance() {
904 assert!(
905 (totalreclaw_core::claims::TIE_ZONE_SCORE_TOLERANCE - 0.01).abs() < 1e-10,
906 "tie zone tolerance should be 0.01"
907 );
908 }
909}