use crate::error::Result;
use crate::metadata::{MetadataStore, MigrationStateRow};
use crate::summary::SummarySubject;
use crate::util::Clock;
const MIGRATION_NAME: &str = "summary_input_backfill_v1";
pub async fn migrate(metadata: &dyn MetadataStore, clock: &dyn Clock) -> Result<()> {
if let Some(state) = metadata.read_migration_state(MIGRATION_NAME).await?
&& state.status == "done"
{
return Ok(());
}
let now = clock.now_ms();
metadata
.upsert_migration_state(MigrationStateRow {
name: MIGRATION_NAME.to_string(),
status: "in_progress".into(),
cursor: None,
started_at_ms: now,
finished_at_ms: None,
})
.await?;
let summaries = metadata.list_all_summaries_for_backfill().await?;
for s in summaries {
for input in &s.inputs {
let (kind, id) = match input {
SummarySubject::Memory(r) => ("memory", r.id.to_string()),
SummarySubject::Partition(p) => ("partition", p.as_str().to_string()),
SummarySubject::Tenant => ("tenant", "<root>".to_string()),
};
metadata.insert_summary_input(&s.id, kind, &id).await?;
}
}
let now = clock.now_ms();
metadata
.upsert_migration_state(MigrationStateRow {
name: MIGRATION_NAME.to_string(),
status: "done".into(),
cursor: None,
started_at_ms: now,
finished_at_ms: Some(now),
})
.await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::audit::AuditOp;
use crate::content::ContentHash;
use crate::metadata::{AuditEntry, SqliteMetadata, SummaryRow};
use crate::partition::PartitionPath;
use crate::summary::SummaryId;
use crate::util::SystemClock;
#[tokio::test]
async fn migrate_is_idempotent_after_first_run() {
let m = SqliteMetadata::connect_memory().await.unwrap();
m.migrate().await.unwrap();
let clock = SystemClock;
migrate(&m, &clock).await.unwrap();
let state = m.read_migration_state(MIGRATION_NAME).await.unwrap();
assert_eq!(state.unwrap().status, "done");
migrate(&m, &clock).await.unwrap();
}
#[tokio::test]
async fn migrate_backfills_inputs_for_existing_summary() {
let m = SqliteMetadata::connect_memory().await.unwrap();
m.migrate().await.unwrap();
let path: PartitionPath = "user=alex".parse().unwrap();
m.ensure_partition_chain(&path, true, 1).await.unwrap();
let sid = SummaryId::generate();
let row = SummaryRow {
id: sid,
subject_kind: "partition".into(),
subject_path: Some(path.clone()),
subject_memory: None,
style: "compact".into(),
version: 1,
data_path: "x".into(),
content_hash: ContentHash([0; 32]),
bytes: 0,
summarizer_id: "x:1".into(),
inputs: vec![SummarySubject::Tenant],
superseded_by: None,
tombstoned: false,
created_at_ms: 1,
};
let audit = AuditEntry {
ts_ms: 1,
actor: None,
op: AuditOp::SummaryAttach,
partition_path: Some(path.clone()),
memory_id: None,
detail: serde_json::Value::Null,
};
m.insert_summary(crate::metadata::InsertSummaryRequest {
row,
audit,
embedding_blob: None,
content_for_index: String::new(),
parent_path: path.as_str().to_string(),
})
.await
.unwrap();
migrate(&m, &SystemClock).await.unwrap();
let citing = m.summaries_citing("tenant", "<root>").await.unwrap();
assert_eq!(citing, vec![sid]);
}
}