use std::time::Duration;
use smos_domain::chat::ToolCall;
use smos_domain::config::{ConfidenceConfig, ExtractionConfig};
use smos_domain::{Embedding, Fact, FactId, MemoryKey, SessionId};
use crate::errors::{ProviderError, UseCaseError};
use crate::helpers::noise_filter;
use crate::ports::{
Clock, Delay, EmbeddingProvider, FactRepository, LlmExtractor, SessionRepository,
};
pub const MIN_INPUT_CHARS: usize = 15;
const EXTRACTION_ATTEMPTS: u32 = 3;
const BACKOFF: [Duration; 2] = [Duration::from_secs(1), Duration::from_secs(2)];
pub struct ExtractFactsFromResponse<'a, FR, SR, EP, LE, C, D> {
pub facts: &'a FR,
pub sessions: &'a SR,
pub embedder: &'a EP,
pub extractor: &'a LE,
pub clock: &'a C,
pub delay: &'a D,
pub confidence_cfg: &'a ConfidenceConfig,
pub extraction_cfg: &'a ExtractionConfig,
pub enable_response_extraction: bool,
}
impl<'a, FR, SR, EP, LE, C, D> ExtractFactsFromResponse<'a, FR, SR, EP, LE, C, D>
where
FR: FactRepository,
SR: SessionRepository,
EP: EmbeddingProvider,
LE: LlmExtractor,
C: Clock,
D: Delay,
{
pub async fn execute(
&self,
content: &str,
tool_calls: &[ToolCall],
memory_key: &MemoryKey,
session_id: &SessionId,
) -> Result<usize, UseCaseError> {
if !self.enable_response_extraction {
return Ok(0);
}
let mut input = noise_filter::clean(content);
input.push_str(&format_tool_calls(tool_calls));
if input.trim().chars().count() < MIN_INPUT_CHARS {
tracing::debug!(
len = input.len(),
"extraction skipped: input below MIN_INPUT_CHARS"
);
return Ok(0);
}
let raw_facts = self.extract_with_retries(&input, tool_calls).await?;
if raw_facts.is_empty() {
return Ok(0);
}
let new_ids = self
.persist_facts(&raw_facts, memory_key, session_id)
.await?;
if !new_ids.is_empty() {
self.sessions.add_pending(session_id, &new_ids).await?;
}
Ok(new_ids.len())
}
async fn extract_with_retries(
&self,
input: &str,
tool_calls: &[ToolCall],
) -> Result<Vec<String>, UseCaseError> {
for attempt in 0..EXTRACTION_ATTEMPTS {
match self.extractor.extract_facts(input, tool_calls).await {
Ok(facts) if !facts.is_empty() => return Ok(facts),
Ok(_) => self.maybe_sleep(attempt).await,
Err(ProviderError::Unavailable(msg)) => {
tracing::warn!(error = %msg, "extractor unavailable; skipping (graceful)");
return Ok(Vec::new());
}
Err(e) if attempt + 1 < EXTRACTION_ATTEMPTS => {
tracing::warn!(attempt = attempt + 1, error = %e, "extraction failed; retrying");
self.maybe_sleep(attempt).await;
}
Err(e) => return Err(e.into()),
}
}
Ok(Vec::new())
}
async fn maybe_sleep(&self, attempt: u32) {
if let Some(delay) = BACKOFF.get(attempt as usize) {
self.delay.delay(*delay).await;
}
}
async fn persist_facts(
&self,
raw_facts: &[String],
memory_key: &MemoryKey,
session_id: &SessionId,
) -> Result<Vec<FactId>, UseCaseError> {
let refs: Vec<&str> = raw_facts.iter().map(String::as_str).collect();
let embeddings = self.embedder.embed_batch(&refs).await?;
let mut new_ids = Vec::new();
for (raw, embedding) in raw_facts.iter().zip(embeddings) {
let Some(vector) = embedding else { continue };
if let Some(id) = self
.persist_one_fact(raw, vector, memory_key, session_id)
.await?
{
new_ids.push(id);
}
}
Ok(new_ids)
}
async fn persist_one_fact(
&self,
raw: &str,
vector: Vec<f32>,
memory_key: &MemoryKey,
session_id: &SessionId,
) -> Result<Option<FactId>, UseCaseError> {
let fact_id = FactId::from_content(raw);
if let Some(mut existing) = self.facts.get(&fact_id, memory_key).await? {
self.confirm_and_save(&mut existing, session_id).await?;
return Ok(None);
}
let similar = self
.facts
.search_for_dedup(vector.clone(), memory_key, 1)
.await?;
if let Some(hit) = similar.into_iter().next() {
match hit.metadata.distance {
Some(d) => {
let similarity = 1.0 - d;
if similarity >= self.extraction_cfg.dedup_cosine_threshold
&& let Some(mut fact) = self.facts.get(&hit.id, memory_key).await?
{
tracing::debug!(
raw = raw,
similarity = similarity,
matched_id = %hit.id,
"semantic dedup: rephrased fact matched an existing one"
);
self.confirm_and_save(&mut fact, session_id).await?;
return Ok(None);
}
}
None => {
tracing::warn!(
raw = raw,
matched_id = %hit.id,
"semantic dedup hit carried no distance; skipping Layer 2 \
(create new pending fact instead)"
);
}
}
}
let emb = Embedding::new(vector)?;
let fact = Fact::new_pending(
raw,
memory_key.clone(),
session_id.clone(),
emb,
self.clock.now(),
self.confidence_cfg.base,
)?;
self.facts.save(&fact).await?;
Ok(Some(fact_id))
}
async fn confirm_and_save(
&self,
fact: &mut Fact,
session_id: &SessionId,
) -> Result<(), UseCaseError> {
if fact.confirm_cross_session(session_id, self.confidence_cfg)? {
self.facts.save(fact).await?;
}
Ok(())
}
}
pub fn format_tool_calls(tool_calls: &[ToolCall]) -> String {
if tool_calls.is_empty() {
return String::new();
}
let mut out = String::from("\n\nTool calls:");
for call in tool_calls {
out.push_str(&format!("\n- {}({})", call.name, call.arguments));
}
out
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{SearchHit, SearchHitMetadata};
use smos_domain::{FactStatus, Heat, Timestamp};
use std::collections::HashMap;
use std::sync::Mutex;
#[derive(Clone)]
struct FixedClock(Timestamp);
impl Clock for FixedClock {
fn now(&self) -> Timestamp {
self.0
}
}
#[derive(Clone, Copy)]
struct NoOpDelay;
impl Delay for NoOpDelay {
async fn delay(&self, _duration: Duration) {}
}
struct ScriptedExtractor {
results: Mutex<Vec<Result<Vec<String>, ProviderError>>>,
calls: Mutex<u32>,
}
impl ScriptedExtractor {
fn new(results: Vec<Result<Vec<String>, ProviderError>>) -> Self {
Self {
results: Mutex::new(results),
calls: Mutex::new(0),
}
}
fn call_count(&self) -> u32 {
*self.calls.lock().unwrap()
}
}
impl LlmExtractor for ScriptedExtractor {
async fn extract_facts(
&self,
_content: &str,
_tool_calls: &[ToolCall],
) -> Result<Vec<String>, ProviderError> {
*self.calls.lock().unwrap() += 1;
let mut guard = self.results.lock().unwrap();
if guard.is_empty() {
Ok(Vec::new())
} else {
guard.remove(0)
}
}
}
struct ConstantEmbedder(Vec<f32>);
impl EmbeddingProvider for ConstantEmbedder {
async fn embed(&self, _text: &str) -> Result<Option<Vec<f32>>, ProviderError> {
Ok(Some(self.0.clone()))
}
}
struct RecordingEmbedder {
calls: std::sync::Arc<Mutex<Vec<String>>>,
}
impl RecordingEmbedder {
fn new() -> (Self, std::sync::Arc<Mutex<Vec<String>>>) {
let calls = std::sync::Arc::new(Mutex::new(Vec::new()));
(
Self {
calls: calls.clone(),
},
calls,
)
}
fn vector_for(text: &str) -> Vec<f32> {
let hash = text
.bytes()
.fold(0u64, |acc, b| acc.wrapping_mul(31).wrapping_add(b as u64));
let mut vec = vec![0.0; 1024];
vec[(hash as usize) % 1024] = 1.0;
vec
}
}
impl EmbeddingProvider for RecordingEmbedder {
async fn embed(&self, text: &str) -> Result<Option<Vec<f32>>, ProviderError> {
self.calls.lock().unwrap().push(text.to_string());
Ok(Some(Self::vector_for(text)))
}
}
#[derive(Default, Clone)]
struct InMemoryFacts {
store: std::sync::Arc<Mutex<HashMap<String, Fact>>>,
dedup_hits: std::sync::Arc<Mutex<Vec<SearchHit>>>,
}
impl InMemoryFacts {
fn script_dedup_hits(&self, hits: Vec<SearchHit>) {
*self.dedup_hits.lock().unwrap() = hits;
}
}
impl FactRepository for InMemoryFacts {
async fn save(&self, fact: &Fact) -> Result<(), crate::errors::RepoError> {
self.store
.lock()
.unwrap()
.insert(fact.id().as_str().to_string(), fact.clone());
Ok(())
}
async fn get(
&self,
id: &FactId,
_mk: &MemoryKey,
) -> Result<Option<Fact>, crate::errors::RepoError> {
Ok(self.store.lock().unwrap().get(id.as_str()).cloned())
}
async fn list_accepted(
&self,
_mk: &MemoryKey,
) -> Result<Vec<Fact>, crate::errors::RepoError> {
Ok(Vec::new())
}
async fn list_pending(
&self,
_mk: &MemoryKey,
) -> Result<Vec<Fact>, crate::errors::RepoError> {
Ok(Vec::new())
}
async fn list_memory_keys_for_session(
&self,
_session_id: &SessionId,
) -> Result<Vec<MemoryKey>, crate::errors::RepoError> {
Ok(Vec::new())
}
async fn list_memory_keys(&self) -> Result<Vec<MemoryKey>, crate::errors::RepoError> {
Ok(Vec::new())
}
async fn search_similar(
&self,
_e: Vec<f32>,
_mk: &MemoryKey,
_l: usize,
) -> Result<Vec<SearchHit>, crate::errors::RepoError> {
Ok(Vec::new())
}
async fn search_for_dedup(
&self,
_e: Vec<f32>,
_mk: &MemoryKey,
_l: usize,
) -> Result<Vec<SearchHit>, crate::errors::RepoError> {
Ok(self.dedup_hits.lock().unwrap().clone())
}
async fn update_heat_batch(
&self,
_ids: &[FactId],
_mk: &MemoryKey,
_h: Heat,
_t: Timestamp,
) -> Result<(), crate::errors::RepoError> {
Ok(())
}
}
#[derive(Default, Clone)]
struct RecordingSessions {
pending: std::sync::Arc<Mutex<Vec<FactId>>>,
}
impl SessionRepository for RecordingSessions {
async fn add_pending(
&self,
_id: &SessionId,
fact_ids: &[FactId],
) -> Result<(), crate::errors::RepoError> {
self.pending
.lock()
.unwrap()
.extend(fact_ids.iter().cloned());
Ok(())
}
async fn get_or_create(
&self,
_i: &SessionId,
_m: &MemoryKey,
) -> Result<smos_domain::SessionState, crate::errors::RepoError> {
unreachable!("not used by extraction")
}
async fn collect_expired(
&self,
_t: Duration,
) -> Result<Vec<(SessionId, smos_domain::SessionState)>, crate::errors::RepoError> {
Ok(Vec::new())
}
async fn snapshot_all(
&self,
) -> Result<Vec<(SessionId, smos_domain::SessionState)>, crate::errors::RepoError> {
Ok(Vec::new())
}
async fn remove_pending_owned(
&self,
_i: &SessionId,
_o: &[FactId],
) -> Result<(), crate::errors::RepoError> {
Ok(())
}
async fn clear_session(&self, _i: &SessionId) -> Result<(), crate::errors::RepoError> {
Ok(())
}
async fn dedup_and_mark(
&self,
_i: &SessionId,
_m: &MemoryKey,
_c: &[FactId],
) -> Result<Vec<FactId>, crate::errors::RepoError> {
Ok(Vec::new())
}
async fn save(
&self,
_i: &SessionId,
_s: &smos_domain::SessionState,
) -> Result<(), crate::errors::RepoError> {
Ok(())
}
}
fn mk() -> MemoryKey {
MemoryKey::from_raw("proj").unwrap()
}
fn sid(tag: u8) -> SessionId {
SessionId::from_raw(&format!("sess_{:012x}", tag as u64)).unwrap()
}
fn cfg() -> ConfidenceConfig {
ConfidenceConfig::default()
}
fn extraction_cfg() -> ExtractionConfig {
ExtractionConfig::default()
}
fn clock() -> FixedClock {
FixedClock(Timestamp::from_unix_secs(1_700_000_000).unwrap())
}
#[allow(clippy::too_many_arguments)]
fn build<'a>(
facts: &'a InMemoryFacts,
sessions: &'a RecordingSessions,
extractor: &'a ScriptedExtractor,
embedder: &'a ConstantEmbedder,
clock: &'a FixedClock,
cfg: &'a ConfidenceConfig,
extraction_cfg: &'a ExtractionConfig,
) -> ExtractFactsFromResponse<
'a,
InMemoryFacts,
RecordingSessions,
ConstantEmbedder,
ScriptedExtractor,
FixedClock,
NoOpDelay,
> {
ExtractFactsFromResponse {
facts,
sessions,
embedder,
extractor,
clock,
delay: &NO_OP_DELAY,
confidence_cfg: cfg,
extraction_cfg,
enable_response_extraction: true,
}
}
static NO_OP_DELAY: NoOpDelay = NoOpDelay;
struct Fix {
embedder: ConstantEmbedder,
clock: FixedClock,
cfg: ConfidenceConfig,
extraction_cfg: ExtractionConfig,
}
impl Fix {
fn new() -> Self {
Self {
embedder: ConstantEmbedder(vec![0.1, 0.2, 0.3]),
clock: clock(),
cfg: cfg(),
extraction_cfg: extraction_cfg(),
}
}
}
#[test]
fn format_tool_calls_renders_name_and_arguments() {
let calls = vec![ToolCall {
name: "read_file".into(),
arguments: smos_domain::chat::ToolArguments::from_json(r#"{"path":"auth.rs"}"#),
}];
assert_eq!(
format_tool_calls(&calls),
"\n\nTool calls:\n- read_file({\"path\":\"auth.rs\"})"
);
}
#[test]
fn format_tool_calls_empty_returns_empty_string() {
assert_eq!(format_tool_calls(&[]), "");
}
#[tokio::test]
async fn execute_disabled_returns_zero_without_calling_extractor() {
let facts = InMemoryFacts::default();
let sessions = RecordingSessions::default();
let extractor = ScriptedExtractor::new(vec![Err(ProviderError::Unavailable(
"must not be called".into(),
))]);
let fix = Fix::new();
let mut uc = build(
&facts,
&sessions,
&extractor,
&fix.embedder,
&fix.clock,
&fix.cfg,
&fix.extraction_cfg,
);
uc.enable_response_extraction = false;
let n = uc
.execute("TTL=10 prevents refresh loop", &[], &mk(), &sid(1))
.await
.unwrap();
assert_eq!(n, 0);
}
#[tokio::test]
async fn execute_short_input_returns_zero_without_calling_extractor() {
let facts = InMemoryFacts::default();
let sessions = RecordingSessions::default();
let extractor = ScriptedExtractor::new(vec![Err(ProviderError::Unavailable(
"must not be called".into(),
))]);
let fix = Fix::new();
let uc = build(
&facts,
&sessions,
&extractor,
&fix.embedder,
&fix.clock,
&fix.cfg,
&fix.extraction_cfg,
);
let n = uc.execute("ok", &[], &mk(), &sid(1)).await.unwrap();
assert_eq!(n, 0);
}
#[tokio::test]
async fn execute_saves_new_pending_fact_and_registers_it() {
let facts = InMemoryFacts::default();
let sessions = RecordingSessions::default();
let extractor = ScriptedExtractor::new(vec![Ok(vec![
"TTL=10 prevents the token refresh loop".to_string(),
])]);
let fix = Fix::new();
let uc = build(
&facts,
&sessions,
&extractor,
&fix.embedder,
&fix.clock,
&fix.cfg,
&fix.extraction_cfg,
);
let n = uc
.execute("we changed TTL to 10 to stop the loop", &[], &mk(), &sid(1))
.await
.unwrap();
assert_eq!(n, 1);
let fact = facts
.store
.lock()
.unwrap()
.get(FactId::from_content("TTL=10 prevents the token refresh loop").as_str())
.cloned()
.expect("fact saved");
assert_eq!(fact.status(), FactStatus::Pending);
assert_eq!(
sessions.pending.lock().unwrap().len(),
1,
"fact registered on session pending list"
);
}
#[tokio::test]
async fn execute_unavailable_extractor_skips_gracefully() {
let facts = InMemoryFacts::default();
let sessions = RecordingSessions::default();
let extractor = ScriptedExtractor::new(vec![Err(ProviderError::Unavailable(
"connection refused".into(),
))]);
let fix = Fix::new();
let uc = build(
&facts,
&sessions,
&extractor,
&fix.embedder,
&fix.clock,
&fix.cfg,
&fix.extraction_cfg,
);
let n = uc
.execute("some real content long enough", &[], &mk(), &sid(1))
.await
.unwrap();
assert_eq!(n, 0);
assert_eq!(
extractor.call_count(),
1,
"Unavailable must skip retries — extractor called exactly once"
);
assert!(
facts.store.lock().unwrap().is_empty(),
"no fact persisted on graceful skip"
);
}
#[tokio::test]
async fn execute_retries_on_request_failed_then_succeeds() {
let facts = InMemoryFacts::default();
let sessions = RecordingSessions::default();
let extractor = ScriptedExtractor::new(vec![
Err(ProviderError::RequestFailed("500".into())),
Err(ProviderError::RequestFailed("500".into())),
Ok(vec!["auth.rs uses JWT for tokens".to_string()]),
]);
let fix = Fix::new();
let uc = build(
&facts,
&sessions,
&extractor,
&fix.embedder,
&fix.clock,
&fix.cfg,
&fix.extraction_cfg,
);
let n = uc
.execute("the auth module uses JWT", &[], &mk(), &sid(1))
.await
.unwrap();
assert_eq!(n, 1);
}
#[tokio::test]
async fn execute_gives_up_after_all_attempts_fail() {
let facts = InMemoryFacts::default();
let sessions = RecordingSessions::default();
let extractor = ScriptedExtractor::new(vec![
Err(ProviderError::RequestFailed("500".into())),
Err(ProviderError::RequestFailed("500".into())),
Err(ProviderError::RequestFailed("500".into())),
]);
let fix = Fix::new();
let uc = build(
&facts,
&sessions,
&extractor,
&fix.embedder,
&fix.clock,
&fix.cfg,
&fix.extraction_cfg,
);
let result = uc
.execute("content long enough to pass gate", &[], &mk(), &sid(1))
.await;
assert!(result.is_err(), "final failure propagates as Err");
assert!(facts.store.lock().unwrap().is_empty());
}
#[tokio::test]
async fn execute_strips_smos_noise_before_extraction() {
let facts = InMemoryFacts::default();
let sessions = RecordingSessions::default();
let extractor = ScriptedExtractor::new(vec![Ok(vec!["a clean fact".to_string()])]);
let fix = Fix::new();
let uc = build(
&facts,
&sessions,
&extractor,
&fix.embedder,
&fix.clock,
&fix.cfg,
&fix.extraction_cfg,
);
let content = "real content about the deployment\n<!-- smos:sess_abcdef012345 -->\n<smos-memory session=\"s\">x</smos-memory>";
let n = uc.execute(content, &[], &mk(), &sid(1)).await.unwrap();
assert_eq!(n, 1);
}
#[tokio::test]
async fn execute_cross_session_confirms_existing_fact() {
let facts = InMemoryFacts::default();
let sessions = RecordingSessions::default();
let first = Fact::new_pending(
"shared fact content here",
mk(),
sid(1),
Embedding::new(vec![1.0]).unwrap(),
Timestamp::from_unix_secs(1_700_000_000).unwrap(),
ConfidenceConfig::default().base,
)
.unwrap();
let fid = first.id().clone();
facts
.store
.lock()
.unwrap()
.insert(fid.as_str().to_string(), first);
let extractor =
ScriptedExtractor::new(vec![Ok(vec!["shared fact content here".to_string()])]);
let fix = Fix::new();
let uc = build(
&facts,
&sessions,
&extractor,
&fix.embedder,
&fix.clock,
&fix.cfg,
&fix.extraction_cfg,
);
let n = uc
.execute("shared fact content here", &[], &mk(), &sid(2))
.await
.unwrap();
assert_eq!(n, 0, "confirmation does not count as a new fact");
let confirmed = facts
.store
.lock()
.unwrap()
.get(fid.as_str())
.cloned()
.expect("fact still present");
assert_eq!(
confirmed.source_sessions().distinct_count(),
2,
"provenance grew to two sessions"
);
assert!(
sessions.pending.lock().unwrap().is_empty(),
"confirmation must not register on the pending list"
);
}
fn hit_for(fact: &Fact, similarity: f32, mk: MemoryKey) -> SearchHit {
let metadata = SearchHitMetadata {
status: "pending".into(),
confidence: 0.5,
valid_until: None,
heat_base: 1.0,
last_access_at: 1_700_000_000.0,
distance: Some(1.0 - similarity),
};
SearchHit {
id: fact.id().clone(),
document: fact.content().to_string(),
memory_key: mk,
metadata,
}
}
#[tokio::test]
async fn persist_facts_layer2_semantic_match_confirms_existing_fact() {
let facts = InMemoryFacts::default();
let sessions = RecordingSessions::default();
let stored = Fact::new_pending(
"the token cache uses TTL=60 to avoid stale entries",
mk(),
sid(1),
Embedding::new(vec![1.0]).unwrap(),
Timestamp::from_unix_secs(1_700_000_000).unwrap(),
ConfidenceConfig::default().base,
)
.unwrap();
let stored_id = stored.id().clone();
facts
.store
.lock()
.unwrap()
.insert(stored_id.as_str().to_string(), stored);
facts.script_dedup_hits(vec![hit_for(
&facts
.store
.lock()
.unwrap()
.get(stored_id.as_str())
.cloned()
.expect("seeded fact"),
0.98,
mk(),
)]);
let rephrased = "token cache TTL is 60 to prevent stale entries";
let extractor = ScriptedExtractor::new(vec![Ok(vec![rephrased.to_string()])]);
let fix = Fix::new();
let uc = build(
&facts,
&sessions,
&extractor,
&fix.embedder,
&fix.clock,
&fix.cfg,
&fix.extraction_cfg,
);
let n = uc.execute(rephrased, &[], &mk(), &sid(2)).await.unwrap();
assert_eq!(
n, 0,
"semantic duplicate must confirm, not create a new fact"
);
let confirmed = facts
.store
.lock()
.unwrap()
.get(stored_id.as_str())
.cloned()
.expect("seeded fact still present");
assert_eq!(
confirmed.source_sessions().distinct_count(),
2,
"semantic match grows provenance to two sessions"
);
assert!(
facts
.store
.lock()
.unwrap()
.get(FactId::from_content(rephrased).as_str())
.is_none(),
"no new fact id created for the rephrased variant"
);
assert!(
sessions.pending.lock().unwrap().is_empty(),
"semantic confirmation must not register on the pending list"
);
}
#[tokio::test]
async fn persist_facts_layer2_below_threshold_creates_new_fact() {
let facts = InMemoryFacts::default();
let sessions = RecordingSessions::default();
let stored = Fact::new_pending(
"auth module uses Argon2id for password hashing",
mk(),
sid(1),
Embedding::new(vec![1.0]).unwrap(),
Timestamp::from_unix_secs(1_700_000_000).unwrap(),
ConfidenceConfig::default().base,
)
.unwrap();
let stored_id = stored.id().clone();
facts
.store
.lock()
.unwrap()
.insert(stored_id.as_str().to_string(), stored);
facts.script_dedup_hits(vec![hit_for(
&facts
.store
.lock()
.unwrap()
.get(stored_id.as_str())
.cloned()
.expect("seeded fact"),
0.80,
mk(),
)]);
let new_content = "TLS handshake failure in the upstream pool";
let extractor = ScriptedExtractor::new(vec![Ok(vec![new_content.to_string()])]);
let fix = Fix::new();
let uc = build(
&facts,
&sessions,
&extractor,
&fix.embedder,
&fix.clock,
&fix.cfg,
&fix.extraction_cfg,
);
let n = uc.execute(new_content, &[], &mk(), &sid(2)).await.unwrap();
assert_eq!(n, 1, "below-threshold similarity must create a new fact");
let new_id = FactId::from_content(new_content);
assert!(
facts.store.lock().unwrap().contains_key(new_id.as_str()),
"new fact persisted under its own FactId"
);
assert_eq!(
sessions.pending.lock().unwrap().len(),
1,
"new fact registered on the pending list"
);
}
#[tokio::test]
async fn persist_facts_layer2_missing_distance_falls_through_to_new_fact() {
let facts = InMemoryFacts::default();
let sessions = RecordingSessions::default();
let stored = Fact::new_pending(
"config reload triggers a graceful drain",
mk(),
sid(1),
Embedding::new(vec![1.0]).unwrap(),
Timestamp::from_unix_secs(1_700_000_000).unwrap(),
ConfidenceConfig::default().base,
)
.unwrap();
let stored_id = stored.id().clone();
facts
.store
.lock()
.unwrap()
.insert(stored_id.as_str().to_string(), stored);
let mut hit = hit_for(
&facts
.store
.lock()
.unwrap()
.get(stored_id.as_str())
.cloned()
.expect("seeded fact"),
1.0,
mk(),
);
hit.metadata.distance = None;
facts.script_dedup_hits(vec![hit]);
let new_content = "config reload drains gracefully on SIGHUP";
let extractor = ScriptedExtractor::new(vec![Ok(vec![new_content.to_string()])]);
let fix = Fix::new();
let uc = build(
&facts,
&sessions,
&extractor,
&fix.embedder,
&fix.clock,
&fix.cfg,
&fix.extraction_cfg,
);
let n = uc.execute(new_content, &[], &mk(), &sid(2)).await.unwrap();
assert_eq!(
n, 1,
"missing distance must not collapse — fall through to new fact"
);
}
#[tokio::test]
async fn persist_facts_layer2_threshold_lowered_collapses_0_85_pair() {
let facts = InMemoryFacts::default();
let sessions = RecordingSessions::default();
let stored = Fact::new_pending(
"indexer batches at most 1024 documents per commit",
mk(),
sid(1),
Embedding::new(vec![1.0]).unwrap(),
Timestamp::from_unix_secs(1_700_000_000).unwrap(),
ConfidenceConfig::default().base,
)
.unwrap();
let stored_id = stored.id().clone();
facts
.store
.lock()
.unwrap()
.insert(stored_id.as_str().to_string(), stored);
facts.script_dedup_hits(vec![hit_for(
&facts
.store
.lock()
.unwrap()
.get(stored_id.as_str())
.cloned()
.expect("seeded fact"),
0.85,
mk(),
)]);
let rephrased = "the indexer caps batches at 1024 documents";
let extractor = ScriptedExtractor::new(vec![Ok(vec![rephrased.to_string()])]);
let mut fix = Fix::new();
fix.extraction_cfg = ExtractionConfig {
dedup_cosine_threshold: 0.80,
};
let uc = build(
&facts,
&sessions,
&extractor,
&fix.embedder,
&fix.clock,
&fix.cfg,
&fix.extraction_cfg,
);
let n = uc.execute(rephrased, &[], &mk(), &sid(2)).await.unwrap();
assert_eq!(
n, 0,
"lowered threshold collapses the 0.85 pair via semantic match"
);
let confirmed = facts
.store
.lock()
.unwrap()
.get(stored_id.as_str())
.cloned()
.expect("seeded fact still present");
assert_eq!(
confirmed.source_sessions().distinct_count(),
2,
"semantic collapse grows provenance"
);
}
#[allow(clippy::too_many_arguments)]
fn build_with_recording_embedder<'a>(
facts: &'a InMemoryFacts,
sessions: &'a RecordingSessions,
extractor: &'a ScriptedExtractor,
embedder: &'a RecordingEmbedder,
clock: &'a FixedClock,
cfg: &'a ConfidenceConfig,
extraction_cfg: &'a ExtractionConfig,
) -> ExtractFactsFromResponse<
'a,
InMemoryFacts,
RecordingSessions,
RecordingEmbedder,
ScriptedExtractor,
FixedClock,
NoOpDelay,
> {
ExtractFactsFromResponse {
facts,
sessions,
embedder,
extractor,
clock,
delay: &NO_OP_DELAY,
confidence_cfg: cfg,
extraction_cfg,
enable_response_extraction: true,
}
}
#[tokio::test]
async fn recording_embedder_yields_distinct_vectors_for_distinct_facts() {
let facts = InMemoryFacts::default();
let sessions = RecordingSessions::default();
let extractor = ScriptedExtractor::new(vec![Ok(vec![
"alpha configuration directive".to_string(),
"beta configuration directive".to_string(),
])]);
let (embedder, calls) = RecordingEmbedder::new();
let clock = clock();
let cfg = cfg();
let extraction_cfg = extraction_cfg();
let uc = build_with_recording_embedder(
&facts,
&sessions,
&extractor,
&embedder,
&clock,
&cfg,
&extraction_cfg,
);
let n = uc
.execute("content covering both directives", &[], &mk(), &sid(1))
.await
.unwrap();
assert_eq!(n, 2, "two distinct facts persisted");
assert_eq!(
calls.lock().unwrap().len(),
2,
"embedder called once per extracted fact"
);
let id_a = FactId::from_content("alpha configuration directive");
let id_b = FactId::from_content("beta configuration directive");
assert!(facts.store.lock().unwrap().contains_key(id_a.as_str()));
assert!(facts.store.lock().unwrap().contains_key(id_b.as_str()));
}
#[tokio::test]
async fn execute_propagates_err_when_batch_contains_empty_raw_fact() {
let facts = InMemoryFacts::default();
let sessions = RecordingSessions::default();
let extractor = ScriptedExtractor::new(vec![Ok(vec![
String::new(),
"real fact that should still persist".to_string(),
])]);
let fix = Fix::new();
let uc = build(
&facts,
&sessions,
&extractor,
&fix.embedder,
&fix.clock,
&fix.cfg,
&fix.extraction_cfg,
);
let result = uc
.execute(
"content long enough to clear MIN_INPUT_CHARS",
&[],
&mk(),
&sid(1),
)
.await;
assert!(
result.is_err(),
"empty raw fact must surface as Err (the only safe non-silent path)"
);
}
}