use smos_domain::config::NliConfig;
use smos_domain::config::{ConfidenceConfig, MergeConfig};
use smos_domain::{Fact, FactContent, FactId, MemoryKey, NliResult, SessionId};
use crate::errors::{ProviderError, UseCaseError};
use crate::log_nonfatal;
use crate::ports::{FactRepository, NliClassifier, SessionRepository};
use outcome::FactOutcome;
use scan::ScanState;
pub mod merge;
pub mod outcome;
pub mod scan;
#[cfg(test)]
mod tests;
#[derive(Debug, Clone, Default, PartialEq)]
pub struct FinalizeStats {
pub session_id: String,
pub processed: usize,
pub finalized: usize,
pub merged: usize,
pub conflicts: usize,
pub rejected: usize,
}
pub struct FinalizeSession<'a, FR, SR, NC> {
pub facts: &'a FR,
pub sessions: &'a SR,
pub classifier: &'a NC,
pub confidence_cfg: &'a ConfidenceConfig,
pub nli_cfg: &'a NliConfig,
pub merge_cfg: &'a MergeConfig,
}
impl<'a, FR, SR, NC> FinalizeSession<'a, FR, SR, NC>
where
FR: FactRepository,
SR: SessionRepository,
NC: NliClassifier,
{
pub async fn execute(
&self,
session_id: &SessionId,
memory_key: &MemoryKey,
) -> Result<FinalizeStats, UseCaseError> {
let mut stats = FinalizeStats {
session_id: session_id.as_str().to_string(),
..FinalizeStats::default()
};
let all_pending = self.facts.list_pending(memory_key).await?;
let pending: Vec<Fact> = all_pending
.into_iter()
.filter(|f| f.source_sessions().iter().any(|s| s == session_id))
.collect();
if pending.is_empty() {
tracing::info!(
session = %session_id,
memory_key = %memory_key,
"finalize: no pending facts for session"
);
return Ok(stats);
}
let owned_ids: Vec<FactId> = pending.iter().map(|f| f.id().clone()).collect();
let accepted = self.facts.list_accepted(memory_key).await?;
stats.processed = pending.len();
tracing::info!(
session = %session_id,
memory_key = %memory_key,
pending = pending.len(),
accepted = accepted.len(),
"finalizing session"
);
let mut comparison_pool: Vec<Fact> = accepted;
for fact in &pending {
let outcome = self.resolve_one(fact, &mut comparison_pool).await;
self.tally(&mut stats, outcome);
}
log_nonfatal!(
self.sessions
.remove_pending_owned(session_id, &owned_ids)
.await,
"session cleanup failed (non-fatal)"
);
tracing::info!(
session = %session_id,
processed = stats.processed,
finalized = stats.finalized,
merged = stats.merged,
conflicts = stats.conflicts,
skipped = stats.processed - stats.finalized - stats.merged - stats.conflicts,
"finalize complete"
);
Ok(stats)
}
async fn resolve_one(&self, pending: &Fact, pool: &mut Vec<Fact>) -> FactOutcome {
let candidates = pending.find_merge_candidates(pool, self.merge_cfg);
if candidates.is_empty() {
return self.finalize_standalone(pending, None, pool).await;
}
let mut scan = ScanState::new(pool);
for candidate in &candidates {
let existing = &candidate.fact;
if pending.conflicts_with().contains(existing.id())
|| existing.conflicts_with().contains(pending.id())
{
scan.mark_nli_observed();
tracing::debug!(
pending = %pending.id(),
existing = %existing.id(),
"C3 guard: skip NLI for already-flagged conflict pair"
);
continue;
}
let nli = if FactContent::text_equals_normalized(existing.content(), pending.content())
{
scan.mark_nli_observed();
NliResult::exact_match_result()
} else {
match self
.classifier
.classify(existing.content(), pending.content())
.await
{
Ok(nli) if nli.available => {
scan.mark_nli_observed();
nli
}
Ok(_unavailable) => {
tracing::warn!(
pending = %pending.id(),
existing = %existing.id(),
"NLI replied with available=false; leaving pending (skip pair)"
);
continue;
}
Err(ProviderError::Unavailable(msg)) => {
tracing::warn!(
pending = %pending.id(),
existing = %existing.id(),
error = %msg,
"NLI unavailable; leaving pending (skip pair)"
);
continue;
}
Err(other) => {
tracing::warn!(
pending = %pending.id(),
existing = %existing.id(),
error = %other,
"NLI error (non-fatal, skip pair)"
);
continue;
}
}
};
if nli.is_contradiction(self.nli_cfg) {
return self
.apply_conflict_flag(pending, existing, scan.pool_mut())
.await;
}
if nli.is_entailment(self.nli_cfg) && !scan.has_merge_pick() {
scan.commit_merge_pick(existing.clone(), nli);
} else {
scan.observe_other_verdict(nli);
}
}
if let Some((existing, nli)) = scan.take_merge_pick() {
return self
.apply_merge(pending, &existing, &nli, scan.pool_mut())
.await;
}
if !scan.nli_observed() {
tracing::info!(
pending = %pending.id(),
candidates = candidates.len(),
"NLI never observed for any candidate; leaving pending"
);
return FactOutcome::Skipped;
}
let last_observed_nli = scan.take_last_observed_nli();
self.finalize_standalone(pending, last_observed_nli.as_ref(), scan.pool_mut())
.await
}
fn tally(&self, stats: &mut FinalizeStats, outcome: FactOutcome) {
match outcome {
FactOutcome::Finalized => stats.finalized += 1,
FactOutcome::Merged => {
stats.merged += 1;
stats.rejected += 1;
}
FactOutcome::Conflict => stats.conflicts += 1,
FactOutcome::Skipped => {
}
}
}
}