1use smos_domain::config::NliConfig;
56use smos_domain::config::{ConfidenceConfig, MergeConfig};
57use smos_domain::enums::FactStatus;
58use smos_domain::{Fact, FactContent, FactId, MemoryKey, NliResult, SessionId};
59
60use crate::errors::{ProviderError, UseCaseError};
61use crate::ports::{FactRepository, NliClassifier, SessionRepository};
62
63#[derive(Debug, Clone, Default, PartialEq)]
71pub struct FinalizeStats {
72 pub session_id: String,
74 pub processed: usize,
76 pub finalized: usize,
79 pub merged: usize,
81 pub conflicts: usize,
84 pub rejected: usize,
89}
90
91pub struct FinalizeSession<'a, FR, SR, NC> {
97 pub facts: &'a FR,
98 pub sessions: &'a SR,
99 pub classifier: &'a NC,
100 pub confidence_cfg: &'a ConfidenceConfig,
101 pub nli_cfg: &'a NliConfig,
102 pub merge_cfg: &'a MergeConfig,
103}
104
105#[derive(Debug, Clone, Copy, PartialEq, Eq)]
108enum FactOutcome {
109 Finalized,
113 Merged,
116 Conflict,
119 Skipped,
124}
125
126impl<'a, FR, SR, NC> FinalizeSession<'a, FR, SR, NC>
127where
128 FR: FactRepository,
129 SR: SessionRepository,
130 NC: NliClassifier,
131{
132 pub async fn execute(
146 &self,
147 session_id: &SessionId,
148 memory_key: &MemoryKey,
149 ) -> Result<FinalizeStats, UseCaseError> {
150 let mut stats = FinalizeStats {
151 session_id: session_id.as_str().to_string(),
152 ..FinalizeStats::default()
153 };
154
155 let all_pending = self.facts.list_pending(memory_key).await?;
163 let pending: Vec<Fact> = all_pending
164 .into_iter()
165 .filter(|f| f.source_sessions().iter().any(|s| s == session_id))
166 .collect();
167
168 if pending.is_empty() {
169 tracing::info!(
170 session = %session_id,
171 memory_key = %memory_key,
172 "finalize: no pending facts for session"
173 );
174 return Ok(stats);
175 }
176
177 let owned_ids: Vec<FactId> = pending.iter().map(|f| f.id().clone()).collect();
182
183 let accepted = self.facts.list_accepted(memory_key).await?;
184 stats.processed = pending.len();
185 tracing::info!(
186 session = %session_id,
187 memory_key = %memory_key,
188 pending = pending.len(),
189 accepted = accepted.len(),
190 "finalizing session"
191 );
192
193 let mut comparison_pool: Vec<Fact> = accepted;
198 for fact in &pending {
199 let outcome = self.resolve_one(fact, &mut comparison_pool).await;
200 self.tally(&mut stats, outcome);
201 }
202
203 if let Err(e) = self
211 .sessions
212 .remove_pending_owned(session_id, &owned_ids)
213 .await
214 {
215 tracing::warn!(error = %e, "session cleanup failed (non-fatal)");
216 }
217
218 tracing::info!(
219 session = %session_id,
220 processed = stats.processed,
221 finalized = stats.finalized,
222 merged = stats.merged,
223 conflicts = stats.conflicts,
224 skipped = stats.processed - stats.finalized - stats.merged - stats.conflicts,
225 "finalize complete"
226 );
227
228 Ok(stats)
229 }
230
231 async fn resolve_one(&self, pending: &Fact, pool: &mut Vec<Fact>) -> FactOutcome {
246 let candidates = pending.find_merge_candidates(pool, self.merge_cfg);
247 if candidates.is_empty() {
248 return self.finalize_standalone(pending, None, pool).await;
249 }
250
251 let mut merge_pick: Option<(Fact, NliResult)> = None;
254 let mut last_observed_nli: Option<NliResult> = None;
257 let mut nli_observed = false;
263
264 for candidate in &candidates {
265 let existing = &candidate.fact;
266
267 if pending.conflicts_with().contains(existing.id())
275 || existing.conflicts_with().contains(pending.id())
276 {
277 nli_observed = true;
278 tracing::debug!(
279 pending = %pending.id(),
280 existing = %existing.id(),
281 "C3 guard: skip NLI for already-flagged conflict pair"
282 );
283 continue;
284 }
285
286 let nli = if FactContent::text_equals_normalized(existing.content(), pending.content())
290 {
291 nli_observed = true;
292 NliResult::exact_match_result()
293 } else {
294 match self
295 .classifier
296 .classify(existing.content(), pending.content())
297 .await
298 {
299 Ok(nli) if nli.available => {
300 nli_observed = true;
307 nli
308 }
309 Ok(_unavailable) => {
310 tracing::warn!(
311 pending = %pending.id(),
312 existing = %existing.id(),
313 "NLI replied with available=false; leaving pending (skip pair)"
314 );
315 continue;
316 }
317 Err(ProviderError::Unavailable(msg)) => {
318 tracing::warn!(
319 pending = %pending.id(),
320 existing = %existing.id(),
321 error = %msg,
322 "NLI unavailable; leaving pending (skip pair)"
323 );
324 continue;
327 }
328 Err(other) => {
329 tracing::warn!(
330 pending = %pending.id(),
331 existing = %existing.id(),
332 error = %other,
333 "NLI error (non-fatal, skip pair)"
334 );
335 continue;
336 }
337 }
338 };
339
340 if nli.is_contradiction(self.nli_cfg) {
343 return self.apply_conflict_flag(pending, existing, pool).await;
344 }
345
346 if nli.is_entailment(self.nli_cfg) && merge_pick.is_none() {
347 merge_pick = Some((existing.clone(), nli));
348 } else {
351 last_observed_nli = Some(nli);
352 }
353 }
354
355 if let Some((existing, nli)) = merge_pick {
356 return self.apply_merge(pending, &existing, &nli, pool).await;
357 }
358
359 if !nli_observed {
363 tracing::info!(
364 pending = %pending.id(),
365 candidates = candidates.len(),
366 "NLI never observed for any candidate; leaving pending"
367 );
368 return FactOutcome::Skipped;
369 }
370
371 self.finalize_standalone(pending, last_observed_nli.as_ref(), pool)
375 .await
376 }
377
378 async fn apply_conflict_flag(
381 &self,
382 pending: &Fact,
383 existing: &Fact,
384 pool: &mut Vec<Fact>,
385 ) -> FactOutcome {
386 let mut existing_mut = existing.clone();
387 let mut pending_mut = pending.clone();
388 if let Err(e) = existing_mut.flag_conflict_bidirectional(&mut pending_mut) {
393 tracing::warn!(
394 existing = %existing_mut.id(),
395 pending = %pending_mut.id(),
396 error = %e,
397 "flag_conflict_bidirectional failed"
398 );
399 }
400 if let Err(e) = self.facts.save(&existing_mut).await {
401 tracing::warn!(fact = %existing_mut.id(), error = %e, "save existing after flag failed");
402 }
403 if let Err(e) = self.facts.save(&pending_mut).await {
404 tracing::warn!(fact = %pending_mut.id(), error = %e, "save pending after flag failed");
405 return FactOutcome::Skipped;
408 }
409 pool.push(pending.clone());
413 FactOutcome::Conflict
414 }
415
416 async fn apply_merge(
421 &self,
422 pending: &Fact,
423 existing: &Fact,
424 nli: &NliResult,
425 pool: &mut Vec<Fact>,
426 ) -> FactOutcome {
427 let mut existing_mut = existing.clone();
428 if let Err(e) = existing_mut.merge_into(pending) {
429 tracing::warn!(fact = %existing_mut.id(), error = %e, "merge_into failed");
430 }
431 if let Err(e) = existing_mut.reclassify(Some(nli), self.confidence_cfg) {
432 tracing::warn!(fact = %existing_mut.id(), error = %e, "reclassify(existing) failed");
433 }
434 if let Err(e) = self.facts.save(&existing_mut).await {
435 tracing::warn!(fact = %existing_mut.id(), error = %e, "save merged existing failed");
436 return FactOutcome::Skipped;
437 }
438
439 let mut pending_mut = pending.clone();
444 if let Err(e) = pending_mut.set_status_and_confidence(
445 FactStatus::Rejected,
446 pending_mut.confidence(),
447 self.confidence_cfg,
448 ) {
449 tracing::warn!(fact = %pending_mut.id(), error = %e, "reject pending twin failed");
450 } else if let Err(e) = self.facts.save(&pending_mut).await {
451 tracing::warn!(fact = %pending_mut.id(), error = %e, "save rejected pending failed");
452 }
453
454 pool.push(existing_mut);
457 FactOutcome::Merged
458 }
459
460 async fn finalize_standalone(
465 &self,
466 pending: &Fact,
467 nli: Option<&NliResult>,
468 pool: &mut Vec<Fact>,
469 ) -> FactOutcome {
470 let mut fact = pending.clone();
471 if let Err(e) = fact.reclassify(nli, self.confidence_cfg) {
472 tracing::warn!(fact = %fact.id(), error = %e, "reclassify(standalone) failed");
473 }
474 if let Err(e) = self.facts.save(&fact).await {
475 tracing::warn!(fact = %fact.id(), error = %e, "save standalone failed");
476 return FactOutcome::Skipped;
477 }
478 pool.push(fact);
482 FactOutcome::Finalized
483 }
484
485 fn tally(&self, stats: &mut FinalizeStats, outcome: FactOutcome) {
487 match outcome {
488 FactOutcome::Finalized => stats.finalized += 1,
489 FactOutcome::Merged => {
490 stats.merged += 1;
491 stats.rejected += 1;
492 }
493 FactOutcome::Conflict => stats.conflicts += 1,
494 FactOutcome::Skipped => {
495 }
498 }
499 }
500}
501
502#[cfg(test)]
503mod tests {
504 use super::*;
512 use std::collections::HashMap;
513 use std::sync::Mutex;
514
515 use smos_domain::config::{ConfidenceConfig, MergeConfig, NliConfig};
516 use smos_domain::enums::NliLabel;
517 use smos_domain::{
518 Embedding, FactStatus, MemoryKey, NliScores, SessionId, SessionState, Timestamp,
519 };
520
521 #[derive(Default, Clone)]
524 struct InMemoryFacts {
525 store: std::sync::Arc<Mutex<HashMap<String, Fact>>>,
526 }
527 impl InMemoryFacts {
528 fn seed(&self, fact: Fact) {
529 self.store
530 .lock()
531 .unwrap()
532 .insert(fact.id().as_str().to_string(), fact);
533 }
534 fn get_clone(&self, id: &FactId) -> Option<Fact> {
535 self.store.lock().unwrap().get(id.as_str()).cloned()
536 }
537 }
538 impl FactRepository for InMemoryFacts {
539 async fn save(&self, fact: &Fact) -> Result<(), crate::errors::RepoError> {
540 self.store
541 .lock()
542 .unwrap()
543 .insert(fact.id().as_str().to_string(), fact.clone());
544 Ok(())
545 }
546 async fn get(
547 &self,
548 id: &FactId,
549 _mk: &MemoryKey,
550 ) -> Result<Option<Fact>, crate::errors::RepoError> {
551 Ok(self.get_clone(id))
552 }
553 async fn list_accepted(
554 &self,
555 _mk: &MemoryKey,
556 ) -> Result<Vec<Fact>, crate::errors::RepoError> {
557 Ok(self
558 .store
559 .lock()
560 .unwrap()
561 .values()
562 .filter(|f| f.status() == FactStatus::Accepted)
563 .cloned()
564 .collect())
565 }
566 async fn list_pending(
567 &self,
568 _mk: &MemoryKey,
569 ) -> Result<Vec<Fact>, crate::errors::RepoError> {
570 Ok(self
571 .store
572 .lock()
573 .unwrap()
574 .values()
575 .filter(|f| f.status() == FactStatus::Pending)
576 .cloned()
577 .collect())
578 }
579 async fn list_memory_keys_for_session(
580 &self,
581 session_id: &SessionId,
582 ) -> Result<Vec<MemoryKey>, crate::errors::RepoError> {
583 let mut out: Vec<MemoryKey> = Vec::new();
587 let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
588 for fact in self.store.lock().unwrap().values() {
589 if !fact.source_sessions().iter().any(|s| s == session_id) {
590 continue;
591 }
592 let mk_str = fact.memory_key().as_str().to_string();
593 if seen.insert(mk_str) {
594 out.push(fact.memory_key().clone());
595 }
596 }
597 Ok(out)
598 }
599 async fn list_memory_keys(&self) -> Result<Vec<MemoryKey>, crate::errors::RepoError> {
600 let mut out: Vec<MemoryKey> = Vec::new();
601 let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
602 for fact in self.store.lock().unwrap().values() {
603 let mk_str = fact.memory_key().as_str().to_string();
604 if seen.insert(mk_str) {
605 out.push(fact.memory_key().clone());
606 }
607 }
608 Ok(out)
609 }
610 async fn search_similar(
611 &self,
612 _e: Vec<f32>,
613 _mk: &MemoryKey,
614 _l: usize,
615 ) -> Result<Vec<crate::types::SearchHit>, crate::errors::RepoError> {
616 Ok(Vec::new())
617 }
618 async fn update_heat_batch(
619 &self,
620 _ids: &[FactId],
621 _mk: &MemoryKey,
622 _h: smos_domain::Heat,
623 _t: Timestamp,
624 ) -> Result<(), crate::errors::RepoError> {
625 Ok(())
626 }
627 }
628
629 #[derive(Default, Clone)]
630 struct InMemorySessions {
631 sessions: std::sync::Arc<Mutex<HashMap<String, SessionState>>>,
632 }
633 impl InMemorySessions {
634 fn seed(&self, state: SessionState) {
635 self.sessions
636 .lock()
637 .unwrap()
638 .insert(state.id().as_str().to_string(), state);
639 }
640 fn pending_of(&self, id: &SessionId) -> Vec<FactId> {
641 self.sessions
642 .lock()
643 .unwrap()
644 .get(id.as_str())
645 .map(|s| s.pending_facts().to_vec())
646 .unwrap_or_default()
647 }
648 }
649 impl SessionRepository for InMemorySessions {
650 async fn get_or_create(
651 &self,
652 id: &SessionId,
653 memory_key: &MemoryKey,
654 ) -> Result<SessionState, crate::errors::RepoError> {
655 Ok(self
656 .sessions
657 .lock()
658 .unwrap()
659 .entry(id.as_str().to_string())
660 .or_insert_with(|| {
661 SessionState::new(
662 id.clone(),
663 memory_key.clone(),
664 Timestamp::from_unix_secs(0).unwrap(),
665 )
666 })
667 .clone())
668 }
669 async fn collect_expired(
670 &self,
671 _t: std::time::Duration,
672 ) -> Result<Vec<(SessionId, SessionState)>, crate::errors::RepoError> {
673 Ok(Vec::new())
674 }
675 async fn snapshot_all(
676 &self,
677 ) -> Result<Vec<(SessionId, SessionState)>, crate::errors::RepoError> {
678 Ok(self
679 .sessions
680 .lock()
681 .unwrap()
682 .iter()
683 .map(|(k, v)| (SessionId::from_raw(k).unwrap(), v.clone()))
684 .collect())
685 }
686 async fn add_pending(
687 &self,
688 id: &SessionId,
689 fact_ids: &[FactId],
690 ) -> Result<(), crate::errors::RepoError> {
691 if let Some(state) = self.sessions.lock().unwrap().get_mut(id.as_str()) {
692 state.add_pending(fact_ids);
693 }
694 Ok(())
695 }
696 async fn remove_pending_owned(
697 &self,
698 id: &SessionId,
699 owned: &[FactId],
700 ) -> Result<(), crate::errors::RepoError> {
701 if let Some(state) = self.sessions.lock().unwrap().get_mut(id.as_str()) {
702 state.remove_owned(owned);
703 }
704 Ok(())
705 }
706 async fn clear_session(&self, id: &SessionId) -> Result<(), crate::errors::RepoError> {
707 self.sessions.lock().unwrap().remove(id.as_str());
708 Ok(())
709 }
710 async fn dedup_and_mark(
711 &self,
712 _id: &SessionId,
713 _mk: &MemoryKey,
714 candidates: &[FactId],
715 ) -> Result<Vec<FactId>, crate::errors::RepoError> {
716 Ok(candidates.to_vec())
717 }
718 async fn save(
719 &self,
720 id: &SessionId,
721 state: &SessionState,
722 ) -> Result<(), crate::errors::RepoError> {
723 self.sessions
724 .lock()
725 .unwrap()
726 .insert(id.as_str().to_string(), state.clone());
727 Ok(())
728 }
729 }
730
731 type NliResolver = Box<dyn Fn(&str, &str) -> Result<NliResult, ProviderError> + Send + Sync>;
736
737 enum ScriptedNliClassifier {
747 Fifo {
748 verdicts: Mutex<Vec<Result<NliResult, ProviderError>>>,
749 calls: Mutex<Vec<(String, String)>>,
750 },
751 Match {
752 resolver: NliResolver,
753 calls: Mutex<Vec<(String, String)>>,
754 },
755 }
756 impl ScriptedNliClassifier {
757 fn new(verdicts: Vec<Result<NliResult, ProviderError>>) -> Self {
758 Self::Fifo {
759 verdicts: Mutex::new(verdicts),
760 calls: Mutex::new(Vec::new()),
761 }
762 }
763 fn matching<F>(resolver: F) -> Self
764 where
765 F: Fn(&str, &str) -> Result<NliResult, ProviderError> + Send + Sync + 'static,
766 {
767 Self::Match {
768 resolver: Box::new(resolver),
769 calls: Mutex::new(Vec::new()),
770 }
771 }
772 fn calls(&self) -> Vec<(String, String)> {
773 match self {
774 Self::Fifo { calls, .. } | Self::Match { calls, .. } => {
775 calls.lock().unwrap().clone()
776 }
777 }
778 }
779 }
780 impl NliClassifier for ScriptedNliClassifier {
781 async fn classify(
782 &self,
783 premise: &str,
784 hypothesis: &str,
785 ) -> Result<NliResult, ProviderError> {
786 match self {
787 Self::Fifo { verdicts, calls } => {
788 calls
789 .lock()
790 .unwrap()
791 .push((premise.to_string(), hypothesis.to_string()));
792 let mut queue = verdicts.lock().unwrap();
793 if queue.is_empty() {
794 Err(ProviderError::Unavailable("scripted queue empty".into()))
795 } else {
796 queue.remove(0)
797 }
798 }
799 Self::Match { resolver, calls } => {
800 calls
801 .lock()
802 .unwrap()
803 .push((premise.to_string(), hypothesis.to_string()));
804 resolver(premise, hypothesis)
805 }
806 }
807 }
808 }
809
810 fn neutral_available() -> NliResult {
814 NliResult {
815 label: NliLabel::Neutral,
816 scores: NliScores {
817 entailment: 0.2,
818 neutral: 0.7,
819 contradiction: 0.1,
820 },
821 available: true,
822 }
823 }
824
825 fn entailment_available() -> NliResult {
826 NliResult {
827 label: NliLabel::Entailment,
828 scores: NliScores {
829 entailment: 0.9,
830 neutral: 0.08,
831 contradiction: 0.02,
832 },
833 available: true,
834 }
835 }
836
837 fn contradiction_available() -> NliResult {
838 NliResult {
839 label: NliLabel::Contradiction,
840 scores: NliScores {
841 entailment: 0.05,
842 neutral: 0.1,
843 contradiction: 0.85,
844 },
845 available: true,
846 }
847 }
848
849 fn memory_key() -> MemoryKey {
852 MemoryKey::from_raw("origa").unwrap()
853 }
854 fn sid(n: u8) -> SessionId {
855 SessionId::from_raw(&format!("sess_{:012x}", n as u64)).unwrap()
856 }
857 fn ts() -> Timestamp {
858 Timestamp::from_unix_secs(1_700_000_000).unwrap()
859 }
860
861 fn pending(content: &str, embedding: Vec<f32>) -> Fact {
863 Fact::new_pending(
864 content,
865 memory_key(),
866 sid(1),
867 Embedding::new(embedding).unwrap(),
868 ts(),
869 ConfidenceConfig::default().base,
870 )
871 .unwrap()
872 }
873
874 fn accepted(content: &str, embedding: Vec<f32>) -> Fact {
877 let mut f = Fact::new_pending(
878 content,
879 memory_key(),
880 sid(2),
881 Embedding::new(embedding).unwrap(),
882 ts(),
883 ConfidenceConfig::default().base,
884 )
885 .unwrap();
886 f.set_status_and_confidence(
887 FactStatus::Accepted,
888 smos_domain::Confidence::new(0.9).unwrap(),
889 &ConfidenceConfig::default(),
890 )
891 .unwrap();
892 f
893 }
894
895 fn session_with_pending(owned: Vec<FactId>) -> SessionState {
897 let mut state = SessionState::new(sid(1), memory_key(), ts());
898 state.add_pending(&owned);
899 state
900 }
901
902 struct Fix {
906 confidence_cfg: ConfidenceConfig,
907 nli_cfg: NliConfig,
908 merge_cfg: MergeConfig,
909 }
910 impl Fix {
911 fn new() -> Self {
912 Self {
913 confidence_cfg: ConfidenceConfig::default(),
914 nli_cfg: NliConfig::default(),
915 merge_cfg: MergeConfig::default(),
916 }
917 }
918 }
919
920 fn build<'a>(
921 facts: &'a InMemoryFacts,
922 sessions: &'a InMemorySessions,
923 classifier: &'a ScriptedNliClassifier,
924 fix: &'a Fix,
925 ) -> FinalizeSession<'a, InMemoryFacts, InMemorySessions, ScriptedNliClassifier> {
926 FinalizeSession {
927 facts,
928 sessions,
929 classifier,
930 confidence_cfg: &fix.confidence_cfg,
931 nli_cfg: &fix.nli_cfg,
932 merge_cfg: &fix.merge_cfg,
933 }
934 }
935
936 #[tokio::test]
941 async fn execute_no_session_returns_empty_stats() {
942 let facts = InMemoryFacts::default();
943 let sessions = InMemorySessions::default();
944 let classifier = ScriptedNliClassifier::new(vec![]);
945 let fix = Fix::new();
946 let uc = build(&facts, &sessions, &classifier, &fix);
947
948 let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
949 assert_eq!(stats.processed, 0);
950 assert_eq!(stats.finalized, 0);
951 assert!(classifier.calls().is_empty(), "no NLI call without pending");
952 }
953
954 #[tokio::test]
961 async fn execute_processes_pending_facts_even_when_session_state_is_absent() {
962 let facts = InMemoryFacts::default();
963 let sessions = InMemorySessions::default();
964 let fact = pending("user prefers rust over go", vec![1.0, 0.0, 0.0]);
969 let fact_id = fact.id().clone();
970 facts.seed(fact);
971
972 let classifier = ScriptedNliClassifier::new(vec![]);
973 let fix = Fix::new();
974 let uc = build(&facts, &sessions, &classifier, &fix);
975
976 let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
977 assert_eq!(
978 stats.processed, 1,
979 "missing SessionState must not mask the fact"
980 );
981 assert_eq!(stats.finalized, 1);
982 let finalized = facts.get_clone(&fact_id).expect("fact still present");
983 assert_eq!(finalized.status(), FactStatus::Pending);
984 }
985
986 #[tokio::test]
990 async fn execute_skips_pending_fact_owned_by_a_different_session() {
991 let facts = InMemoryFacts::default();
992 let sessions = InMemorySessions::default();
993 let fact = pending("user prefers rust over go", vec![1.0, 0.0, 0.0]);
996 let fact_id = fact.id().clone();
997 facts.seed(fact);
998
999 let classifier = ScriptedNliClassifier::new(vec![]);
1000 let fix = Fix::new();
1001 let uc = build(&facts, &sessions, &classifier, &fix);
1002
1003 let stats = uc.execute(&sid(2), &memory_key()).await.unwrap();
1004 assert_eq!(stats.processed, 0);
1005 let untouched = facts.get_clone(&fact_id).expect("fact still present");
1007 assert_eq!(untouched.status(), FactStatus::Pending);
1008 }
1009
1010 #[tokio::test]
1011 async fn execute_empty_session_returns_empty_stats() {
1012 let facts = InMemoryFacts::default();
1013 let sessions = InMemorySessions::default();
1014 sessions.seed(SessionState::new(sid(1), memory_key(), ts()));
1015 let classifier = ScriptedNliClassifier::new(vec![]);
1016 let fix = Fix::new();
1017 let uc = build(&facts, &sessions, &classifier, &fix);
1018
1019 let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
1020 assert_eq!(stats.processed, 0);
1021 }
1022
1023 #[tokio::test]
1024 async fn execute_standalone_promotes_pending_fact_with_no_candidate() {
1025 let facts = InMemoryFacts::default();
1026 let sessions = InMemorySessions::default();
1027 let fact = pending("user prefers rust over go", vec![1.0, 0.0, 0.0]);
1030 let fact_id = fact.id().clone();
1031 facts.seed(fact);
1032 sessions.seed(session_with_pending(vec![fact_id.clone()]));
1033
1034 let classifier = ScriptedNliClassifier::new(vec![]);
1035 let fix = Fix::new();
1036 let uc = build(&facts, &sessions, &classifier, &fix);
1037
1038 let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
1039 assert_eq!(stats.processed, 1);
1040 assert_eq!(stats.finalized, 1);
1041 assert_eq!(stats.merged, 0);
1042 assert_eq!(stats.conflicts, 0);
1043 let finalized = facts.get_clone(&fact_id).expect("fact still present");
1045 assert_eq!(finalized.status(), FactStatus::Pending);
1046 assert!(
1047 classifier.calls().is_empty(),
1048 "no NLI call without candidate"
1049 );
1050 assert!(
1051 sessions.pending_of(&sid(1)).is_empty(),
1052 "owned pending cleared"
1053 );
1054 }
1055
1056 #[tokio::test]
1057 async fn execute_entailment_merges_pending_into_existing() {
1058 let facts = InMemoryFacts::default();
1059 let sessions = InMemorySessions::default();
1060 let existing = accepted("ttl=10 prevents refresh loop", vec![1.0, 0.0, 0.0]);
1061 let existing_id = existing.id().clone();
1062 facts.seed(existing);
1063 let pending_fact = pending("ttl=10 stops the refresh loop", vec![1.0, 0.0, 0.0]);
1065 let pending_id = pending_fact.id().clone();
1066 facts.seed(pending_fact.clone());
1067 sessions.seed(session_with_pending(vec![pending_id.clone()]));
1068
1069 let classifier = ScriptedNliClassifier::new(vec![Ok(entailment_available())]);
1070 let fix = Fix::new();
1071 let uc = build(&facts, &sessions, &classifier, &fix);
1072
1073 let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
1074 assert_eq!(stats.processed, 1);
1075 assert_eq!(stats.merged, 1);
1076 assert_eq!(stats.rejected, 1);
1077 assert_eq!(stats.finalized, 0);
1078
1079 let merged = facts.get_clone(&existing_id).expect("existing present");
1082 assert!(merged.source_sessions().distinct_count() >= 2);
1083 let twin = facts.get_clone(&pending_id).expect("pending present");
1085 assert_eq!(twin.status(), FactStatus::Rejected);
1086 assert!(
1087 sessions.pending_of(&sid(1)).is_empty(),
1088 "owned pending cleared"
1089 );
1090 }
1091
1092 #[tokio::test]
1093 async fn execute_contradiction_flags_bidirectional_conflict() {
1094 let facts = InMemoryFacts::default();
1095 let sessions = InMemorySessions::default();
1096 let existing = accepted("ttl=60 seconds", vec![1.0, 0.0, 0.0]);
1097 let existing_id = existing.id().clone();
1098 facts.seed(existing);
1099 let pending_fact = pending("ttl=10 seconds", vec![1.0, 0.0, 0.0]);
1100 let pending_id = pending_fact.id().clone();
1101 facts.seed(pending_fact.clone());
1102 sessions.seed(session_with_pending(vec![pending_id.clone()]));
1103
1104 let classifier = ScriptedNliClassifier::new(vec![Ok(contradiction_available())]);
1105 let fix = Fix::new();
1106 let uc = build(&facts, &sessions, &classifier, &fix);
1107
1108 let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
1109 assert_eq!(stats.processed, 1);
1110 assert_eq!(stats.conflicts, 1);
1111 assert_eq!(stats.merged, 0);
1112 assert_eq!(stats.finalized, 0);
1113
1114 let existing_after = facts.get_clone(&existing_id).expect("existing present");
1116 let pending_after = facts.get_clone(&pending_id).expect("pending present");
1117 assert!(existing_after.conflicts_with().contains(&pending_id));
1118 assert!(pending_after.conflicts_with().contains(&existing_id));
1119 assert_eq!(existing_after.status(), FactStatus::Accepted);
1121 assert_eq!(pending_after.status(), FactStatus::Pending);
1122 assert!(existing_after.valid_until().is_none());
1124 assert!(pending_after.valid_until().is_none());
1125 }
1126
1127 #[tokio::test]
1132 async fn drift_priority_walk_contradiction_beats_earlier_neutral() {
1133 let facts = InMemoryFacts::default();
1134 let sessions = InMemorySessions::default();
1135 let closer = accepted("rust is memory safe", vec![1.0, 0.0, 0.0]);
1140 let closer_id = closer.id().clone();
1141 let farther = accepted("rust leaks memory everywhere", vec![0.9, 0.1, 0.0]);
1142 let farther_id = farther.id().clone();
1143 facts.seed(closer);
1144 facts.seed(farther);
1145 let pending_fact = pending("rust is memory safe language", vec![1.0, 0.0, 0.0]);
1146 let pending_id = pending_fact.id().clone();
1147 facts.seed(pending_fact.clone());
1148 sessions.seed(session_with_pending(vec![pending_id.clone()]));
1149
1150 let classifier = ScriptedNliClassifier::new(vec![
1153 Ok(neutral_available()),
1154 Ok(contradiction_available()),
1155 ]);
1156 let fix = Fix::new();
1157 let uc = build(&facts, &sessions, &classifier, &fix);
1158
1159 let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
1160 assert_eq!(
1161 stats.conflicts, 1,
1162 "drift must win over the earlier neutral"
1163 );
1164 assert_eq!(stats.merged, 0, "no merge despite the neutral candidate");
1165
1166 let pending_after = facts.get_clone(&pending_id).expect("pending present");
1169 assert!(
1170 pending_after.conflicts_with().contains(&farther_id),
1171 "drift flag points to the contradicting candidate"
1172 );
1173 assert!(
1174 !pending_after.conflicts_with().contains(&closer_id),
1175 "no spurious drift flag on the neutral candidate"
1176 );
1177 }
1178
1179 #[tokio::test]
1180 async fn drift_priority_walk_keeps_merge_pick_but_still_scans_for_contradiction() {
1181 let facts = InMemoryFacts::default();
1184 let sessions = InMemorySessions::default();
1185 let entailed = accepted("the api runs on port 8080", vec![1.0, 0.0, 0.0]);
1186 let entailed_id = entailed.id().clone();
1187 let drift = accepted("the api runs on port 9090", vec![0.95, 0.05, 0.0]);
1188 let drift_id = drift.id().clone();
1189 facts.seed(entailed);
1190 facts.seed(drift);
1191 let pending_fact = pending("the api runs on port 8080 today", vec![1.0, 0.0, 0.0]);
1192 let pending_id = pending_fact.id().clone();
1193 facts.seed(pending_fact.clone());
1194 sessions.seed(session_with_pending(vec![pending_id.clone()]));
1195
1196 let classifier = ScriptedNliClassifier::new(vec![
1197 Ok(entailment_available()),
1198 Ok(contradiction_available()),
1199 ]);
1200 let fix = Fix::new();
1201 let uc = build(&facts, &sessions, &classifier, &fix);
1202
1203 let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
1204 assert_eq!(stats.conflicts, 1);
1205 assert_eq!(stats.merged, 0);
1206 let entailed_after = facts.get_clone(&entailed_id).expect("entailed present");
1208 assert_eq!(
1209 entailed_after.source_sessions().distinct_count(),
1210 1,
1211 "merge not committed for the entailed candidate"
1212 );
1213 let drift_after = facts.get_clone(&drift_id).expect("drift present");
1215 assert!(drift_after.conflicts_with().contains(&pending_id));
1216 }
1217
1218 #[tokio::test]
1223 async fn c3_guard_skips_nli_for_already_flagged_conflict_pair() {
1224 let facts = InMemoryFacts::default();
1225 let sessions = InMemorySessions::default();
1226 let mut existing = accepted("ttl=60 seconds", vec![1.0, 0.0, 0.0]);
1227 let mut pending_fact = pending("ttl=10 seconds", vec![1.0, 0.0, 0.0]);
1228 existing.flag_conflict(pending_fact.id().clone()).unwrap();
1230 pending_fact.flag_conflict(existing.id().clone()).unwrap();
1231 let existing_id = existing.id().clone();
1232 let pending_id = pending_fact.id().clone();
1233 facts.seed(existing);
1234 facts.seed(pending_fact.clone());
1235 sessions.seed(session_with_pending(vec![pending_id.clone()]));
1236
1237 let classifier = ScriptedNliClassifier::new(vec![Ok(contradiction_available())]);
1240 let fix = Fix::new();
1241 let uc = build(&facts, &sessions, &classifier, &fix);
1242
1243 let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
1244 assert_eq!(stats.processed, 1);
1245 assert_eq!(stats.finalized, 1);
1247 assert_eq!(stats.conflicts, 0);
1248 assert!(
1249 classifier.calls().is_empty(),
1250 "C3 guard must skip every sidecar call"
1251 );
1252 let existing_after = facts.get_clone(&existing_id).expect("existing present");
1254 assert_eq!(existing_after.conflicts_with().len(), 1);
1255 assert!(existing_after.conflicts_with().contains(&pending_id));
1256 let pending_after = facts.get_clone(&pending_id).expect("pending present");
1261 assert_eq!(pending_after.conflicts_with().len(), 1);
1262 assert!(
1263 pending_after.conflicts_with().contains(&existing_id),
1264 "pending twin must retain its pre-existing conflict flag"
1265 );
1266 }
1267
1268 #[tokio::test]
1281 async fn multi_contradiction_returns_after_first_drift() {
1282 let facts = InMemoryFacts::default();
1283 let sessions = InMemorySessions::default();
1284 let existing_a = accepted("ttl=60 seconds", vec![1.0, 0.0, 0.0]);
1285 let existing_b = accepted("ttl=30 seconds", vec![0.95, 0.05, 0.0]);
1286 let a_id = existing_a.id().clone();
1287 let b_id = existing_b.id().clone();
1288 facts.seed(existing_a);
1289 facts.seed(existing_b);
1290 let pending_fact = pending("ttl=10 seconds", vec![1.0, 0.0, 0.0]);
1291 let pending_id = pending_fact.id().clone();
1292 facts.seed(pending_fact.clone());
1293 sessions.seed(session_with_pending(vec![pending_id.clone()]));
1294
1295 let classifier = ScriptedNliClassifier::new(vec![Ok(contradiction_available())]);
1299 let fix = Fix::new();
1300 let uc = build(&facts, &sessions, &classifier, &fix);
1301
1302 let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
1303 assert_eq!(stats.conflicts, 1);
1304 assert_eq!(stats.processed, 1);
1305 assert_eq!(
1306 classifier.calls().len(),
1307 1,
1308 "first contradiction must short-circuit; second candidate not visited"
1309 );
1310
1311 let pending_after = facts.get_clone(&pending_id).expect("pending present");
1315 assert_eq!(
1316 pending_after.conflicts_with().len(),
1317 1,
1318 "exactly one drift flag on the pending twin"
1319 );
1320 let flagged = pending_after
1322 .conflicts_with()
1323 .iter()
1324 .next()
1325 .expect("flag set");
1326 assert!(*flagged == a_id || *flagged == b_id);
1327 }
1328
1329 #[tokio::test]
1334 async fn exact_match_skips_sidecar_and_merges_identical_pair() {
1335 let facts = InMemoryFacts::default();
1336 let sessions = InMemorySessions::default();
1337 let existing = accepted("identical fact content", vec![1.0, 0.0, 0.0]);
1338 let existing_id = existing.id().clone();
1339 facts.seed(existing);
1340 let pending_fact = pending("IDENTICAL FACT CONTENT", vec![1.0, 0.0, 0.0]);
1347 let pending_id = pending_fact.id().clone();
1348 facts.seed(pending_fact.clone());
1349 sessions.seed(session_with_pending(vec![pending_id.clone()]));
1350
1351 let classifier = ScriptedNliClassifier::new(vec![Ok(contradiction_available())]);
1352 let fix = Fix::new();
1353 let uc = build(&facts, &sessions, &classifier, &fix);
1354
1355 let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
1356 assert_eq!(stats.merged, 1);
1359 assert_eq!(stats.conflicts, 0);
1360 assert!(
1361 classifier.calls().is_empty(),
1362 "exact-match must short-circuit before any sidecar call"
1363 );
1364 let merged = facts.get_clone(&existing_id).expect("existing present");
1365 assert!(merged.source_sessions().distinct_count() >= 2);
1366 }
1367
1368 #[tokio::test]
1373 async fn sidecar_unavailable_keeps_pending_fact_gracefully() {
1374 let facts = InMemoryFacts::default();
1375 let sessions = InMemorySessions::default();
1376 let existing = accepted("rust is memory safe", vec![1.0, 0.0, 0.0]);
1377 facts.seed(existing);
1378 let pending_fact = pending("rust guarantees memory safety", vec![1.0, 0.0, 0.0]);
1379 let pending_id = pending_fact.id().clone();
1380 facts.seed(pending_fact.clone());
1381 sessions.seed(session_with_pending(vec![pending_id.clone()]));
1382
1383 let classifier = ScriptedNliClassifier::new(vec![Err(ProviderError::Unavailable(
1385 "sidecar crashed".into(),
1386 ))]);
1387 let fix = Fix::new();
1388 let uc = build(&facts, &sessions, &classifier, &fix);
1389
1390 let stats = uc
1391 .execute(&sid(1), &memory_key())
1392 .await
1393 .expect("graceful Ok");
1394 assert_eq!(stats.finalized, 0);
1396 assert_eq!(stats.merged, 0);
1397 assert_eq!(stats.conflicts, 0);
1398 let pending_after = facts.get_clone(&pending_id).expect("pending present");
1400 assert_eq!(pending_after.status(), FactStatus::Pending);
1401 assert!(pending_after.conflicts_with().is_empty());
1402 }
1403
1404 #[tokio::test]
1405 async fn sidecar_replies_available_false_keeps_pending_fact_gracefully() {
1406 let facts = InMemoryFacts::default();
1414 let sessions = InMemorySessions::default();
1415 let existing = accepted("rust is memory safe", vec![1.0, 0.0, 0.0]);
1416 facts.seed(existing);
1417 let pending_fact = pending("rust guarantees memory safety", vec![1.0, 0.0, 0.0]);
1418 let pending_id = pending_fact.id().clone();
1419 facts.seed(pending_fact.clone());
1420 sessions.seed(session_with_pending(vec![pending_id.clone()]));
1421
1422 let unavailable_verdict = NliResult {
1426 label: NliLabel::Neutral,
1427 scores: NliScores {
1428 entailment: 0.0,
1429 neutral: 1.0,
1430 contradiction: 0.0,
1431 },
1432 available: false,
1433 };
1434 let classifier = ScriptedNliClassifier::new(vec![Ok(unavailable_verdict)]);
1435 let fix = Fix::new();
1436 let uc = build(&facts, &sessions, &classifier, &fix);
1437
1438 let stats = uc
1439 .execute(&sid(1), &memory_key())
1440 .await
1441 .expect("graceful Ok");
1442 assert_eq!(stats.finalized, 0, "available=false must NOT promote");
1443 assert_eq!(stats.merged, 0);
1444 assert_eq!(stats.conflicts, 0);
1445 let pending_after = facts.get_clone(&pending_id).expect("pending present");
1446 assert_eq!(pending_after.status(), FactStatus::Pending);
1447 assert!(
1448 pending_after.conflicts_with().is_empty(),
1449 "no drift flag without a real verdict"
1450 );
1451 }
1452
1453 #[tokio::test]
1454 async fn batch_continues_after_single_pair_failure() {
1455 let facts = InMemoryFacts::default();
1456 let sessions = InMemorySessions::default();
1457 let existing = accepted("shared anchor fact here", vec![1.0, 0.0, 0.0]);
1461 facts.seed(existing);
1462 let p1 = pending("shared anchor fact here too", vec![1.0, 0.0, 0.0]);
1464 let p2 = pending("shared anchor fact but longer", vec![1.0, 0.0, 0.0]);
1466 let p3 = pending("totally unrelated pending fact", vec![0.0, 1.0, 0.0]);
1468 let p1_id = p1.id().clone();
1469 let p3_id = p3.id().clone();
1470 facts.seed(p1.clone());
1471 facts.seed(p2.clone());
1472 facts.seed(p3.clone());
1473 sessions.seed(session_with_pending(vec![
1474 p1.id().clone(),
1475 p2.id().clone(),
1476 p3.id().clone(),
1477 ]));
1478
1479 let classifier = ScriptedNliClassifier::matching(|_premise, hypothesis| match hypothesis {
1483 "shared anchor fact here too" => Err(ProviderError::Unavailable("transient".into())),
1484 "shared anchor fact but longer" => Ok(entailment_available()),
1485 other => Err(ProviderError::InvalidResponse(format!(
1486 "unexpected hypothesis: {other}"
1487 ))),
1488 });
1489 let fix = Fix::new();
1490 let uc = build(&facts, &sessions, &classifier, &fix);
1491
1492 let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
1493 assert_eq!(stats.processed, 3);
1496 assert_eq!(stats.merged, 1);
1497 assert_eq!(stats.finalized, 1);
1498 let p1_after = facts.get_clone(&p1_id).expect("p1 present");
1499 assert_eq!(p1_after.status(), FactStatus::Pending, "p1 stayed pending");
1500 let p3_after = facts.get_clone(&p3_id).expect("p3 present");
1501 assert_eq!(p3_after.status(), FactStatus::Pending);
1503 }
1504
1505 #[tokio::test]
1510 async fn finalize_clears_owned_pending_ids_after_drain() {
1511 let facts = InMemoryFacts::default();
1512 let sessions = InMemorySessions::default();
1513 let p1 = pending("first standalone pending fact", vec![1.0, 0.0, 0.0]);
1516 let p2 = pending("second standalone pending fact", vec![0.0, 1.0, 0.0]);
1517 let p1_id = p1.id().clone();
1518 let p2_id = p2.id().clone();
1519 facts.seed(p1);
1520 facts.seed(p2);
1521 sessions.seed(session_with_pending(vec![p1_id, p2_id]));
1522
1523 let classifier = ScriptedNliClassifier::new(vec![]);
1524 let fix = Fix::new();
1525 let uc = build(&facts, &sessions, &classifier, &fix);
1526
1527 let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
1528 assert_eq!(stats.processed, 2);
1529 assert!(
1530 sessions.pending_of(&sid(1)).is_empty(),
1531 "owned pending ids cleared after finalize"
1532 );
1533 }
1534
1535 #[tokio::test]
1540 async fn stats_default_is_zeroed() {
1541 let stats = FinalizeStats::default();
1542 assert_eq!(stats.processed, 0);
1543 assert_eq!(stats.finalized, 0);
1544 assert_eq!(stats.merged, 0);
1545 assert_eq!(stats.conflicts, 0);
1546 assert_eq!(stats.rejected, 0);
1547 assert!(stats.session_id.is_empty());
1548 }
1549
1550 #[tokio::test]
1551 async fn stats_session_id_echoed_in_output() {
1552 let facts = InMemoryFacts::default();
1553 let sessions = InMemorySessions::default();
1554 sessions.seed(SessionState::new(sid(7), memory_key(), ts()));
1555 let classifier = ScriptedNliClassifier::new(vec![]);
1556 let fix = Fix::new();
1557 let uc = build(&facts, &sessions, &classifier, &fix);
1558
1559 let stats = uc.execute(&sid(7), &memory_key()).await.unwrap();
1560 assert_eq!(stats.session_id, sid(7).as_str());
1561 }
1562}