#![allow(deprecated)]
use crate::error::MemoryError;
use crate::types::TraceId;
use serde::{Deserialize, Serialize};
pub use stack_ids::EnvelopeId;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ImportEnvelope {
pub envelope_id: EnvelopeId,
pub schema_version: String,
pub content_digest: String,
pub source_authority: String,
pub trace_id: Option<TraceId>,
pub namespace: String,
pub records: Vec<ImportRecord>,
}
impl ImportEnvelope {
pub fn validate(&self) -> Result<(), MemoryError> {
if self.envelope_id.is_empty() {
return Err(MemoryError::ImportInvalid {
reason: "envelope_id must not be empty".into(),
});
}
if self.schema_version.is_empty() {
return Err(MemoryError::ImportInvalid {
reason: "schema_version must not be empty".into(),
});
}
if self.content_digest.is_empty() {
return Err(MemoryError::ImportInvalid {
reason: "content_digest must not be empty".into(),
});
}
if self.source_authority.is_empty() {
return Err(MemoryError::ImportInvalid {
reason: "source_authority must not be empty".into(),
});
}
if self.namespace.is_empty() {
return Err(MemoryError::ImportInvalid {
reason: "namespace must not be empty".into(),
});
}
if self.records.is_empty() {
return Err(MemoryError::ImportInvalid {
reason: "envelope must contain at least one record".into(),
});
}
for (i, record) in self.records.iter().enumerate() {
record.validate().map_err(|e| MemoryError::ImportInvalid {
reason: format!("record[{i}]: {e}"),
})?;
}
Ok(())
}
pub(crate) fn dedupe_key(&self) -> (&str, &str, &str) {
(
self.envelope_id.as_str(),
&self.schema_version,
&self.content_digest,
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum ImportRecord {
Fact {
content: String,
source: Option<String>,
metadata: Option<serde_json::Value>,
},
Episode {
document_id: String,
meta: crate::types::EpisodeMeta,
},
}
impl ImportRecord {
fn validate(&self) -> Result<(), MemoryError> {
match self {
ImportRecord::Fact { content, .. } => {
if content.is_empty() {
return Err(MemoryError::ImportInvalid {
reason: "fact content must not be empty".into(),
});
}
}
ImportRecord::Episode { document_id, .. } => {
if document_id.is_empty() {
return Err(MemoryError::ImportInvalid {
reason: "episode document_id must not be empty".into(),
});
}
}
}
Ok(())
}
pub fn content_text(&self) -> &str {
match self {
ImportRecord::Fact { content, .. } => content,
ImportRecord::Episode { meta, .. } => &meta.effect_type,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ImportStatus {
Complete,
AlreadyImported,
Aborted { reason: String },
}
impl ImportStatus {
pub fn as_str(&self) -> &str {
match self {
Self::Complete => "complete",
Self::AlreadyImported => "already_imported",
Self::Aborted { .. } => "aborted",
}
}
pub fn from_str_value(s: &str) -> Self {
match s {
"complete" => Self::Complete,
"already_imported" => Self::AlreadyImported,
s if s.starts_with("aborted:") => Self::Aborted {
reason: s.strip_prefix("aborted:").unwrap_or("").to_string(),
},
_ => Self::Aborted {
reason: format!("unknown status: {s}"),
},
}
}
pub(crate) fn to_db_string(&self) -> String {
match self {
Self::Complete => "complete".into(),
Self::AlreadyImported => "already_imported".into(),
Self::Aborted { reason } => format!("aborted:{reason}"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ImportReceipt {
pub envelope_id: EnvelopeId,
pub schema_version: String,
pub content_digest: String,
pub status: ImportStatus,
pub record_count: usize,
pub imported_at: String,
pub was_duplicate: bool,
pub trace_id: Option<TraceId>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ImportProjectionFreshness {
Current,
Stale { last_import_at: String },
Superseded { superseded_by: EnvelopeId },
ImportFailed { error: String, attempted_at: String },
NeverImported,
}
pub(crate) const MIGRATION_V10: &str = r#"
CREATE TABLE IF NOT EXISTS import_log (
envelope_id TEXT NOT NULL,
schema_version TEXT NOT NULL,
content_digest TEXT NOT NULL,
source_authority TEXT NOT NULL,
namespace TEXT NOT NULL,
trace_id TEXT,
record_count INTEGER NOT NULL,
status TEXT NOT NULL DEFAULT 'complete',
imported_at TEXT NOT NULL DEFAULT (datetime('now')),
PRIMARY KEY (envelope_id, schema_version, content_digest)
);
CREATE INDEX IF NOT EXISTS idx_import_log_namespace ON import_log(namespace);
CREATE INDEX IF NOT EXISTS idx_import_log_imported_at ON import_log(imported_at DESC);
CREATE INDEX IF NOT EXISTS idx_import_log_source ON import_log(source_authority);
"#;
pub(crate) fn check_import_exists(
conn: &rusqlite::Connection,
envelope_id: &str,
schema_version: &str,
content_digest: &str,
) -> Result<bool, MemoryError> {
let count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM import_log
WHERE envelope_id = ?1 AND schema_version = ?2 AND content_digest = ?3",
rusqlite::params![envelope_id, schema_version, content_digest],
|row| row.get(0),
)
.unwrap_or(0);
Ok(count > 0)
}
pub(crate) fn insert_import_log(
tx: &rusqlite::Transaction<'_>,
envelope: &ImportEnvelope,
status: &ImportStatus,
record_count: usize,
) -> Result<(), MemoryError> {
tx.execute(
"INSERT OR REPLACE INTO import_log
(envelope_id, schema_version, content_digest, source_authority,
namespace, trace_id, record_count, status)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
rusqlite::params![
envelope.envelope_id.as_str(),
envelope.schema_version,
envelope.content_digest,
envelope.source_authority,
envelope.namespace,
envelope.trace_id.as_ref().map(|t| t.as_str()),
record_count as i64,
status.to_db_string(),
],
)?;
Ok(())
}
pub(crate) fn query_import_log(
conn: &rusqlite::Connection,
namespace: Option<&str>,
limit: usize,
) -> Result<Vec<ImportReceipt>, MemoryError> {
let (sql, params): (&str, Vec<Box<dyn rusqlite::types::ToSql>>) = if let Some(ns) = namespace {
(
"SELECT envelope_id, schema_version, content_digest, status,
record_count, imported_at, trace_id
FROM import_log
WHERE namespace = ?1
ORDER BY imported_at DESC
LIMIT ?2",
vec![
Box::new(ns.to_string()) as Box<dyn rusqlite::types::ToSql>,
Box::new(limit as i64),
],
)
} else {
(
"SELECT envelope_id, schema_version, content_digest, status,
record_count, imported_at, trace_id
FROM import_log
ORDER BY imported_at DESC
LIMIT ?1",
vec![Box::new(limit as i64) as Box<dyn rusqlite::types::ToSql>],
)
};
let mut stmt = conn.prepare(sql)?;
let params_refs: Vec<&dyn rusqlite::types::ToSql> = params.iter().map(|p| p.as_ref()).collect();
let rows = stmt
.query_map(params_refs.as_slice(), |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, String>(2)?,
row.get::<_, String>(3)?,
row.get::<_, i64>(4)?,
row.get::<_, String>(5)?,
row.get::<_, Option<String>>(6)?,
))
})?
.collect::<Result<Vec<_>, _>>()?;
Ok(rows
.into_iter()
.map(
|(
envelope_id,
schema_version,
content_digest,
status,
record_count,
imported_at,
trace_id,
)| {
let status_parsed = ImportStatus::from_str_value(&status);
let was_duplicate = matches!(status_parsed, ImportStatus::AlreadyImported);
ImportReceipt {
envelope_id: EnvelopeId(envelope_id),
schema_version,
content_digest,
status: status_parsed,
record_count: record_count as usize,
imported_at,
was_duplicate,
trace_id: trace_id.map(TraceId::new),
}
},
)
.collect())
}
pub(crate) fn last_import_at(
conn: &rusqlite::Connection,
namespace: &str,
) -> Result<Option<String>, MemoryError> {
let v10: Option<String> = conn
.query_row(
"SELECT imported_at FROM import_log
WHERE namespace = ?1 AND status = 'complete'
ORDER BY imported_at DESC LIMIT 1",
rusqlite::params![namespace],
|row| row.get(0),
)
.ok();
let v11: Option<String> = conn
.query_row(
"SELECT imported_at FROM projection_import_log
WHERE scope_namespace = ?1 AND status = 'complete'
ORDER BY imported_at DESC LIMIT 1",
rusqlite::params![namespace],
|row| row.get(0),
)
.ok();
let result = match (v10, v11) {
(Some(a), Some(b)) => Some(if a >= b { a } else { b }),
(Some(a), None) => Some(a),
(None, Some(b)) => Some(b),
(None, None) => None,
};
Ok(result)
}