use std::sync::Arc;
use tracing::{Instrument, Level, event, info_span};
use crate::graph::{LlmExtractor, TripleExtractor};
use crate::jobs::{Job, MemoryJobsStore};
use crate::llm::LlmRole;
use crate::store::{MemoryStore, StoreError};
use super::ClientInner;
#[derive(Debug, thiserror::Error)]
pub(super) enum RelationalExtractError {
#[error("source lookup failed: {0}")]
SourceLookup(String),
#[error("triple extraction failed: {0}")]
Extraction(String),
#[error("relational staging failed: {0}")]
Staging(String),
}
impl ClientInner {
pub(super) async fn run_relational_extract(self: &Arc<Self>, job: Job) -> Result<(), RelationalExtractError> {
let span = info_span!("memoir.relational", source_pid = %job.source_pid);
async move { self.run_relational_extract_inner(job).await }
.instrument(span)
.await
}
async fn run_relational_extract_inner(self: &Arc<Self>, job: Job) -> Result<(), RelationalExtractError> {
let pid = job.source_pid.clone();
let source = match self.store.recall(&pid).await {
Ok(source) => source,
Err(StoreError::NotFound(_)) => {
event!(
name: "memoir.relational.source_missing",
Level::WARN,
source_pid = %pid,
"episodic source vanished before relational extract; treating job as no-op",
);
return Ok(());
}
Err(err) => return Err(RelationalExtractError::SourceLookup(err.to_string())),
};
let Some(provider) = self.llms.get(LlmRole::Relational) else {
event!(
name: "memoir.relational.skipped",
Level::WARN,
source_pid = %pid,
"no relational llm configured; treating job as no-op",
);
return Ok(());
};
let extractor = LlmExtractor::new(provider.clone());
let triples = extractor
.extract(&source.content)
.await
.map_err(|err| RelationalExtractError::Extraction(err.to_string()))?;
self.triple_staging
.stage(&source.pid, &triples)
.await
.map_err(|err| RelationalExtractError::Staging(err.to_string()))?;
self.jobs
.enqueue_synthesis_if_ready(&source.pid, job.id)
.await
.map_err(|err| RelationalExtractError::Staging(err.to_string()))?;
event!(
name: "memoir.relational.staged",
Level::DEBUG,
source_pid = %source.pid,
triple_count = triples.len(),
"staged relational triples for synthesis",
);
Ok(())
}
}