use std::collections::HashMap;
use futures::stream::{self, StreamExt};
use crate::dedup::{self, ResolvedEntity};
use crate::error::GraphError;
use crate::extract;
use crate::llm::LlmProvider;
use crate::types::*;
use crate::GraphMemory;
const LLM_CONCURRENCY: usize = 10;
pub async fn ingest_archive(
gm: &GraphMemory,
archive_text: &str,
session_id: &str,
log_number: Option<u32>,
llm: Option<&dyn LlmProvider>,
) -> Result<IngestionReport, GraphError> {
let mut report = IngestionReport::default();
let chunks = extract::chunk_conversation(archive_text, 500);
if chunks.is_empty() {
return Ok(report);
}
for (i, chunk) in chunks.iter().enumerate() {
let abstract_text = build_episode_abstract(chunk);
let episode = NewEpisode {
session_id: session_id.to_string(),
abstract_text,
overview: None,
content: Some(chunk.clone()),
log_number,
};
match gm.add_episode(episode).await {
Ok(_) => report.episodes_created += 1,
Err(e) => {
report.errors.push(format!("episode chunk {}: {}", i, e));
}
}
}
if let Some(llm) = llm {
process_extraction(gm, &chunks, session_id, log_number, llm, &mut report).await?;
}
Ok(report)
}
pub async fn extract_from_archive(
gm: &GraphMemory,
archive_text: &str,
session_id: &str,
log_number: Option<u32>,
llm: &dyn LlmProvider,
) -> Result<IngestionReport, GraphError> {
let mut report = IngestionReport::default();
let chunks = extract::chunk_conversation(archive_text, 500);
if chunks.is_empty() {
return Ok(report);
}
process_extraction(gm, &chunks, session_id, log_number, llm, &mut report).await?;
Ok(report)
}
async fn process_extraction(
gm: &GraphMemory,
chunks: &[String],
session_id: &str,
log_number: Option<u32>,
llm: &dyn LlmProvider,
report: &mut IngestionReport,
) -> Result<(), GraphError> {
let extraction_results: Vec<(usize, Result<ExtractionResult, GraphError>)> =
stream::iter(chunks.iter().enumerate())
.map(|(i, chunk)| async move {
let result = extract::extract_from_chunk(llm, chunk, session_id, log_number).await;
(i, result)
})
.buffer_unordered(LLM_CONCURRENCY)
.collect()
.await;
let mut all_entities: Vec<ExtractedEntity> = Vec::new();
let mut all_relationships: Vec<ExtractedRelationship> = Vec::new();
for (i, result) in extraction_results {
match result {
Ok(extraction) => {
all_entities.extend(extract::flatten_extraction(&extraction));
all_relationships.extend(extraction.relationships);
}
Err(e) => {
report.errors.push(format!("extraction chunk {}: {}", i, e));
}
}
}
let deduplicated = local_merge_entities(all_entities);
let mut name_map: HashMap<String, String> = HashMap::new();
for candidate in &deduplicated {
match dedup::resolve_entity(gm, llm, candidate, session_id).await {
Ok(ResolvedEntity::Created(entity)) => {
name_map.insert(candidate.name.clone(), entity.name.clone());
report.entities_created += 1;
}
Ok(ResolvedEntity::Merged(entity)) => {
name_map.insert(candidate.name.clone(), entity.name.clone());
report.entities_merged += 1;
}
Ok(ResolvedEntity::Skipped) => {
name_map.insert(candidate.name.clone(), candidate.name.clone());
report.entities_skipped += 1;
}
Err(e) => {
report
.errors
.push(format!("dedup '{}': {}", candidate.name, e));
}
}
}
for rel in &all_relationships {
let from_name = name_map.get(&rel.source).unwrap_or(&rel.source);
let to_name = name_map.get(&rel.target).unwrap_or(&rel.target);
if relationship_exists(gm, from_name, to_name, &rel.rel_type).await {
report.relationships_skipped += 1;
continue;
}
let new_rel = NewRelationship {
from_entity: from_name.clone(),
to_entity: to_name.clone(),
rel_type: rel.rel_type.clone(),
description: rel.description.clone(),
confidence: None,
source: Some(session_id.to_string()),
};
match gm.add_relationship(new_rel).await {
Ok(_) => report.relationships_created += 1,
Err(e) => {
report
.errors
.push(format!("relationship {} -> {}: {}", from_name, to_name, e));
}
}
}
Ok(())
}
fn local_merge_entities(entities: Vec<ExtractedEntity>) -> Vec<ExtractedEntity> {
let mut seen: HashMap<String, ExtractedEntity> = HashMap::new();
let mut order: Vec<String> = Vec::new();
for entity in entities {
let key = entity.name.to_lowercase();
if let Some(existing) = seen.get_mut(&key) {
if entity.abstract_text.len() > existing.abstract_text.len() {
existing.abstract_text = entity.abstract_text;
}
if let Some(new_overview) = entity.overview {
existing.overview = Some(match &existing.overview {
Some(o) => format!("{}\n\n{}", o, new_overview),
None => new_overview,
});
}
if let Some(new_content) = entity.content {
existing.content = Some(match &existing.content {
Some(c) => format!("{}\n\n{}", c, new_content),
None => new_content,
});
}
if let Some(new_attrs) = entity.attributes {
existing.attributes = Some(match &existing.attributes {
Some(a) => merge_json(a, &new_attrs),
None => new_attrs,
});
}
} else {
order.push(key.clone());
seen.insert(key, entity);
}
}
order.into_iter().filter_map(|k| seen.remove(&k)).collect()
}
fn merge_json(base: &serde_json::Value, overlay: &serde_json::Value) -> serde_json::Value {
match (base, overlay) {
(serde_json::Value::Object(b), serde_json::Value::Object(o)) => {
let mut merged = b.clone();
for (k, v) in o {
merged.insert(k.clone(), v.clone());
}
serde_json::Value::Object(merged)
}
_ => overlay.clone(),
}
}
fn build_episode_abstract(chunk: &str) -> String {
let chars: String = chunk.chars().take(200).collect();
if chars.len() < chunk.len() {
format!("{}...", chars.trim())
} else {
chars.trim().to_string()
}
}
async fn relationship_exists(
gm: &GraphMemory,
from_name: &str,
to_name: &str,
rel_type: &str,
) -> bool {
let Ok(rels) = gm.get_relationships(from_name, Direction::Outgoing).await else {
return false;
};
let Ok(Some(to_entity)) = gm.get_entity(to_name).await else {
return false;
};
let to_id = to_entity.id_string();
rels.iter().any(|r| {
r.rel_type == rel_type && {
let out_id = match &r.to_id {
serde_json::Value::String(s) => s.clone(),
other => other.to_string(),
};
out_id == to_id
}
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn episode_abstract_truncates() {
let long = "x".repeat(500);
let abs = build_episode_abstract(&long);
assert!(abs.len() < 210);
assert!(abs.ends_with("..."));
}
#[test]
fn episode_abstract_short_unchanged() {
let short = "Hello world";
let abs = build_episode_abstract(short);
assert_eq!(abs, "Hello world");
}
}