Skip to main content

smos_application/use_cases/
finalize_session.rs

1//! `FinalizeSession` — session-end batch resolution pipeline (§5, §9).
2//!
3//! Drains a session's pending facts and resolves each one against the currently
4//! accepted pool via NLI: entailment merges into the existing fact, contradiction
5//! flags a bidirectional drift pair, neutral (or no candidate) promotes the
6//! pending fact through the validation gate. Resolution is **drift-priority**:
7//! a contradiction against a less-similar candidate must NOT be masked by a
8//! neutral/entailment hit on the top candidate, so the scan walks every
9//! candidate and only commits a merge after the full pass is contradiction-free.
10//!
11//! # Fail-open contract
12//!
13//! The use case NEVER raises on a per-fact failure (§9 known limitation
14//! "NLI backend unavailable graceful"): any NLI / save / mutation error is
15//! logged and the loop continues. Pending facts that could not be resolved
16//! stay pending for the next session-end cycle. Only the outer pool-load
17//! error surface propagates as `Err` (and even then the use case degrades
18//! to `Ok(stats)` with `processed == 0`).
19//!
20//! # Session ownership — `source_sessions`, not `SessionState.pending_facts`
21//!
22//! Pending ownership is derived from `Fact.source_sessions`: every fact whose
23//! provenance list references `session_id` is in scope. The HTTP extraction
24//! path NEVER persists a `SessionState` row — it only mutates
25//! `fact.source_sessions` at extraction time — so reading
26//! `SessionState.pending_facts()` left real pending facts invisible to
27//! finalize (the operator-facing "24 pending facts but finalize says
28//! nothing to do" bug). `source_sessions` is the only durable provenance
29//! signal that survives the request path; this use case is the sole reader
30//! that drives resolution off it.
31//!
32//! The `memory_key` is supplied by the caller (CLI `--memory-key`, watcher
33//! reading `SessionState.memory_key()`) because `source_sessions` does NOT
34//! pin a namespace — the same `session_id` could in principle appear under
35//! multiple memory_keys (e.g. after a key migration), so the caller picks the
36//! scope. The CLI additionally exposes a discovery fallback
37//! (`FactRepository::list_memory_keys_for_session`) that iterates every key
38//! when the operator does not name one.
39//!
40//! # Session bookkeeping
41//!
42//! `owned_ids` is snapshotted BEFORE the first await so concurrent extraction
43//! appends (which race the drain) survive: only the snapshotted ids are
44//! removed from `pending_facts` after finalize. Fresh pending ids appended by
45//! another flow during finalize are preserved for the next cycle. The
46//! `remove_pending_owned` cleanup is best-effort — a missing `SessionState`
47//! row (the common case on the HTTP path) makes it a no-op; a present row
48//! gets its bookkeeping cleared so the watcher does not re-schedule an idle
49//! session.
50//!
51//! See `smos-poc/smos/session_end.py::process_session_end` for the canonical
52//! Python reference; this implementation mirrors `_resolve_one`,
53//! `_apply_merge`, `_apply_conflict_flag`, and `_finalize_standalone`.
54
55use smos_domain::config::NliConfig;
56use smos_domain::config::{ConfidenceConfig, MergeConfig};
57use smos_domain::{Fact, FactContent, FactId, MemoryKey, NliResult, SessionId};
58
59use crate::errors::{ProviderError, UseCaseError};
60use crate::log_nonfatal;
61use crate::ports::{FactRepository, NliClassifier, SessionRepository};
62
63use outcome::FactOutcome;
64use scan::ScanState;
65
66pub mod merge;
67pub mod outcome;
68pub mod scan;
69
70#[cfg(test)]
71mod tests;
72
73/// Aggregate outcome counters for one finalize run.
74///
75/// `FinalizeStats` is the wire shape the watcher (Slice-7) and the CLI
76/// `--finalize` trigger surface to operators, so every field is `pub`. The
77/// `rejected` counter overlaps with `merged` (every merge rejects the pending
78/// twin) — both are reported because operators want to see "how many facts
79/// left the pending pool by each exit".
80#[derive(Debug, Clone, Default, PartialEq)]
81pub struct FinalizeStats {
82    /// Id of the session that was finalized.
83    pub session_id: String,
84    /// Pending facts the use case attempted to resolve.
85    pub processed: usize,
86    /// Standalone facts promoted through the validation gate (may still be
87    /// `Pending` if the validation gate rejected the promotion).
88    pub finalized: usize,
89    /// Pending facts merged into an existing accepted fact (entailment path).
90    pub merged: usize,
91    /// Pending facts whose strongest NLI verdict was a contradiction (drift).
92    /// Both sides of the pair are flagged; status is unchanged.
93    pub conflicts: usize,
94    /// Pending facts marked `Rejected` after being absorbed into another fact.
95    /// Equals `merged` after a clean run, but kept separate so a partial run
96    /// (e.g. save failure between the merge save and the reject save) is
97    /// visible to operators.
98    pub rejected: usize,
99}
100
101/// Borrow-style bundle of every dependency the finalize pipeline needs.
102///
103/// Built inline at the call site (the watcher in Slice-7, or the CLI
104/// `--finalize` trigger), dropped right after [`FinalizeSession::execute`]
105/// returns. References keep allocation to one borrow per call.
106pub struct FinalizeSession<'a, FR, SR, NC> {
107    pub facts: &'a FR,
108    pub sessions: &'a SR,
109    pub classifier: &'a NC,
110    pub confidence_cfg: &'a ConfidenceConfig,
111    pub nli_cfg: &'a NliConfig,
112    pub merge_cfg: &'a MergeConfig,
113}
114
115impl<'a, FR, SR, NC> FinalizeSession<'a, FR, SR, NC>
116where
117    FR: FactRepository,
118    SR: SessionRepository,
119    NC: NliClassifier,
120{
121    /// Resolve every pending fact owned by `session_id` within `memory_key`.
122    ///
123    /// Ownership is derived from `Fact.source_sessions` (see module docs):
124    /// every pending fact whose provenance list contains `session_id` is in
125    /// scope. Returns `Ok(stats)` even on per-fact failures; the only `Err`
126    /// paths are store catastrophes that prevent reading the pending or
127    /// accepted pools.
128    ///
129    /// `memory_key` scopes the namespace scan. Callers that already know the
130    /// namespace (the watcher reading `SessionState.memory_key()`, the CLI
131    /// with `--memory-key`) pass it directly; the CLI additionally exposes a
132    /// discovery fallback that iterates every key when the operator does not
133    /// name one.
134    pub async fn execute(
135        &self,
136        session_id: &SessionId,
137        memory_key: &MemoryKey,
138    ) -> Result<FinalizeStats, UseCaseError> {
139        let mut stats = FinalizeStats {
140            session_id: session_id.as_str().to_string(),
141            ..FinalizeStats::default()
142        };
143
144        // Step 1 — load the pending pool for this memory_key, then filter to
145        // the facts whose `source_sessions` references `session_id`. The
146        // HTTP extraction path never persists `SessionState`, so this is the
147        // only ownership signal that survives the request path. We do NOT
148        // consult `SessionState.pending_facts()` for ownership: a missing or
149        // empty session row must NOT mask real pending facts (the
150        // operator-facing "nothing to do" bug).
151        let all_pending = self.facts.list_pending(memory_key).await?;
152        let pending: Vec<Fact> = all_pending
153            .into_iter()
154            .filter(|f| f.source_sessions().iter().any(|s| s == session_id))
155            .collect();
156
157        if pending.is_empty() {
158            tracing::info!(
159                session = %session_id,
160                memory_key = %memory_key,
161                "finalize: no pending facts for session"
162            );
163            return Ok(stats);
164        }
165
166        // Step 2 — snapshot owned_ids BEFORE any await on the resolution
167        // walk. Concurrent extraction may save more pending facts carrying
168        // this session in `source_sessions` while we drain; those survive
169        // and are picked up by the next cycle (no leak, no double-resolve).
170        let owned_ids: Vec<FactId> = pending.iter().map(|f| f.id().clone()).collect();
171
172        let accepted = self.facts.list_accepted(memory_key).await?;
173        stats.processed = pending.len();
174        tracing::info!(
175            session = %session_id,
176            memory_key = %memory_key,
177            pending = pending.len(),
178            accepted = accepted.len(),
179            "finalizing session"
180        );
181
182        // Step 3 — drift-priority walk. The comparison pool grows as standalone
183        // facts are promoted (so a later pending fact can merge with one that
184        // was itself pending a moment ago); merges and conflicts consume the
185        // pending twin without growing the pool.
186        let mut comparison_pool: Vec<Fact> = accepted;
187        for fact in &pending {
188            let outcome = self.resolve_one(fact, &mut comparison_pool).await;
189            self.tally(&mut stats, outcome);
190        }
191
192        // Step 4 — bookkeeping cleanup. Only the originally-owned ids are
193        // removed; concurrent additions survive (see step 2 comment). This
194        // is best-effort: a missing `SessionState` (the common case on the
195        // HTTP extraction path) makes the call a no-op; a present row gets
196        // its bookkeeping cleared so the watcher does not re-schedule an
197        // idle session. A failure here is non-fatal — the session just
198        // re-drains on the next finalize, which is idempotent.
199        log_nonfatal!(
200            self.sessions
201                .remove_pending_owned(session_id, &owned_ids)
202                .await,
203            "session cleanup failed (non-fatal)"
204        );
205
206        tracing::info!(
207            session = %session_id,
208            processed = stats.processed,
209            finalized = stats.finalized,
210            merged = stats.merged,
211            conflicts = stats.conflicts,
212            skipped = stats.processed - stats.finalized - stats.merged - stats.conflicts,
213            "finalize complete"
214        );
215
216        Ok(stats)
217    }
218
219    /// Resolve one pending fact against the (growing) comparison pool.
220    ///
221    /// Drift-priority semantics (§9):
222    /// - Exact-match short-circuit returns entailment WITHOUT an NLI call.
223    /// - C3 guard skips pairs already flagged as conflicting (no double-flag).
224    /// - First contradiction wins immediately (flag + return). We do NOT
225    ///   commit an earlier entailment candidate before the contradiction is
226    ///   observed, because drift is a stronger signal than merge.
227    /// - First entailment candidate becomes the merge pick, but the scan
228    ///   continues so a later less-similar candidate can still surface a
229    ///   contradiction.
230    /// - Otherwise the pending fact is finalized standalone, carrying the
231    ///   last observed (non-contradiction, non-entailment-merge) NLI verdict
232    ///   for the `no_contradiction_bonus`.
233    async fn resolve_one(&self, pending: &Fact, pool: &mut Vec<Fact>) -> FactOutcome {
234        let candidates = pending.find_merge_candidates(pool, self.merge_cfg);
235        if candidates.is_empty() {
236            return self.finalize_standalone(pending, None, pool).await;
237        }
238
239        // Drift-priority scan state: the four mutable accumulators of the
240        // candidate walk (AAD-5) encapsulated in `ScanState`. The transition
241        // logic is 1:1 with the pre-R8 inline locals (`merge_pick`,
242        // `last_observed_nli`, `nli_observed`, and the `pool` borrow).
243        let mut scan = ScanState::new(pool);
244
245        for candidate in &candidates {
246            let existing = &candidate.fact;
247
248            // C3 guard — already-flagged conflict pair. Skip the (expensive)
249            // NLI call entirely; the conflict is already recorded. The pair
250            // still counts as "NLI observed" because the conflict was
251            // resolved by an earlier finalize cycle — without this, a
252            // pending twin of an already-flagged pair would be stuck in
253            // pending forever (every cycle would skip the same pair and
254            // report "NLI never observed").
255            if pending.conflicts_with().contains(existing.id())
256                || existing.conflicts_with().contains(pending.id())
257            {
258                scan.mark_nli_observed();
259                tracing::debug!(
260                    pending = %pending.id(),
261                    existing = %existing.id(),
262                    "C3 guard: skip NLI for already-flagged conflict pair"
263                );
264                continue;
265            }
266
267            // Exact-match short-circuit — identical text is entailment by
268            // definition. Avoids DeBERTa's known quirk of returning `neutral`
269            // on identical pairs.
270            let nli = if FactContent::text_equals_normalized(existing.content(), pending.content())
271            {
272                scan.mark_nli_observed();
273                NliResult::exact_match_result()
274            } else {
275                match self
276                    .classifier
277                    .classify(existing.content(), pending.content())
278                    .await
279                {
280                    Ok(nli) if nli.available => {
281                        // Real verdict from the NLI backend. An
282                        // `available = false` reply (the backend's own
283                        // graceful-degradation placeholder) is treated as
284                        // Unavailable: skip pair, do NOT bump `nli_observed`
285                        // — otherwise a permanently broken backend would
286                        // silently promote facts without drift detection.
287                        scan.mark_nli_observed();
288                        nli
289                    }
290                    Ok(_unavailable) => {
291                        tracing::warn!(
292                            pending = %pending.id(),
293                            existing = %existing.id(),
294                            "NLI replied with available=false; leaving pending (skip pair)"
295                        );
296                        continue;
297                    }
298                    Err(ProviderError::Unavailable(msg)) => {
299                        tracing::warn!(
300                            pending = %pending.id(),
301                            existing = %existing.id(),
302                            error = %msg,
303                            "NLI unavailable; leaving pending (skip pair)"
304                        );
305                        // Graceful: skip this pair, keep scanning. If every
306                        // pair is unavailable the pending fact stays pending.
307                        continue;
308                    }
309                    Err(other) => {
310                        tracing::warn!(
311                            pending = %pending.id(),
312                            existing = %existing.id(),
313                            error = %other,
314                            "NLI error (non-fatal, skip pair)"
315                        );
316                        continue;
317                    }
318                }
319            };
320
321            // Drift wins immediately — flag both sides bidirectionally and
322            // exit. We do NOT commit any earlier merge candidate.
323            if nli.is_contradiction(self.nli_cfg) {
324                return self
325                    .apply_conflict_flag(pending, existing, scan.pool_mut())
326                    .await;
327            }
328
329            if nli.is_entailment(self.nli_cfg) && !scan.has_merge_pick() {
330                scan.commit_merge_pick(existing.clone(), nli);
331                // Continue scanning: a later less-similar candidate may still
332                // contradict this pending fact (drift-priority walk).
333            } else {
334                scan.observe_other_verdict(nli);
335            }
336        }
337
338        if let Some((existing, nli)) = scan.take_merge_pick() {
339            return self
340                .apply_merge(pending, &existing, &nli, scan.pool_mut())
341                .await;
342        }
343
344        // The NLI backend never answered for any candidate → keep the fact
345        // pending. We have candidates but no NLI signal; promoting would
346        // silently mask a potential drift.
347        if !scan.nli_observed() {
348            tracing::info!(
349                pending = %pending.id(),
350                candidates = candidates.len(),
351                "NLI never observed for any candidate; leaving pending"
352            );
353            return FactOutcome::Skipped;
354        }
355
356        // No merge, no conflict — promote standalone. `last_observed_nli`
357        // (the strongest non-contradiction verdict we observed) feeds the
358        // `no_contradiction_bonus` in the confidence scorer.
359        let last_observed_nli = scan.take_last_observed_nli();
360        self.finalize_standalone(pending, last_observed_nli.as_ref(), scan.pool_mut())
361            .await
362    }
363
364    /// Fold a per-fact outcome into the running stats.
365    fn tally(&self, stats: &mut FinalizeStats, outcome: FactOutcome) {
366        match outcome {
367            FactOutcome::Finalized => stats.finalized += 1,
368            FactOutcome::Merged => {
369                stats.merged += 1;
370                stats.rejected += 1;
371            }
372            FactOutcome::Conflict => stats.conflicts += 1,
373            FactOutcome::Skipped => {
374                // Skipped facts stay pending — not tallied into any counter.
375                // Detectable via `processed - finalized - merged - conflicts`.
376            }
377        }
378    }
379}