use std::sync::Arc;
use tracing::{Instrument, Level, event, info_span};
use crate::jobs::{Job, JobsError, MemoryJobsStore};
use crate::llm::{
AcceptAllEventAt, DEFAULT_EXTRACTION_PROMPT, EventAtValidator, ExtractionOutput, LlmError, LlmKind, LlmProvider,
LlmRole, MAX_CONTENT_CHARS, build_extraction_content, parse_extraction,
};
use crate::memory::MemoryKind;
use crate::store::{MemoryStore, StoreError};
use super::ClientInner;
#[derive(Debug, thiserror::Error)]
pub(super) enum ExtractError {
#[error("source lookup failed: {0}")]
SourceLookup(#[from] StoreError),
#[error("llm call failed: {0}")]
LlmCall(LlmError),
#[error("llm output parse failed: {0}")]
Parse(LlmError),
#[error("persist failed: {0}")]
Persist(String),
}
impl ClientInner {
pub(super) async fn run_extract(self: &Arc<Self>, job: Job) -> Result<(), ExtractError> {
let span = info_span!("memoir.extraction", source_pid = %job.source_pid);
async move { self.run_extract_inner(job).await }.instrument(span).await
}
async fn run_extract_inner(self: &Arc<Self>, job: Job) -> Result<(), ExtractError> {
let source_pid = job.source_pid.clone();
event!(
name: "memoir.extraction.started",
Level::INFO,
source_pid = %source_pid,
"extraction started for {{source_pid}}",
);
let source = match self.store.recall(&source_pid).await {
Ok(memory) => memory,
Err(StoreError::NotFound(_)) => {
event!(
name: "memoir.extraction.source_missing",
Level::INFO,
source_pid = %source_pid,
"source memory absent for {{source_pid}} (cascade delete race); skipping",
);
return Ok(());
}
Err(err) => return Err(ExtractError::SourceLookup(err)),
};
self.re_extract_source(&source, None, job.id).await
}
#[cfg_attr(not(feature = "knowledge-graph"), allow(unused_variables))]
pub(super) async fn re_extract_source(
self: &Arc<Self>,
source: &crate::memory::Memory,
correction: Option<&str>,
caller_job_id: i64,
) -> Result<(), ExtractError> {
let source_pid = source.pid.clone();
let Some(provider) = self.llms.get(LlmRole::Extraction) else {
event!(
name: "memoir.extraction.skipped",
Level::WARN,
source_pid = %source_pid,
"no extraction llm configured; treating job as no-op",
);
return Ok(());
};
let content_len = source.content.len();
let reference = source.event_at.unwrap_or(source.created_at);
let extraction_content = build_extraction_content(reference, &source.content, correction);
let raw = match provider.extract(DEFAULT_EXTRACTION_PROMPT, &extraction_content).await {
Ok(raw) => raw,
Err(err) => {
event!(
name: "memoir.extraction.llm_failed",
Level::WARN,
source_pid = %source_pid,
provider = %provider.kind(),
model = %provider.model(),
content_len = content_len,
error.message = %err,
"extraction llm call failed for {{source_pid}}: {{error.message}}",
);
return Err(ExtractError::LlmCall(err));
}
};
let parsed: ExtractionOutput = parse_extraction(&raw).map_err(|err| {
event!(
name: "memoir.extraction.parse_failed",
Level::WARN,
source_pid = %source_pid,
provider = %provider.kind(),
content_len = content_len,
error.message = %err,
"extraction parse failed for {{source_pid}}: {{error.message}}",
);
ExtractError::Parse(err)
})?;
event!(
name: "memoir.extraction.parsed",
Level::INFO,
source_pid = %source_pid,
fact_count = parsed.facts.len(),
"extraction yielded {{fact_count}} fact(s) for {{source_pid}}",
);
if parsed.facts.is_empty() {
return Ok(());
}
let provider_kind = provider.kind();
let provider_model = provider.model().to_string();
let validator = AcceptAllEventAt;
let mut persisted_pids: Vec<String> = Vec::with_capacity(parsed.facts.len());
for fact in parsed.facts {
let content_chars = fact.content.chars().count();
if fact.content.is_empty() || content_chars > MAX_CONTENT_CHARS {
continue;
}
let metadata = build_semantic_metadata(provider_kind, &provider_model);
let event_at = fact
.event_at
.and_then(|candidate| validator.validate(reference, candidate));
let confidence = crate::memory::Confidence::from_unit_scale(fact.confidence);
let written = self
.store
.remember(crate::store::NewMemory {
scope: source.scope.clone(),
content: fact.content,
metadata,
kind: MemoryKind::Semantic,
source_pid: Some(source_pid.clone()),
event_at,
confidence,
})
.await
.map_err(|err| ExtractError::Persist(err.to_string()))?;
self.jobs
.enqueue(
crate::jobs::JobKind::Embed,
written.pid.clone(),
serde_json::json!({ "origin": "extraction" }),
)
.await
.map_err(|err: JobsError| ExtractError::Persist(err.to_string()))?;
if self.nli.is_some() {
self.jobs
.enqueue(
crate::jobs::JobKind::Categorize,
written.pid.clone(),
serde_json::json!({ "origin": "extraction" }),
)
.await
.map_err(|err: JobsError| ExtractError::Persist(err.to_string()))?;
}
persisted_pids.push(written.pid);
}
event!(
name: "memoir.extraction.persisted",
Level::INFO,
source_pid = %source_pid,
semantic_count = persisted_pids.len(),
"extraction persisted {{semantic_count}} semantic row(s) for {{source_pid}}",
);
#[cfg(feature = "knowledge-graph")]
if self.graph.is_some() {
self.jobs
.enqueue_synthesis_if_ready(&source_pid, caller_job_id)
.await
.map_err(|err: JobsError| ExtractError::Persist(err.to_string()))?;
}
Ok(())
}
}
fn build_semantic_metadata(provider: LlmKind, model: &str) -> serde_json::Value {
serde_json::json!({
"origin": "extraction",
"provider": provider.as_ref(),
"model": model,
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn should_build_semantic_metadata_with_expected_shape() {
let meta = build_semantic_metadata(LlmKind::Ollama, "llama3.2");
assert_eq!(meta["origin"], "extraction");
assert_eq!(meta["provider"], "ollama");
assert_eq!(meta["model"], "llama3.2");
}
#[test]
fn should_not_record_confidence_in_metadata() {
let meta = build_semantic_metadata(LlmKind::Ollama, "llama3.2");
assert!(meta.get("confidence").is_none());
}
#[test]
fn should_extract_error_chain_via_from_store_error() {
let store_err = StoreError::NotFound("pid".to_string());
let extract_err: ExtractError = store_err.into();
assert!(matches!(extract_err, ExtractError::SourceLookup(_)));
}
}