use tracing::{Instrument, Level, event, info_span};
use crate::embedding::EmbeddingModel;
use crate::memory::Memory;
use crate::store::{IndexStatus, MemoryStore};
use crate::vector::VectorIndex;
use super::ClientInner;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum EmbedOutcome {
Indexed,
Failed,
}
impl ClientInner {
pub(super) async fn run_embed_job(&self, source_pid: &str) -> Result<(), crate::store::StoreError> {
let written = match self.store.recall(source_pid).await {
Ok(memory) => memory,
Err(crate::store::StoreError::NotFound(_)) => {
event!(
name: "memoir.embed.source_missing",
Level::INFO,
pid = %source_pid,
"source memory absent for {{pid}} (cascade delete race); skipping",
);
return Ok(());
}
Err(err) => return Err(err),
};
let span = info_span!("memoir.embed", pid = %written.pid);
async move {
let _ = self.embed_and_index(written).await;
}
.instrument(span)
.await;
Ok(())
}
pub(super) async fn embed_and_index(&self, written: Memory) -> EmbedOutcome {
let pid = written.pid.as_str();
let vector = match self.embedder.embed(&written.content).await {
Ok(v) => v,
Err(err) => {
event!(
name: "memoir.embed.embed_failed",
Level::WARN,
pid = %pid,
error.message = %err,
"embed step failed for {{pid}}: {{error.message}}",
);
self.record_failed(pid).await;
return EmbedOutcome::Failed;
}
};
if let Err(err) = self.index.upsert(&written, vector).await {
event!(
name: "memoir.embed.upsert_failed",
Level::WARN,
pid = %pid,
error.message = %err,
"vector upsert failed for {{pid}}: {{error.message}}",
);
self.record_failed(pid).await;
return EmbedOutcome::Failed;
}
if let Err(err) = self.store.set_index_status(pid, IndexStatus::Indexed).await {
event!(
name: "memoir.embed.index_status_failed",
Level::WARN,
pid = %pid,
error.message = %err,
"set_index_status(indexed) failed for {{pid}}: {{error.message}} — row stays pending until reconciliation",
);
return EmbedOutcome::Failed;
}
event!(
name: "memoir.embed.success",
Level::INFO,
pid = %pid,
"{{pid}} indexed",
);
EmbedOutcome::Indexed
}
async fn record_failed(&self, pid: &str) {
if let Err(err) = self.store.set_index_status(pid, IndexStatus::Failed).await {
event!(
name: "memoir.embed.index_status_failed",
Level::WARN,
pid = %pid,
error.message = %err,
"set_index_status(failed) failed for {{pid}}: {{error.message}} — row stays pending until reconciliation",
);
}
}
}