use cortex_core::{
compose_policy_outcomes, validate_summary_spans, EpisodeId, PolicyContribution, PolicyDecision,
PolicyOutcome, SourceAuthority, SummarySpan, TraceId,
};
use rusqlite::{params, OptionalExtension, Row};
use serde_json::Value;
use crate::{Pool, StoreError, StoreResult};
pub const INSERT_SOURCE_EVENT_LINEAGE_RULE_ID: &str = "episode.insert.source_event_lineage";
pub const INSERT_REDACTION_STATUS_RULE_ID: &str = "episode.insert.redaction_status";
pub const INSERT_SUMMARY_SPAN_PROOF_RULE_ID: &str = "episode.summary_span_proof";
macro_rules! episode_select_sql {
($where_clause:literal) => {
concat!(
"SELECT id, trace_id, source_events_json, summary, domains_json, entities_json,
candidate_meaning, extracted_by_json, confidence, status
FROM episodes ",
$where_clause,
";"
)
};
}
#[derive(Debug, Clone, PartialEq)]
pub struct EpisodeRecord {
pub id: EpisodeId,
pub trace_id: TraceId,
pub source_events_json: Value,
pub summary: String,
pub domains_json: Value,
pub entities_json: Value,
pub candidate_meaning: Option<String>,
pub extracted_by_json: Value,
pub confidence: f64,
pub status: String,
}
#[derive(Debug)]
pub struct EpisodeRepo<'a> {
pool: &'a Pool,
}
impl<'a> EpisodeRepo<'a> {
#[must_use]
pub const fn new(pool: &'a Pool) -> Self {
Self { pool }
}
pub fn insert(&self, episode: &EpisodeRecord, policy: &PolicyDecision) -> StoreResult<()> {
require_policy_final_outcome(policy, "episode.insert")?;
require_contributor_rule(policy, INSERT_SOURCE_EVENT_LINEAGE_RULE_ID)?;
require_contributor_rule(policy, INSERT_REDACTION_STATUS_RULE_ID)?;
if json_array_empty(&episode.source_events_json) {
return Err(StoreError::Validation(
"episode requires source event lineage".into(),
));
}
self.pool.execute(
"INSERT INTO episodes (
id, trace_id, source_events_json, summary, domains_json, entities_json,
candidate_meaning, extracted_by_json, confidence, status
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10);",
params![
episode.id.to_string(),
episode.trace_id.to_string(),
serde_json::to_string(&episode.source_events_json)?,
episode.summary,
serde_json::to_string(&episode.domains_json)?,
serde_json::to_string(&episode.entities_json)?,
episode.candidate_meaning,
serde_json::to_string(&episode.extracted_by_json)?,
episode.confidence,
episode.status,
],
)?;
Ok(())
}
pub fn insert_with_summary_spans(
&self,
episode: &EpisodeRecord,
summary_spans: &[SummarySpan],
policy: &PolicyDecision,
) -> StoreResult<()> {
require_policy_final_outcome(policy, "episode.insert_with_summary_spans")?;
require_contributor_rule(policy, INSERT_SOURCE_EVENT_LINEAGE_RULE_ID)?;
require_contributor_rule(policy, INSERT_REDACTION_STATUS_RULE_ID)?;
require_contributor_rule(policy, INSERT_SUMMARY_SPAN_PROOF_RULE_ID)?;
if json_array_empty(&episode.source_events_json) {
return Err(StoreError::Validation(
"episode requires source event lineage".into(),
));
}
validate_summary_spans(&episode.summary, summary_spans, |_| {
SourceAuthority::Derived
})
.map_err(|err| {
StoreError::Validation(format!(
"episode summary_spans_json failed {}",
err.invariant()
))
})?;
self.pool.execute(
"INSERT INTO episodes (
id, trace_id, source_events_json, summary, domains_json, entities_json,
candidate_meaning, extracted_by_json, confidence, status, summary_spans_json
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11);",
params![
episode.id.to_string(),
episode.trace_id.to_string(),
serde_json::to_string(&episode.source_events_json)?,
episode.summary,
serde_json::to_string(&episode.domains_json)?,
serde_json::to_string(&episode.entities_json)?,
episode.candidate_meaning,
serde_json::to_string(&episode.extracted_by_json)?,
episode.confidence,
episode.status,
serde_json::to_string(summary_spans)?,
],
)?;
Ok(())
}
pub fn get_by_id(&self, id: &EpisodeId) -> StoreResult<Option<EpisodeRecord>> {
let row = self
.pool
.query_row(
episode_select_sql!("WHERE id = ?1"),
params![id.to_string()],
episode_row,
)
.optional()?;
row.map(TryInto::try_into).transpose()
}
pub fn list_by_trace(&self, trace_id: &TraceId) -> StoreResult<Vec<EpisodeRecord>> {
let mut stmt = self
.pool
.prepare(episode_select_sql!("WHERE trace_id = ?1 ORDER BY id"))?;
let rows = stmt.query_map(params![trace_id.to_string()], episode_row)?;
let mut episodes = Vec::new();
for row in rows {
episodes.push(row?.try_into()?);
}
Ok(episodes)
}
}
#[derive(Debug)]
struct EpisodeRow {
id: String,
trace_id: String,
source_events_json: String,
summary: String,
domains_json: String,
entities_json: String,
candidate_meaning: Option<String>,
extracted_by_json: String,
confidence: f64,
status: String,
}
fn episode_row(row: &Row<'_>) -> rusqlite::Result<EpisodeRow> {
Ok(EpisodeRow {
id: row.get(0)?,
trace_id: row.get(1)?,
source_events_json: row.get(2)?,
summary: row.get(3)?,
domains_json: row.get(4)?,
entities_json: row.get(5)?,
candidate_meaning: row.get(6)?,
extracted_by_json: row.get(7)?,
confidence: row.get(8)?,
status: row.get(9)?,
})
}
impl TryFrom<EpisodeRow> for EpisodeRecord {
type Error = StoreError;
fn try_from(row: EpisodeRow) -> StoreResult<Self> {
Ok(Self {
id: row.id.parse()?,
trace_id: row.trace_id.parse()?,
source_events_json: serde_json::from_str(&row.source_events_json)?,
summary: row.summary,
domains_json: serde_json::from_str(&row.domains_json)?,
entities_json: serde_json::from_str(&row.entities_json)?,
candidate_meaning: row.candidate_meaning,
extracted_by_json: serde_json::from_str(&row.extracted_by_json)?,
confidence: row.confidence,
status: row.status,
})
}
}
fn json_array_empty(value: &Value) -> bool {
value.as_array().is_some_and(Vec::is_empty)
}
fn require_policy_final_outcome(policy: &PolicyDecision, surface: &str) -> StoreResult<()> {
match policy.final_outcome {
PolicyOutcome::Allow | PolicyOutcome::Warn | PolicyOutcome::BreakGlass => Ok(()),
PolicyOutcome::Quarantine | PolicyOutcome::Reject => Err(StoreError::Validation(format!(
"{surface} preflight: composed policy outcome {:?} blocks episode mutation",
policy.final_outcome,
))),
}
}
fn require_contributor_rule(policy: &PolicyDecision, rule_id: &str) -> StoreResult<()> {
let contains_rule = policy
.contributing
.iter()
.chain(policy.discarded.iter())
.any(|contribution| contribution.rule_id.as_str() == rule_id);
if contains_rule {
Ok(())
} else {
Err(StoreError::Validation(format!(
"policy decision missing required contributor `{rule_id}`; caller skipped ADR 0026 composition",
)))
}
}
#[must_use]
pub fn insert_policy_decision_test_allow() -> PolicyDecision {
compose_policy_outcomes(
vec![
PolicyContribution::new(
INSERT_SOURCE_EVENT_LINEAGE_RULE_ID,
PolicyOutcome::Allow,
"test fixture: episode source-event lineage validated",
)
.expect("static test contribution is valid"),
PolicyContribution::new(
INSERT_REDACTION_STATUS_RULE_ID,
PolicyOutcome::Allow,
"test fixture: episode redaction status asserted",
)
.expect("static test contribution is valid"),
],
None,
)
}
#[must_use]
pub fn insert_with_summary_spans_policy_decision_test_allow() -> PolicyDecision {
compose_policy_outcomes(
vec![
PolicyContribution::new(
INSERT_SOURCE_EVENT_LINEAGE_RULE_ID,
PolicyOutcome::Allow,
"test fixture: episode source-event lineage validated",
)
.expect("static test contribution is valid"),
PolicyContribution::new(
INSERT_REDACTION_STATUS_RULE_ID,
PolicyOutcome::Allow,
"test fixture: episode redaction status asserted",
)
.expect("static test contribution is valid"),
PolicyContribution::new(
INSERT_SUMMARY_SPAN_PROOF_RULE_ID,
PolicyOutcome::Allow,
"test fixture: episode summary-span proof composed",
)
.expect("static test contribution is valid"),
],
None,
)
}