use std::sync::Mutex;
use chrono::{TimeZone, Utc};
use tempfile::TempDir;
use super::{ingest_chat, ingest_document, ingest_document_versioned, ingest_email_with_raw_refs};
use crate::memory::chunks::{
count_chunks, get_chunk_lifecycle_status, CHUNK_STATUS_PENDING_EXTRACTION,
};
use crate::memory::chunks::{get_chunk_raw_refs, RawRef};
use crate::memory::config::MemoryConfig;
use crate::memory::ingest::canonicalize::chat::{ChatBatch, ChatMessage};
use crate::memory::ingest::canonicalize::document::DocumentInput;
use crate::memory::ingest::canonicalize::email::{EmailMessage, EmailThread};
use crate::memory::ingest::types::{NullJobSink, TreeJobSink};
use crate::memory::score::ScoringConfig;
#[derive(Default)]
struct RecordingJobSink {
ids: Mutex<Vec<String>>,
}
impl TreeJobSink for RecordingJobSink {
fn enqueue_extract(&self, chunk_id: &str) -> anyhow::Result<()> {
self.ids.lock().unwrap().push(chunk_id.to_string());
Ok(())
}
}
fn test_config() -> (TempDir, MemoryConfig) {
let tmp = TempDir::new().unwrap();
let cfg = MemoryConfig::new(tmp.path());
(tmp, cfg)
}
fn substantive_batch() -> ChatBatch {
ChatBatch {
platform: "slack".into(),
channel_label: "#eng".into(),
messages: vec![
ChatMessage {
author: "alice".into(),
timestamp: Utc.timestamp_millis_opt(1_700_000_000_000).unwrap(),
text: "We are planning to ship the Phoenix migration on Friday after reviewing the runbook and staging results. alice@example.com"
.into(),
source_ref: Some("slack://m1".into()),
},
ChatMessage {
author: "bob".into(),
timestamp: Utc.timestamp_millis_opt(1_700_000_010_000).unwrap(),
text: "Confirmed, I will handle the coordination and launch tracking tonight."
.into(),
source_ref: None,
},
],
}
}
#[tokio::test]
async fn ingest_chat_writes_chunks_and_enqueues_extract_jobs() {
let (_tmp, cfg) = test_config();
let sink = RecordingJobSink::default();
let scoring = ScoringConfig::default_regex_only();
let out = ingest_chat(
&cfg,
"slack:#eng",
"alice",
vec![],
substantive_batch(),
&sink,
&scoring,
)
.await
.unwrap();
assert!(!out.already_ingested);
assert!(out.chunks_written >= 1);
assert_eq!(count_chunks(&cfg).unwrap(), out.chunks_written as u64);
assert_eq!(out.chunk_ids.len(), out.chunks_written);
assert_eq!(out.extract_jobs_enqueued, out.chunk_ids.len());
assert_eq!(sink.ids.lock().unwrap().len(), out.extract_jobs_enqueued);
assert_eq!(
get_chunk_lifecycle_status(&cfg, &out.chunk_ids[0]).unwrap(),
Some(CHUNK_STATUS_PENDING_EXTRACTION.to_string())
);
}
#[tokio::test]
async fn ingest_chat_empty_batch_is_noop() {
let (_tmp, cfg) = test_config();
let sink = NullJobSink;
let scoring = ScoringConfig::default_regex_only();
let batch = ChatBatch {
platform: "slack".into(),
channel_label: "#eng".into(),
messages: vec![],
};
let out = ingest_chat(&cfg, "slack:#eng", "alice", vec![], batch, &sink, &scoring)
.await
.unwrap();
assert_eq!(out.chunks_written, 0);
assert_eq!(out.extract_jobs_enqueued, 0);
assert_eq!(count_chunks(&cfg).unwrap(), 0);
}
#[tokio::test]
async fn second_document_ingest_with_same_source_id_is_short_circuited() {
let (_tmp, cfg) = test_config();
let sink = RecordingJobSink::default();
let scoring = ScoringConfig::default_regex_only();
let doc = DocumentInput {
provider: "notion".into(),
title: "Launch plan".into(),
body: "Phoenix ships Friday after staging review. alice@example.com owns this.".into(),
modified_at: Utc.timestamp_millis_opt(1_700_000_000_000).unwrap(),
source_ref: Some("notion://page/abc".into()),
};
let first = ingest_document(
&cfg,
"notion:abc",
"alice",
vec![],
doc.clone(),
&sink,
&scoring,
)
.await
.unwrap();
assert!(!first.already_ingested);
assert!(first.chunks_written >= 1);
let mutated = DocumentInput {
body: "totally different content that should NOT make it into the tree".into(),
..doc
};
let second = ingest_document(
&cfg,
"notion:abc",
"alice",
vec![],
mutated,
&sink,
&scoring,
)
.await
.unwrap();
assert!(second.already_ingested);
assert_eq!(second.chunks_written, 0);
assert!(second.chunk_ids.is_empty());
assert_eq!(count_chunks(&cfg).unwrap(), first.chunks_written as u64);
}
#[tokio::test]
async fn versioned_document_admits_second_revision() {
let (_tmp, cfg) = test_config();
let sink = RecordingJobSink::default();
let scoring = ScoringConfig::default_regex_only();
let doc_v1 = DocumentInput {
provider: "notion".into(),
title: "Roadmap".into(),
body: "Phoenix ships Friday. alice@example.com owns the rollout.".into(),
modified_at: Utc.timestamp_millis_opt(1_700_000_000_000).unwrap(),
source_ref: Some("notion://page/v".into()),
};
let v1 = ingest_document_versioned(
&cfg,
"notion:v",
"alice",
vec![],
doc_v1,
None,
Some(1),
&sink,
&scoring,
)
.await
.unwrap();
assert!(!v1.already_ingested);
assert!(v1.chunks_written >= 1);
let doc_v2 = DocumentInput {
provider: "notion".into(),
title: "Roadmap".into(),
body: "Phoenix now ships next Monday. bob@example.com owns the rollout.".into(),
modified_at: Utc.timestamp_millis_opt(1_700_000_100_000).unwrap(),
source_ref: Some("notion://page/v".into()),
};
let v2 = ingest_document_versioned(
&cfg,
"notion:v",
"alice",
vec![],
doc_v2,
None,
Some(2),
&sink,
&scoring,
)
.await
.unwrap();
assert!(!v2.already_ingested, "a new version must be admitted");
assert!(v2.chunks_written >= 1);
}
#[tokio::test]
async fn ingest_email_with_raw_refs_attaches_archive_refs_to_every_chunk() {
let (_tmp, cfg) = test_config();
let sink = RecordingJobSink::default();
let scoring = ScoringConfig::default_regex_only();
let thread = EmailThread {
provider: "gmail".into(),
thread_subject: "Launch review".into(),
messages: vec![
EmailMessage {
from: "Alice Smith <alice@example.com>".into(),
to: vec!["Bob Jones <bob@example.com>".into()],
cc: vec![],
subject: "Launch review".into(),
sent_at: Utc.timestamp_millis_opt(1_700_000_000_000).unwrap(),
body: "Alice sent the launch checklist to Bob. Kitchen is north of Garden.".into(),
source_ref: Some("gmail://msg-1".into()),
list_unsubscribe: None,
},
EmailMessage {
from: "Bob Jones <bob@example.com>".into(),
to: vec!["Alice Smith <alice@example.com>".into()],
cc: vec![],
subject: "Re: Launch review".into(),
sent_at: Utc.timestamp_millis_opt(1_700_000_060_000).unwrap(),
body: "Bob confirmed the checklist and added staging notes.".into(),
source_ref: Some("gmail://msg-2".into()),
list_unsubscribe: None,
},
],
};
let raw_refs = vec![
RawRef {
path: "raw/gmail/thread-1/message-1.md".into(),
start: 0,
end: Some(128),
},
RawRef {
path: "raw/gmail/thread-1/message-2.md".into(),
start: 128,
end: None,
},
];
let out = ingest_email_with_raw_refs(
&cfg,
"gmail:thread-1",
"alice",
vec!["mock-email".into()],
thread,
raw_refs.clone(),
&sink,
&scoring,
)
.await
.unwrap();
assert!(!out.already_ingested);
assert!(out.chunks_written >= 1);
assert_eq!(out.extract_jobs_enqueued, out.chunk_ids.len());
for chunk_id in &out.chunk_ids {
let stored = get_chunk_raw_refs(&cfg, chunk_id).unwrap().unwrap();
assert_eq!(stored.len(), raw_refs.len());
assert_eq!(stored[0].path, raw_refs[0].path);
assert_eq!(stored[1].start, raw_refs[1].start);
}
}