use std::collections::HashSet;
use std::future::{Future, IntoFuture};
use std::pin::Pin;
use tracing::{Level, event};
use crate::memory::Scope;
use crate::store::MemoryStore;
use crate::vector::VectorIndex;
use super::{Client, ClientError};
pub const DEFAULT_FAILED_BATCH: usize = 100;
pub const DEFAULT_SCROLL_PAGE_SIZE: usize = 256;
pub const DEFAULT_REBUILD_PAGE_SIZE: usize = 256;
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct ReconcileSummary {
pub failed_retried: usize,
pub failed_recovered: usize,
pub orphans_deleted: usize,
pub graph_rebuild_enqueued: usize,
}
#[must_use = "reconcile() returns a builder that must be awaited"]
pub struct ReconcileBuilder<'a> {
client: &'a Client,
retry_failed: bool,
clean_orphans: bool,
failed_batch: usize,
scroll_page_size: usize,
rebuild_scope: Option<Scope>,
rebuild_page_size: usize,
}
impl<'a> ReconcileBuilder<'a> {
pub(super) fn new(client: &'a Client) -> Self {
Self {
client,
retry_failed: true,
clean_orphans: true,
failed_batch: DEFAULT_FAILED_BATCH,
scroll_page_size: DEFAULT_SCROLL_PAGE_SIZE,
rebuild_scope: None,
rebuild_page_size: DEFAULT_REBUILD_PAGE_SIZE,
}
}
pub fn only_retry_failed(mut self) -> Self {
self.retry_failed = true;
self.clean_orphans = false;
self
}
pub fn only_clean_orphans(mut self) -> Self {
self.retry_failed = false;
self.clean_orphans = true;
self
}
pub fn failed_batch(mut self, batch: usize) -> Self {
self.failed_batch = batch;
self
}
pub fn scroll_page_size(mut self, page_size: usize) -> Self {
self.scroll_page_size = page_size;
self
}
#[cfg(feature = "knowledge-graph")]
pub fn rebuild_graph(mut self, scope: Scope) -> Self {
self.rebuild_scope = Some(scope);
self
}
#[cfg(feature = "knowledge-graph")]
pub fn rebuild_page_size(mut self, page_size: usize) -> Self {
self.rebuild_page_size = page_size;
self
}
}
impl<'a> IntoFuture for ReconcileBuilder<'a> {
type Output = Result<ReconcileSummary, ClientError>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(execute(self))
}
}
async fn execute(builder: ReconcileBuilder<'_>) -> Result<ReconcileSummary, ClientError> {
let ReconcileBuilder {
client,
retry_failed,
clean_orphans,
failed_batch,
scroll_page_size,
rebuild_scope,
rebuild_page_size,
} = builder;
#[cfg(not(feature = "knowledge-graph"))]
let _ = (&rebuild_scope, rebuild_page_size);
let inner = client.inner.clone();
let mut summary = ReconcileSummary::default();
if retry_failed {
let rows = inner.store.find_failed(failed_batch).await?;
summary.failed_retried = rows.len();
for row in rows {
if matches!(
inner.embed_and_index(row).await,
super::embed::EmbedOutcome::Indexed
) {
summary.failed_recovered += 1;
}
}
event!(
name: "memoir.reconcile.retry_failed_complete",
Level::INFO,
retried = summary.failed_retried,
recovered = summary.failed_recovered,
"retried {{retried}}, recovered {{recovered}}",
);
}
if clean_orphans {
let scopes = inner.store.list_scopes().await?;
for scope in scopes {
let postgres_pids: HashSet<String> = inner
.store
.indexed_pids_in_scope(&scope)
.await?
.into_iter()
.collect();
let index_pids = inner
.index
.list_pids_in_scope(scope.clone(), scroll_page_size)
.await?;
let orphans: Vec<&str> = index_pids
.iter()
.filter(|pid| !postgres_pids.contains(pid.as_str()))
.map(String::as_str)
.collect();
if orphans.is_empty() {
continue;
}
let count = orphans.len();
if let Err(err) = inner.index.delete_by_pids(&orphans).await {
event!(
name: "memoir.reconcile.orphan_delete_failed",
Level::WARN,
count = count,
error.message = %err,
"orphan delete failed for {{count}} pid(s): {{error.message}} — will retry on next sweep",
);
} else {
summary.orphans_deleted += count;
}
}
event!(
name: "memoir.reconcile.orphans_complete",
Level::INFO,
deleted = summary.orphans_deleted,
"deleted {{deleted}} orphans",
);
}
#[cfg(feature = "knowledge-graph")]
if let Some(scope) = rebuild_scope {
summary.graph_rebuild_enqueued = rebuild_graph(&inner, scope, rebuild_page_size).await?;
}
Ok(summary)
}
#[cfg(feature = "knowledge-graph")]
async fn rebuild_graph(
inner: &std::sync::Arc<super::ClientInner>,
scope: Scope,
page_size: usize,
) -> Result<usize, ClientError> {
use crate::graph::GraphStore;
use crate::jobs::{JobKind, MemoryJobsStore};
use crate::memory::KindSelector;
use crate::store::TimelineParams;
let Some(graph) = inner.graph.as_deref() else {
return Ok(0);
};
if let Err(err) = graph.forget_scope(&scope).await {
event!(
name: "memoir.reconcile.rebuild_wipe_failed",
Level::WARN,
error.message = %err,
"graph wipe before rebuild failed: {{error.message}} — rebuilding over existing graph (commit is idempotent)",
);
}
let mut seen = HashSet::new();
let mut cursor = None;
let mut limit = page_size;
loop {
let params = TimelineParams {
kinds: KindSelector {
episodic: true,
semantic: false,
},
created_before: cursor,
include_superseded: true,
limit,
..TimelineParams::default()
};
let page = inner.store.timeline(scope.clone(), params).await?;
if page.is_empty() {
break;
}
let page_len = page.len();
let oldest = page.last().map(|memory| memory.created_at);
for memory in page {
if !seen.insert(memory.pid.clone()) {
continue;
}
inner
.jobs
.enqueue(
JobKind::RelationalExtract,
memory.pid,
serde_json::json!({ "origin": "reconcile" }),
)
.await?;
}
let next_cursor = oldest.map(|ts| ts + chrono::Duration::microseconds(1));
if page_len < limit {
break;
}
if next_cursor == cursor {
limit *= 2;
continue;
}
cursor = next_cursor;
limit = page_size;
}
let enqueued = seen.len();
event!(
name: "memoir.reconcile.rebuild_complete",
Level::INFO,
enqueued = enqueued,
"graph rebuild enqueued {{enqueued}} relational-extract job(s)",
);
Ok(enqueued)
}