use anyhow::{anyhow, bail, Result};
use chrono::Utc;
use crate::memory::chunks::{
self, chunk_markdown, claim_source_ingest_tx, get_chunk_lifecycle_status, is_source_ingested,
set_chunk_lifecycle_status, set_chunk_raw_refs, upsert_chunks, with_connection, ChunkerInput,
ChunkerOptions, SourceKind, CHUNK_STATUS_PENDING_EXTRACTION,
};
use crate::memory::config::MemoryConfig;
use crate::memory::score::{persist_score, score_chunks_fast, ScoringConfig};
use crate::memory::store::content;
use super::canonicalize::chat::{self, ChatBatch};
use super::canonicalize::document::{self, DocumentInput};
use super::canonicalize::email::{self, EmailThread};
use super::canonicalize::CanonicalisedSource;
use super::types::{IngestOptions, IngestSummary, TreeJobSink};
pub async fn ingest_canonical(
config: &MemoryConfig,
source_id: &str,
canonical: CanonicalisedSource,
sink: &dyn TreeJobSink,
scoring: &ScoringConfig,
opts: &IngestOptions,
) -> Result<IngestSummary> {
let source_kind = canonical.metadata.source_kind;
let input = ChunkerInput {
source_kind,
source_id: source_id.to_string(),
markdown: canonical.markdown,
metadata: canonical.metadata,
};
let chunks = chunk_markdown(&input, &ChunkerOptions::default());
if chunks.is_empty() {
return Ok(IngestSummary::empty(source_id));
}
if source_kind == SourceKind::Document {
let gate_key = match opts.gate_version_ms {
Some(v) => format!("{source_id}@{v}"),
None => source_id.to_string(),
};
let claimed = with_connection(config, |conn| {
let tx = conn.unchecked_transaction()?;
let claimed =
claim_source_ingest_tx(&tx, source_kind, &gate_key, Utc::now().timestamp_millis())?;
tx.commit()?;
Ok(claimed)
})?;
if !claimed {
return Ok(IngestSummary::already_ingested(source_id));
}
}
let content_root = chunks::content_root(config);
content::stage_chunks(&content_root, &chunks)
.map_err(|e| anyhow!("stage_chunks failed: {e}"))?;
let mut prior: Vec<Option<String>> = Vec::with_capacity(chunks.len());
for chunk in &chunks {
prior.push(get_chunk_lifecycle_status(config, &chunk.id)?);
}
let chunks_written = upsert_chunks(config, &chunks)?;
if let Some(refs) = opts.raw_refs.as_ref() {
for chunk in &chunks {
set_chunk_raw_refs(config, &chunk.id, refs)?;
}
}
let scores = score_chunks_fast(&chunks, scoring).await?;
if scores.len() != chunks.len() {
bail!(
"scorer length mismatch: chunks={} scores={}",
chunks.len(),
scores.len()
);
}
let mut chunks_dropped = 0usize;
let mut extract_jobs_enqueued = 0usize;
let mut chunk_ids = Vec::with_capacity(chunks.len());
for ((chunk, result), pre) in chunks.iter().zip(scores.iter()).zip(prior.iter()) {
chunk_ids.push(chunk.id.clone());
if !result.kept {
chunks_dropped += 1;
}
let needs_processing =
matches!(pre.as_deref(), None | Some(CHUNK_STATUS_PENDING_EXTRACTION));
if !needs_processing {
continue;
}
let ts_ms = chunk.metadata.timestamp.timestamp_millis();
persist_score(config, result, ts_ms, None)?;
set_chunk_lifecycle_status(config, &chunk.id, CHUNK_STATUS_PENDING_EXTRACTION)?;
sink.enqueue_extract(&chunk.id)?;
extract_jobs_enqueued += 1;
}
Ok(IngestSummary {
source_id: source_id.to_string(),
chunks_written,
chunks_dropped,
chunk_ids,
extract_jobs_enqueued,
already_ingested: false,
})
}
#[allow(clippy::too_many_arguments)]
pub async fn ingest_chat(
config: &MemoryConfig,
source_id: &str,
owner: &str,
tags: Vec<String>,
batch: ChatBatch,
sink: &dyn TreeJobSink,
scoring: &ScoringConfig,
) -> Result<IngestSummary> {
let canonical =
match chat::canonicalise(source_id, owner, &tags, batch).map_err(anyhow::Error::msg)? {
Some(c) => c,
None => return Ok(IngestSummary::empty(source_id)),
};
ingest_canonical(
config,
source_id,
canonical,
sink,
scoring,
&IngestOptions::default(),
)
.await
}
#[allow(clippy::too_many_arguments)]
pub async fn ingest_email(
config: &MemoryConfig,
source_id: &str,
owner: &str,
tags: Vec<String>,
thread: EmailThread,
sink: &dyn TreeJobSink,
scoring: &ScoringConfig,
) -> Result<IngestSummary> {
let canonical =
match email::canonicalise(source_id, owner, &tags, thread).map_err(anyhow::Error::msg)? {
Some(c) => c,
None => return Ok(IngestSummary::empty(source_id)),
};
ingest_canonical(
config,
source_id,
canonical,
sink,
scoring,
&IngestOptions::default(),
)
.await
}
#[allow(clippy::too_many_arguments)]
pub async fn ingest_email_with_raw_refs(
config: &MemoryConfig,
source_id: &str,
owner: &str,
tags: Vec<String>,
thread: EmailThread,
raw_refs: Vec<crate::memory::chunks::RawRef>,
sink: &dyn TreeJobSink,
scoring: &ScoringConfig,
) -> Result<IngestSummary> {
let canonical =
match email::canonicalise(source_id, owner, &tags, thread).map_err(anyhow::Error::msg)? {
Some(c) => c,
None => return Ok(IngestSummary::empty(source_id)),
};
let opts = IngestOptions {
gate_version_ms: None,
raw_refs: Some(raw_refs),
};
ingest_canonical(config, source_id, canonical, sink, scoring, &opts).await
}
#[allow(clippy::too_many_arguments)]
pub async fn ingest_document(
config: &MemoryConfig,
source_id: &str,
owner: &str,
tags: Vec<String>,
doc: DocumentInput,
sink: &dyn TreeJobSink,
scoring: &ScoringConfig,
) -> Result<IngestSummary> {
ingest_document_versioned(
config, source_id, owner, tags, doc, None, None, sink, scoring,
)
.await
}
#[allow(clippy::too_many_arguments)]
pub async fn ingest_document_with_scope(
config: &MemoryConfig,
source_id: &str,
owner: &str,
tags: Vec<String>,
doc: DocumentInput,
path_scope: Option<String>,
sink: &dyn TreeJobSink,
scoring: &ScoringConfig,
) -> Result<IngestSummary> {
ingest_document_versioned(
config, source_id, owner, tags, doc, path_scope, None, sink, scoring,
)
.await
}
#[allow(clippy::too_many_arguments)]
pub async fn ingest_document_versioned(
config: &MemoryConfig,
source_id: &str,
owner: &str,
tags: Vec<String>,
doc: DocumentInput,
path_scope: Option<String>,
version_ms: Option<i64>,
sink: &dyn TreeJobSink,
scoring: &ScoringConfig,
) -> Result<IngestSummary> {
let gate_key = match version_ms {
Some(v) => format!("{source_id}@{v}"),
None => source_id.to_string(),
};
if is_source_ingested(config, SourceKind::Document, &gate_key)? {
return Ok(IngestSummary::already_ingested(source_id));
}
let canonical = match document::canonicalise(source_id, owner, &tags, doc, path_scope)
.map_err(anyhow::Error::msg)?
{
Some(c) => c,
None => return Ok(IngestSummary::empty(source_id)),
};
let opts = IngestOptions {
gate_version_ms: version_ms,
raw_refs: None,
};
ingest_canonical(config, source_id, canonical, sink, scoring, &opts).await
}
#[cfg(test)]
#[path = "pipeline_tests.rs"]
mod tests;