use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::{Instant, SystemTime, UNIX_EPOCH};
use ipld_core::ipld::Ipld;
use mnem_core::id::{EdgeId, NodeId};
use mnem_core::objects::{Edge, Node};
use mnem_core::repo::Transaction;
use tracing::{debug, info_span};
use crate::chunk::{ChunkerKind, chunk as run_chunker};
use crate::error::Error;
use crate::extract::{EntityKind, EntitySpan, Extractor, RuleExtractor};
use crate::types::{Chunk, IngestConfig, IngestResult, Section, SourceKind};
pub type EmbedderArc = Arc<dyn EmbedText>;
pub trait EmbedText: Send + Sync {
fn embed_text(&self, text: &str) -> Result<mnem_core::objects::Embedding, Error>;
}
pub struct Ingester {
pub config: IngestConfig,
pub extractor: Box<dyn Extractor>,
pub embedder: Option<EmbedderArc>,
pub progress: Option<std::sync::Arc<dyn Fn() + Send + Sync>>,
}
impl std::fmt::Debug for Ingester {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Ingester")
.field("config", &self.config)
.field("extractor", &"<dyn Extractor>")
.field(
"embedder",
&self.embedder.as_ref().map(|_| "<dyn EmbedText>"),
)
.field(
"progress",
&self.progress.as_ref().map(|_| "<dyn Fn() + Send + Sync>"),
)
.finish()
}
}
impl Ingester {
#[must_use]
pub fn new(config: IngestConfig) -> Self {
Self {
config,
extractor: Box::new(RuleExtractor::default()),
embedder: None,
progress: None,
}
}
#[must_use]
pub fn with_extractor(mut self, ext: Box<dyn Extractor>) -> Self {
self.extractor = ext;
self
}
#[must_use]
pub fn with_embedder(mut self, embedder: EmbedderArc) -> Self {
self.embedder = Some(embedder);
self
}
#[must_use]
pub fn with_progress(mut self, cb: std::sync::Arc<dyn Fn() + Send + Sync>) -> Self {
self.progress = Some(cb);
self
}
#[must_use]
pub fn source_kind_for_path(path: &std::path::Path) -> SourceKind {
match path
.extension()
.and_then(|s| s.to_str())
.map(str::to_ascii_lowercase)
{
Some(ext) if ext == "md" || ext == "markdown" => SourceKind::Markdown,
Some(ext) if ext == "pdf" => SourceKind::Pdf,
Some(ext) if ext == "json" || ext == "jsonl" => SourceKind::Conversation,
_ => SourceKind::Text,
}
}
pub fn ingest(
&self,
tx: &mut Transaction,
bytes: &[u8],
kind: SourceKind,
) -> Result<IngestResult, Error> {
let started = Instant::now();
let _span = info_span!("mnem_ingest.run", ?kind).entered();
let sections = parse(bytes, kind)?;
let chunker: ChunkerKind = self.config.chunker.clone();
let chunks = run_chunker(§ions, &chunker);
debug!(
n_sections = sections.len(),
n_chunks = chunks.len(),
"parse + chunk done"
);
self.extractor.prepare(§ions)?;
let created_at_micros = now_micros();
let source_kind_str = source_kind_str(kind);
let doc_id = NodeId::new_v7();
let mut doc =
Node::new(doc_id, self.config.ntype.clone()).with_summary(doc_summary(§ions));
doc.props.insert(
"mnem:created_at".into(),
Ipld::Integer(i128::from(created_at_micros)),
);
doc.props.insert(
"mnem:source_kind".into(),
Ipld::String(source_kind_str.to_string()),
);
tx.add_node(&doc).map_err(Error::commit)?;
let mut node_count: u64 = 1;
let mut relation_count: u64 = 0;
let mut entity_registry: BTreeMap<(EntityKind, String), NodeId> = BTreeMap::new();
for (chunk_idx, c) in chunks.iter().enumerate() {
let chunk_id = self.commit_chunk(tx, c, doc_id, created_at_micros, source_kind_str)?;
node_count += 1;
if let Some(cb) = &self.progress {
cb();
}
debug!(chunk = chunk_idx, "chunk committed");
let mut ents_for_chunk: Vec<(EntitySpan, NodeId)> = Vec::new();
for section in sections.iter().filter(|s| section_in_chunk(s, c)) {
let ents = self.extractor.extract_entities(section);
for e in ents {
let key = (e.kind, canonical(&e.text));
let ent_id = if let Some(existing) = entity_registry.get(&key) {
*existing
} else {
let id = NodeId::new_v7();
let mut n = Node::new(id, e.kind.ntype()).with_summary(e.text.clone());
n.props.insert(
"mnem:created_at".into(),
Ipld::Integer(i128::from(created_at_micros)),
);
n.props
.insert("canonical".into(), Ipld::String(key.1.clone()));
tx.add_node(&n).map_err(Error::commit)?;
node_count += 1;
entity_registry.insert(key, id);
id
};
let mention = Edge::new(EdgeId::new_v7(), "chunk_mentions", chunk_id, ent_id);
tx.add_edge(&mention).map_err(Error::commit)?;
ents_for_chunk.push((e, ent_id));
}
let section_ents: Vec<EntitySpan> =
ents_for_chunk.iter().map(|(e, _)| e.clone()).collect();
let rels = self.extractor.extract_relations(§ion_ents, section);
for r in rels {
let subj_id = ents_for_chunk[r.subject_span].1;
let obj_id = ents_for_chunk[r.object_span].1;
let rel_edge = Edge::new(EdgeId::new_v7(), r.kind.clone(), subj_id, obj_id);
tx.add_edge(&rel_edge).map_err(Error::commit)?;
relation_count += 1;
}
}
}
let elapsed_ms = u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX);
let entity_count = u64::try_from(entity_registry.len()).unwrap_or(u64::MAX);
let chunk_count = u64::try_from(chunks.len()).unwrap_or(u64::MAX);
Ok(IngestResult {
commit_cid: None,
node_count,
chunk_count,
entity_count,
relation_count,
elapsed_ms,
})
}
fn commit_chunk(
&self,
tx: &mut Transaction,
c: &Chunk,
doc_id: NodeId,
created_at: i64,
source_kind: &'static str,
) -> Result<NodeId, Error> {
let id = NodeId::new_v7();
let summary = short_summary(&c.text);
let mut node = Node::new(id, "Chunk").with_summary(summary);
node.content = Some(bytes::Bytes::copy_from_slice(c.text.as_bytes()));
node.props.insert(
"mnem:created_at".into(),
Ipld::Integer(i128::from(created_at)),
);
node.props.insert(
"mnem:source_kind".into(),
Ipld::String(source_kind.to_string()),
);
node.props.insert(
"mnem:section_path".into(),
Ipld::List(
c.section_path
.iter()
.map(|s| Ipld::String(s.clone()))
.collect(),
),
);
node.props.insert(
"tokens_estimate".into(),
Ipld::Integer(i128::from(c.tokens_estimate)),
);
let pending_emb = if let Some(embedder) = &self.embedder {
Some(embedder.embed_text(&c.text)?)
} else {
None
};
let chunk_cid = tx.add_node(&node).map_err(Error::commit)?;
if let Some(emb) = pending_emb {
let model = emb.model.clone();
tx.set_embedding(chunk_cid, model, emb)
.map_err(Error::commit)?;
}
let edge = Edge::new(EdgeId::new_v7(), "chunk_of", id, doc_id);
tx.add_edge(&edge).map_err(Error::commit)?;
Ok(id)
}
}
fn parse(bytes: &[u8], kind: SourceKind) -> Result<Vec<Section>, Error> {
match kind {
SourceKind::Markdown => {
let s = std::str::from_utf8(bytes).map_err(|e| Error::ParseFailed {
what: "markdown".into(),
detail: e.to_string(),
})?;
crate::md::parse_markdown(s)
}
SourceKind::Text => {
let s = std::str::from_utf8(bytes).map_err(|e| Error::ParseFailed {
what: "text".into(),
detail: e.to_string(),
})?;
crate::text::parse_text(s)
}
SourceKind::Pdf => crate::pdf::parse_pdf(bytes),
SourceKind::Conversation => crate::conversation::parse_conversation(bytes),
}
}
fn section_in_chunk(section: &Section, chunk: &Chunk) -> bool {
match (§ion.heading, chunk.section_path.last()) {
(Some(h), Some(last)) => h == last,
(None, _) => true,
_ => false,
}
}
fn doc_summary(sections: &[Section]) -> String {
for s in sections {
let trimmed = s.text.trim();
if !trimmed.is_empty() {
return short_summary(trimmed);
}
}
"(empty)".into()
}
fn short_summary(text: &str) -> String {
let trimmed = text.trim();
if trimmed.len() <= 200 {
return trimmed.to_string();
}
let mut end = 200;
while end > 0 && !trimmed.is_char_boundary(end) {
end -= 1;
}
format!("{}…", &trimmed[..end])
}
fn canonical(s: &str) -> String {
s.trim().to_lowercase()
}
const fn source_kind_str(kind: SourceKind) -> &'static str {
match kind {
SourceKind::Markdown => "markdown",
SourceKind::Text => "text",
SourceKind::Pdf => "pdf",
SourceKind::Conversation => "conversation",
}
}
fn now_micros() -> i64 {
let d = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default();
i64::try_from(d.as_micros()).unwrap_or(i64::MAX)
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use mnem_core::objects::{Dtype, Embedding};
use mnem_core::repo::ReadonlyRepo;
use mnem_core::store::{MemoryBlockstore, MemoryOpHeadsStore};
use std::sync::Arc as StdArc;
fn test_repo() -> ReadonlyRepo {
let bs = StdArc::new(MemoryBlockstore::new());
let op_heads = StdArc::new(MemoryOpHeadsStore::new());
ReadonlyRepo::init(bs, op_heads).expect("init repo")
}
struct StubEmbedder;
impl EmbedText for StubEmbedder {
fn embed_text(&self, _text: &str) -> Result<Embedding, Error> {
let v: Vec<f32> = (0..384)
.map(|i| f32::from(i16::try_from(i % 256).unwrap_or(0)) * 0.01)
.collect();
let mut buf = Vec::with_capacity(v.len() * 4);
for x in v {
buf.extend_from_slice(&x.to_le_bytes());
}
Ok(Embedding {
model: "stub:test".into(),
dtype: Dtype::F32,
dim: 384,
vector: Bytes::from(buf),
})
}
}
#[test]
fn ingest_markdown_produces_doc_and_chunks() {
let repo = test_repo();
let mut tx = repo.start_transaction();
let ing = Ingester::new(IngestConfig::default());
let md = "# Phase-B5c\n\nAlice Johnson joined Acme Corp on 2026-04-24.\n\nSee https://example.com for details.";
let result = ing
.ingest(&mut tx, md.as_bytes(), SourceKind::Markdown)
.expect("ingest ok");
assert!(result.chunk_count >= 1, "got {result:?}");
assert!(result.node_count >= 2, "expected doc + chunks + entities");
assert!(result.entity_count >= 1, "expected at least one entity");
}
#[test]
fn ingest_text_respects_embedder() {
let repo = test_repo();
let mut tx = repo.start_transaction();
let ing = Ingester::new(IngestConfig::default()).with_embedder(StdArc::new(StubEmbedder));
let body = "Plain body. Alice Johnson met Bob Lee at Acme Corp.";
let result = ing
.ingest(&mut tx, body.as_bytes(), SourceKind::Text)
.expect("ingest ok");
assert!(result.node_count >= 2);
assert!(result.chunk_count >= 1);
}
#[test]
fn source_kind_for_path_maps_extensions() {
use std::path::Path;
assert_eq!(
Ingester::source_kind_for_path(Path::new("/x/y.md")),
SourceKind::Markdown
);
assert_eq!(
Ingester::source_kind_for_path(Path::new("y.MARKDOWN")),
SourceKind::Markdown
);
assert_eq!(
Ingester::source_kind_for_path(Path::new("book.pdf")),
SourceKind::Pdf
);
assert_eq!(
Ingester::source_kind_for_path(Path::new("chat.json")),
SourceKind::Conversation
);
assert_eq!(
Ingester::source_kind_for_path(Path::new("notes.txt")),
SourceKind::Text
);
assert_eq!(
Ingester::source_kind_for_path(Path::new("noext")),
SourceKind::Text
);
}
#[test]
fn ingest_is_deterministic_in_counts() {
let md = "# H\n\nBob Lee visited https://foo.io on 2026-04-24.";
let repo1 = test_repo();
let mut tx1 = repo1.start_transaction();
let r1 = Ingester::new(IngestConfig::default())
.ingest(&mut tx1, md.as_bytes(), SourceKind::Markdown)
.unwrap();
let repo2 = test_repo();
let mut tx2 = repo2.start_transaction();
let r2 = Ingester::new(IngestConfig::default())
.ingest(&mut tx2, md.as_bytes(), SourceKind::Markdown)
.unwrap();
assert_eq!(r1.chunk_count, r2.chunk_count);
assert_eq!(r1.entity_count, r2.entity_count);
assert_eq!(r1.relation_count, r2.relation_count);
}
}