use std::sync::Arc;
use tracing::{Instrument, Level, event, info_span};
use crate::graph::{
CardinalityPolicy, CommitContext, EmbeddingEntityResolver, EmbeddingSynthesizer, FalkorEdgeCatalog,
FalkorEntityCatalog, GraphStore, SemanticFact, Synthesizer, TemporalEdgeResolver,
};
use crate::jobs::Job;
use crate::store::{MemoryStore, StoreError};
use super::ClientInner;
const SINGLE_VALUED_RELATIONS: &[&str] = &["works at", "lives in", "located in", "reports to"];
#[derive(Debug, thiserror::Error)]
pub(super) enum SynthesizeError {
#[error("source lookup failed: {0}")]
SourceLookup(String),
#[error("triple staging access failed: {0}")]
Staging(String),
#[error("synthesis failed: {0}")]
Synthesis(String),
#[error("graph commit failed: {0}")]
Commit(String),
}
impl ClientInner {
pub(super) async fn run_synthesize(self: &Arc<Self>, job: Job) -> Result<(), SynthesizeError> {
let span = info_span!("memoir.synthesize", source_pid = %job.source_pid);
async move { self.run_synthesize_inner(job).await }.instrument(span).await
}
async fn run_synthesize_inner(self: &Arc<Self>, job: Job) -> Result<(), SynthesizeError> {
let pid = job.source_pid.clone();
let Some(graph) = self.graph.clone() else {
event!(
name: "memoir.synthesize.no_graph",
Level::WARN,
source_pid = %pid,
"no graph store configured; treating job as no-op",
);
return Ok(());
};
let Some(triples) = self
.triple_staging
.take_pending(&pid)
.await
.map_err(|err| SynthesizeError::Staging(err.to_string()))?
else {
event!(
name: "memoir.synthesize.no_staged_triples",
Level::WARN,
source_pid = %pid,
"no staged triples for source; treating job as no-op",
);
return Ok(());
};
let source = match self.store.recall(&pid).await {
Ok(source) => source,
Err(StoreError::NotFound(_)) => {
event!(
name: "memoir.synthesize.source_missing",
Level::WARN,
source_pid = %pid,
"episodic source vanished before synthesis; clearing staging and treating job as no-op",
);
self.triple_staging
.clear(&pid)
.await
.map_err(|err| SynthesizeError::Staging(err.to_string()))?;
return Ok(());
}
Err(err) => return Err(SynthesizeError::SourceLookup(err.to_string())),
};
let facts = self
.store
.active_semantics_for_source(&pid)
.await
.map_err(|err| SynthesizeError::SourceLookup(err.to_string()))?
.into_iter()
.map(|memory| SemanticFact { content: memory.content })
.collect::<Vec<_>>();
let synthesizer = EmbeddingSynthesizer::new(self.embedder.clone());
let reconciled = synthesizer
.synthesize(triples, &facts)
.await
.map_err(|err| SynthesizeError::Synthesis(err.to_string()))?;
let entities = EmbeddingEntityResolver::new(self.embedder.clone(), FalkorEntityCatalog::new(graph.clone()));
let edges = TemporalEdgeResolver::new(
FalkorEdgeCatalog::new(graph.clone()),
CardinalityPolicy::with_single_valued(SINGLE_VALUED_RELATIONS.iter().copied()),
);
let ctx = CommitContext {
scope: source.scope.clone(),
memory_pid: source.pid.clone(),
valid_from: source.event_at.unwrap_or(source.created_at),
};
let committed = graph
.commit_triples(&self.embedder, &entities, &edges, &ctx, &reconciled)
.await
.map_err(|err| SynthesizeError::Commit(err.to_string()))?;
self.triple_staging
.clear(&pid)
.await
.map_err(|err| SynthesizeError::Staging(err.to_string()))?;
event!(
name: "memoir.synthesize.committed",
Level::DEBUG,
source_pid = %source.pid,
fact_count = facts.len(),
committed,
"synthesized and committed triples to the graph",
);
Ok(())
}
}