smos_application/use_cases/finalize_session.rs
1//! `FinalizeSession` — session-end batch resolution pipeline (§5, §9).
2//!
3//! Drains a session's pending facts and resolves each one against the currently
4//! accepted pool via NLI: entailment merges into the existing fact, contradiction
5//! flags a bidirectional drift pair, neutral (or no candidate) promotes the
6//! pending fact through the validation gate. Resolution is **drift-priority**:
7//! a contradiction against a less-similar candidate must NOT be masked by a
8//! neutral/entailment hit on the top candidate, so the scan walks every
9//! candidate and only commits a merge after the full pass is contradiction-free.
10//!
11//! # Fail-open contract
12//!
13//! The use case NEVER raises on a per-fact failure (§9 known limitation
14//! "NLI backend unavailable graceful"): any NLI / save / mutation error is
15//! logged and the loop continues. Pending facts that could not be resolved
16//! stay pending for the next session-end cycle. Only the outer pool-load
17//! error surface propagates as `Err` (and even then the use case degrades
18//! to `Ok(stats)` with `processed == 0`).
19//!
20//! # Session ownership — `source_sessions`, not `SessionState.pending_facts`
21//!
22//! Pending ownership is derived from `Fact.source_sessions`: every fact whose
23//! provenance list references `session_id` is in scope. The HTTP extraction
24//! path NEVER persists a `SessionState` row — it only mutates
25//! `fact.source_sessions` at extraction time — so reading
26//! `SessionState.pending_facts()` left real pending facts invisible to
27//! finalize (the operator-facing "24 pending facts but finalize says
28//! nothing to do" bug). `source_sessions` is the only durable provenance
29//! signal that survives the request path; this use case is the sole reader
30//! that drives resolution off it.
31//!
32//! The `memory_key` is supplied by the caller (CLI `--memory-key`, watcher
33//! reading `SessionState.memory_key()`) because `source_sessions` does NOT
34//! pin a namespace — the same `session_id` could in principle appear under
35//! multiple memory_keys (e.g. after a key migration), so the caller picks the
36//! scope. The CLI additionally exposes a discovery fallback
37//! (`FactRepository::list_memory_keys_for_session`) that iterates every key
38//! when the operator does not name one.
39//!
40//! # Session bookkeeping
41//!
42//! `owned_ids` is snapshotted BEFORE the first await so concurrent extraction
43//! appends (which race the drain) survive: only the snapshotted ids are
44//! removed from `pending_facts` after finalize. Fresh pending ids appended by
45//! another flow during finalize are preserved for the next cycle. The
46//! `remove_pending_owned` cleanup is best-effort — a missing `SessionState`
47//! row (the common case on the HTTP path) makes it a no-op; a present row
48//! gets its bookkeeping cleared so the watcher does not re-schedule an idle
49//! session.
50//!
51//! See `smos-poc/smos/session_end.py::process_session_end` for the canonical
52//! Python reference; this implementation mirrors `_resolve_one`,
53//! `_apply_merge`, `_apply_conflict_flag`, and `_finalize_standalone`.
54
55use smos_domain::config::NliConfig;
56use smos_domain::config::{ConfidenceConfig, MergeConfig};
57use smos_domain::{Fact, FactContent, FactId, MemoryKey, NliResult, SessionId};
58
59use crate::errors::{ProviderError, UseCaseError};
60use crate::log_nonfatal;
61use crate::ports::{FactRepository, NliClassifier, SessionRepository};
62
63use outcome::FactOutcome;
64use scan::ScanState;
65
66pub mod merge;
67pub mod outcome;
68pub mod scan;
69
70#[cfg(test)]
71mod tests;
72
73/// Aggregate outcome counters for one finalize run.
74///
75/// `FinalizeStats` is the wire shape the watcher (Slice-7) and the CLI
76/// `--finalize` trigger surface to operators, so every field is `pub`. The
77/// `rejected` counter overlaps with `merged` (every merge rejects the pending
78/// twin) — both are reported because operators want to see "how many facts
79/// left the pending pool by each exit".
80#[derive(Debug, Clone, Default, PartialEq)]
81pub struct FinalizeStats {
82 /// Id of the session that was finalized.
83 pub session_id: String,
84 /// Pending facts the use case attempted to resolve.
85 pub processed: usize,
86 /// Standalone facts promoted through the validation gate (may still be
87 /// `Pending` if the validation gate rejected the promotion).
88 pub finalized: usize,
89 /// Pending facts merged into an existing accepted fact (entailment path).
90 pub merged: usize,
91 /// Pending facts whose strongest NLI verdict was a contradiction (drift).
92 /// Both sides of the pair are flagged; status is unchanged.
93 pub conflicts: usize,
94 /// Pending facts marked `Rejected` after being absorbed into another fact.
95 /// Equals `merged` after a clean run, but kept separate so a partial run
96 /// (e.g. save failure between the merge save and the reject save) is
97 /// visible to operators.
98 pub rejected: usize,
99}
100
101/// Borrow-style bundle of every dependency the finalize pipeline needs.
102///
103/// Built inline at the call site (the watcher in Slice-7, or the CLI
104/// `--finalize` trigger), dropped right after [`FinalizeSession::execute`]
105/// returns. References keep allocation to one borrow per call.
106pub struct FinalizeSession<'a, FR, SR, NC> {
107 pub facts: &'a FR,
108 pub sessions: &'a SR,
109 pub classifier: &'a NC,
110 pub confidence_cfg: &'a ConfidenceConfig,
111 pub nli_cfg: &'a NliConfig,
112 pub merge_cfg: &'a MergeConfig,
113}
114
115impl<'a, FR, SR, NC> FinalizeSession<'a, FR, SR, NC>
116where
117 FR: FactRepository,
118 SR: SessionRepository,
119 NC: NliClassifier,
120{
121 /// Resolve every pending fact owned by `session_id` within `memory_key`.
122 ///
123 /// Ownership is derived from `Fact.source_sessions` (see module docs):
124 /// every pending fact whose provenance list contains `session_id` is in
125 /// scope. Returns `Ok(stats)` even on per-fact failures; the only `Err`
126 /// paths are store catastrophes that prevent reading the pending or
127 /// accepted pools.
128 ///
129 /// `memory_key` scopes the namespace scan. Callers that already know the
130 /// namespace (the watcher reading `SessionState.memory_key()`, the CLI
131 /// with `--memory-key`) pass it directly; the CLI additionally exposes a
132 /// discovery fallback that iterates every key when the operator does not
133 /// name one.
134 pub async fn execute(
135 &self,
136 session_id: &SessionId,
137 memory_key: &MemoryKey,
138 ) -> Result<FinalizeStats, UseCaseError> {
139 let mut stats = FinalizeStats {
140 session_id: session_id.as_str().to_string(),
141 ..FinalizeStats::default()
142 };
143
144 // Step 1 — load the pending pool for this memory_key, then filter to
145 // the facts whose `source_sessions` references `session_id`. The
146 // HTTP extraction path never persists `SessionState`, so this is the
147 // only ownership signal that survives the request path. We do NOT
148 // consult `SessionState.pending_facts()` for ownership: a missing or
149 // empty session row must NOT mask real pending facts (the
150 // operator-facing "nothing to do" bug).
151 let all_pending = self.facts.list_pending(memory_key).await?;
152 let pending: Vec<Fact> = all_pending
153 .into_iter()
154 .filter(|f| f.source_sessions().iter().any(|s| s == session_id))
155 .collect();
156
157 if pending.is_empty() {
158 tracing::info!(
159 session = %session_id,
160 memory_key = %memory_key,
161 "finalize: no pending facts for session"
162 );
163 return Ok(stats);
164 }
165
166 // Step 2 — snapshot owned_ids BEFORE any await on the resolution
167 // walk. Concurrent extraction may save more pending facts carrying
168 // this session in `source_sessions` while we drain; those survive
169 // and are picked up by the next cycle (no leak, no double-resolve).
170 let owned_ids: Vec<FactId> = pending.iter().map(|f| f.id().clone()).collect();
171
172 let accepted = self.facts.list_accepted(memory_key).await?;
173 stats.processed = pending.len();
174 tracing::info!(
175 session = %session_id,
176 memory_key = %memory_key,
177 pending = pending.len(),
178 accepted = accepted.len(),
179 "finalizing session"
180 );
181
182 // Step 3 — drift-priority walk. The comparison pool grows as standalone
183 // facts are promoted (so a later pending fact can merge with one that
184 // was itself pending a moment ago); merges and conflicts consume the
185 // pending twin without growing the pool.
186 let mut comparison_pool: Vec<Fact> = accepted;
187 for fact in &pending {
188 let outcome = self.resolve_one(fact, &mut comparison_pool).await;
189 self.tally(&mut stats, outcome);
190 }
191
192 // Step 4 — bookkeeping cleanup. Only the originally-owned ids are
193 // removed; concurrent additions survive (see step 2 comment). This
194 // is best-effort: a missing `SessionState` (the common case on the
195 // HTTP extraction path) makes the call a no-op; a present row gets
196 // its bookkeeping cleared so the watcher does not re-schedule an
197 // idle session. A failure here is non-fatal — the session just
198 // re-drains on the next finalize, which is idempotent.
199 log_nonfatal!(
200 self.sessions
201 .remove_pending_owned(session_id, &owned_ids)
202 .await,
203 "session cleanup failed (non-fatal)"
204 );
205
206 tracing::info!(
207 session = %session_id,
208 processed = stats.processed,
209 finalized = stats.finalized,
210 merged = stats.merged,
211 conflicts = stats.conflicts,
212 skipped = stats.processed - stats.finalized - stats.merged - stats.conflicts,
213 "finalize complete"
214 );
215
216 Ok(stats)
217 }
218
219 /// Resolve one pending fact against the (growing) comparison pool.
220 ///
221 /// Drift-priority semantics (§9):
222 /// - Exact-match short-circuit returns entailment WITHOUT an NLI call.
223 /// - C3 guard skips pairs already flagged as conflicting (no double-flag).
224 /// - First contradiction wins immediately (flag + return). We do NOT
225 /// commit an earlier entailment candidate before the contradiction is
226 /// observed, because drift is a stronger signal than merge.
227 /// - First entailment candidate becomes the merge pick, but the scan
228 /// continues so a later less-similar candidate can still surface a
229 /// contradiction.
230 /// - Otherwise the pending fact is finalized standalone, carrying the
231 /// last observed (non-contradiction, non-entailment-merge) NLI verdict
232 /// for the `no_contradiction_bonus`.
233 async fn resolve_one(&self, pending: &Fact, pool: &mut Vec<Fact>) -> FactOutcome {
234 let candidates = pending.find_merge_candidates(pool, self.merge_cfg);
235 if candidates.is_empty() {
236 return self.finalize_standalone(pending, None, pool).await;
237 }
238
239 // Drift-priority scan state: the four mutable accumulators of the
240 // candidate walk (AAD-5) encapsulated in `ScanState`. The transition
241 // logic is 1:1 with the pre-R8 inline locals (`merge_pick`,
242 // `last_observed_nli`, `nli_observed`, and the `pool` borrow).
243 let mut scan = ScanState::new(pool);
244
245 for candidate in &candidates {
246 let existing = &candidate.fact;
247
248 // C3 guard — already-flagged conflict pair. Skip the (expensive)
249 // NLI call entirely; the conflict is already recorded. The pair
250 // still counts as "NLI observed" because the conflict was
251 // resolved by an earlier finalize cycle — without this, a
252 // pending twin of an already-flagged pair would be stuck in
253 // pending forever (every cycle would skip the same pair and
254 // report "NLI never observed").
255 if pending.conflicts_with().contains(existing.id())
256 || existing.conflicts_with().contains(pending.id())
257 {
258 scan.mark_nli_observed();
259 tracing::debug!(
260 pending = %pending.id(),
261 existing = %existing.id(),
262 "C3 guard: skip NLI for already-flagged conflict pair"
263 );
264 continue;
265 }
266
267 // Exact-match short-circuit — identical text is entailment by
268 // definition. Avoids DeBERTa's known quirk of returning `neutral`
269 // on identical pairs.
270 let nli = if FactContent::text_equals_normalized(existing.content(), pending.content())
271 {
272 scan.mark_nli_observed();
273 NliResult::exact_match_result()
274 } else {
275 match self
276 .classifier
277 .classify(existing.content(), pending.content())
278 .await
279 {
280 Ok(nli) if nli.available => {
281 // Real verdict from the NLI backend. An
282 // `available = false` reply (the backend's own
283 // graceful-degradation placeholder) is treated as
284 // Unavailable: skip pair, do NOT bump `nli_observed`
285 // — otherwise a permanently broken backend would
286 // silently promote facts without drift detection.
287 scan.mark_nli_observed();
288 nli
289 }
290 Ok(_unavailable) => {
291 tracing::warn!(
292 pending = %pending.id(),
293 existing = %existing.id(),
294 "NLI replied with available=false; leaving pending (skip pair)"
295 );
296 continue;
297 }
298 Err(ProviderError::Unavailable(msg)) => {
299 tracing::warn!(
300 pending = %pending.id(),
301 existing = %existing.id(),
302 error = %msg,
303 "NLI unavailable; leaving pending (skip pair)"
304 );
305 // Graceful: skip this pair, keep scanning. If every
306 // pair is unavailable the pending fact stays pending.
307 continue;
308 }
309 Err(other) => {
310 tracing::warn!(
311 pending = %pending.id(),
312 existing = %existing.id(),
313 error = %other,
314 "NLI error (non-fatal, skip pair)"
315 );
316 continue;
317 }
318 }
319 };
320
321 // Drift wins immediately — flag both sides bidirectionally and
322 // exit. We do NOT commit any earlier merge candidate.
323 if nli.is_contradiction(self.nli_cfg) {
324 return self
325 .apply_conflict_flag(pending, existing, scan.pool_mut())
326 .await;
327 }
328
329 if nli.is_entailment(self.nli_cfg) && !scan.has_merge_pick() {
330 scan.commit_merge_pick(existing.clone(), nli);
331 // Continue scanning: a later less-similar candidate may still
332 // contradict this pending fact (drift-priority walk).
333 } else {
334 scan.observe_other_verdict(nli);
335 }
336 }
337
338 if let Some((existing, nli)) = scan.take_merge_pick() {
339 return self
340 .apply_merge(pending, &existing, &nli, scan.pool_mut())
341 .await;
342 }
343
344 // The NLI backend never answered for any candidate → keep the fact
345 // pending. We have candidates but no NLI signal; promoting would
346 // silently mask a potential drift.
347 if !scan.nli_observed() {
348 tracing::info!(
349 pending = %pending.id(),
350 candidates = candidates.len(),
351 "NLI never observed for any candidate; leaving pending"
352 );
353 return FactOutcome::Skipped;
354 }
355
356 // No merge, no conflict — promote standalone. `last_observed_nli`
357 // (the strongest non-contradiction verdict we observed) feeds the
358 // `no_contradiction_bonus` in the confidence scorer.
359 let last_observed_nli = scan.take_last_observed_nli();
360 self.finalize_standalone(pending, last_observed_nli.as_ref(), scan.pool_mut())
361 .await
362 }
363
364 /// Fold a per-fact outcome into the running stats.
365 fn tally(&self, stats: &mut FinalizeStats, outcome: FactOutcome) {
366 match outcome {
367 FactOutcome::Finalized => stats.finalized += 1,
368 FactOutcome::Merged => {
369 stats.merged += 1;
370 stats.rejected += 1;
371 }
372 FactOutcome::Conflict => stats.conflicts += 1,
373 FactOutcome::Skipped => {
374 // Skipped facts stay pending — not tallied into any counter.
375 // Detectable via `processed - finalized - merged - conflicts`.
376 }
377 }
378 }
379}