Skip to main content

smos_application/use_cases/
finalize_session.rs

1//! `FinalizeSession` — session-end batch resolution pipeline (§5, §9).
2//!
3//! Drains a session's pending facts and resolves each one against the currently
4//! accepted pool via NLI: entailment merges into the existing fact, contradiction
5//! flags a bidirectional drift pair, neutral (or no candidate) promotes the
6//! pending fact through the validation gate. Resolution is **drift-priority**:
7//! a contradiction against a less-similar candidate must NOT be masked by a
8//! neutral/entailment hit on the top candidate, so the scan walks every
9//! candidate and only commits a merge after the full pass is contradiction-free.
10//!
11//! # Fail-open contract
12//!
13//! The use case NEVER raises on a per-fact failure (§9 known limitation
14//! "NLI backend unavailable graceful"): any NLI / save / mutation error is
15//! logged and the loop continues. Pending facts that could not be resolved
16//! stay pending for the next session-end cycle. Only the outer pool-load
17//! error surface propagates as `Err` (and even then the use case degrades
18//! to `Ok(stats)` with `processed == 0`).
19//!
20//! # Session ownership — `source_sessions`, not `SessionState.pending_facts`
21//!
22//! Pending ownership is derived from `Fact.source_sessions`: every fact whose
23//! provenance list references `session_id` is in scope. The HTTP extraction
24//! path NEVER persists a `SessionState` row — it only mutates
25//! `fact.source_sessions` at extraction time — so reading
26//! `SessionState.pending_facts()` left real pending facts invisible to
27//! finalize (the operator-facing "24 pending facts but finalize says
28//! nothing to do" bug). `source_sessions` is the only durable provenance
29//! signal that survives the request path; this use case is the sole reader
30//! that drives resolution off it.
31//!
32//! The `memory_key` is supplied by the caller (CLI `--memory-key`, watcher
33//! reading `SessionState.memory_key()`) because `source_sessions` does NOT
34//! pin a namespace — the same `session_id` could in principle appear under
35//! multiple memory_keys (e.g. after a key migration), so the caller picks the
36//! scope. The CLI additionally exposes a discovery fallback
37//! (`FactRepository::list_memory_keys_for_session`) that iterates every key
38//! when the operator does not name one.
39//!
40//! # Session bookkeeping
41//!
42//! `owned_ids` is snapshotted BEFORE the first await so concurrent extraction
43//! appends (which race the drain) survive: only the snapshotted ids are
44//! removed from `pending_facts` after finalize. Fresh pending ids appended by
45//! another flow during finalize are preserved for the next cycle. The
46//! `remove_pending_owned` cleanup is best-effort — a missing `SessionState`
47//! row (the common case on the HTTP path) makes it a no-op; a present row
48//! gets its bookkeeping cleared so the watcher does not re-schedule an idle
49//! session.
50//!
51//! See `smos-poc/smos/session_end.py::process_session_end` for the canonical
52//! Python reference; this implementation mirrors `_resolve_one`,
53//! `_apply_merge`, `_apply_conflict_flag`, and `_finalize_standalone`.
54
55use smos_domain::config::NliConfig;
56use smos_domain::config::{ConfidenceConfig, MergeConfig};
57use smos_domain::enums::FactStatus;
58use smos_domain::{Fact, FactContent, FactId, MemoryKey, NliResult, SessionId};
59
60use crate::errors::{ProviderError, UseCaseError};
61use crate::ports::{FactRepository, NliClassifier, SessionRepository};
62
63/// Aggregate outcome counters for one finalize run.
64///
65/// `FinalizeStats` is the wire shape the watcher (Slice-7) and the CLI
66/// `--finalize` trigger surface to operators, so every field is `pub`. The
67/// `rejected` counter overlaps with `merged` (every merge rejects the pending
68/// twin) — both are reported because operators want to see "how many facts
69/// left the pending pool by each exit".
70#[derive(Debug, Clone, Default, PartialEq)]
71pub struct FinalizeStats {
72    /// Id of the session that was finalized.
73    pub session_id: String,
74    /// Pending facts the use case attempted to resolve.
75    pub processed: usize,
76    /// Standalone facts promoted through the validation gate (may still be
77    /// `Pending` if the validation gate rejected the promotion).
78    pub finalized: usize,
79    /// Pending facts merged into an existing accepted fact (entailment path).
80    pub merged: usize,
81    /// Pending facts whose strongest NLI verdict was a contradiction (drift).
82    /// Both sides of the pair are flagged; status is unchanged.
83    pub conflicts: usize,
84    /// Pending facts marked `Rejected` after being absorbed into another fact.
85    /// Equals `merged` after a clean run, but kept separate so a partial run
86    /// (e.g. save failure between the merge save and the reject save) is
87    /// visible to operators.
88    pub rejected: usize,
89}
90
91/// Borrow-style bundle of every dependency the finalize pipeline needs.
92///
93/// Built inline at the call site (the watcher in Slice-7, or the CLI
94/// `--finalize` trigger), dropped right after [`FinalizeSession::execute`]
95/// returns. References keep allocation to one borrow per call.
96pub struct FinalizeSession<'a, FR, SR, NC> {
97    pub facts: &'a FR,
98    pub sessions: &'a SR,
99    pub classifier: &'a NC,
100    pub confidence_cfg: &'a ConfidenceConfig,
101    pub nli_cfg: &'a NliConfig,
102    pub merge_cfg: &'a MergeConfig,
103}
104
105/// Per-fact resolution outcome. Internal to the use case; surfaced in
106/// [`FinalizeStats`] via the `tally` step.
107#[derive(Debug, Clone, Copy, PartialEq, Eq)]
108enum FactOutcome {
109    /// Pending fact was reclassified standalone (no candidate, or only neutral
110    /// NLI verdicts). Status may be `Accepted` / `Pending` / `Rejected`
111    /// depending on the validation gate.
112    Finalized,
113    /// Pending fact was merged into an existing accepted fact and the twin
114    /// was marked `Rejected`.
115    Merged,
116    /// Pending fact drifted (contradiction) and was bidirectionally flagged
117    /// against an existing fact. Status unchanged on both sides.
118    Conflict,
119    /// Pending fact could not be resolved (NLI unavailable, save failed, …).
120    /// Stays `Pending` for the next cycle. NOT tallied into any counter so
121    /// operators can detect "facts stuck in pending" via `processed - (finalized
122    /// + merged + conflicts)`.
123    Skipped,
124}
125
126impl<'a, FR, SR, NC> FinalizeSession<'a, FR, SR, NC>
127where
128    FR: FactRepository,
129    SR: SessionRepository,
130    NC: NliClassifier,
131{
132    /// Resolve every pending fact owned by `session_id` within `memory_key`.
133    ///
134    /// Ownership is derived from `Fact.source_sessions` (see module docs):
135    /// every pending fact whose provenance list contains `session_id` is in
136    /// scope. Returns `Ok(stats)` even on per-fact failures; the only `Err`
137    /// paths are store catastrophes that prevent reading the pending or
138    /// accepted pools.
139    ///
140    /// `memory_key` scopes the namespace scan. Callers that already know the
141    /// namespace (the watcher reading `SessionState.memory_key()`, the CLI
142    /// with `--memory-key`) pass it directly; the CLI additionally exposes a
143    /// discovery fallback that iterates every key when the operator does not
144    /// name one.
145    pub async fn execute(
146        &self,
147        session_id: &SessionId,
148        memory_key: &MemoryKey,
149    ) -> Result<FinalizeStats, UseCaseError> {
150        let mut stats = FinalizeStats {
151            session_id: session_id.as_str().to_string(),
152            ..FinalizeStats::default()
153        };
154
155        // Step 1 — load the pending pool for this memory_key, then filter to
156        // the facts whose `source_sessions` references `session_id`. The
157        // HTTP extraction path never persists `SessionState`, so this is the
158        // only ownership signal that survives the request path. We do NOT
159        // consult `SessionState.pending_facts()` for ownership: a missing or
160        // empty session row must NOT mask real pending facts (the
161        // operator-facing "nothing to do" bug).
162        let all_pending = self.facts.list_pending(memory_key).await?;
163        let pending: Vec<Fact> = all_pending
164            .into_iter()
165            .filter(|f| f.source_sessions().iter().any(|s| s == session_id))
166            .collect();
167
168        if pending.is_empty() {
169            tracing::info!(
170                session = %session_id,
171                memory_key = %memory_key,
172                "finalize: no pending facts for session"
173            );
174            return Ok(stats);
175        }
176
177        // Step 2 — snapshot owned_ids BEFORE any await on the resolution
178        // walk. Concurrent extraction may save more pending facts carrying
179        // this session in `source_sessions` while we drain; those survive
180        // and are picked up by the next cycle (no leak, no double-resolve).
181        let owned_ids: Vec<FactId> = pending.iter().map(|f| f.id().clone()).collect();
182
183        let accepted = self.facts.list_accepted(memory_key).await?;
184        stats.processed = pending.len();
185        tracing::info!(
186            session = %session_id,
187            memory_key = %memory_key,
188            pending = pending.len(),
189            accepted = accepted.len(),
190            "finalizing session"
191        );
192
193        // Step 3 — drift-priority walk. The comparison pool grows as standalone
194        // facts are promoted (so a later pending fact can merge with one that
195        // was itself pending a moment ago); merges and conflicts consume the
196        // pending twin without growing the pool.
197        let mut comparison_pool: Vec<Fact> = accepted;
198        for fact in &pending {
199            let outcome = self.resolve_one(fact, &mut comparison_pool).await;
200            self.tally(&mut stats, outcome);
201        }
202
203        // Step 4 — bookkeeping cleanup. Only the originally-owned ids are
204        // removed; concurrent additions survive (see step 2 comment). This
205        // is best-effort: a missing `SessionState` (the common case on the
206        // HTTP extraction path) makes the call a no-op; a present row gets
207        // its bookkeeping cleared so the watcher does not re-schedule an
208        // idle session. A failure here is non-fatal — the session just
209        // re-drains on the next finalize, which is idempotent.
210        if let Err(e) = self
211            .sessions
212            .remove_pending_owned(session_id, &owned_ids)
213            .await
214        {
215            tracing::warn!(error = %e, "session cleanup failed (non-fatal)");
216        }
217
218        tracing::info!(
219            session = %session_id,
220            processed = stats.processed,
221            finalized = stats.finalized,
222            merged = stats.merged,
223            conflicts = stats.conflicts,
224            skipped = stats.processed - stats.finalized - stats.merged - stats.conflicts,
225            "finalize complete"
226        );
227
228        Ok(stats)
229    }
230
231    /// Resolve one pending fact against the (growing) comparison pool.
232    ///
233    /// Drift-priority semantics (§9):
234    /// - Exact-match short-circuit returns entailment WITHOUT an NLI call.
235    /// - C3 guard skips pairs already flagged as conflicting (no double-flag).
236    /// - First contradiction wins immediately (flag + return). We do NOT
237    ///   commit an earlier entailment candidate before the contradiction is
238    ///   observed, because drift is a stronger signal than merge.
239    /// - First entailment candidate becomes the merge pick, but the scan
240    ///   continues so a later less-similar candidate can still surface a
241    ///   contradiction.
242    /// - Otherwise the pending fact is finalized standalone, carrying the
243    ///   last observed (non-contradiction, non-entailment-merge) NLI verdict
244    ///   for the `no_contradiction_bonus`.
245    async fn resolve_one(&self, pending: &Fact, pool: &mut Vec<Fact>) -> FactOutcome {
246        let candidates = pending.find_merge_candidates(pool, self.merge_cfg);
247        if candidates.is_empty() {
248            return self.finalize_standalone(pending, None, pool).await;
249        }
250
251        // The first entailment candidate is the merge pick; we keep scanning
252        // so a later contradiction can override it.
253        let mut merge_pick: Option<(Fact, NliResult)> = None;
254        // Last non-merge NLI verdict — feeds the `no_contradiction_bonus` on
255        // the standalone path (POC `last_observed_nli`).
256        let mut last_observed_nli: Option<NliResult> = None;
257        // Did the NLI backend actually return ANY verdict for any candidate?
258        // When the backend is fully unreachable, we cannot tell whether a
259        // contradiction exists — keep the fact pending (graceful degradation,
260        // §9). The flag also flips on an exact-match short-circuit (which is
261        // a real verdict, just resolved locally).
262        let mut nli_observed = false;
263
264        for candidate in &candidates {
265            let existing = &candidate.fact;
266
267            // C3 guard — already-flagged conflict pair. Skip the (expensive)
268            // NLI call entirely; the conflict is already recorded. The pair
269            // still counts as "NLI observed" because the conflict was
270            // resolved by an earlier finalize cycle — without this, a
271            // pending twin of an already-flagged pair would be stuck in
272            // pending forever (every cycle would skip the same pair and
273            // report "NLI never observed").
274            if pending.conflicts_with().contains(existing.id())
275                || existing.conflicts_with().contains(pending.id())
276            {
277                nli_observed = true;
278                tracing::debug!(
279                    pending = %pending.id(),
280                    existing = %existing.id(),
281                    "C3 guard: skip NLI for already-flagged conflict pair"
282                );
283                continue;
284            }
285
286            // Exact-match short-circuit — identical text is entailment by
287            // definition. Avoids DeBERTa's known quirk of returning `neutral`
288            // on identical pairs.
289            let nli = if FactContent::text_equals_normalized(existing.content(), pending.content())
290            {
291                nli_observed = true;
292                NliResult::exact_match_result()
293            } else {
294                match self
295                    .classifier
296                    .classify(existing.content(), pending.content())
297                    .await
298                {
299                    Ok(nli) if nli.available => {
300                        // Real verdict from the NLI backend. An
301                        // `available = false` reply (the backend's own
302                        // graceful-degradation placeholder) is treated as
303                        // Unavailable: skip pair, do NOT bump `nli_observed`
304                        // — otherwise a permanently broken backend would
305                        // silently promote facts without drift detection.
306                        nli_observed = true;
307                        nli
308                    }
309                    Ok(_unavailable) => {
310                        tracing::warn!(
311                            pending = %pending.id(),
312                            existing = %existing.id(),
313                            "NLI replied with available=false; leaving pending (skip pair)"
314                        );
315                        continue;
316                    }
317                    Err(ProviderError::Unavailable(msg)) => {
318                        tracing::warn!(
319                            pending = %pending.id(),
320                            existing = %existing.id(),
321                            error = %msg,
322                            "NLI unavailable; leaving pending (skip pair)"
323                        );
324                        // Graceful: skip this pair, keep scanning. If every
325                        // pair is unavailable the pending fact stays pending.
326                        continue;
327                    }
328                    Err(other) => {
329                        tracing::warn!(
330                            pending = %pending.id(),
331                            existing = %existing.id(),
332                            error = %other,
333                            "NLI error (non-fatal, skip pair)"
334                        );
335                        continue;
336                    }
337                }
338            };
339
340            // Drift wins immediately — flag both sides bidirectionally and
341            // exit. We do NOT commit any earlier merge candidate.
342            if nli.is_contradiction(self.nli_cfg) {
343                return self.apply_conflict_flag(pending, existing, pool).await;
344            }
345
346            if nli.is_entailment(self.nli_cfg) && merge_pick.is_none() {
347                merge_pick = Some((existing.clone(), nli));
348                // Continue scanning: a later less-similar candidate may still
349                // contradict this pending fact (drift-priority walk).
350            } else {
351                last_observed_nli = Some(nli);
352            }
353        }
354
355        if let Some((existing, nli)) = merge_pick {
356            return self.apply_merge(pending, &existing, &nli, pool).await;
357        }
358
359        // The NLI backend never answered for any candidate → keep the fact
360        // pending. We have candidates but no NLI signal; promoting would
361        // silently mask a potential drift.
362        if !nli_observed {
363            tracing::info!(
364                pending = %pending.id(),
365                candidates = candidates.len(),
366                "NLI never observed for any candidate; leaving pending"
367            );
368            return FactOutcome::Skipped;
369        }
370
371        // No merge, no conflict — promote standalone. `last_observed_nli`
372        // (the strongest non-contradiction verdict we observed) feeds the
373        // `no_contradiction_bonus` in the confidence scorer.
374        self.finalize_standalone(pending, last_observed_nli.as_ref(), pool)
375            .await
376    }
377
378    /// Apply a bidirectional drift flag between `pending` and `existing`.
379    /// Status is unchanged on both sides (POC `_apply_conflict_flag`).
380    async fn apply_conflict_flag(
381        &self,
382        pending: &Fact,
383        existing: &Fact,
384        pool: &mut Vec<Fact>,
385    ) -> FactOutcome {
386        let mut existing_mut = existing.clone();
387        let mut pending_mut = pending.clone();
388        // Encapsulate the §5.2 invariant "both facts must carry the conflict
389        // link" in one call. The bidirectional helper short-circuits on the
390        // first failure; in this path `flag_conflict` cannot fail because
391        // `find_merge_candidates` already excluded self-matches.
392        if let Err(e) = existing_mut.flag_conflict_bidirectional(&mut pending_mut) {
393            tracing::warn!(
394                existing = %existing_mut.id(),
395                pending = %pending_mut.id(),
396                error = %e,
397                "flag_conflict_bidirectional failed"
398            );
399        }
400        if let Err(e) = self.facts.save(&existing_mut).await {
401            tracing::warn!(fact = %existing_mut.id(), error = %e, "save existing after flag failed");
402        }
403        if let Err(e) = self.facts.save(&pending_mut).await {
404            tracing::warn!(fact = %pending_mut.id(), error = %e, "save pending after flag failed");
405            // Pending twin failed to persist its flag — leave it pending so
406            // the next finalize re-attempts the same scan (idempotent).
407            return FactOutcome::Skipped;
408        }
409        // The pending twin stays pending (status unchanged). The pool does
410        // NOT grow — a flagged pair should not silently become a merge
411        // candidate for the next pending fact.
412        pool.push(pending.clone());
413        FactOutcome::Conflict
414    }
415
416    /// Merge `pending` into `existing`, then mark the pending twin `Rejected`
417    /// (POC `_apply_merge`). Source sessions and conflict flags are unioned
418    /// into the existing fact, then confidence is recomputed with the
419    /// entailment verdict (which carries the `no_contradiction_bonus`).
420    async fn apply_merge(
421        &self,
422        pending: &Fact,
423        existing: &Fact,
424        nli: &NliResult,
425        pool: &mut Vec<Fact>,
426    ) -> FactOutcome {
427        let mut existing_mut = existing.clone();
428        if let Err(e) = existing_mut.merge_into(pending) {
429            tracing::warn!(fact = %existing_mut.id(), error = %e, "merge_into failed");
430        }
431        if let Err(e) = existing_mut.reclassify(Some(nli), self.confidence_cfg) {
432            tracing::warn!(fact = %existing_mut.id(), error = %e, "reclassify(existing) failed");
433        }
434        if let Err(e) = self.facts.save(&existing_mut).await {
435            tracing::warn!(fact = %existing_mut.id(), error = %e, "save merged existing failed");
436            return FactOutcome::Skipped;
437        }
438
439        // Mark the pending twin Rejected so it stops appearing in pending
440        // listings. The `ConfidenceConfig` is forwarded so the validation
441        // gate's transition guards (`Pending → Rejected` is always allowed)
442        // can run; the confidence value itself is carried over unchanged.
443        let mut pending_mut = pending.clone();
444        if let Err(e) = pending_mut.set_status_and_confidence(
445            FactStatus::Rejected,
446            pending_mut.confidence(),
447            self.confidence_cfg,
448        ) {
449            tracing::warn!(fact = %pending_mut.id(), error = %e, "reject pending twin failed");
450        } else if let Err(e) = self.facts.save(&pending_mut).await {
451            tracing::warn!(fact = %pending_mut.id(), error = %e, "save rejected pending failed");
452        }
453
454        // The (updated) existing fact rejoins the pool so a later pending
455        // fact can merge with the unioned provenance.
456        pool.push(existing_mut);
457        FactOutcome::Merged
458    }
459
460    /// Promote a standalone pending fact through the validation gate.
461    /// `nli` is the strongest non-contradiction verdict observed during the
462    /// scan (or `None` when the scan had no candidate at all) and feeds the
463    /// `no_contradiction_bonus` in the confidence scorer.
464    async fn finalize_standalone(
465        &self,
466        pending: &Fact,
467        nli: Option<&NliResult>,
468        pool: &mut Vec<Fact>,
469    ) -> FactOutcome {
470        let mut fact = pending.clone();
471        if let Err(e) = fact.reclassify(nli, self.confidence_cfg) {
472            tracing::warn!(fact = %fact.id(), error = %e, "reclassify(standalone) failed");
473        }
474        if let Err(e) = self.facts.save(&fact).await {
475            tracing::warn!(fact = %fact.id(), error = %e, "save standalone failed");
476            return FactOutcome::Skipped;
477        }
478        // The promoted fact joins the comparison pool so a later pending
479        // fact can merge with it — even if the validation gate kept it
480        // `Pending` (it is still a candidate for the same-session twin).
481        pool.push(fact);
482        FactOutcome::Finalized
483    }
484
485    /// Fold a per-fact outcome into the running stats.
486    fn tally(&self, stats: &mut FinalizeStats, outcome: FactOutcome) {
487        match outcome {
488            FactOutcome::Finalized => stats.finalized += 1,
489            FactOutcome::Merged => {
490                stats.merged += 1;
491                stats.rejected += 1;
492            }
493            FactOutcome::Conflict => stats.conflicts += 1,
494            FactOutcome::Skipped => {
495                // Skipped facts stay pending — not tallied into any counter.
496                // Detectable via `processed - finalized - merged - conflicts`.
497            }
498        }
499    }
500}
501
502#[cfg(test)]
503mod tests {
504    //! Classicist unit tests for `FinalizeSession`.
505    //!
506    //! The fakes (`InMemoryFacts`, `InMemorySessions`, `ScriptedNliClassifier`)
507    //! are local to this module so the use case can be exercised without
508    //! spinning up SurrealDB or a native NLI backend. E2E coverage against a
509    //! real `SurrealStore` lives in `smos-adapters/tests/e2e_finalize.rs`.
510
511    use super::*;
512    use std::collections::HashMap;
513    use std::sync::Mutex;
514
515    use smos_domain::config::{ConfidenceConfig, MergeConfig, NliConfig};
516    use smos_domain::enums::NliLabel;
517    use smos_domain::{
518        Embedding, FactStatus, MemoryKey, NliScores, SessionId, SessionState, Timestamp,
519    };
520
521    // ---- Fakes (classicist style: in-memory state, scripted verdicts) ----
522
523    #[derive(Default, Clone)]
524    struct InMemoryFacts {
525        store: std::sync::Arc<Mutex<HashMap<String, Fact>>>,
526    }
527    impl InMemoryFacts {
528        fn seed(&self, fact: Fact) {
529            self.store
530                .lock()
531                .unwrap()
532                .insert(fact.id().as_str().to_string(), fact);
533        }
534        fn get_clone(&self, id: &FactId) -> Option<Fact> {
535            self.store.lock().unwrap().get(id.as_str()).cloned()
536        }
537    }
538    impl FactRepository for InMemoryFacts {
539        async fn save(&self, fact: &Fact) -> Result<(), crate::errors::RepoError> {
540            self.store
541                .lock()
542                .unwrap()
543                .insert(fact.id().as_str().to_string(), fact.clone());
544            Ok(())
545        }
546        async fn get(
547            &self,
548            id: &FactId,
549            _mk: &MemoryKey,
550        ) -> Result<Option<Fact>, crate::errors::RepoError> {
551            Ok(self.get_clone(id))
552        }
553        async fn list_accepted(
554            &self,
555            _mk: &MemoryKey,
556        ) -> Result<Vec<Fact>, crate::errors::RepoError> {
557            Ok(self
558                .store
559                .lock()
560                .unwrap()
561                .values()
562                .filter(|f| f.status() == FactStatus::Accepted)
563                .cloned()
564                .collect())
565        }
566        async fn list_pending(
567            &self,
568            _mk: &MemoryKey,
569        ) -> Result<Vec<Fact>, crate::errors::RepoError> {
570            Ok(self
571                .store
572                .lock()
573                .unwrap()
574                .values()
575                .filter(|f| f.status() == FactStatus::Pending)
576                .cloned()
577                .collect())
578        }
579        async fn list_memory_keys_for_session(
580            &self,
581            session_id: &SessionId,
582        ) -> Result<Vec<MemoryKey>, crate::errors::RepoError> {
583            // Mirrors the SurrealStore implementation: scan facts for
584            // `source_sessions` membership, dedupe the memory_keys in Rust
585            // (insertion order preserved so the test fixture is stable).
586            let mut out: Vec<MemoryKey> = Vec::new();
587            let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
588            for fact in self.store.lock().unwrap().values() {
589                if !fact.source_sessions().iter().any(|s| s == session_id) {
590                    continue;
591                }
592                let mk_str = fact.memory_key().as_str().to_string();
593                if seen.insert(mk_str) {
594                    out.push(fact.memory_key().clone());
595                }
596            }
597            Ok(out)
598        }
599        async fn list_memory_keys(&self) -> Result<Vec<MemoryKey>, crate::errors::RepoError> {
600            let mut out: Vec<MemoryKey> = Vec::new();
601            let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
602            for fact in self.store.lock().unwrap().values() {
603                let mk_str = fact.memory_key().as_str().to_string();
604                if seen.insert(mk_str) {
605                    out.push(fact.memory_key().clone());
606                }
607            }
608            Ok(out)
609        }
610        async fn search_similar(
611            &self,
612            _e: Vec<f32>,
613            _mk: &MemoryKey,
614            _l: usize,
615        ) -> Result<Vec<crate::types::SearchHit>, crate::errors::RepoError> {
616            Ok(Vec::new())
617        }
618        async fn update_heat_batch(
619            &self,
620            _ids: &[FactId],
621            _mk: &MemoryKey,
622            _h: smos_domain::Heat,
623            _t: Timestamp,
624        ) -> Result<(), crate::errors::RepoError> {
625            Ok(())
626        }
627    }
628
629    #[derive(Default, Clone)]
630    struct InMemorySessions {
631        sessions: std::sync::Arc<Mutex<HashMap<String, SessionState>>>,
632    }
633    impl InMemorySessions {
634        fn seed(&self, state: SessionState) {
635            self.sessions
636                .lock()
637                .unwrap()
638                .insert(state.id().as_str().to_string(), state);
639        }
640        fn pending_of(&self, id: &SessionId) -> Vec<FactId> {
641            self.sessions
642                .lock()
643                .unwrap()
644                .get(id.as_str())
645                .map(|s| s.pending_facts().to_vec())
646                .unwrap_or_default()
647        }
648    }
649    impl SessionRepository for InMemorySessions {
650        async fn get_or_create(
651            &self,
652            id: &SessionId,
653            memory_key: &MemoryKey,
654        ) -> Result<SessionState, crate::errors::RepoError> {
655            Ok(self
656                .sessions
657                .lock()
658                .unwrap()
659                .entry(id.as_str().to_string())
660                .or_insert_with(|| {
661                    SessionState::new(
662                        id.clone(),
663                        memory_key.clone(),
664                        Timestamp::from_unix_secs(0).unwrap(),
665                    )
666                })
667                .clone())
668        }
669        async fn collect_expired(
670            &self,
671            _t: std::time::Duration,
672        ) -> Result<Vec<(SessionId, SessionState)>, crate::errors::RepoError> {
673            Ok(Vec::new())
674        }
675        async fn snapshot_all(
676            &self,
677        ) -> Result<Vec<(SessionId, SessionState)>, crate::errors::RepoError> {
678            Ok(self
679                .sessions
680                .lock()
681                .unwrap()
682                .iter()
683                .map(|(k, v)| (SessionId::from_raw(k).unwrap(), v.clone()))
684                .collect())
685        }
686        async fn add_pending(
687            &self,
688            id: &SessionId,
689            fact_ids: &[FactId],
690        ) -> Result<(), crate::errors::RepoError> {
691            if let Some(state) = self.sessions.lock().unwrap().get_mut(id.as_str()) {
692                state.add_pending(fact_ids);
693            }
694            Ok(())
695        }
696        async fn remove_pending_owned(
697            &self,
698            id: &SessionId,
699            owned: &[FactId],
700        ) -> Result<(), crate::errors::RepoError> {
701            if let Some(state) = self.sessions.lock().unwrap().get_mut(id.as_str()) {
702                state.remove_owned(owned);
703            }
704            Ok(())
705        }
706        async fn clear_session(&self, id: &SessionId) -> Result<(), crate::errors::RepoError> {
707            self.sessions.lock().unwrap().remove(id.as_str());
708            Ok(())
709        }
710        async fn dedup_and_mark(
711            &self,
712            _id: &SessionId,
713            _mk: &MemoryKey,
714            candidates: &[FactId],
715        ) -> Result<Vec<FactId>, crate::errors::RepoError> {
716            Ok(candidates.to_vec())
717        }
718        async fn save(
719            &self,
720            id: &SessionId,
721            state: &SessionState,
722        ) -> Result<(), crate::errors::RepoError> {
723            self.sessions
724                .lock()
725                .unwrap()
726                .insert(id.as_str().to_string(), state.clone());
727            Ok(())
728        }
729    }
730
731    /// Closure type used by the matcher variant of [`ScriptedNliClassifier`].
732    /// Lifted into a `type` alias so clippy's `type_complexity` lint does not
733    /// fire on the enum variant — the alias is also the right level of
734    /// abstraction to give the contract a name.
735    type NliResolver = Box<dyn Fn(&str, &str) -> Result<NliResult, ProviderError> + Send + Sync>;
736
737    /// Scripted NLI classifier with two modes:
738    /// - **FIFO**: each call pops the next verdict from the queue. Use when
739    ///   the test controls the call order (e.g. exactly one candidate).
740    /// - **Matcher**: each call dispatches to the closure supplied at build
741    ///   time. Use when pending iteration order is not deterministic
742    ///   (`HashMap` order) — the test keys verdicts on the candidate text.
743    ///
744    /// Both modes record every (premise, hypothesis) pair so tests can assert
745    /// on the exact set of pairs the use case asked about.
746    enum ScriptedNliClassifier {
747        Fifo {
748            verdicts: Mutex<Vec<Result<NliResult, ProviderError>>>,
749            calls: Mutex<Vec<(String, String)>>,
750        },
751        Match {
752            resolver: NliResolver,
753            calls: Mutex<Vec<(String, String)>>,
754        },
755    }
756    impl ScriptedNliClassifier {
757        fn new(verdicts: Vec<Result<NliResult, ProviderError>>) -> Self {
758            Self::Fifo {
759                verdicts: Mutex::new(verdicts),
760                calls: Mutex::new(Vec::new()),
761            }
762        }
763        fn matching<F>(resolver: F) -> Self
764        where
765            F: Fn(&str, &str) -> Result<NliResult, ProviderError> + Send + Sync + 'static,
766        {
767            Self::Match {
768                resolver: Box::new(resolver),
769                calls: Mutex::new(Vec::new()),
770            }
771        }
772        fn calls(&self) -> Vec<(String, String)> {
773            match self {
774                Self::Fifo { calls, .. } | Self::Match { calls, .. } => {
775                    calls.lock().unwrap().clone()
776                }
777            }
778        }
779    }
780    impl NliClassifier for ScriptedNliClassifier {
781        async fn classify(
782            &self,
783            premise: &str,
784            hypothesis: &str,
785        ) -> Result<NliResult, ProviderError> {
786            match self {
787                Self::Fifo { verdicts, calls } => {
788                    calls
789                        .lock()
790                        .unwrap()
791                        .push((premise.to_string(), hypothesis.to_string()));
792                    let mut queue = verdicts.lock().unwrap();
793                    if queue.is_empty() {
794                        Err(ProviderError::Unavailable("scripted queue empty".into()))
795                    } else {
796                        queue.remove(0)
797                    }
798                }
799                Self::Match { resolver, calls } => {
800                    calls
801                        .lock()
802                        .unwrap()
803                        .push((premise.to_string(), hypothesis.to_string()));
804                    resolver(premise, hypothesis)
805                }
806            }
807        }
808    }
809
810    /// NLI verdict that always returns `Neutral` (above the no-contradiction
811    /// threshold but below entailment). Used when tests do not care about the
812    /// specific label, only that the NLI backend was reachable.
813    fn neutral_available() -> NliResult {
814        NliResult {
815            label: NliLabel::Neutral,
816            scores: NliScores {
817                entailment: 0.2,
818                neutral: 0.7,
819                contradiction: 0.1,
820            },
821            available: true,
822        }
823    }
824
825    fn entailment_available() -> NliResult {
826        NliResult {
827            label: NliLabel::Entailment,
828            scores: NliScores {
829                entailment: 0.9,
830                neutral: 0.08,
831                contradiction: 0.02,
832            },
833            available: true,
834        }
835    }
836
837    fn contradiction_available() -> NliResult {
838        NliResult {
839            label: NliLabel::Contradiction,
840            scores: NliScores {
841                entailment: 0.05,
842                neutral: 0.1,
843                contradiction: 0.85,
844            },
845            available: true,
846        }
847    }
848
849    // ---- Fixtures ----
850
851    fn memory_key() -> MemoryKey {
852        MemoryKey::from_raw("origa").unwrap()
853    }
854    fn sid(n: u8) -> SessionId {
855        SessionId::from_raw(&format!("sess_{:012x}", n as u64)).unwrap()
856    }
857    fn ts() -> Timestamp {
858        Timestamp::from_unix_secs(1_700_000_000).unwrap()
859    }
860
861    /// Build a pending fact whose content-derived id is deterministic.
862    fn pending(content: &str, embedding: Vec<f32>) -> Fact {
863        Fact::new_pending(
864            content,
865            memory_key(),
866            sid(1),
867            Embedding::new(embedding).unwrap(),
868            ts(),
869            ConfidenceConfig::default().base,
870        )
871        .unwrap()
872    }
873
874    /// Build an accepted fact (single source, base confidence lifted above the
875    /// accept threshold via `set_status_and_confidence`).
876    fn accepted(content: &str, embedding: Vec<f32>) -> Fact {
877        let mut f = Fact::new_pending(
878            content,
879            memory_key(),
880            sid(2),
881            Embedding::new(embedding).unwrap(),
882            ts(),
883            ConfidenceConfig::default().base,
884        )
885        .unwrap();
886        f.set_status_and_confidence(
887            FactStatus::Accepted,
888            smos_domain::Confidence::new(0.9).unwrap(),
889            &ConfidenceConfig::default(),
890        )
891        .unwrap();
892        f
893    }
894
895    /// Build a session state carrying `owned` pending fact ids.
896    fn session_with_pending(owned: Vec<FactId>) -> SessionState {
897        let mut state = SessionState::new(sid(1), memory_key(), ts());
898        state.add_pending(&owned);
899        state
900    }
901
902    /// Shared fixture: confidence / NLI / merge configs owned by the test so
903    /// the returned use case can borrow them for its whole lifetime.
904    /// Mirrors the `Fix` pattern in `extract_facts_from_response`.
905    struct Fix {
906        confidence_cfg: ConfidenceConfig,
907        nli_cfg: NliConfig,
908        merge_cfg: MergeConfig,
909    }
910    impl Fix {
911        fn new() -> Self {
912            Self {
913                confidence_cfg: ConfidenceConfig::default(),
914                nli_cfg: NliConfig::default(),
915                merge_cfg: MergeConfig::default(),
916            }
917        }
918    }
919
920    fn build<'a>(
921        facts: &'a InMemoryFacts,
922        sessions: &'a InMemorySessions,
923        classifier: &'a ScriptedNliClassifier,
924        fix: &'a Fix,
925    ) -> FinalizeSession<'a, InMemoryFacts, InMemorySessions, ScriptedNliClassifier> {
926        FinalizeSession {
927            facts,
928            sessions,
929            classifier,
930            confidence_cfg: &fix.confidence_cfg,
931            nli_cfg: &fix.nli_cfg,
932            merge_cfg: &fix.merge_cfg,
933        }
934    }
935
936    // -----------------------------------------------------------------------
937    // Happy-path tests
938    // -----------------------------------------------------------------------
939
940    #[tokio::test]
941    async fn execute_no_session_returns_empty_stats() {
942        let facts = InMemoryFacts::default();
943        let sessions = InMemorySessions::default();
944        let classifier = ScriptedNliClassifier::new(vec![]);
945        let fix = Fix::new();
946        let uc = build(&facts, &sessions, &classifier, &fix);
947
948        let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
949        assert_eq!(stats.processed, 0);
950        assert_eq!(stats.finalized, 0);
951        assert!(classifier.calls().is_empty(), "no NLI call without pending");
952    }
953
954    /// Regression guard for the operator-facing bug: HTTP extraction persists
955    /// `fact.source_sessions` but NEVER writes a `SessionState` row, so the
956    /// previous implementation (which read `SessionState.pending_facts()`
957    /// for ownership) reported "nothing to do" while 24 pending facts sat in
958    /// the store. The fix derives ownership from `source_sessions` instead,
959    /// so a missing SessionState must NOT mask real pending facts.
960    #[tokio::test]
961    async fn execute_processes_pending_facts_even_when_session_state_is_absent() {
962        let facts = InMemoryFacts::default();
963        let sessions = InMemorySessions::default();
964        // NO `sessions.seed(...)` — the HTTP path leaves SessionState empty.
965        // The pending fact still carries `source_sessions = [sid(1)]`
966        // (the `pending()` fixture sets it via `Fact::new_pending`), which
967        // is the only ownership signal the use case consults after the fix.
968        let fact = pending("user prefers rust over go", vec![1.0, 0.0, 0.0]);
969        let fact_id = fact.id().clone();
970        facts.seed(fact);
971
972        let classifier = ScriptedNliClassifier::new(vec![]);
973        let fix = Fix::new();
974        let uc = build(&facts, &sessions, &classifier, &fix);
975
976        let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
977        assert_eq!(
978            stats.processed, 1,
979            "missing SessionState must not mask the fact"
980        );
981        assert_eq!(stats.finalized, 1);
982        let finalized = facts.get_clone(&fact_id).expect("fact still present");
983        assert_eq!(finalized.status(), FactStatus::Pending);
984    }
985
986    /// A pending fact whose `source_sessions` does NOT contain the target
987    /// session is skipped — finalize is scoped to one session's ownership,
988    /// not to every pending fact in the namespace.
989    #[tokio::test]
990    async fn execute_skips_pending_fact_owned_by_a_different_session() {
991        let facts = InMemoryFacts::default();
992        let sessions = InMemorySessions::default();
993        // `pending()` fixture sets source_sessions = [sid(1)] — finalizing
994        // sid(2) must NOT pick it up.
995        let fact = pending("user prefers rust over go", vec![1.0, 0.0, 0.0]);
996        let fact_id = fact.id().clone();
997        facts.seed(fact);
998
999        let classifier = ScriptedNliClassifier::new(vec![]);
1000        let fix = Fix::new();
1001        let uc = build(&facts, &sessions, &classifier, &fix);
1002
1003        let stats = uc.execute(&sid(2), &memory_key()).await.unwrap();
1004        assert_eq!(stats.processed, 0);
1005        // The fact survives untouched.
1006        let untouched = facts.get_clone(&fact_id).expect("fact still present");
1007        assert_eq!(untouched.status(), FactStatus::Pending);
1008    }
1009
1010    #[tokio::test]
1011    async fn execute_empty_session_returns_empty_stats() {
1012        let facts = InMemoryFacts::default();
1013        let sessions = InMemorySessions::default();
1014        sessions.seed(SessionState::new(sid(1), memory_key(), ts()));
1015        let classifier = ScriptedNliClassifier::new(vec![]);
1016        let fix = Fix::new();
1017        let uc = build(&facts, &sessions, &classifier, &fix);
1018
1019        let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
1020        assert_eq!(stats.processed, 0);
1021    }
1022
1023    #[tokio::test]
1024    async fn execute_standalone_promotes_pending_fact_with_no_candidate() {
1025        let facts = InMemoryFacts::default();
1026        let sessions = InMemorySessions::default();
1027        // Pending fact with a unique embedding → no candidate above the merge
1028        // threshold (no accepted fact exists at all).
1029        let fact = pending("user prefers rust over go", vec![1.0, 0.0, 0.0]);
1030        let fact_id = fact.id().clone();
1031        facts.seed(fact);
1032        sessions.seed(session_with_pending(vec![fact_id.clone()]));
1033
1034        let classifier = ScriptedNliClassifier::new(vec![]);
1035        let fix = Fix::new();
1036        let uc = build(&facts, &sessions, &classifier, &fix);
1037
1038        let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
1039        assert_eq!(stats.processed, 1);
1040        assert_eq!(stats.finalized, 1);
1041        assert_eq!(stats.merged, 0);
1042        assert_eq!(stats.conflicts, 0);
1043        // Single-source, base confidence (0.5) → Pending (validation gate).
1044        let finalized = facts.get_clone(&fact_id).expect("fact still present");
1045        assert_eq!(finalized.status(), FactStatus::Pending);
1046        assert!(
1047            classifier.calls().is_empty(),
1048            "no NLI call without candidate"
1049        );
1050        assert!(
1051            sessions.pending_of(&sid(1)).is_empty(),
1052            "owned pending cleared"
1053        );
1054    }
1055
1056    #[tokio::test]
1057    async fn execute_entailment_merges_pending_into_existing() {
1058        let facts = InMemoryFacts::default();
1059        let sessions = InMemorySessions::default();
1060        let existing = accepted("ttl=10 prevents refresh loop", vec![1.0, 0.0, 0.0]);
1061        let existing_id = existing.id().clone();
1062        facts.seed(existing);
1063        // Pending twin: identical embedding (cosine 1.0 ≥ 0.85 merge threshold).
1064        let pending_fact = pending("ttl=10 stops the refresh loop", vec![1.0, 0.0, 0.0]);
1065        let pending_id = pending_fact.id().clone();
1066        facts.seed(pending_fact.clone());
1067        sessions.seed(session_with_pending(vec![pending_id.clone()]));
1068
1069        let classifier = ScriptedNliClassifier::new(vec![Ok(entailment_available())]);
1070        let fix = Fix::new();
1071        let uc = build(&facts, &sessions, &classifier, &fix);
1072
1073        let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
1074        assert_eq!(stats.processed, 1);
1075        assert_eq!(stats.merged, 1);
1076        assert_eq!(stats.rejected, 1);
1077        assert_eq!(stats.finalized, 0);
1078
1079        // Existing fact grew provenance (union of source sessions) and was
1080        // reclassified with the entailment verdict (no_contradiction_bonus).
1081        let merged = facts.get_clone(&existing_id).expect("existing present");
1082        assert!(merged.source_sessions().distinct_count() >= 2);
1083        // Pending twin was rejected.
1084        let twin = facts.get_clone(&pending_id).expect("pending present");
1085        assert_eq!(twin.status(), FactStatus::Rejected);
1086        assert!(
1087            sessions.pending_of(&sid(1)).is_empty(),
1088            "owned pending cleared"
1089        );
1090    }
1091
1092    #[tokio::test]
1093    async fn execute_contradiction_flags_bidirectional_conflict() {
1094        let facts = InMemoryFacts::default();
1095        let sessions = InMemorySessions::default();
1096        let existing = accepted("ttl=60 seconds", vec![1.0, 0.0, 0.0]);
1097        let existing_id = existing.id().clone();
1098        facts.seed(existing);
1099        let pending_fact = pending("ttl=10 seconds", vec![1.0, 0.0, 0.0]);
1100        let pending_id = pending_fact.id().clone();
1101        facts.seed(pending_fact.clone());
1102        sessions.seed(session_with_pending(vec![pending_id.clone()]));
1103
1104        let classifier = ScriptedNliClassifier::new(vec![Ok(contradiction_available())]);
1105        let fix = Fix::new();
1106        let uc = build(&facts, &sessions, &classifier, &fix);
1107
1108        let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
1109        assert_eq!(stats.processed, 1);
1110        assert_eq!(stats.conflicts, 1);
1111        assert_eq!(stats.merged, 0);
1112        assert_eq!(stats.finalized, 0);
1113
1114        // Both sides carry the bidirectional conflict flag.
1115        let existing_after = facts.get_clone(&existing_id).expect("existing present");
1116        let pending_after = facts.get_clone(&pending_id).expect("pending present");
1117        assert!(existing_after.conflicts_with().contains(&pending_id));
1118        assert!(pending_after.conflicts_with().contains(&existing_id));
1119        // Status UNCHANGED on both sides (Accepted stays Accepted, Pending stays Pending).
1120        assert_eq!(existing_after.status(), FactStatus::Accepted);
1121        assert_eq!(pending_after.status(), FactStatus::Pending);
1122        // No valid_until tombstone on either side (drift is not a death).
1123        assert!(existing_after.valid_until().is_none());
1124        assert!(pending_after.valid_until().is_none());
1125    }
1126
1127    // -----------------------------------------------------------------------
1128    // Drift-priority walk
1129    // -----------------------------------------------------------------------
1130
1131    #[tokio::test]
1132    async fn drift_priority_walk_contradiction_beats_earlier_neutral() {
1133        let facts = InMemoryFacts::default();
1134        let sessions = InMemorySessions::default();
1135        // Two accepted facts, both above the cosine threshold. The closer
1136        // candidate ("similar") would yield `Neutral` — the use case must
1137        // keep scanning so the contradiction against the less-similar
1138        // candidate ("drift") still wins.
1139        let closer = accepted("rust is memory safe", vec![1.0, 0.0, 0.0]);
1140        let closer_id = closer.id().clone();
1141        let farther = accepted("rust leaks memory everywhere", vec![0.9, 0.1, 0.0]);
1142        let farther_id = farther.id().clone();
1143        facts.seed(closer);
1144        facts.seed(farther);
1145        let pending_fact = pending("rust is memory safe language", vec![1.0, 0.0, 0.0]);
1146        let pending_id = pending_fact.id().clone();
1147        facts.seed(pending_fact.clone());
1148        sessions.seed(session_with_pending(vec![pending_id.clone()]));
1149
1150        // find_merge_candidates sorts by cosine descending, so the first NLI
1151        // call hits "closer" (Neutral), the second hits "farther" (Contradiction).
1152        let classifier = ScriptedNliClassifier::new(vec![
1153            Ok(neutral_available()),
1154            Ok(contradiction_available()),
1155        ]);
1156        let fix = Fix::new();
1157        let uc = build(&facts, &sessions, &classifier, &fix);
1158
1159        let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
1160        assert_eq!(
1161            stats.conflicts, 1,
1162            "drift must win over the earlier neutral"
1163        );
1164        assert_eq!(stats.merged, 0, "no merge despite the neutral candidate");
1165
1166        // The contradiction was flagged against "farther" (the contradicting
1167        // candidate), NOT against "closer".
1168        let pending_after = facts.get_clone(&pending_id).expect("pending present");
1169        assert!(
1170            pending_after.conflicts_with().contains(&farther_id),
1171            "drift flag points to the contradicting candidate"
1172        );
1173        assert!(
1174            !pending_after.conflicts_with().contains(&closer_id),
1175            "no spurious drift flag on the neutral candidate"
1176        );
1177    }
1178
1179    #[tokio::test]
1180    async fn drift_priority_walk_keeps_merge_pick_but_still_scans_for_contradiction() {
1181        // Entailment candidate first, contradiction second → contradiction wins,
1182        // the earlier merge pick is NOT committed.
1183        let facts = InMemoryFacts::default();
1184        let sessions = InMemorySessions::default();
1185        let entailed = accepted("the api runs on port 8080", vec![1.0, 0.0, 0.0]);
1186        let entailed_id = entailed.id().clone();
1187        let drift = accepted("the api runs on port 9090", vec![0.95, 0.05, 0.0]);
1188        let drift_id = drift.id().clone();
1189        facts.seed(entailed);
1190        facts.seed(drift);
1191        let pending_fact = pending("the api runs on port 8080 today", vec![1.0, 0.0, 0.0]);
1192        let pending_id = pending_fact.id().clone();
1193        facts.seed(pending_fact.clone());
1194        sessions.seed(session_with_pending(vec![pending_id.clone()]));
1195
1196        let classifier = ScriptedNliClassifier::new(vec![
1197            Ok(entailment_available()),
1198            Ok(contradiction_available()),
1199        ]);
1200        let fix = Fix::new();
1201        let uc = build(&facts, &sessions, &classifier, &fix);
1202
1203        let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
1204        assert_eq!(stats.conflicts, 1);
1205        assert_eq!(stats.merged, 0);
1206        // The entailed candidate was NOT modified (no merge committed).
1207        let entailed_after = facts.get_clone(&entailed_id).expect("entailed present");
1208        assert_eq!(
1209            entailed_after.source_sessions().distinct_count(),
1210            1,
1211            "merge not committed for the entailed candidate"
1212        );
1213        // The drift candidate was flagged.
1214        let drift_after = facts.get_clone(&drift_id).expect("drift present");
1215        assert!(drift_after.conflicts_with().contains(&pending_id));
1216    }
1217
1218    // -----------------------------------------------------------------------
1219    // C3 guard — already-flagged pairs skip the sidecar
1220    // -----------------------------------------------------------------------
1221
1222    #[tokio::test]
1223    async fn c3_guard_skips_nli_for_already_flagged_conflict_pair() {
1224        let facts = InMemoryFacts::default();
1225        let sessions = InMemorySessions::default();
1226        let mut existing = accepted("ttl=60 seconds", vec![1.0, 0.0, 0.0]);
1227        let mut pending_fact = pending("ttl=10 seconds", vec![1.0, 0.0, 0.0]);
1228        // Pre-flag the pair so the C3 guard fires before any sidecar call.
1229        existing.flag_conflict(pending_fact.id().clone()).unwrap();
1230        pending_fact.flag_conflict(existing.id().clone()).unwrap();
1231        let existing_id = existing.id().clone();
1232        let pending_id = pending_fact.id().clone();
1233        facts.seed(existing);
1234        facts.seed(pending_fact.clone());
1235        sessions.seed(session_with_pending(vec![pending_id.clone()]));
1236
1237        // The classifier would have returned contradiction, but the C3 guard
1238        // must short-circuit before any call.
1239        let classifier = ScriptedNliClassifier::new(vec![Ok(contradiction_available())]);
1240        let fix = Fix::new();
1241        let uc = build(&facts, &sessions, &classifier, &fix);
1242
1243        let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
1244        assert_eq!(stats.processed, 1);
1245        // The C3 guard skipped every candidate → standalone promotion.
1246        assert_eq!(stats.finalized, 1);
1247        assert_eq!(stats.conflicts, 0);
1248        assert!(
1249            classifier.calls().is_empty(),
1250            "C3 guard must skip every sidecar call"
1251        );
1252        // Existing flags UNCHANGED (no double-flag).
1253        let existing_after = facts.get_clone(&existing_id).expect("existing present");
1254        assert_eq!(existing_after.conflicts_with().len(), 1);
1255        assert!(existing_after.conflicts_with().contains(&pending_id));
1256        // Pending twin also keeps its pre-flagged conflict link — the C3
1257        // guard leaves both sides untouched, which is the contract that
1258        // keeps a re-finalized session idempotent (no spurious
1259        // double-flag, no leak of the conflict to fresh candidates).
1260        let pending_after = facts.get_clone(&pending_id).expect("pending present");
1261        assert_eq!(pending_after.conflicts_with().len(), 1);
1262        assert!(
1263            pending_after.conflicts_with().contains(&existing_id),
1264            "pending twin must retain its pre-existing conflict flag"
1265        );
1266    }
1267
1268    // -----------------------------------------------------------------------
1269    // Multi-contradiction — pending fact drifts against 2+ existing facts
1270    // -----------------------------------------------------------------------
1271
1272    /// A pending fact that contradicts MULTIPLE accepted facts flags every
1273    /// contradiction it finds. Drift-priority means the FIRST
1274    /// contradiction wins for the *outcome* (the loop returns
1275    /// `Conflict` on the first one), but `resolve_one` continues scanning
1276    /// only until the first contradiction — it does NOT keep flagging
1277    /// after the drift is observed. This test pins that semantics: the
1278    /// second contradicting candidate is NOT visited once the first
1279    /// contradiction has fired.
1280    #[tokio::test]
1281    async fn multi_contradiction_returns_after_first_drift() {
1282        let facts = InMemoryFacts::default();
1283        let sessions = InMemorySessions::default();
1284        let existing_a = accepted("ttl=60 seconds", vec![1.0, 0.0, 0.0]);
1285        let existing_b = accepted("ttl=30 seconds", vec![0.95, 0.05, 0.0]);
1286        let a_id = existing_a.id().clone();
1287        let b_id = existing_b.id().clone();
1288        facts.seed(existing_a);
1289        facts.seed(existing_b);
1290        let pending_fact = pending("ttl=10 seconds", vec![1.0, 0.0, 0.0]);
1291        let pending_id = pending_fact.id().clone();
1292        facts.seed(pending_fact.clone());
1293        sessions.seed(session_with_pending(vec![pending_id.clone()]));
1294
1295        // First candidate returns contradiction → loop returns
1296        // immediately. The second verdict (also contradiction) is never
1297        // consumed.
1298        let classifier = ScriptedNliClassifier::new(vec![Ok(contradiction_available())]);
1299        let fix = Fix::new();
1300        let uc = build(&facts, &sessions, &classifier, &fix);
1301
1302        let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
1303        assert_eq!(stats.conflicts, 1);
1304        assert_eq!(stats.processed, 1);
1305        assert_eq!(
1306            classifier.calls().len(),
1307            1,
1308            "first contradiction must short-circuit; second candidate not visited"
1309        );
1310
1311        // The pending twin carries exactly ONE drift flag (against
1312        // whichever candidate was visited first — the merge-candidate
1313        // order is deterministic via cosine).
1314        let pending_after = facts.get_clone(&pending_id).expect("pending present");
1315        assert_eq!(
1316            pending_after.conflicts_with().len(),
1317            1,
1318            "exactly one drift flag on the pending twin"
1319        );
1320        // Sanity: the flagged id is one of the two existing facts.
1321        let flagged = pending_after
1322            .conflicts_with()
1323            .iter()
1324            .next()
1325            .expect("flag set");
1326        assert!(*flagged == a_id || *flagged == b_id);
1327    }
1328
1329    // -----------------------------------------------------------------------
1330    // Exact-match short-circuit
1331    // -----------------------------------------------------------------------
1332
1333    #[tokio::test]
1334    async fn exact_match_skips_sidecar_and_merges_identical_pair() {
1335        let facts = InMemoryFacts::default();
1336        let sessions = InMemorySessions::default();
1337        let existing = accepted("identical fact content", vec![1.0, 0.0, 0.0]);
1338        let existing_id = existing.id().clone();
1339        facts.seed(existing);
1340        // Pending twin has the SAME content → exact-match short-circuit.
1341        // Note: FactId is content-derived, so two identical-content facts
1342        // share the same id. We bypass that here by seeding the pending twin
1343        // under a different content hash via the lowercase trick (POC normalises
1344        // case + whitespace, so "IDENTICAL FACT CONTENT" exact-matches the
1345        // existing lower-case form).
1346        let pending_fact = pending("IDENTICAL FACT CONTENT", vec![1.0, 0.0, 0.0]);
1347        let pending_id = pending_fact.id().clone();
1348        facts.seed(pending_fact.clone());
1349        sessions.seed(session_with_pending(vec![pending_id.clone()]));
1350
1351        let classifier = ScriptedNliClassifier::new(vec![Ok(contradiction_available())]);
1352        let fix = Fix::new();
1353        let uc = build(&facts, &sessions, &classifier, &fix);
1354
1355        let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
1356        // Exact-match returns entailment immediately → merge committed. The
1357        // scripted contradiction verdict MUST NOT be consumed.
1358        assert_eq!(stats.merged, 1);
1359        assert_eq!(stats.conflicts, 0);
1360        assert!(
1361            classifier.calls().is_empty(),
1362            "exact-match must short-circuit before any sidecar call"
1363        );
1364        let merged = facts.get_clone(&existing_id).expect("existing present");
1365        assert!(merged.source_sessions().distinct_count() >= 2);
1366    }
1367
1368    // -----------------------------------------------------------------------
1369    // Graceful degradation
1370    // -----------------------------------------------------------------------
1371
1372    #[tokio::test]
1373    async fn sidecar_unavailable_keeps_pending_fact_gracefully() {
1374        let facts = InMemoryFacts::default();
1375        let sessions = InMemorySessions::default();
1376        let existing = accepted("rust is memory safe", vec![1.0, 0.0, 0.0]);
1377        facts.seed(existing);
1378        let pending_fact = pending("rust guarantees memory safety", vec![1.0, 0.0, 0.0]);
1379        let pending_id = pending_fact.id().clone();
1380        facts.seed(pending_fact.clone());
1381        sessions.seed(session_with_pending(vec![pending_id.clone()]));
1382
1383        // Every NLI call is Unavailable — the use case must not raise.
1384        let classifier = ScriptedNliClassifier::new(vec![Err(ProviderError::Unavailable(
1385            "sidecar crashed".into(),
1386        ))]);
1387        let fix = Fix::new();
1388        let uc = build(&facts, &sessions, &classifier, &fix);
1389
1390        let stats = uc
1391            .execute(&sid(1), &memory_key())
1392            .await
1393            .expect("graceful Ok");
1394        // No outcome tallied (skip does not increment any counter).
1395        assert_eq!(stats.finalized, 0);
1396        assert_eq!(stats.merged, 0);
1397        assert_eq!(stats.conflicts, 0);
1398        // The pending fact survives unchanged.
1399        let pending_after = facts.get_clone(&pending_id).expect("pending present");
1400        assert_eq!(pending_after.status(), FactStatus::Pending);
1401        assert!(pending_after.conflicts_with().is_empty());
1402    }
1403
1404    #[tokio::test]
1405    async fn sidecar_replies_available_false_keeps_pending_fact_gracefully() {
1406        // The sidecar sometimes replies with its own graceful-degradation
1407        // placeholder (label=neutral, available=false) when the model raised
1408        // on a malformed input or the sidecar's stdout closed before the
1409        // reply landed. The use case must treat `available = false` exactly
1410        // like `Err(Unavailable)` — the pending fact stays pending so a
1411        // permanently broken sidecar cannot silently promote facts past the
1412        // drift-detection gate.
1413        let facts = InMemoryFacts::default();
1414        let sessions = InMemorySessions::default();
1415        let existing = accepted("rust is memory safe", vec![1.0, 0.0, 0.0]);
1416        facts.seed(existing);
1417        let pending_fact = pending("rust guarantees memory safety", vec![1.0, 0.0, 0.0]);
1418        let pending_id = pending_fact.id().clone();
1419        facts.seed(pending_fact.clone());
1420        sessions.seed(session_with_pending(vec![pending_id.clone()]));
1421
1422        // Reply shape mirrors the "classifier unavailable" verdict produced
1423        // by the NLI backend on a transport/runtime failure (see
1424        // `ProviderError::Unavailable` mapping in `NativeNliClassifier`).
1425        let unavailable_verdict = NliResult {
1426            label: NliLabel::Neutral,
1427            scores: NliScores {
1428                entailment: 0.0,
1429                neutral: 1.0,
1430                contradiction: 0.0,
1431            },
1432            available: false,
1433        };
1434        let classifier = ScriptedNliClassifier::new(vec![Ok(unavailable_verdict)]);
1435        let fix = Fix::new();
1436        let uc = build(&facts, &sessions, &classifier, &fix);
1437
1438        let stats = uc
1439            .execute(&sid(1), &memory_key())
1440            .await
1441            .expect("graceful Ok");
1442        assert_eq!(stats.finalized, 0, "available=false must NOT promote");
1443        assert_eq!(stats.merged, 0);
1444        assert_eq!(stats.conflicts, 0);
1445        let pending_after = facts.get_clone(&pending_id).expect("pending present");
1446        assert_eq!(pending_after.status(), FactStatus::Pending);
1447        assert!(
1448            pending_after.conflicts_with().is_empty(),
1449            "no drift flag without a real verdict"
1450        );
1451    }
1452
1453    #[tokio::test]
1454    async fn batch_continues_after_single_pair_failure() {
1455        let facts = InMemoryFacts::default();
1456        let sessions = InMemorySessions::default();
1457        // Three pending facts, two with candidates and one standalone. Each
1458        // candidate has a distinct content so the matcher can return a
1459        // deterministic verdict regardless of `HashMap` iteration order.
1460        let existing = accepted("shared anchor fact here", vec![1.0, 0.0, 0.0]);
1461        facts.seed(existing);
1462        // p1: similar but the matcher marks it as NLI-unavailable → skip pair.
1463        let p1 = pending("shared anchor fact here too", vec![1.0, 0.0, 0.0]);
1464        // p2: similar, matcher returns entailment → merge.
1465        let p2 = pending("shared anchor fact but longer", vec![1.0, 0.0, 0.0]);
1466        // p3: orthogonal embedding → no candidate → standalone promotion.
1467        let p3 = pending("totally unrelated pending fact", vec![0.0, 1.0, 0.0]);
1468        let p1_id = p1.id().clone();
1469        let p3_id = p3.id().clone();
1470        facts.seed(p1.clone());
1471        facts.seed(p2.clone());
1472        facts.seed(p3.clone());
1473        sessions.seed(session_with_pending(vec![
1474            p1.id().clone(),
1475            p2.id().clone(),
1476            p3.id().clone(),
1477        ]));
1478
1479        // Order-independent matcher: keyed on the hypothesis text (the pending
1480        // twin) so HashMap iteration order over the pending list does not
1481        // change the outcome.
1482        let classifier = ScriptedNliClassifier::matching(|_premise, hypothesis| match hypothesis {
1483            "shared anchor fact here too" => Err(ProviderError::Unavailable("transient".into())),
1484            "shared anchor fact but longer" => Ok(entailment_available()),
1485            other => Err(ProviderError::InvalidResponse(format!(
1486                "unexpected hypothesis: {other}"
1487            ))),
1488        });
1489        let fix = Fix::new();
1490        let uc = build(&facts, &sessions, &classifier, &fix);
1491
1492        let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
1493        // One merge (p2 → existing), one finalize (p3 standalone), one skip
1494        // (p1 stayed pending because the sidecar was unreachable).
1495        assert_eq!(stats.processed, 3);
1496        assert_eq!(stats.merged, 1);
1497        assert_eq!(stats.finalized, 1);
1498        let p1_after = facts.get_clone(&p1_id).expect("p1 present");
1499        assert_eq!(p1_after.status(), FactStatus::Pending, "p1 stayed pending");
1500        let p3_after = facts.get_clone(&p3_id).expect("p3 present");
1501        // p3 standalone: single source, base confidence → still Pending.
1502        assert_eq!(p3_after.status(), FactStatus::Pending);
1503    }
1504
1505    // -----------------------------------------------------------------------
1506    // Bookkeeping cleanup
1507    // -----------------------------------------------------------------------
1508
1509    #[tokio::test]
1510    async fn finalize_clears_owned_pending_ids_after_drain() {
1511        let facts = InMemoryFacts::default();
1512        let sessions = InMemorySessions::default();
1513        // Two pending facts, both owned by the session. After finalize the
1514        // session's pending list must be empty (both owned ids drained).
1515        let p1 = pending("first standalone pending fact", vec![1.0, 0.0, 0.0]);
1516        let p2 = pending("second standalone pending fact", vec![0.0, 1.0, 0.0]);
1517        let p1_id = p1.id().clone();
1518        let p2_id = p2.id().clone();
1519        facts.seed(p1);
1520        facts.seed(p2);
1521        sessions.seed(session_with_pending(vec![p1_id, p2_id]));
1522
1523        let classifier = ScriptedNliClassifier::new(vec![]);
1524        let fix = Fix::new();
1525        let uc = build(&facts, &sessions, &classifier, &fix);
1526
1527        let stats = uc.execute(&sid(1), &memory_key()).await.unwrap();
1528        assert_eq!(stats.processed, 2);
1529        assert!(
1530            sessions.pending_of(&sid(1)).is_empty(),
1531            "owned pending ids cleared after finalize"
1532        );
1533    }
1534
1535    // -----------------------------------------------------------------------
1536    // Stats contract
1537    // -----------------------------------------------------------------------
1538
1539    #[tokio::test]
1540    async fn stats_default_is_zeroed() {
1541        let stats = FinalizeStats::default();
1542        assert_eq!(stats.processed, 0);
1543        assert_eq!(stats.finalized, 0);
1544        assert_eq!(stats.merged, 0);
1545        assert_eq!(stats.conflicts, 0);
1546        assert_eq!(stats.rejected, 0);
1547        assert!(stats.session_id.is_empty());
1548    }
1549
1550    #[tokio::test]
1551    async fn stats_session_id_echoed_in_output() {
1552        let facts = InMemoryFacts::default();
1553        let sessions = InMemorySessions::default();
1554        sessions.seed(SessionState::new(sid(7), memory_key(), ts()));
1555        let classifier = ScriptedNliClassifier::new(vec![]);
1556        let fix = Fix::new();
1557        let uc = build(&facts, &sessions, &classifier, &fix);
1558
1559        let stats = uc.execute(&sid(7), &memory_key()).await.unwrap();
1560        assert_eq!(stats.session_id, sid(7).as_str());
1561    }
1562}