use smos_domain::config::NliConfig;
use smos_domain::config::{ConfidenceConfig, MergeConfig};
use smos_domain::enums::FactStatus;
use smos_domain::{Fact, FactContent, FactId, MemoryKey, NliResult, SessionId};
use crate::errors::{ProviderError, UseCaseError};
use crate::ports::{FactRepository, NliClassifier, SessionRepository};
#[derive(Debug, Clone, Default, PartialEq)]
pub struct FinalizeStats {
pub session_id: String,
pub processed: usize,
pub finalized: usize,
pub merged: usize,
pub conflicts: usize,
pub rejected: usize,
}
pub struct FinalizeSession<'a, FR, SR, NC> {
pub facts: &'a FR,
pub sessions: &'a SR,
pub classifier: &'a NC,
pub confidence_cfg: &'a ConfidenceConfig,
pub nli_cfg: &'a NliConfig,
pub merge_cfg: &'a MergeConfig,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum FactOutcome {
Finalized,
Merged,
Conflict,
Skipped,
}
impl<'a, FR, SR, NC> FinalizeSession<'a, FR, SR, NC>
where
FR: FactRepository,
SR: SessionRepository,
NC: NliClassifier,
{
pub async fn execute(
&self,
session_id: &SessionId,
memory_key: &MemoryKey,
) -> Result<FinalizeStats, UseCaseError> {
let mut stats = FinalizeStats {
session_id: session_id.as_str().to_string(),
..FinalizeStats::default()
};
let all_pending = self.facts.list_pending(memory_key).await?;
let pending: Vec<Fact> = all_pending
.into_iter()
.filter(|f| f.source_sessions().iter().any(|s| s == session_id))
.collect();
if pending.is_empty() {
tracing::info!(
session = %session_id,
memory_key = %memory_key,
"finalize: no pending facts for session"
);
return Ok(stats);
}
let owned_ids: Vec<FactId> = pending.iter().map(|f| f.id().clone()).collect();
let accepted = self.facts.list_accepted(memory_key).await?;
stats.processed = pending.len();
tracing::info!(
session = %session_id,
memory_key = %memory_key,
pending = pending.len(),
accepted = accepted.len(),
"finalizing session"
);
let mut comparison_pool: Vec<Fact> = accepted;
for fact in &pending {
let outcome = self.resolve_one(fact, &mut comparison_pool).await;
self.tally(&mut stats, outcome);
}
if let Err(e) = self
.sessions
.remove_pending_owned(session_id, &owned_ids)
.await
{
tracing::warn!(error = %e, "session cleanup failed (non-fatal)");
}
tracing::info!(
session = %session_id,
processed = stats.processed,
finalized = stats.finalized,
merged = stats.merged,
conflicts = stats.conflicts,
skipped = stats.processed - stats.finalized - stats.merged - stats.conflicts,
"finalize complete"
);
Ok(stats)
}
async fn resolve_one(&self, pending: &Fact, pool: &mut Vec<Fact>) -> FactOutcome {
let candidates = pending.find_merge_candidates(pool, self.merge_cfg);
if candidates.is_empty() {
return self.finalize_standalone(pending, None, pool).await;
}
let mut merge_pick: Option<(Fact, NliResult)> = None;
let mut last_observed_nli: Option<NliResult> = None;
let mut nli_observed = false;
for candidate in &candidates {
let existing = &candidate.fact;
if pending.conflicts_with().contains(existing.id())
|| existing.conflicts_with().contains(pending.id())
{
nli_observed = true;
tracing::debug!(
pending = %pending.id(),
existing = %existing.id(),
"C3 guard: skip NLI for already-flagged conflict pair"
);
continue;
}
let nli = if FactContent::text_equals_normalized(existing.content(), pending.content())
{
nli_observed = true;
NliResult::exact_match_result()
} else {
match self
.classifier
.classify(existing.content(), pending.content())
.await
{
Ok(nli) if nli.available => {
nli_observed = true;
nli
}
Ok(_unavailable) => {
tracing::warn!(
pending = %pending.id(),
existing = %existing.id(),
"NLI replied with available=false; leaving pending (skip pair)"
);
continue;
}
Err(ProviderError::Unavailable(msg)) => {
tracing::warn!(
pending = %pending.id(),
existing = %existing.id(),
error = %msg,
"NLI unavailable; leaving pending (skip pair)"
);
continue;
}
Err(other) => {
tracing::warn!(
pending = %pending.id(),
existing = %existing.id(),
error = %other,
"NLI error (non-fatal, skip pair)"
);
continue;
}
}
};
if nli.is_contradiction(self.nli_cfg) {
return self.apply_conflict_flag(pending, existing, pool).await;
}
if nli.is_entailment(self.nli_cfg) && merge_pick.is_none() {
merge_pick = Some((existing.clone(), nli));
} else {
last_observed_nli = Some(nli);
}
}
if let Some((existing, nli)) = merge_pick {
return self.apply_merge(pending, &existing, &nli, pool).await;
}
if !nli_observed {
tracing::info!(
pending = %pending.id(),
candidates = candidates.len(),
"NLI never observed for any candidate; leaving pending"
);
return FactOutcome::Skipped;
}
self.finalize_standalone(pending, last_observed_nli.as_ref(), pool)
.await
}
async fn apply_conflict_flag(
&self,
pending: &Fact,
existing: &Fact,
pool: &mut Vec<Fact>,
) -> FactOutcome {
let mut existing_mut = existing.clone();
let mut pending_mut = pending.clone();
if let Err(e) = existing_mut.flag_conflict_bidirectional(&mut pending_mut) {
tracing::warn!(
existing = %existing_mut.id(),
pending = %pending_mut.id(),
error = %e,
"flag_conflict_bidirectional failed"
);
}
if let Err(e) = self.facts.save(&existing_mut).await {
tracing::warn!(fact = %existing_mut.id(), error = %e, "save existing after flag failed");
}
if let Err(e) = self.facts.save(&pending_mut).await {
tracing::warn!(fact = %pending_mut.id(), error = %e, "save pending after flag failed");
return FactOutcome::Skipped;
}
pool.push(pending.clone());
FactOutcome::Conflict
}
async fn apply_merge(
&self,
pending: &Fact,
existing: &Fact,
nli: &NliResult,
pool: &mut Vec<Fact>,
) -> FactOutcome {
let mut existing_mut = existing.clone();
if let Err(e) = existing_mut.merge_into(pending) {
tracing::warn!(fact = %existing_mut.id(), error = %e, "merge_into failed");
}
if let Err(e) = existing_mut.reclassify(Some(nli), self.confidence_cfg) {
tracing::warn!(fact = %existing_mut.id(), error = %e, "reclassify(existing) failed");
}
if let Err(e) = self.facts.save(&existing_mut).await {
tracing::warn!(fact = %existing_mut.id(), error = %e, "save merged existing failed");
return FactOutcome::Skipped;
}
let mut pending_mut = pending.clone();
if let Err(e) = pending_mut.set_status_and_confidence(
FactStatus::Rejected,
pending_mut.confidence(),
self.confidence_cfg,
) {
tracing::warn!(fact = %pending_mut.id(), error = %e, "reject pending twin failed");
} else if let Err(e) = self.facts.save(&pending_mut).await {
tracing::warn!(fact = %pending_mut.id(), error = %e, "save rejected pending failed");
}
pool.push(existing_mut);
FactOutcome::Merged
}
async fn finalize_standalone(
&self,
pending: &Fact,
nli: Option<&NliResult>,
pool: &mut Vec<Fact>,
) -> FactOutcome {
let mut fact = pending.clone();
if let Err(e) = fact.reclassify(nli, self.confidence_cfg) {
tracing::warn!(fact = %fact.id(), error = %e, "reclassify(standalone) failed");
}
if let Err(e) = self.facts.save(&fact).await {
tracing::warn!(fact = %fact.id(), error = %e, "save standalone failed");
return FactOutcome::Skipped;
}
pool.push(fact);
FactOutcome::Finalized
}
fn tally(&self, stats: &mut FinalizeStats, outcome: FactOutcome) {
match outcome {
FactOutcome::Finalized => stats.finalized += 1,
FactOutcome::Merged => {
stats.merged += 1;
stats.rejected += 1;
}
FactOutcome::Conflict => stats.conflicts += 1,
FactOutcome::Skipped => {
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
use std::sync::Mutex;
use smos_domain::config::{ConfidenceConfig, MergeConfig, NliConfig};
use smos_domain::enums::NliLabel;
use smos_domain::{
Embedding, FactStatus, MemoryKey, NliScores, SessionId, SessionState, Timestamp,
};
#[derive(Default, Clone)]
struct InMemoryFacts {
store: std::sync::Arc<Mutex<HashMap<String, Fact>>>,
}
impl InMemoryFacts {
fn seed(&self, fact: Fact) {
self.store
.lock()
.unwrap()
.insert(fact.id().as_str().to_string(), fact);
}
fn get_clone(&self, id: &FactId) -> Option<Fact> {
self.store.lock().unwrap().get(id.as_str()).cloned()
}
}
impl FactRepository for InMemoryFacts {
async fn save(&self, fact: &Fact) -> Result<(), crate::errors::RepoError> {
self.store
.lock()
.unwrap()
.insert(fact.id().as_str().to_string(), fact.clone());
Ok(())
}
async fn get(
&self,
id: &FactId,
_mk: &MemoryKey,
) -> Result<Option<Fact>, crate::errors::RepoError> {
Ok(self.get_clone(id))
}
async fn list_accepted(
&self,
_mk: &MemoryKey,
) -> Result<Vec<Fact>, crate::errors::RepoError> {
Ok(self
.store
.lock()
.unwrap()
.values()
.filter(|f| f.status() == FactStatus::Accepted)
.cloned()
.collect())
}
async fn list_pending(
&self,
_mk: &MemoryKey,
) -> Result<Vec<Fact>, crate::errors::RepoError> {
Ok(self
.store
.lock()
.unwrap()
.values()
.filter(|f| f.status() == FactStatus::Pending)
.cloned()
.collect())
}
async fn list_memory_keys_for_session(
&self,
session_id: &SessionId,
) -> Result<Vec<MemoryKey>, crate::errors::RepoError> {
let mut out: Vec<MemoryKey> = Vec::new();
let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
for fact in self.store.lock().unwrap().values() {
if !fact.source_sessions().iter().any(|s| s == session_id) {
continue;
}
let mk_str = fact.memory_key().as_str().to_string();
if seen.insert(mk_str) {
out.push(fact.memory_key().clone());
}
}
Ok(out)
}
async fn list_memory_keys(&self) -> Result<Vec<MemoryKey>, crate::errors::RepoError> {
let mut out: Vec<MemoryKey> = Vec::new();
let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
for fact in self.store.lock().unwrap().values() {
let mk_str = fact.memory_key().as_str().to_string();
if seen.insert(mk_str) {
out.push(fact.memory_key().clone());
}
}
Ok(out)
}
async fn search_similar(
&self,
_e: Vec<f32>,
_mk: &MemoryKey,
_l: usize,
) -> Result<Vec<crate::types::SearchHit>, crate::errors::RepoError> {
Ok(Vec::new())
}
async fn update_heat_batch(
&self,
_ids: &[FactId],
_mk: &MemoryKey,
_h: smos_domain::Heat,
_t: Timestamp,
) -> Result<(), crate::errors::RepoError> {
Ok(())
}
}
#[derive(Default, Clone)]
struct InMemorySessions {
sessions: std::sync::Arc<Mutex<HashMap<String, SessionState>>>,
}
impl InMemorySessions {
fn seed(&self, state: SessionState) {
self.sessions
.lock()
.unwrap()
.insert(state.id().as_str().to_string(), state);
}
fn pending_of(&self, id: &SessionId) -> Vec<FactId> {
self.sessions
.lock()
.unwrap()
.get(id.as_str())
.map(|s| s.pending_facts().to_vec())
.unwrap_or_default()
}
}
impl SessionRepository for InMemorySessions {
async fn get_or_create(
&self,
id: &SessionId,
memory_key: &MemoryKey,
) -> Result<SessionState, crate::errors::RepoError> {
Ok(self
.sessions
.lock()
.unwrap()
.entry(id.as_str().to_string())
.or_insert_with(|| {
SessionState::new(
id.clone(),
memory_key.clone(),
Timestamp::from_unix_secs(0).unwrap(),
)
})
.clone())
}
async fn collect_expired(
&self,
_t: std::time::Duration,
) -> Result<Vec<(SessionId, SessionState)>, crate::errors::RepoError> {
Ok(Vec::new())
}
async fn snapshot_all(
&self,
) -> Result<Vec<(SessionId, SessionState)>, crate::errors::RepoError> {
Ok(self
.sessions
.lock()
.unwrap()
.iter()
.map(|(k, v)| (SessionId::from_raw(k).unwrap(), v.clone()))
.collect())
}
async fn add_pending(
&self,
id: &SessionId,
fact_ids: &[FactId],
) -> Result<(), crate::errors::RepoError> {
if let Some(state) = self.sessions.lock().unwrap().get_mut(id.as_str()) {
state.add_pending(fact_ids);
}
Ok(())
}
async fn remove_pending_owned(
&self,
id: &SessionId,
owned: &[FactId],
) -> Result<(), crate::errors::RepoError> {
if let Some(state) = self.sessions.lock().unwrap().get_mut(id.as_str()) {
state.remove_owned(owned);
}
Ok(())
}
async fn clear_session(&self, id: &SessionId) -> Result<(), crate::errors::RepoError> {
self.sessions.lock().unwrap().remove(id.as_str());
Ok(())
}
async fn dedup_and_mark(
&self,
_id: &SessionId,
_mk: &MemoryKey,
candidates: &[FactId],
) -> Result<Vec<FactId>, crate::errors::RepoError> {
Ok(candidates.to_vec())
}
async fn save(
&self,
id: &SessionId,
state: &SessionState,
) -> Result<(), crate::errors::RepoError> {
self.sessions
.lock()
.unwrap()
.insert(id.as_str().to_string(), state.clone());
Ok(())
}
}
type NliResolver = Box<dyn Fn(&str, &str) -> Result<NliResult, ProviderError> + Send + Sync>;
enum ScriptedNliClassifier {
Fifo {
verdicts: Mutex<Vec<Result<NliResult, ProviderError>>>,
calls: Mutex<Vec<(String, String)>>,
},
Match {
resolver: NliResolver,
calls: Mutex<Vec<(String, String)>>,
},
}
impl ScriptedNliClassifier {
fn new(verdicts: Vec<Result<NliResult, ProviderError>>) -> Self {
Self::Fifo {
verdicts: Mutex::new(verdicts),
calls: Mutex::new(Vec::new()),
}
}
fn matching<F>(resolver: F) -> Self
where
F: Fn(&str, &str) -> Result<NliResult, ProviderError> + Send + Sync + 'static,
{
Self::Match {
resolver: Box::new(resolver),
calls: Mutex::new(Vec::new()),
}
}
fn calls(&self) -> Vec<(String, String)> {
match self {
Self::Fifo { calls, .. } | Self::Match { calls, .. } => {
calls.lock().unwrap().clone()
}
}
}
}
impl NliClassifier for ScriptedNliClassifier {
async fn classify(
&self,
premise: &str,
hypothesis: &str,
) -> Result<NliResult, ProviderError> {
match self {
Self::Fifo { verdicts, calls } => {
calls
.lock()
.unwrap()
.push((premise.to_string(), hypothesis.to_string()));
let mut queue = verdicts.lock().unwrap();
if queue.is_empty() {
Err(ProviderError::Unavailable("scripted queue empty".into()))
} else {
queue.remove(0)
}
}
Self::Match { resolver, calls } => {
calls
.lock()
.unwrap()
.push((premise.to_string(), hypothesis.to_string()));
resolver(premise, hypothesis)
}
}
}
}
fn neutral_available() -> NliResult {
NliResult {
label: NliLabel::Neutral,
scores: NliScores {
entailment: 0.2,
neutral: 0.7,
contradiction: 0.1,
},
available: true,
}
}
fn entailment_available() -> NliResult {
NliResult {
label: NliLabel::Entailment,
scores: NliScores {
entailment: 0.9,
neutral: 0.08,
contradiction: 0.02,
},
available: true,
}
}
fn contradiction_available() -> NliResult {
NliResult {
label: NliLabel::Contradiction,
scores: NliScores {
entailment: 0.05,
neutral: 0.1,
contradiction: 0.85,
},
available: true,
}
}
fn memory_key() -> MemoryKey {
MemoryKey::from_raw("origa").unwrap()
}
fn sid(n: u8) -> SessionId {
SessionId::from_raw(&format!("sess_{:012x}", n as u64)).unwrap()
}
fn ts() -> Timestamp {
Timestamp::from_unix_secs(1_700_000_000).unwrap()
}
fn pending(content: &str, embedding: Vec<f32>) -> Fact {
Fact::new_pending(
content,
memory_key(),
sid(1),
Embedding::new(embedding).unwrap(),
ts(),
ConfidenceConfig::default().base,
)
.unwrap()
}
fn accepted(content: &str, embedding: Vec<f32>) -> Fact {
let mut f = Fact::new_pending(
content,
memory_key(),
sid(2),
Embedding::new(embedding).unwrap(),
ts(),
ConfidenceConfig::default().base,
)
.unwrap();
f.set_status_and_confidence(
FactStatus::Accepted,
smos_domain::Confidence::new(0.9).unwrap(),
&ConfidenceConfig::default(),
)
.unwrap();
f
}
fn session_with_pending(owned: Vec<FactId>) -> SessionState {
let mut state = SessionState::new(sid(1), memory_key(), ts());
state.add_pending(&owned);
state
}
struct Fix {
confidence_cfg: ConfidenceConfig,
nli_cfg: NliConfig,
merge_cfg: MergeConfig,
}
impl Fix {
fn new() -> Self {
Self {
confidence_cfg: ConfidenceConfig::default(),
nli_cfg: NliConfig::default(),
merge_cfg: MergeConfig::default(),
}
}
}
fn build<'a>(
facts: &'a InMemoryFacts,
sessions: &'a InMemorySessions,
classifier: &'a ScriptedNliClassifier,
fix: &'a Fix,
) -> FinalizeSession<'a, InMemoryFacts, InMemorySessions, ScriptedNliClassifier> {
FinalizeSession {
facts,
sessions,
classifier,
confidence_cfg: &fix.confidence_cfg,
nli_cfg: &fix.nli_cfg,
merge_cfg: &fix.merge_cfg,
}
}
#[tokio::test]
async fn execute_no_session_returns_empty_stats() {
let facts = InMemoryFacts::default();
let sessions = InMemorySessions::default();
let classifier = ScriptedNliClassifier::new(vec![]);
let fix = Fix::new();
let uc = build(&facts, &sessions, &classifier, &fix);
let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
assert_eq!(stats.processed, 0);
assert_eq!(stats.finalized, 0);
assert!(classifier.calls().is_empty(), "no NLI call without pending");
}
#[tokio::test]
async fn execute_processes_pending_facts_even_when_session_state_is_absent() {
let facts = InMemoryFacts::default();
let sessions = InMemorySessions::default();
let fact = pending("user prefers rust over go", vec![1.0, 0.0, 0.0]);
let fact_id = fact.id().clone();
facts.seed(fact);
let classifier = ScriptedNliClassifier::new(vec![]);
let fix = Fix::new();
let uc = build(&facts, &sessions, &classifier, &fix);
let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
assert_eq!(
stats.processed, 1,
"missing SessionState must not mask the fact"
);
assert_eq!(stats.finalized, 1);
let finalized = facts.get_clone(&fact_id).expect("fact still present");
assert_eq!(finalized.status(), FactStatus::Pending);
}
#[tokio::test]
async fn execute_skips_pending_fact_owned_by_a_different_session() {
let facts = InMemoryFacts::default();
let sessions = InMemorySessions::default();
let fact = pending("user prefers rust over go", vec![1.0, 0.0, 0.0]);
let fact_id = fact.id().clone();
facts.seed(fact);
let classifier = ScriptedNliClassifier::new(vec![]);
let fix = Fix::new();
let uc = build(&facts, &sessions, &classifier, &fix);
let stats = uc.execute(&sid(2), &memory_key()).await.unwrap();
assert_eq!(stats.processed, 0);
let untouched = facts.get_clone(&fact_id).expect("fact still present");
assert_eq!(untouched.status(), FactStatus::Pending);
}
#[tokio::test]
async fn execute_empty_session_returns_empty_stats() {
let facts = InMemoryFacts::default();
let sessions = InMemorySessions::default();
sessions.seed(SessionState::new(sid(1), memory_key(), ts()));
let classifier = ScriptedNliClassifier::new(vec![]);
let fix = Fix::new();
let uc = build(&facts, &sessions, &classifier, &fix);
let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
assert_eq!(stats.processed, 0);
}
#[tokio::test]
async fn execute_standalone_promotes_pending_fact_with_no_candidate() {
let facts = InMemoryFacts::default();
let sessions = InMemorySessions::default();
let fact = pending("user prefers rust over go", vec![1.0, 0.0, 0.0]);
let fact_id = fact.id().clone();
facts.seed(fact);
sessions.seed(session_with_pending(vec![fact_id.clone()]));
let classifier = ScriptedNliClassifier::new(vec![]);
let fix = Fix::new();
let uc = build(&facts, &sessions, &classifier, &fix);
let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
assert_eq!(stats.processed, 1);
assert_eq!(stats.finalized, 1);
assert_eq!(stats.merged, 0);
assert_eq!(stats.conflicts, 0);
let finalized = facts.get_clone(&fact_id).expect("fact still present");
assert_eq!(finalized.status(), FactStatus::Pending);
assert!(
classifier.calls().is_empty(),
"no NLI call without candidate"
);
assert!(
sessions.pending_of(&sid(1)).is_empty(),
"owned pending cleared"
);
}
#[tokio::test]
async fn execute_entailment_merges_pending_into_existing() {
let facts = InMemoryFacts::default();
let sessions = InMemorySessions::default();
let existing = accepted("ttl=10 prevents refresh loop", vec![1.0, 0.0, 0.0]);
let existing_id = existing.id().clone();
facts.seed(existing);
let pending_fact = pending("ttl=10 stops the refresh loop", vec![1.0, 0.0, 0.0]);
let pending_id = pending_fact.id().clone();
facts.seed(pending_fact.clone());
sessions.seed(session_with_pending(vec![pending_id.clone()]));
let classifier = ScriptedNliClassifier::new(vec![Ok(entailment_available())]);
let fix = Fix::new();
let uc = build(&facts, &sessions, &classifier, &fix);
let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
assert_eq!(stats.processed, 1);
assert_eq!(stats.merged, 1);
assert_eq!(stats.rejected, 1);
assert_eq!(stats.finalized, 0);
let merged = facts.get_clone(&existing_id).expect("existing present");
assert!(merged.source_sessions().distinct_count() >= 2);
let twin = facts.get_clone(&pending_id).expect("pending present");
assert_eq!(twin.status(), FactStatus::Rejected);
assert!(
sessions.pending_of(&sid(1)).is_empty(),
"owned pending cleared"
);
}
#[tokio::test]
async fn execute_contradiction_flags_bidirectional_conflict() {
let facts = InMemoryFacts::default();
let sessions = InMemorySessions::default();
let existing = accepted("ttl=60 seconds", vec![1.0, 0.0, 0.0]);
let existing_id = existing.id().clone();
facts.seed(existing);
let pending_fact = pending("ttl=10 seconds", vec![1.0, 0.0, 0.0]);
let pending_id = pending_fact.id().clone();
facts.seed(pending_fact.clone());
sessions.seed(session_with_pending(vec![pending_id.clone()]));
let classifier = ScriptedNliClassifier::new(vec![Ok(contradiction_available())]);
let fix = Fix::new();
let uc = build(&facts, &sessions, &classifier, &fix);
let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
assert_eq!(stats.processed, 1);
assert_eq!(stats.conflicts, 1);
assert_eq!(stats.merged, 0);
assert_eq!(stats.finalized, 0);
let existing_after = facts.get_clone(&existing_id).expect("existing present");
let pending_after = facts.get_clone(&pending_id).expect("pending present");
assert!(existing_after.conflicts_with().contains(&pending_id));
assert!(pending_after.conflicts_with().contains(&existing_id));
assert_eq!(existing_after.status(), FactStatus::Accepted);
assert_eq!(pending_after.status(), FactStatus::Pending);
assert!(existing_after.valid_until().is_none());
assert!(pending_after.valid_until().is_none());
}
#[tokio::test]
async fn drift_priority_walk_contradiction_beats_earlier_neutral() {
let facts = InMemoryFacts::default();
let sessions = InMemorySessions::default();
let closer = accepted("rust is memory safe", vec![1.0, 0.0, 0.0]);
let closer_id = closer.id().clone();
let farther = accepted("rust leaks memory everywhere", vec![0.9, 0.1, 0.0]);
let farther_id = farther.id().clone();
facts.seed(closer);
facts.seed(farther);
let pending_fact = pending("rust is memory safe language", vec![1.0, 0.0, 0.0]);
let pending_id = pending_fact.id().clone();
facts.seed(pending_fact.clone());
sessions.seed(session_with_pending(vec![pending_id.clone()]));
let classifier = ScriptedNliClassifier::new(vec![
Ok(neutral_available()),
Ok(contradiction_available()),
]);
let fix = Fix::new();
let uc = build(&facts, &sessions, &classifier, &fix);
let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
assert_eq!(
stats.conflicts, 1,
"drift must win over the earlier neutral"
);
assert_eq!(stats.merged, 0, "no merge despite the neutral candidate");
let pending_after = facts.get_clone(&pending_id).expect("pending present");
assert!(
pending_after.conflicts_with().contains(&farther_id),
"drift flag points to the contradicting candidate"
);
assert!(
!pending_after.conflicts_with().contains(&closer_id),
"no spurious drift flag on the neutral candidate"
);
}
#[tokio::test]
async fn drift_priority_walk_keeps_merge_pick_but_still_scans_for_contradiction() {
let facts = InMemoryFacts::default();
let sessions = InMemorySessions::default();
let entailed = accepted("the api runs on port 8080", vec![1.0, 0.0, 0.0]);
let entailed_id = entailed.id().clone();
let drift = accepted("the api runs on port 9090", vec![0.95, 0.05, 0.0]);
let drift_id = drift.id().clone();
facts.seed(entailed);
facts.seed(drift);
let pending_fact = pending("the api runs on port 8080 today", vec![1.0, 0.0, 0.0]);
let pending_id = pending_fact.id().clone();
facts.seed(pending_fact.clone());
sessions.seed(session_with_pending(vec![pending_id.clone()]));
let classifier = ScriptedNliClassifier::new(vec![
Ok(entailment_available()),
Ok(contradiction_available()),
]);
let fix = Fix::new();
let uc = build(&facts, &sessions, &classifier, &fix);
let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
assert_eq!(stats.conflicts, 1);
assert_eq!(stats.merged, 0);
let entailed_after = facts.get_clone(&entailed_id).expect("entailed present");
assert_eq!(
entailed_after.source_sessions().distinct_count(),
1,
"merge not committed for the entailed candidate"
);
let drift_after = facts.get_clone(&drift_id).expect("drift present");
assert!(drift_after.conflicts_with().contains(&pending_id));
}
#[tokio::test]
async fn c3_guard_skips_nli_for_already_flagged_conflict_pair() {
let facts = InMemoryFacts::default();
let sessions = InMemorySessions::default();
let mut existing = accepted("ttl=60 seconds", vec![1.0, 0.0, 0.0]);
let mut pending_fact = pending("ttl=10 seconds", vec![1.0, 0.0, 0.0]);
existing.flag_conflict(pending_fact.id().clone()).unwrap();
pending_fact.flag_conflict(existing.id().clone()).unwrap();
let existing_id = existing.id().clone();
let pending_id = pending_fact.id().clone();
facts.seed(existing);
facts.seed(pending_fact.clone());
sessions.seed(session_with_pending(vec![pending_id.clone()]));
let classifier = ScriptedNliClassifier::new(vec![Ok(contradiction_available())]);
let fix = Fix::new();
let uc = build(&facts, &sessions, &classifier, &fix);
let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
assert_eq!(stats.processed, 1);
assert_eq!(stats.finalized, 1);
assert_eq!(stats.conflicts, 0);
assert!(
classifier.calls().is_empty(),
"C3 guard must skip every sidecar call"
);
let existing_after = facts.get_clone(&existing_id).expect("existing present");
assert_eq!(existing_after.conflicts_with().len(), 1);
assert!(existing_after.conflicts_with().contains(&pending_id));
let pending_after = facts.get_clone(&pending_id).expect("pending present");
assert_eq!(pending_after.conflicts_with().len(), 1);
assert!(
pending_after.conflicts_with().contains(&existing_id),
"pending twin must retain its pre-existing conflict flag"
);
}
#[tokio::test]
async fn multi_contradiction_returns_after_first_drift() {
let facts = InMemoryFacts::default();
let sessions = InMemorySessions::default();
let existing_a = accepted("ttl=60 seconds", vec![1.0, 0.0, 0.0]);
let existing_b = accepted("ttl=30 seconds", vec![0.95, 0.05, 0.0]);
let a_id = existing_a.id().clone();
let b_id = existing_b.id().clone();
facts.seed(existing_a);
facts.seed(existing_b);
let pending_fact = pending("ttl=10 seconds", vec![1.0, 0.0, 0.0]);
let pending_id = pending_fact.id().clone();
facts.seed(pending_fact.clone());
sessions.seed(session_with_pending(vec![pending_id.clone()]));
let classifier = ScriptedNliClassifier::new(vec![Ok(contradiction_available())]);
let fix = Fix::new();
let uc = build(&facts, &sessions, &classifier, &fix);
let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
assert_eq!(stats.conflicts, 1);
assert_eq!(stats.processed, 1);
assert_eq!(
classifier.calls().len(),
1,
"first contradiction must short-circuit; second candidate not visited"
);
let pending_after = facts.get_clone(&pending_id).expect("pending present");
assert_eq!(
pending_after.conflicts_with().len(),
1,
"exactly one drift flag on the pending twin"
);
let flagged = pending_after
.conflicts_with()
.iter()
.next()
.expect("flag set");
assert!(*flagged == a_id || *flagged == b_id);
}
#[tokio::test]
async fn exact_match_skips_sidecar_and_merges_identical_pair() {
let facts = InMemoryFacts::default();
let sessions = InMemorySessions::default();
let existing = accepted("identical fact content", vec![1.0, 0.0, 0.0]);
let existing_id = existing.id().clone();
facts.seed(existing);
let pending_fact = pending("IDENTICAL FACT CONTENT", vec![1.0, 0.0, 0.0]);
let pending_id = pending_fact.id().clone();
facts.seed(pending_fact.clone());
sessions.seed(session_with_pending(vec![pending_id.clone()]));
let classifier = ScriptedNliClassifier::new(vec![Ok(contradiction_available())]);
let fix = Fix::new();
let uc = build(&facts, &sessions, &classifier, &fix);
let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
assert_eq!(stats.merged, 1);
assert_eq!(stats.conflicts, 0);
assert!(
classifier.calls().is_empty(),
"exact-match must short-circuit before any sidecar call"
);
let merged = facts.get_clone(&existing_id).expect("existing present");
assert!(merged.source_sessions().distinct_count() >= 2);
}
#[tokio::test]
async fn sidecar_unavailable_keeps_pending_fact_gracefully() {
let facts = InMemoryFacts::default();
let sessions = InMemorySessions::default();
let existing = accepted("rust is memory safe", vec![1.0, 0.0, 0.0]);
facts.seed(existing);
let pending_fact = pending("rust guarantees memory safety", vec![1.0, 0.0, 0.0]);
let pending_id = pending_fact.id().clone();
facts.seed(pending_fact.clone());
sessions.seed(session_with_pending(vec![pending_id.clone()]));
let classifier = ScriptedNliClassifier::new(vec![Err(ProviderError::Unavailable(
"sidecar crashed".into(),
))]);
let fix = Fix::new();
let uc = build(&facts, &sessions, &classifier, &fix);
let stats = uc
.execute(&sid(1), &memory_key())
.await
.expect("graceful Ok");
assert_eq!(stats.finalized, 0);
assert_eq!(stats.merged, 0);
assert_eq!(stats.conflicts, 0);
let pending_after = facts.get_clone(&pending_id).expect("pending present");
assert_eq!(pending_after.status(), FactStatus::Pending);
assert!(pending_after.conflicts_with().is_empty());
}
#[tokio::test]
async fn sidecar_replies_available_false_keeps_pending_fact_gracefully() {
let facts = InMemoryFacts::default();
let sessions = InMemorySessions::default();
let existing = accepted("rust is memory safe", vec![1.0, 0.0, 0.0]);
facts.seed(existing);
let pending_fact = pending("rust guarantees memory safety", vec![1.0, 0.0, 0.0]);
let pending_id = pending_fact.id().clone();
facts.seed(pending_fact.clone());
sessions.seed(session_with_pending(vec![pending_id.clone()]));
let unavailable_verdict = NliResult {
label: NliLabel::Neutral,
scores: NliScores {
entailment: 0.0,
neutral: 1.0,
contradiction: 0.0,
},
available: false,
};
let classifier = ScriptedNliClassifier::new(vec![Ok(unavailable_verdict)]);
let fix = Fix::new();
let uc = build(&facts, &sessions, &classifier, &fix);
let stats = uc
.execute(&sid(1), &memory_key())
.await
.expect("graceful Ok");
assert_eq!(stats.finalized, 0, "available=false must NOT promote");
assert_eq!(stats.merged, 0);
assert_eq!(stats.conflicts, 0);
let pending_after = facts.get_clone(&pending_id).expect("pending present");
assert_eq!(pending_after.status(), FactStatus::Pending);
assert!(
pending_after.conflicts_with().is_empty(),
"no drift flag without a real verdict"
);
}
#[tokio::test]
async fn batch_continues_after_single_pair_failure() {
let facts = InMemoryFacts::default();
let sessions = InMemorySessions::default();
let existing = accepted("shared anchor fact here", vec![1.0, 0.0, 0.0]);
facts.seed(existing);
let p1 = pending("shared anchor fact here too", vec![1.0, 0.0, 0.0]);
let p2 = pending("shared anchor fact but longer", vec![1.0, 0.0, 0.0]);
let p3 = pending("totally unrelated pending fact", vec![0.0, 1.0, 0.0]);
let p1_id = p1.id().clone();
let p3_id = p3.id().clone();
facts.seed(p1.clone());
facts.seed(p2.clone());
facts.seed(p3.clone());
sessions.seed(session_with_pending(vec![
p1.id().clone(),
p2.id().clone(),
p3.id().clone(),
]));
let classifier = ScriptedNliClassifier::matching(|_premise, hypothesis| match hypothesis {
"shared anchor fact here too" => Err(ProviderError::Unavailable("transient".into())),
"shared anchor fact but longer" => Ok(entailment_available()),
other => Err(ProviderError::InvalidResponse(format!(
"unexpected hypothesis: {other}"
))),
});
let fix = Fix::new();
let uc = build(&facts, &sessions, &classifier, &fix);
let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
assert_eq!(stats.processed, 3);
assert_eq!(stats.merged, 1);
assert_eq!(stats.finalized, 1);
let p1_after = facts.get_clone(&p1_id).expect("p1 present");
assert_eq!(p1_after.status(), FactStatus::Pending, "p1 stayed pending");
let p3_after = facts.get_clone(&p3_id).expect("p3 present");
assert_eq!(p3_after.status(), FactStatus::Pending);
}
#[tokio::test]
async fn finalize_clears_owned_pending_ids_after_drain() {
let facts = InMemoryFacts::default();
let sessions = InMemorySessions::default();
let p1 = pending("first standalone pending fact", vec![1.0, 0.0, 0.0]);
let p2 = pending("second standalone pending fact", vec![0.0, 1.0, 0.0]);
let p1_id = p1.id().clone();
let p2_id = p2.id().clone();
facts.seed(p1);
facts.seed(p2);
sessions.seed(session_with_pending(vec![p1_id, p2_id]));
let classifier = ScriptedNliClassifier::new(vec![]);
let fix = Fix::new();
let uc = build(&facts, &sessions, &classifier, &fix);
let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
assert_eq!(stats.processed, 2);
assert!(
sessions.pending_of(&sid(1)).is_empty(),
"owned pending ids cleared after finalize"
);
}
#[tokio::test]
async fn stats_default_is_zeroed() {
let stats = FinalizeStats::default();
assert_eq!(stats.processed, 0);
assert_eq!(stats.finalized, 0);
assert_eq!(stats.merged, 0);
assert_eq!(stats.conflicts, 0);
assert_eq!(stats.rejected, 0);
assert!(stats.session_id.is_empty());
}
#[tokio::test]
async fn stats_session_id_echoed_in_output() {
let facts = InMemoryFacts::default();
let sessions = InMemorySessions::default();
sessions.seed(SessionState::new(sid(7), memory_key(), ts()));
let classifier = ScriptedNliClassifier::new(vec![]);
let fix = Fix::new();
let uc = build(&facts, &sessions, &classifier, &fix);
let stats = uc.execute(&sid(7), &memory_key()).await.unwrap();
assert_eq!(stats.session_id, sid(7).as_str());
}
}