use std::collections::HashSet;
use std::future::{Future, IntoFuture};
use std::pin::Pin;
use tracing::{Level, event};
use crate::store::MemoryStore;
use crate::vector::VectorIndex;
use super::{Client, ClientError};
pub const DEFAULT_FAILED_BATCH: usize = 100;
pub const DEFAULT_SCROLL_PAGE_SIZE: usize = 256;
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct ReconcileSummary {
pub failed_retried: usize,
pub failed_recovered: usize,
pub orphans_deleted: usize,
}
#[must_use = "reconcile() returns a builder that must be awaited"]
pub struct ReconcileBuilder<'a> {
client: &'a Client,
retry_failed: bool,
clean_orphans: bool,
failed_batch: usize,
scroll_page_size: usize,
}
impl<'a> ReconcileBuilder<'a> {
pub(super) fn new(client: &'a Client) -> Self {
Self {
client,
retry_failed: true,
clean_orphans: true,
failed_batch: DEFAULT_FAILED_BATCH,
scroll_page_size: DEFAULT_SCROLL_PAGE_SIZE,
}
}
pub fn only_retry_failed(mut self) -> Self {
self.retry_failed = true;
self.clean_orphans = false;
self
}
pub fn only_clean_orphans(mut self) -> Self {
self.retry_failed = false;
self.clean_orphans = true;
self
}
pub fn failed_batch(mut self, batch: usize) -> Self {
self.failed_batch = batch;
self
}
pub fn scroll_page_size(mut self, page_size: usize) -> Self {
self.scroll_page_size = page_size;
self
}
}
impl<'a> IntoFuture for ReconcileBuilder<'a> {
type Output = Result<ReconcileSummary, ClientError>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(execute(self))
}
}
async fn execute(builder: ReconcileBuilder<'_>) -> Result<ReconcileSummary, ClientError> {
let ReconcileBuilder {
client,
retry_failed,
clean_orphans,
failed_batch,
scroll_page_size,
} = builder;
let inner = client.inner.clone();
let mut summary = ReconcileSummary::default();
if retry_failed {
let rows = inner.store.find_failed(failed_batch).await?;
summary.failed_retried = rows.len();
for row in rows {
if matches!(
inner.embed_and_index(row).await,
super::embed::EmbedOutcome::Indexed
) {
summary.failed_recovered += 1;
}
}
event!(
name: "memoir.reconcile.retry_failed_complete",
Level::INFO,
retried = summary.failed_retried,
recovered = summary.failed_recovered,
"retried {{retried}}, recovered {{recovered}}",
);
}
if clean_orphans {
let scopes = inner.store.list_scopes().await?;
for scope in scopes {
let postgres_pids: HashSet<String> = inner
.store
.indexed_pids_in_scope(&scope)
.await?
.into_iter()
.collect();
let index_pids = inner
.index
.list_pids_in_scope(scope.clone(), scroll_page_size)
.await?;
let orphans: Vec<&str> = index_pids
.iter()
.filter(|pid| !postgres_pids.contains(pid.as_str()))
.map(String::as_str)
.collect();
if orphans.is_empty() {
continue;
}
let count = orphans.len();
if let Err(err) = inner.index.delete_by_pids(&orphans).await {
event!(
name: "memoir.reconcile.orphan_delete_failed",
Level::WARN,
count = count,
error.message = %err,
"orphan delete failed for {{count}} pid(s): {{error.message}} — will retry on next sweep",
);
} else {
summary.orphans_deleted += count;
}
}
event!(
name: "memoir.reconcile.orphans_complete",
Level::INFO,
deleted = summary.orphans_deleted,
"deleted {{deleted}} orphans",
);
}
Ok(summary)
}