use std::sync::Arc;
use tokio::sync::Mutex as AsyncMutex;
use tracing::{debug, warn};
use crate::db;
use crate::error::Result;
use crate::models::dispute::InitiatorRole;
use crate::models::mediation::{EscalationTrigger, Flag};
use crate::models::reasoning::{
ClassificationRequest, ClassificationResponse, ReasoningContext, SuggestedAction,
};
use crate::prompts::PromptBundle;
use crate::reasoning::ReasoningProvider;
const LOW_CONFIDENCE_THRESHOLD: f64 = 0.5;
#[derive(Debug, Clone, PartialEq)]
pub enum PolicyDecision {
AskClarification {
buyer_text: String,
seller_text: String,
},
Summarize {
classification: crate::models::mediation::ClassificationLabel,
confidence: f64,
},
Escalate(EscalationTrigger),
}
#[allow(clippy::too_many_arguments)]
pub async fn initial_classification(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
session_id: &str,
dispute_id: &str,
initiator_role: InitiatorRole,
prompt_bundle: &Arc<PromptBundle>,
reasoning: &dyn ReasoningProvider,
provider_name: &str,
model_name: &str,
) -> Result<PolicyDecision> {
let request = ClassificationRequest {
session_id: session_id.to_string(),
dispute_id: dispute_id.to_string(),
initiator_role,
prompt_bundle: Arc::clone(prompt_bundle),
transcript: Vec::new(),
context: ReasoningContext {
round_count: 0,
last_classification: None,
last_confidence: None,
},
};
let classification = match reasoning.classify(request).await {
Ok(response) => response,
Err(e) => {
let now = current_ts_secs()?;
let payload = serde_json::json!({
"provider": provider_name,
"model": model_name,
"attempt_count": 1,
"error_category": reasoning_error_category(&e),
})
.to_string();
{
let guard = conn.lock().await;
if let Err(db_err) = db::mediation_events::record_event(
&guard,
db::mediation_events::MediationEventKind::ReasoningCallFailed,
Some(session_id),
&payload,
None,
Some(&prompt_bundle.id),
Some(&prompt_bundle.policy_hash),
now,
) {
warn!(
session_id = %session_id,
error = %db_err,
"failed to record reasoning_call_failed event"
);
}
}
warn!(
session_id = %session_id,
error = %e,
"reasoning.classify failed on initial classification; escalating as reasoning_unavailable"
);
return Ok(PolicyDecision::Escalate(
EscalationTrigger::ReasoningUnavailable,
));
}
};
let decision = classify_to_decision(&classification, PolicyRound::Initial);
let rationale_id = persist_classification_audit(
conn,
session_id,
prompt_bundle,
provider_name,
model_name,
&classification,
)
.await?;
debug!(
session_id = %session_id,
classification = %classification.classification,
confidence = classification.confidence,
rationale_id = %rationale_id,
?decision,
"initial classification persisted"
);
Ok(decision)
}
#[allow(clippy::too_many_arguments)]
pub async fn evaluate(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
session_id: &str,
prompt_bundle: &Arc<PromptBundle>,
provider_name: &str,
model_name: &str,
classification: ClassificationResponse,
followup_number: u32,
) -> Result<PolicyDecision> {
let decision =
classify_to_decision(&classification, PolicyRound::MidSession { followup_number });
let rationale_id = persist_classification_audit(
conn,
session_id,
prompt_bundle,
provider_name,
model_name,
&classification,
)
.await?;
debug!(
session_id = %session_id,
classification = %classification.classification,
confidence = classification.confidence,
rationale_id = %rationale_id,
?decision,
"evaluate: mid-session classification persisted"
);
Ok(decision)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum PolicyRound {
Initial,
MidSession { followup_number: u32 },
}
pub const EARLY_MIDSESSION_BYPASS_FOLLOWUPS: u32 = 3;
pub(crate) fn classify_to_decision(
classification: &ClassificationResponse,
round: PolicyRound,
) -> PolicyDecision {
if classification.flags.contains(&Flag::FraudRisk) {
return PolicyDecision::Escalate(EscalationTrigger::FraudIndicator);
}
if classification.flags.contains(&Flag::ConflictingClaims) {
return PolicyDecision::Escalate(EscalationTrigger::ConflictingClaims);
}
if classification
.flags
.contains(&Flag::AuthorityBoundaryAttempt)
{
return PolicyDecision::Escalate(EscalationTrigger::AuthorityBoundaryAttempt);
}
let in_bypass_window = match round {
PolicyRound::Initial => true,
PolicyRound::MidSession { followup_number } => {
followup_number <= EARLY_MIDSESSION_BYPASS_FOLLOWUPS
}
};
let passthrough_low_confidence_ask_clarification = in_bypass_window
&& matches!(
classification.suggested_action,
SuggestedAction::AskClarification { .. }
);
if classification.confidence < LOW_CONFIDENCE_THRESHOLD
&& !passthrough_low_confidence_ask_clarification
{
return PolicyDecision::Escalate(EscalationTrigger::LowConfidence);
}
if let SuggestedAction::Escalate(_) = &classification.suggested_action {
return PolicyDecision::Escalate(EscalationTrigger::ReasoningUnavailable);
}
match &classification.suggested_action {
SuggestedAction::AskClarification {
buyer_text,
seller_text,
} => {
if buyer_text.trim().is_empty() || seller_text.trim().is_empty() {
return PolicyDecision::Escalate(EscalationTrigger::ReasoningUnavailable);
}
PolicyDecision::AskClarification {
buyer_text: buyer_text.clone(),
seller_text: seller_text.clone(),
}
}
SuggestedAction::Summarize => {
use crate::models::mediation::ClassificationLabel;
match classification.classification {
ClassificationLabel::CoordinationFailureResolvable => PolicyDecision::Summarize {
classification: classification.classification,
confidence: classification.confidence,
},
_ => PolicyDecision::Escalate(EscalationTrigger::InvalidModelOutput),
}
}
SuggestedAction::Escalate(_) => {
PolicyDecision::Escalate(EscalationTrigger::ReasoningUnavailable)
}
}
}
#[derive(Debug, Clone)]
pub struct RationaleAudit {
pub rationale_id: String,
pub classification: crate::models::mediation::ClassificationLabel,
pub confidence: f64,
}
#[derive(Debug, Clone)]
pub struct ClassifyForStartOutcome {
pub decision: PolicyDecision,
pub rationale_audit: Option<RationaleAudit>,
}
#[allow(clippy::too_many_arguments)]
pub async fn classify_for_start(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
dispute_id: &str,
initiator_role: InitiatorRole,
prompt_bundle: &Arc<PromptBundle>,
reasoning: &dyn ReasoningProvider,
provider_name: &str,
model_name: &str,
) -> Result<ClassifyForStartOutcome> {
let request = ClassificationRequest {
session_id: dispute_id.to_string(),
dispute_id: dispute_id.to_string(),
initiator_role,
prompt_bundle: Arc::clone(prompt_bundle),
transcript: Vec::new(),
context: ReasoningContext {
round_count: 0,
last_classification: None,
last_confidence: None,
},
};
let classification = match reasoning.classify(request).await {
Ok(response) => response,
Err(e) => {
let now = current_ts_secs()?;
let payload = serde_json::json!({
"dispute_id": dispute_id,
"provider": provider_name,
"model": model_name,
"attempt_count": 1,
"error_category": reasoning_error_category(&e),
})
.to_string();
{
let guard = conn.lock().await;
if let Err(db_err) = db::mediation_events::record_event(
&guard,
db::mediation_events::MediationEventKind::ReasoningCallFailed,
None,
&payload,
None,
Some(&prompt_bundle.id),
Some(&prompt_bundle.policy_hash),
now,
) {
warn!(
dispute_id = %dispute_id,
error = %db_err,
"failed to record dispute-scoped reasoning_call_failed event"
);
}
}
warn!(
dispute_id = %dispute_id,
error = %e,
"classify_for_start: reasoning.classify failed; escalating as reasoning_unavailable"
);
return Ok(ClassifyForStartOutcome {
decision: PolicyDecision::Escalate(EscalationTrigger::ReasoningUnavailable),
rationale_audit: None,
});
}
};
let decision = classify_to_decision(&classification, PolicyRound::Initial);
let now = current_ts_secs()?;
let rationale_id = {
let guard = conn.lock().await;
db::rationales::insert_rationale(
&guard,
None,
provider_name,
model_name,
&prompt_bundle.id,
&prompt_bundle.policy_hash,
&classification.rationale.0,
now,
)?
};
debug!(
dispute_id = %dispute_id,
classification = %classification.classification,
confidence = classification.confidence,
rationale_id = %rationale_id,
?decision,
"classify_for_start: rationale persisted dispute-scoped"
);
Ok(ClassifyForStartOutcome {
decision,
rationale_audit: Some(RationaleAudit {
rationale_id,
classification: classification.classification,
confidence: classification.confidence,
}),
})
}
pub async fn record_classification_for_session(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
session_id: &str,
audit: &RationaleAudit,
prompt_bundle: &Arc<PromptBundle>,
) -> Result<()> {
let now = current_ts_secs()?;
let mut guard = conn.lock().await;
let tx = guard.transaction()?;
tx.execute(
"UPDATE reasoning_rationales
SET session_id = ?1
WHERE rationale_id = ?2 AND session_id IS NULL",
rusqlite::params![session_id, &audit.rationale_id],
)?;
db::mediation_events::record_classification_produced(
&tx,
session_id,
&audit.rationale_id,
&audit.classification.to_string(),
audit.confidence,
Some(&prompt_bundle.id),
Some(&prompt_bundle.policy_hash),
now,
)?;
tx.commit()?;
debug!(
session_id = %session_id,
rationale_id = %audit.rationale_id,
classification = %audit.classification,
confidence = audit.confidence,
"record_classification_for_session: rationale rebound + classification_produced emitted"
);
Ok(())
}
async fn persist_classification_audit(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
session_id: &str,
prompt_bundle: &Arc<PromptBundle>,
provider_name: &str,
model_name: &str,
classification: &ClassificationResponse,
) -> Result<String> {
let now = current_ts_secs()?;
let mut guard = conn.lock().await;
let tx = guard.transaction()?;
let rationale_id = db::rationales::insert_rationale(
&tx,
Some(session_id),
provider_name,
model_name,
&prompt_bundle.id,
&prompt_bundle.policy_hash,
&classification.rationale.0,
now,
)?;
db::mediation_events::record_classification_produced(
&tx,
session_id,
&rationale_id,
&classification.classification.to_string(),
classification.confidence,
Some(&prompt_bundle.id),
Some(&prompt_bundle.policy_hash),
now,
)?;
tx.commit()?;
Ok(rationale_id)
}
fn reasoning_error_category(err: &crate::models::reasoning::ReasoningError) -> &'static str {
use crate::models::reasoning::ReasoningError::*;
match err {
Unreachable(_) => "unreachable",
Timeout => "timeout",
MalformedResponse(_) => "malformed_response",
AuthorityBoundaryViolation(_) => "authority_boundary_violation",
Other(_) => "unknown",
}
}
use super::current_ts_secs;
#[cfg(test)]
mod tests {
use super::*;
use async_trait::async_trait;
use std::sync::Mutex as SyncMutex;
use crate::db::migrations::run_migrations;
use crate::db::open_in_memory;
use crate::models::mediation::{ClassificationLabel, Flag};
use crate::models::reasoning::{
ClassificationResponse, EscalationReason, RationaleText, ReasoningError, SummaryRequest,
SummaryResponse,
};
use crate::prompts::PromptBundle;
fn test_bundle() -> Arc<PromptBundle> {
Arc::new(PromptBundle {
id: "phase3-default".into(),
policy_hash: "test-policy-hash".into(),
system: "sys".into(),
classification: "cls".into(),
escalation: "esc".into(),
mediation_style: "style".into(),
message_templates: "tpl".into(),
})
}
fn base_response() -> ClassificationResponse {
ClassificationResponse {
classification: ClassificationLabel::CoordinationFailureResolvable,
confidence: 0.9,
suggested_action: SuggestedAction::AskClarification {
buyer_text: "please confirm X (buyer)".into(),
seller_text: "please confirm X (seller)".into(),
},
rationale: RationaleText("rationale body".into()),
flags: Vec::new(),
}
}
struct ScriptedProvider {
next: SyncMutex<Option<std::result::Result<ClassificationResponse, ReasoningError>>>,
}
impl ScriptedProvider {
fn ok(response: ClassificationResponse) -> Self {
Self {
next: SyncMutex::new(Some(Ok(response))),
}
}
fn err(err: ReasoningError) -> Self {
Self {
next: SyncMutex::new(Some(Err(err))),
}
}
}
#[async_trait]
impl ReasoningProvider for ScriptedProvider {
async fn classify(
&self,
_request: ClassificationRequest,
) -> std::result::Result<ClassificationResponse, ReasoningError> {
self.next
.lock()
.unwrap()
.take()
.expect("classify called twice; scripted provider only has one entry")
}
async fn summarize(
&self,
_request: SummaryRequest,
) -> std::result::Result<SummaryResponse, ReasoningError> {
panic!("summarize not expected in policy tests")
}
async fn health_check(&self) -> std::result::Result<(), ReasoningError> {
Ok(())
}
}
fn fresh_conn() -> Arc<AsyncMutex<rusqlite::Connection>> {
let mut conn = open_in_memory().unwrap();
run_migrations(&mut conn).unwrap();
conn.execute(
"INSERT INTO disputes (
dispute_id, event_id, mostro_pubkey, initiator_role,
dispute_status, event_timestamp, detected_at, lifecycle_state
) VALUES ('d1', 'e1', 'm1', 'buyer', 'initiated', 1, 2, 'notified')",
[],
)
.unwrap();
conn.execute(
"INSERT INTO mediation_sessions (
session_id, dispute_id, state, round_count,
prompt_bundle_id, policy_hash,
started_at, last_transition_at
) VALUES ('sess-policy', 'd1', 'awaiting_response', 0,
'phase3-default', 'test-policy-hash', 100, 100)",
[],
)
.unwrap();
Arc::new(AsyncMutex::new(conn))
}
async fn run_initial(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
provider: &dyn ReasoningProvider,
) -> Result<PolicyDecision> {
let bundle = test_bundle();
initial_classification(
conn,
"sess-policy",
"d1",
InitiatorRole::Buyer,
&bundle,
provider,
"openai",
"gpt-test",
)
.await
}
#[tokio::test]
async fn fraud_risk_flag_escalates_regardless_of_action() {
let conn = fresh_conn();
let mut resp = base_response();
resp.flags = vec![Flag::FraudRisk];
let provider = ScriptedProvider::ok(resp);
let decision = run_initial(&conn, &provider).await.unwrap();
assert_eq!(
decision,
PolicyDecision::Escalate(EscalationTrigger::FraudIndicator)
);
}
#[tokio::test]
async fn low_confidence_with_summarize_escalates_on_initial() {
let conn = fresh_conn();
let mut resp = base_response();
resp.confidence = 0.3;
resp.suggested_action = SuggestedAction::Summarize;
let provider = ScriptedProvider::ok(resp);
let decision = run_initial(&conn, &provider).await.unwrap();
assert_eq!(
decision,
PolicyDecision::Escalate(EscalationTrigger::LowConfidence)
);
}
#[tokio::test]
async fn initial_round_passes_low_confidence_ask_clarification() {
let conn = fresh_conn();
let mut resp = base_response();
resp.confidence = 0.3; let provider = ScriptedProvider::ok(resp);
let decision = run_initial(&conn, &provider).await.unwrap();
assert_eq!(
decision,
PolicyDecision::AskClarification {
buyer_text: "please confirm X (buyer)".into(),
seller_text: "please confirm X (seller)".into(),
}
);
}
#[tokio::test]
async fn model_suggested_escalate_maps_to_reasoning_unavailable() {
let conn = fresh_conn();
let mut resp = base_response();
resp.suggested_action = SuggestedAction::Escalate(EscalationReason("model says so".into()));
let provider = ScriptedProvider::ok(resp);
let decision = run_initial(&conn, &provider).await.unwrap();
assert_eq!(
decision,
PolicyDecision::Escalate(EscalationTrigger::ReasoningUnavailable)
);
}
#[tokio::test]
async fn provider_unreachable_error_escalates_reasoning_unavailable() {
let conn = fresh_conn();
let provider = ScriptedProvider::err(ReasoningError::Unreachable("network".into()));
let decision = run_initial(&conn, &provider).await.unwrap();
assert_eq!(
decision,
PolicyDecision::Escalate(EscalationTrigger::ReasoningUnavailable)
);
let count: i64 = {
let guard = conn.lock().await;
guard
.query_row("SELECT COUNT(*) FROM reasoning_rationales", [], |r| {
r.get(0)
})
.unwrap()
};
assert_eq!(count, 0);
}
#[tokio::test]
async fn happy_path_returns_ask_clarification_and_persists_audit() {
let conn = fresh_conn();
let provider = ScriptedProvider::ok(base_response());
let decision = run_initial(&conn, &provider).await.unwrap();
assert_eq!(
decision,
PolicyDecision::AskClarification {
buyer_text: "please confirm X (buyer)".into(),
seller_text: "please confirm X (seller)".into(),
}
);
let (rat_count, evt_count): (i64, i64) = {
let guard = conn.lock().await;
let rat = guard
.query_row("SELECT COUNT(*) FROM reasoning_rationales", [], |r| {
r.get(0)
})
.unwrap();
let evt = guard
.query_row(
"SELECT COUNT(*) FROM mediation_events
WHERE session_id='sess-policy' AND kind='classification_produced'",
[],
|r| r.get(0),
)
.unwrap();
(rat, evt)
};
assert_eq!(rat_count, 1, "rationale audit row expected");
assert_eq!(evt_count, 1, "classification_produced event expected");
}
#[tokio::test]
async fn authority_boundary_flag_suppresses_and_escalates() {
let conn = fresh_conn();
let mut resp = base_response();
resp.flags = vec![Flag::AuthorityBoundaryAttempt];
let provider = ScriptedProvider::ok(resp);
let decision = run_initial(&conn, &provider).await.unwrap();
assert_eq!(
decision,
PolicyDecision::Escalate(EscalationTrigger::AuthorityBoundaryAttempt)
);
}
#[tokio::test]
async fn authority_boundary_flag_dominates_low_confidence() {
let conn = fresh_conn();
let mut resp = base_response();
resp.confidence = 0.2;
resp.flags = vec![Flag::AuthorityBoundaryAttempt];
let provider = ScriptedProvider::ok(resp);
let decision = run_initial(&conn, &provider).await.unwrap();
assert_eq!(
decision,
PolicyDecision::Escalate(EscalationTrigger::AuthorityBoundaryAttempt)
);
}
#[tokio::test]
async fn empty_buyer_clarification_text_escalates_as_malformed() {
let conn = fresh_conn();
let mut resp = base_response();
resp.suggested_action = SuggestedAction::AskClarification {
buyer_text: " \n\t".into(),
seller_text: "please confirm X (seller)".into(),
};
let provider = ScriptedProvider::ok(resp);
let decision = run_initial(&conn, &provider).await.unwrap();
assert_eq!(
decision,
PolicyDecision::Escalate(EscalationTrigger::ReasoningUnavailable)
);
}
#[tokio::test]
async fn empty_seller_clarification_text_escalates_as_malformed() {
let conn = fresh_conn();
let mut resp = base_response();
resp.suggested_action = SuggestedAction::AskClarification {
buyer_text: "please confirm X (buyer)".into(),
seller_text: "".into(),
};
let provider = ScriptedProvider::ok(resp);
let decision = run_initial(&conn, &provider).await.unwrap();
assert_eq!(
decision,
PolicyDecision::Escalate(EscalationTrigger::ReasoningUnavailable)
);
}
async fn run_evaluate(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
classification: ClassificationResponse,
) -> Result<PolicyDecision> {
run_evaluate_at(conn, classification, EARLY_MIDSESSION_BYPASS_FOLLOWUPS + 1).await
}
async fn run_evaluate_at(
conn: &Arc<AsyncMutex<rusqlite::Connection>>,
classification: ClassificationResponse,
followup_number: u32,
) -> Result<PolicyDecision> {
let bundle = test_bundle();
evaluate(
conn,
"sess-policy",
&bundle,
"openai",
"gpt-test",
classification,
followup_number,
)
.await
}
#[tokio::test]
async fn evaluate_fraud_flag_escalates() {
let conn = fresh_conn();
let mut resp = base_response();
resp.flags = vec![Flag::FraudRisk];
let decision = run_evaluate(&conn, resp).await.unwrap();
assert_eq!(
decision,
PolicyDecision::Escalate(EscalationTrigger::FraudIndicator)
);
}
#[tokio::test]
async fn evaluate_authority_boundary_escalates() {
let conn = fresh_conn();
let mut resp = base_response();
resp.flags = vec![Flag::AuthorityBoundaryAttempt];
let decision = run_evaluate(&conn, resp).await.unwrap();
assert_eq!(
decision,
PolicyDecision::Escalate(EscalationTrigger::AuthorityBoundaryAttempt)
);
}
#[tokio::test]
async fn evaluate_passes_buyer_and_seller_texts_through_distinctly() {
let conn = fresh_conn();
let mut resp = base_response();
resp.suggested_action = SuggestedAction::AskClarification {
buyer_text: "BUYER-ONLY-QUESTION".into(),
seller_text: "SELLER-ONLY-QUESTION".into(),
};
let decision = run_evaluate(&conn, resp).await.unwrap();
match decision {
PolicyDecision::AskClarification {
buyer_text,
seller_text,
} => {
assert_eq!(buyer_text, "BUYER-ONLY-QUESTION");
assert_eq!(seller_text, "SELLER-ONLY-QUESTION");
assert_ne!(
buyer_text, seller_text,
"per-party texts must survive as separate strings"
);
}
other => panic!("expected AskClarification, got {other:?}"),
}
}
#[tokio::test]
async fn evaluate_low_confidence_escalates() {
let conn = fresh_conn();
let mut resp = base_response();
resp.confidence = 0.3;
let decision = run_evaluate(&conn, resp).await.unwrap();
assert_eq!(
decision,
PolicyDecision::Escalate(EscalationTrigger::LowConfidence)
);
}
#[tokio::test]
async fn evaluate_low_confidence_ask_clarification_bypasses_in_early_mid_session() {
let conn = fresh_conn();
let mut resp = base_response();
resp.confidence = 0.3; let decision = run_evaluate_at(&conn, resp, 1).await.unwrap();
assert_eq!(
decision,
PolicyDecision::AskClarification {
buyer_text: "please confirm X (buyer)".into(),
seller_text: "please confirm X (seller)".into(),
}
);
}
#[tokio::test]
async fn evaluate_low_confidence_ask_clarification_bypasses_at_boundary() {
let conn = fresh_conn();
let mut resp = base_response();
resp.confidence = 0.3;
let decision = run_evaluate_at(&conn, resp, EARLY_MIDSESSION_BYPASS_FOLLOWUPS)
.await
.unwrap();
assert_eq!(
decision,
PolicyDecision::AskClarification {
buyer_text: "please confirm X (buyer)".into(),
seller_text: "please confirm X (seller)".into(),
}
);
}
#[tokio::test]
async fn evaluate_low_confidence_ask_clarification_past_bypass_escalates() {
let conn = fresh_conn();
let mut resp = base_response();
resp.confidence = 0.3;
let decision = run_evaluate_at(&conn, resp, EARLY_MIDSESSION_BYPASS_FOLLOWUPS + 1)
.await
.unwrap();
assert_eq!(
decision,
PolicyDecision::Escalate(EscalationTrigger::LowConfidence)
);
}
#[tokio::test]
async fn evaluate_low_confidence_summarize_escalates_inside_bypass_window() {
let conn = fresh_conn();
let mut resp = base_response();
resp.confidence = 0.3;
resp.suggested_action = SuggestedAction::Summarize;
let decision = run_evaluate_at(&conn, resp, 1).await.unwrap();
assert_eq!(
decision,
PolicyDecision::Escalate(EscalationTrigger::LowConfidence)
);
}
#[tokio::test]
async fn evaluate_happy_path_persists_audit() {
let conn = fresh_conn();
let decision = run_evaluate(&conn, base_response()).await.unwrap();
assert_eq!(
decision,
PolicyDecision::AskClarification {
buyer_text: "please confirm X (buyer)".into(),
seller_text: "please confirm X (seller)".into(),
}
);
let (rat_count, evt_count): (i64, i64) = {
let guard = conn.lock().await;
let rat = guard
.query_row("SELECT COUNT(*) FROM reasoning_rationales", [], |r| {
r.get(0)
})
.unwrap();
let evt = guard
.query_row(
"SELECT COUNT(*) FROM mediation_events
WHERE session_id='sess-policy' AND kind='classification_produced'",
[],
|r| r.get(0),
)
.unwrap();
(rat, evt)
};
assert_eq!(rat_count, 1, "rationale audit row expected");
assert_eq!(evt_count, 1, "classification_produced event expected");
}
}