use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::sync::{Arc, Mutex};
use converge_pack::{
AgentEffect, Context, ContextFact, ContextKey, FactId, ProposedFact, Provenance,
ProvenanceSource, Suggestor, TextPayload,
};
use converge_provider::{
ChatMessage, ChatRequest, ChatRole, DynChatBackend, LlmError, ResponseFormat,
};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::provenance::ORGANISM_PLANNING_PROVENANCE;
use crate::suggestor::SharedBudget;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DdError {
CreditsExhausted { provider: String, detail: String },
RateLimited {
provider: String,
retry_after_ms: Option<u64>,
},
ProviderUnavailable { provider: String, detail: String },
BadResponse { provider: String, detail: String },
PromptTooLarge {
provider: String,
tokens: Option<usize>,
},
ParseFailed { provider: String, detail: String },
}
impl fmt::Display for DdError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::CreditsExhausted { provider, detail } => {
write!(f, "[{provider}] credits exhausted: {detail}")
}
Self::RateLimited { provider, .. } => write!(f, "[{provider}] rate limited"),
Self::ProviderUnavailable { provider, detail } => {
write!(f, "[{provider}] unavailable: {detail}")
}
Self::BadResponse { provider, detail } => {
write!(f, "[{provider}] bad response: {detail}")
}
Self::PromptTooLarge { provider, tokens } => write!(
f,
"[{provider}] prompt too large ({})",
tokens.map_or("unknown".into(), |t| format!("{t} tokens"))
),
Self::ParseFailed { provider, detail } => {
write!(f, "[{provider}] parse failed: {detail}")
}
}
}
}
impl DdError {
pub fn is_infra_failure(&self) -> bool {
matches!(
self,
Self::CreditsExhausted { .. }
| Self::RateLimited { .. }
| Self::ProviderUnavailable { .. }
)
}
pub fn is_fatal(&self) -> bool {
matches!(self, Self::CreditsExhausted { .. })
}
fn constraint_id(&self, suggestor: &str) -> String {
let kind = match self {
Self::CreditsExhausted { .. } => "credits-exhausted",
Self::RateLimited { .. } => "rate-limited",
Self::ProviderUnavailable { .. } => "provider-unavailable",
Self::BadResponse { .. } => "bad-response",
Self::PromptTooLarge { .. } => "prompt-too-large",
Self::ParseFailed { .. } => "parse-failed",
};
format!("dd:constraint:{suggestor}:{kind}")
}
}
fn error_to_constraint(error: &DdError, suggestor: &str) -> ProposedFact {
let id = error.constraint_id(suggestor);
let content = serde_json::json!({
"type": "error",
"error": serde_json::to_value(error).unwrap_or_default(),
"is_infra_failure": error.is_infra_failure(),
"is_fatal": error.is_fatal(),
"message": error.to_string(),
})
.to_string();
proposed_text_fact(ContextKey::Constraints, id, content).with_confidence(1.0)
}
fn proposed_text_fact(
key: ContextKey,
id: impl Into<converge_pack::ProposalId>,
content: impl Into<String>,
) -> ProposedFact {
ORGANISM_PLANNING_PROVENANCE.proposed_fact(key, id, TextPayload::new(content))
}
fn fact_text(fact: &ContextFact) -> &str {
fact.text().unwrap_or_default()
}
#[async_trait::async_trait]
pub trait DdSearch: Send + Sync {
async fn search(&self, query: &str) -> Result<Vec<SearchHit>, DdError>;
}
#[derive(Debug, Clone)]
pub struct SearchHit {
pub title: String,
pub url: String,
pub content: String,
pub provider: String,
}
pub async fn dd_complete(backend: &dyn DynChatBackend, prompt: &str) -> Result<String, DdError> {
let request = ChatRequest {
messages: vec![ChatMessage {
role: ChatRole::User,
content: prompt.to_string(),
tool_calls: Vec::new(),
tool_call_id: None,
}],
system: None,
tools: Vec::new(),
response_format: ResponseFormat::Json,
max_tokens: None,
temperature: None,
stop_sequences: Vec::new(),
model: None,
};
backend
.chat(request)
.await
.map(|resp| resp.content)
.map_err(|err| classify_llm_error(&err))
}
fn classify_llm_error(err: &LlmError) -> DdError {
let provider = "chat-backend".to_string();
let message = err.to_string();
match err {
LlmError::RateLimited { retry_after, .. } => DdError::RateLimited {
provider,
retry_after_ms: u64::try_from(retry_after.as_millis()).ok(),
},
LlmError::AuthDenied { .. } => DdError::CreditsExhausted {
provider,
detail: message,
},
LlmError::Timeout { .. } | LlmError::NetworkError { .. } => DdError::ProviderUnavailable {
provider,
detail: message,
},
LlmError::ContextLengthExceeded { request_tokens, .. } => DdError::PromptTooLarge {
provider,
tokens: Some(*request_tokens as usize),
},
LlmError::InvalidRequest { .. }
| LlmError::ModelNotFound { .. }
| LlmError::ContentFiltered { .. }
| LlmError::ResponseFormatMismatch { .. }
| LlmError::ProviderError { .. } => DdError::BadResponse {
provider,
detail: message,
},
}
}
pub struct FailoverDdSearch {
backends: Vec<Arc<dyn DdSearch>>,
}
impl FailoverDdSearch {
pub fn new(backends: Vec<Arc<dyn DdSearch>>) -> Self {
Self { backends }
}
}
#[async_trait::async_trait]
impl DdSearch for FailoverDdSearch {
async fn search(&self, query: &str) -> Result<Vec<SearchHit>, DdError> {
let mut last_error = None;
for backend in &self.backends {
match backend.search(query).await {
Ok(result) => return Ok(result),
Err(e) => {
let should_failover = e.is_infra_failure();
eprintln!(
"[failover] {} — {}",
e,
if should_failover {
"trying next"
} else {
"not retryable"
}
);
if !should_failover {
return Err(e);
}
last_error = Some(e);
}
}
}
Err(last_error.unwrap_or_else(|| DdError::ProviderUnavailable {
provider: "failover".into(),
detail: "no backends configured".into(),
}))
}
}
pub struct BreadthResearchSuggestor {
subject: String,
budget: Arc<SharedBudget>,
search: Arc<dyn DdSearch>,
tag: String,
processed: Mutex<HashSet<FactId>>,
}
impl BreadthResearchSuggestor {
pub fn new(
subject: impl Into<String>,
budget: Arc<SharedBudget>,
search: Arc<dyn DdSearch>,
) -> Self {
Self {
subject: subject.into(),
budget,
search,
tag: "breadth".into(),
processed: Mutex::new(HashSet::new()),
}
}
pub fn with_tag(mut self, tag: impl Into<String>) -> Self {
self.tag = tag.into();
self
}
fn unprocessed_strategies(&self, ctx: &dyn Context) -> Vec<String> {
let processed = self.processed.lock().unwrap();
ctx.get(ContextKey::Strategies)
.iter()
.filter(|f| fact_text(f).contains(&self.tag))
.filter(|f| !processed.contains(f.id()))
.map(|f| fact_text(f).to_string())
.collect()
}
}
#[async_trait::async_trait]
impl Suggestor for BreadthResearchSuggestor {
fn name(&self) -> &'static str {
"dd-breadth-research"
}
fn dependencies(&self) -> &[ContextKey] {
&[ContextKey::Strategies]
}
fn provenance(&self) -> Provenance {
ORGANISM_PLANNING_PROVENANCE.provenance()
}
fn accepts(&self, ctx: &dyn Context) -> bool {
self.budget.remaining("searches") > 0 && !self.unprocessed_strategies(ctx).is_empty()
}
async fn execute(&self, ctx: &dyn Context) -> AgentEffect {
let strategies = self.unprocessed_strategies(ctx);
let mut effect = AgentEffect::builder();
for strategy in strategies {
if !self.budget.try_use("searches") {
break;
}
let query = format!("{} {strategy}", self.subject);
match self.search.search(&query).await {
Ok(hits) => {
for hit in &hits {
if !is_relevant(&hit.title, &hit.content, &hit.url, &self.subject) {
continue;
}
let id = format!("signal-breadth-{}", Uuid::new_v4());
let content = serde_json::json!({
"title": hit.title,
"url": hit.url,
"content": hit.content,
"provider": hit.provider,
"query": query,
})
.to_string();
effect.push(
proposed_text_fact(ContextKey::Signals, id, content)
.with_confidence(1.0),
);
}
}
Err(e) => {
effect.push(error_to_constraint(&e, "dd-breadth-research"));
if e.is_fatal() {
break;
}
}
}
self.processed.lock().unwrap().insert(
ctx.get(ContextKey::Strategies)
.iter()
.find(|f| fact_text(f) == strategy)
.map_or_else(|| FactId::new(""), |f| f.id().clone()),
);
}
effect.build()
}
}
pub struct DepthResearchSuggestor {
subject: String,
budget: Arc<SharedBudget>,
search: Arc<dyn DdSearch>,
tag: String,
processed: Mutex<HashSet<FactId>>,
}
impl DepthResearchSuggestor {
pub fn new(
subject: impl Into<String>,
budget: Arc<SharedBudget>,
search: Arc<dyn DdSearch>,
) -> Self {
Self {
subject: subject.into(),
budget,
search,
tag: "depth".into(),
processed: Mutex::new(HashSet::new()),
}
}
pub fn with_tag(mut self, tag: impl Into<String>) -> Self {
self.tag = tag.into();
self
}
fn unprocessed_strategies(&self, ctx: &dyn Context) -> Vec<String> {
let processed = self.processed.lock().unwrap();
ctx.get(ContextKey::Strategies)
.iter()
.filter(|f| fact_text(f).contains(&self.tag))
.filter(|f| !processed.contains(f.id()))
.map(|f| fact_text(f).to_string())
.collect()
}
}
#[async_trait::async_trait]
impl Suggestor for DepthResearchSuggestor {
fn name(&self) -> &'static str {
"dd-depth-research"
}
fn dependencies(&self) -> &[ContextKey] {
&[ContextKey::Strategies]
}
fn provenance(&self) -> Provenance {
ORGANISM_PLANNING_PROVENANCE.provenance()
}
fn accepts(&self, ctx: &dyn Context) -> bool {
self.budget.remaining("searches") > 0 && !self.unprocessed_strategies(ctx).is_empty()
}
async fn execute(&self, ctx: &dyn Context) -> AgentEffect {
let strategies = self.unprocessed_strategies(ctx);
let mut effect = AgentEffect::builder();
for strategy in strategies {
if !self.budget.try_use("searches") {
break;
}
let query = format!("{} {strategy}", self.subject);
match self.search.search(&query).await {
Ok(hits) => {
for hit in &hits {
if !is_relevant(&hit.title, &hit.content, &hit.url, &self.subject) {
continue;
}
let id = format!("signal-depth-{}", Uuid::new_v4());
let content = serde_json::json!({
"title": hit.title,
"url": hit.url,
"content": hit.content,
"provider": hit.provider,
"query": query,
})
.to_string();
effect.push(
proposed_text_fact(ContextKey::Signals, id, content)
.with_confidence(1.0),
);
}
}
Err(e) => {
effect.push(error_to_constraint(&e, "dd-depth-research"));
if e.is_fatal() {
break;
}
}
}
self.processed.lock().unwrap().insert(
ctx.get(ContextKey::Strategies)
.iter()
.find(|f| fact_text(f) == strategy)
.map_or_else(|| FactId::new(""), |f| f.id().clone()),
);
}
effect.build()
}
}
pub struct FactExtractorSuggestor {
subject: String,
budget: Arc<SharedBudget>,
llm: Arc<dyn DynChatBackend>,
processed_signal_count: Mutex<usize>,
}
impl FactExtractorSuggestor {
pub fn new(
subject: impl Into<String>,
budget: Arc<SharedBudget>,
llm: Arc<dyn DynChatBackend>,
) -> Self {
Self {
subject: subject.into(),
budget,
llm,
processed_signal_count: Mutex::new(0),
}
}
}
#[async_trait::async_trait]
impl Suggestor for FactExtractorSuggestor {
fn name(&self) -> &'static str {
"dd-fact-extractor"
}
fn dependencies(&self) -> &[ContextKey] {
&[ContextKey::Signals]
}
fn provenance(&self) -> Provenance {
ORGANISM_PLANNING_PROVENANCE.provenance()
}
fn accepts(&self, ctx: &dyn Context) -> bool {
let current = ctx.count(ContextKey::Signals);
let processed = *self.processed_signal_count.lock().unwrap();
self.budget.remaining("llm") > 0 && current > processed
}
async fn execute(&self, ctx: &dyn Context) -> AgentEffect {
let all_signals = ctx.get(ContextKey::Signals);
let (start, end) = next_batch_bounds(
all_signals.len(),
*self.processed_signal_count.lock().unwrap(),
15,
);
let signals: Vec<_> = all_signals
.iter()
.skip(start)
.take(end - start)
.cloned()
.collect();
if signals.is_empty() || !self.budget.try_use("llm") {
return AgentEffect::empty();
}
*self.processed_signal_count.lock().unwrap() = end;
let prompt = prompts::fact_extraction(&self.subject, &signals);
let mut seen_fact_keys: HashSet<String> = ctx
.get(ContextKey::Hypotheses)
.iter()
.filter_map(|fact| existing_fact_signature(fact_text(fact)))
.collect();
let mut effect = AgentEffect::builder();
match dd_complete(self.llm.as_ref(), &prompt).await {
Ok(raw) => match parse_json_array_response(&raw, "facts") {
Ok(facts) => {
for (i, fact) in facts.iter().enumerate() {
let Some(normalized_fact) = normalize_dd_fact(fact) else {
continue;
};
let signature = dd_fact_signature(&normalized_fact);
if !seen_fact_keys.insert(signature) {
continue;
}
let id = format!("hypothesis-{}-{i}", Uuid::new_v4());
effect.push(
proposed_text_fact(
ContextKey::Hypotheses,
id,
normalized_fact.to_string(),
)
.with_confidence(normalized_fact["confidence"].as_f64().unwrap_or(0.5)),
);
}
}
Err(detail) => {
let parse_err = DdError::ParseFailed {
provider: "llm".into(),
detail: format!(
"{detail} (first 200 chars: {})",
&raw[..raw.len().min(200)]
),
};
effect.push(error_to_constraint(&parse_err, "dd-fact-extractor"));
}
},
Err(e) => {
effect.push(error_to_constraint(&e, "dd-fact-extractor"));
}
}
effect.build()
}
}
pub struct GapDetectorSuggestor {
subject: String,
budget: Arc<SharedBudget>,
llm: Arc<dyn DynChatBackend>,
last_hypothesis_count: Mutex<usize>,
generation_count: Mutex<usize>,
max_generations: usize,
min_hypotheses: usize,
}
impl GapDetectorSuggestor {
pub fn new(
subject: impl Into<String>,
budget: Arc<SharedBudget>,
llm: Arc<dyn DynChatBackend>,
) -> Self {
Self {
subject: subject.into(),
budget,
llm,
last_hypothesis_count: Mutex::new(0),
generation_count: Mutex::new(0),
max_generations: 3,
min_hypotheses: 5,
}
}
pub fn with_max_generations(mut self, max: usize) -> Self {
self.max_generations = max;
self
}
pub fn with_min_hypotheses(mut self, min: usize) -> Self {
self.min_hypotheses = min;
self
}
}
#[async_trait::async_trait]
impl Suggestor for GapDetectorSuggestor {
fn name(&self) -> &'static str {
"dd-gap-detector"
}
fn dependencies(&self) -> &[ContextKey] {
&[ContextKey::Hypotheses]
}
fn provenance(&self) -> Provenance {
ORGANISM_PLANNING_PROVENANCE.provenance()
}
fn accepts(&self, ctx: &dyn Context) -> bool {
let current = ctx.count(ContextKey::Hypotheses);
let last = *self.last_hypothesis_count.lock().unwrap();
let gens = *self.generation_count.lock().unwrap();
current >= self.min_hypotheses
&& current > last
&& gens < self.max_generations
&& self.budget.remaining("llm") > 0
}
async fn execute(&self, ctx: &dyn Context) -> AgentEffect {
if !self.budget.try_use("llm") {
return AgentEffect::empty();
}
let hypotheses = ctx.get(ContextKey::Hypotheses);
*self.last_hypothesis_count.lock().unwrap() = hypotheses.len();
let generation = {
let mut g = self.generation_count.lock().unwrap();
*g += 1;
*g
};
let prompt =
prompts::gap_detection(&self.subject, hypotheses, generation, self.max_generations);
let mut effect = AgentEffect::builder();
let mut seen_strategy_contents: HashSet<String> = ctx
.get(ContextKey::Strategies)
.iter()
.map(|fact| fact_text(fact).to_string())
.collect();
match dd_complete(self.llm.as_ref(), &prompt).await {
Ok(raw) => match parse_json_array_response(&raw, "strategies") {
Ok(strategies) => {
for (i, s) in strategies.iter().enumerate() {
let mode = s["mode"].as_str().unwrap_or("breadth");
let query = s["query"].as_str().unwrap_or("");
let reason = s["reason"].as_str().unwrap_or("");
let content = format!("[{mode}] {query} -- {reason}");
if query.trim().is_empty()
|| !seen_strategy_contents.insert(content.clone())
{
continue;
}
let id = format!("strategy-gap-{i}-{}", Uuid::new_v4());
effect.push(proposed_text_fact(ContextKey::Strategies, id, content));
}
}
Err(detail) => {
let parse_err = DdError::ParseFailed {
provider: "llm".into(),
detail: format!(
"{detail} (first 200 chars: {})",
&raw[..raw.len().min(200)]
),
};
effect.push(error_to_constraint(&parse_err, "dd-gap-detector"));
}
},
Err(e) => {
effect.push(error_to_constraint(&e, "dd-gap-detector"));
}
}
effect.build()
}
}
pub struct ContradictionFinderSuggestor {
last_hypothesis_count: Mutex<usize>,
}
impl ContradictionFinderSuggestor {
pub fn new() -> Self {
Self {
last_hypothesis_count: Mutex::new(0),
}
}
}
#[async_trait::async_trait]
impl Suggestor for ContradictionFinderSuggestor {
fn name(&self) -> &'static str {
"dd-contradiction-finder"
}
fn dependencies(&self) -> &[ContextKey] {
&[ContextKey::Hypotheses]
}
fn provenance(&self) -> Provenance {
ORGANISM_PLANNING_PROVENANCE.provenance()
}
fn accepts(&self, ctx: &dyn Context) -> bool {
let current = ctx.count(ContextKey::Hypotheses);
let last = *self.last_hypothesis_count.lock().unwrap();
current > last && current >= 3
}
async fn execute(&self, ctx: &dyn Context) -> AgentEffect {
let hypotheses = ctx.get(ContextKey::Hypotheses);
*self.last_hypothesis_count.lock().unwrap() = hypotheses.len();
let mut by_category: HashMap<String, Vec<(FactId, String)>> = HashMap::new();
for fact in hypotheses {
if let Ok(v) = serde_json::from_str::<serde_json::Value>(fact_text(fact)) {
let category = v["category"].as_str().unwrap_or("unknown").to_string();
let claim = v["claim"].as_str().unwrap_or("").to_string();
if !claim.is_empty() {
by_category
.entry(category)
.or_default()
.push((fact.id().clone(), claim));
}
}
}
let mut effect = AgentEffect::builder();
let existing_evaluations: HashSet<FactId> = ctx
.get(ContextKey::Evaluations)
.iter()
.map(|f| f.id().clone())
.collect();
for (category, claims) in &by_category {
if claims.len() < 2 {
continue;
}
let has_contradiction = claims.iter().any(|(_, c)| {
c.to_lowercase().contains("contradiction")
|| c.to_lowercase().contains("disagree")
|| c.to_lowercase().contains("conflict")
});
if has_contradiction {
let id = format!("contradiction-{category}-{}", Uuid::new_v4());
if existing_evaluations.contains(id.as_str()) {
continue;
}
let claim_ids: Vec<&str> = claims.iter().map(|(id, _)| id.as_str()).collect();
let content = serde_json::json!({
"category": category,
"type": "contradiction",
"claim_count": claims.len(),
"claim_ids": claim_ids,
"description": format!("Contradictory claims detected in {category} — sources disagree"),
"needs_human_review": true,
})
.to_string();
effect.push(
proposed_text_fact(ContextKey::Evaluations, id, content).with_confidence(0.9),
);
}
}
effect.build()
}
}
pub struct SynthesisSuggestor {
subject: String,
budget: Arc<SharedBudget>,
llm: Arc<dyn DynChatBackend>,
last_hypothesis_count: Mutex<usize>,
stable_cycles: Mutex<usize>,
required_stable_cycles: usize,
}
impl SynthesisSuggestor {
pub fn new(
subject: impl Into<String>,
budget: Arc<SharedBudget>,
llm: Arc<dyn DynChatBackend>,
) -> Self {
Self {
subject: subject.into(),
budget,
llm,
last_hypothesis_count: Mutex::new(0),
stable_cycles: Mutex::new(0),
required_stable_cycles: 2,
}
}
pub fn with_required_stable_cycles(mut self, n: usize) -> Self {
self.required_stable_cycles = n;
self
}
}
#[async_trait::async_trait]
impl Suggestor for SynthesisSuggestor {
fn name(&self) -> &'static str {
"dd-synthesis"
}
fn dependencies(&self) -> &[ContextKey] {
&[]
}
fn provenance(&self) -> Provenance {
ORGANISM_PLANNING_PROVENANCE.provenance()
}
fn accepts(&self, ctx: &dyn Context) -> bool {
let current = ctx.count(ContextKey::Hypotheses);
let mut last = self.last_hypothesis_count.lock().unwrap();
let mut stable = self.stable_cycles.lock().unwrap();
if current == *last && current > 0 {
*stable += 1;
} else {
*stable = 0;
*last = current;
}
*stable >= self.required_stable_cycles
&& !ctx.has(ContextKey::Proposals)
&& self.budget.remaining("llm") > 0
}
async fn execute(&self, ctx: &dyn Context) -> AgentEffect {
if !self.budget.try_use("llm") {
return AgentEffect::empty();
}
let hypotheses = ctx.get(ContextKey::Hypotheses);
let consolidated = consolidate_dd_hypotheses(hypotheses);
let prompt = prompts::synthesis(&self.subject, &consolidated);
match dd_complete(self.llm.as_ref(), &prompt).await {
Ok(raw) => {
let id = format!("synthesis-{}", Uuid::new_v4());
AgentEffect::builder()
.proposal(
proposed_text_fact(ContextKey::Proposals, id, raw).with_confidence(0.8),
)
.build()
}
Err(e) => AgentEffect::builder()
.proposal(error_to_constraint(&e, "dd-synthesis"))
.build(),
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct DdFactSummary {
pub category: String,
pub claim: String,
pub confidence: f64,
pub support_count: usize,
pub evidence_count: usize,
}
#[derive(Debug, Clone)]
struct ConsolidationCandidate {
summary: DdFactSummary,
distinctive_tokens: HashSet<String>,
topic_tokens: HashSet<String>,
numeric_tokens: Vec<String>,
approximate: bool,
priority_score: f64,
}
pub mod prompts {
use super::{DdFactSummary, covered_dd_categories, fact_text, missing_expected_dd_categories};
use converge_pack::ContextFact;
pub fn fact_extraction(subject: &str, signals: &[ContextFact]) -> String {
let sources_text: String = signals
.iter()
.enumerate()
.filter_map(|(i, f)| {
let v: serde_json::Value = serde_json::from_str(fact_text(f)).ok()?;
Some(format!(
"[Source {i}] ({}) {}\n URL: {}\n {}",
v["provider"].as_str().unwrap_or("?"),
v["title"].as_str().unwrap_or(""),
v["url"].as_str().unwrap_or(""),
v["content"].as_str().unwrap_or("")
))
})
.collect::<Vec<_>>()
.join("\n\n");
format!(
r#"You are an analyst extracting facts about {subject} from research sources.
{sources_text}
Extract key facts as JSON array. ONLY valid JSON, no fences:
{{
"facts": [
{{
"claim": "specific factual claim",
"category": "product|customers|technology|competition|market|financials|team|risk|governance",
"source_indices": [0, 3],
"confidence": 0.9
}}
]
}}
Rules:
- Return an object with a top-level "facts" array
- Return at most 20 facts, prioritized by investment relevance
- Return DISTINCT facts only. Do not restate the same claim with cosmetic wording changes.
- Aim to cover these DD categories when evidence exists: product, customers, technology, competition, market, financials
- Use category "technology" for platform, architecture, integrations, attack-surface management mechanics, threat-intelligence infrastructure, APIs, or technical moat
- Do NOT label a clearly technical platform claim as "product" just because it mentions a product name
- Every fact MUST cite source_indices
- 0.9+ for primary sources, 0.7 for secondary, 0.5 for inferred
- Flag contradictions between sources as separate facts with category "risk"
- If no reliable facts can be extracted, return {{"facts":[]}}"#
)
}
pub fn gap_detection(
subject: &str,
hypotheses: &[ContextFact],
generation: usize,
max_generations: usize,
) -> String {
let facts_text: String = hypotheses
.iter()
.map(fact_text)
.collect::<Vec<_>>()
.join("\n");
let covered_categories = covered_dd_categories(hypotheses);
let missing_categories = missing_expected_dd_categories(hypotheses);
let covered_text = if covered_categories.is_empty() {
"none yet".to_string()
} else {
covered_categories.join(", ")
};
let missing_text = if missing_categories.is_empty() {
"none".to_string()
} else {
missing_categories.join(", ")
};
format!(
r#"You are a PE analyst reviewing extracted facts about {subject}.
Current facts:
{facts_text}
Covered categories:
{covered_text}
Missing expected categories:
{missing_text}
What critical gaps remain? Focus on:
- Missing financials (ARR, growth, margins)
- Unknown ownership/investors
- Unclear competitive positioning
- Missing tech stack details
- Unknown customer concentration
Return JSON object:
{{
"strategies": [
{{"query": "search terms", "mode": "breadth|depth", "reason": "why this matters"}}
]
}}
This is research pass {generation} of {max_generations}. Only propose searches for gaps that are CRITICAL for investment decision-making.
Pass 1: broad gaps (max 4). Pass 2+: only truly unresolved items (max 2).
ONLY valid JSON, no markdown fences. If no critical gaps remain, return {{"strategies":[]}}."#
)
}
pub fn synthesis(subject: &str, hypotheses: &[DdFactSummary]) -> String {
let facts_text = if hypotheses.is_empty() {
"No consolidated facts were available.".to_string()
} else {
hypotheses
.iter()
.map(|fact| {
format!(
"- [{} | confidence {:.2} | support {} | evidence {}] {}",
fact.category,
fact.confidence,
fact.support_count,
fact.evidence_count,
fact.claim
)
})
.collect::<Vec<_>>()
.join("\n")
};
format!(
r#"You are a senior PE analyst producing a final due diligence synthesis for {subject}.
Consolidated facts:
{facts_text}
Produce a final analysis as JSON:
{{
"summary": "2-3 paragraph executive summary",
"market_analysis": "market analysis",
"competitive_landscape": "competitive analysis",
"technology_assessment": "tech assessment",
"risk_factors": ["risk 1", "risk 2"],
"growth_opportunities": ["opp 1", "opp 2"],
"recommendation": "investment recommendation"
}}
ONLY valid JSON, no markdown fences. All values plain strings."#
)
}
}
fn strip_fences(raw: &str) -> &str {
let s = raw.trim();
let s = s
.strip_prefix("```json")
.or_else(|| s.strip_prefix("```"))
.unwrap_or(s);
s.strip_suffix("```").unwrap_or(s).trim()
}
pub fn consolidate_dd_hypotheses(hypotheses: &[ContextFact]) -> Vec<DdFactSummary> {
consolidate_dd_fact_values(
hypotheses
.iter()
.filter_map(|fact| serde_json::from_str::<serde_json::Value>(fact_text(fact)).ok())
.collect::<Vec<_>>(),
)
}
#[derive(Debug, Clone, Default)]
pub struct DdHooks {
pub investors: Vec<String>,
pub business_areas: Vec<String>,
pub regions: Vec<String>,
pub competitors: Vec<String>,
}
#[derive(Debug, Clone)]
pub struct HookPatterns {
pub business_areas: Vec<(String, String)>,
pub regions: Vec<(String, String)>,
pub entity_triggers: Vec<String>,
}
impl Default for HookPatterns {
fn default() -> Self {
Self {
business_areas: vec![
("saas".into(), "SaaS".into()),
(" grc".into(), "Governance, Risk & Compliance (GRC)".into()),
(",grc".into(), "Governance, Risk & Compliance (GRC)".into()),
(
"governance, risk".into(),
"Governance, Risk & Compliance (GRC)".into(),
),
(" esg".into(), "ESG Reporting".into()),
("sustainability".into(), "ESG Reporting".into()),
("compliance".into(), "Compliance Management".into()),
("strategic planning".into(), "Strategic Planning".into()),
("quality management".into(), "Quality Management".into()),
("risk management".into(), "Risk Management".into()),
("edc".into(), "Electronic Data Capture (EDC)".into()),
("clinical trial".into(), "Clinical Trial Management".into()),
("eclinical".into(), "eClinical Solutions".into()),
("cybersecurity".into(), "Cybersecurity".into()),
("vulnerability".into(), "Vulnerability Management".into()),
("threat intelligence".into(), "Threat Intelligence".into()),
("penetration testing".into(), "Penetration Testing".into()),
("attack surface".into(), "Attack Surface Management".into()),
("workforce management".into(), "Workforce Management".into()),
(
"scheduling".into(),
"Scheduling & Resource Management".into(),
),
("timetabling".into(), "Timetabling".into()),
(
"higher education".into(),
"Higher Education Software".into(),
),
("edtech".into(), "EdTech".into()),
(
"business intelligence".into(),
"Business Intelligence".into(),
),
("analytics".into(), "Analytics".into()),
("fintech".into(), "FinTech".into()),
("payment".into(), "Payment Solutions".into()),
("information security".into(), "Information Security".into()),
("regulatory".into(), "Regulatory Technology".into()),
("crm".into(), "CRM".into()),
("marketing automation".into(), "Marketing Automation".into()),
("sales automation".into(), "Sales Automation".into()),
],
regions: vec![
("nordic".into(), "Nordics".into()),
("scandinav".into(), "Nordics".into()),
("sweden".into(), "Nordics".into()),
("norway".into(), "Nordics".into()),
("denmark".into(), "Nordics".into()),
("finland".into(), "Nordics".into()),
("europe".into(), "Europe".into()),
("north america".into(), "North America".into()),
("united states".into(), "North America".into()),
(" us ".into(), "North America".into()),
("apac".into(), "APAC".into()),
("asia".into(), "APAC".into()),
("japan".into(), "Japan".into()),
("united kingdom".into(), "United Kingdom".into()),
(" uk ".into(), "United Kingdom".into()),
("germany".into(), "Europe".into()),
("france".into(), "France".into()),
("global".into(), "Global".into()),
],
entity_triggers: vec![
"acquired by ".into(),
"acquired ".into(),
"investment from ".into(),
"invested by ".into(),
"backed by ".into(),
"funded by ".into(),
"partnership with ".into(),
"partner ".into(),
"competes with ".into(),
"competitor ".into(),
"competitors like ".into(),
"competitors such as ".into(),
"alternatives include ".into(),
"compared to ".into(),
"compared against ".into(),
"compared against competitors like ".into(),
],
}
}
}
pub fn extract_hooks_from_facts(
subject: &str,
facts: &[DdFactSummary],
patterns: &HookPatterns,
) -> DdHooks {
let mut business_areas = std::collections::BTreeSet::new();
let mut competitors = std::collections::BTreeSet::new();
let mut investors = std::collections::BTreeSet::new();
let mut regions = std::collections::BTreeSet::new();
let subject_lower = subject.to_lowercase();
for fact in facts {
let claim = &fact.claim;
let claim_lower = claim.to_lowercase();
for (pattern, label) in &patterns.business_areas {
if claim_lower.contains(pattern.as_str()) {
business_areas.insert(label.clone());
}
}
match fact.category.as_str() {
"competition" | "competitors" => {
for name in extract_named_entities(claim, &subject_lower, &patterns.entity_triggers)
{
competitors.insert(name);
}
}
"financials" => {
for name in extract_named_entities(claim, &subject_lower, &patterns.entity_triggers)
{
investors.insert(name);
}
}
_ => {}
}
let mut seen_regions = std::collections::HashSet::new();
for (pattern, label) in &patterns.regions {
if claim_lower.contains(pattern.as_str()) && seen_regions.insert(label.clone()) {
regions.insert(label.clone());
}
}
}
DdHooks {
investors: investors.into_iter().collect(),
business_areas: business_areas.into_iter().collect(),
regions: regions.into_iter().collect(),
competitors: competitors.into_iter().collect(),
}
}
fn extract_named_entities(claim: &str, exclude_lower: &str, triggers: &[String]) -> Vec<String> {
let mut entities = Vec::new();
let claim_lower = claim.to_lowercase();
for trigger in triggers {
if let Some(pos) = claim_lower.find(trigger.as_str()) {
let after = &claim[pos + trigger.len()..];
let entity = after
.split([',', '.', ';', '(', ')'])
.next()
.unwrap_or("")
.trim();
if !entity.is_empty() && entity.len() < 60 && entity.to_lowercase() != exclude_lower {
entities.push(entity.to_string());
}
}
}
entities
}
fn next_batch_bounds(
total_items: usize,
processed_items: usize,
max_batch: usize,
) -> (usize, usize) {
let start = processed_items.min(total_items);
let end = (start + max_batch).min(total_items);
(start, end)
}
fn consolidate_dd_fact_values<I>(values: I) -> Vec<DdFactSummary>
where
I: IntoIterator<Item = serde_json::Value>,
{
let mut by_signature: HashMap<String, DdFactSummary> = HashMap::new();
for value in values {
let Some(normalized) = normalize_dd_fact(&value) else {
continue;
};
let Some(summary) = summary_from_normalized_fact(&normalized) else {
continue;
};
let signature = dd_fact_signature(&normalized);
if let Some(existing) = by_signature.get_mut(&signature) {
merge_exact_summary(existing, summary);
} else {
by_signature.insert(signature, summary);
}
}
let summaries: Vec<DdFactSummary> = by_signature.into_values().collect();
if summaries.is_empty() {
return Vec::new();
}
let token_frequencies = token_document_frequency(&summaries);
let total_summaries = summaries.len();
let mut candidates: Vec<ConsolidationCandidate> = summaries
.into_iter()
.map(|summary| build_consolidation_candidate(summary, &token_frequencies, total_summaries))
.collect();
candidates.sort_by(compare_candidates);
let mut kept = Vec::new();
let mut counts_by_category: HashMap<String, usize> = HashMap::new();
for candidate in candidates {
if kept
.iter()
.any(|existing| should_skip_candidate(&candidate, existing))
{
continue;
}
let count = counts_by_category
.get(candidate.summary.category.as_str())
.copied()
.unwrap_or(0);
if count >= category_fact_cap(candidate.summary.category.as_str()) {
continue;
}
counts_by_category
.entry(candidate.summary.category.clone())
.and_modify(|value| *value += 1)
.or_insert(1);
kept.push(candidate);
}
kept.sort_by(compare_candidates);
kept.into_iter()
.map(|candidate| candidate.summary)
.collect()
}
fn summary_from_normalized_fact(fact: &serde_json::Value) -> Option<DdFactSummary> {
let category = fact.get("category")?.as_str()?.to_string();
let claim = fact.get("claim")?.as_str()?.trim().to_string();
if claim.is_empty() {
return None;
}
let confidence = fact
.get("confidence")
.and_then(serde_json::Value::as_f64)
.unwrap_or(0.5)
.clamp(0.0, 1.0);
let evidence_count = fact
.get("source_indices")
.and_then(serde_json::Value::as_array)
.map_or(1, |values| values.len().max(1));
Some(DdFactSummary {
category,
claim,
confidence,
support_count: 1,
evidence_count,
})
}
#[allow(clippy::float_cmp)]
fn merge_exact_summary(existing: &mut DdFactSummary, candidate: DdFactSummary) {
existing.support_count += candidate.support_count;
existing.evidence_count += candidate.evidence_count;
if candidate.confidence > existing.confidence
|| (candidate.confidence == existing.confidence
&& candidate.claim.len() > existing.claim.len())
{
existing.claim = candidate.claim;
}
existing.confidence = existing.confidence.max(candidate.confidence);
}
fn build_consolidation_candidate(
summary: DdFactSummary,
token_frequencies: &HashMap<String, usize>,
total_summaries: usize,
) -> ConsolidationCandidate {
let claim_tokens = informative_claim_tokens(&summary.claim);
let topic_tokens: HashSet<String> = claim_tokens
.iter()
.filter(|token| !token.chars().any(|ch| ch.is_ascii_digit()))
.cloned()
.collect();
let distinctive_tokens: HashSet<String> = claim_tokens
.iter()
.filter(|token| {
token_frequencies.get(*token).copied().unwrap_or_default() * 2 <= total_summaries + 1
})
.cloned()
.collect();
let approximate = claim_is_approximate(&summary.claim);
let numeric_tokens = numeric_claim_tokens(&summary.claim);
let priority_score = fact_priority_score(&summary, approximate, numeric_tokens.len());
ConsolidationCandidate {
summary,
distinctive_tokens: if distinctive_tokens.is_empty() {
claim_tokens.iter().cloned().collect()
} else {
distinctive_tokens
},
topic_tokens: if topic_tokens.is_empty() {
claim_tokens.iter().cloned().collect()
} else {
topic_tokens
},
numeric_tokens,
approximate,
priority_score,
}
}
fn token_document_frequency(summaries: &[DdFactSummary]) -> HashMap<String, usize> {
let mut frequencies = HashMap::new();
for summary in summaries {
let mut seen = HashSet::new();
for token in informative_claim_tokens(&summary.claim) {
if seen.insert(token.clone()) {
frequencies
.entry(token)
.and_modify(|count| *count += 1)
.or_insert(1);
}
}
}
frequencies
}
fn informative_claim_tokens(claim: &str) -> Vec<String> {
canonicalize_claim(claim)
.split_whitespace()
.filter(|token| token.len() > 2 && !dd_stopwords().contains(token))
.map(ToOwned::to_owned)
.collect()
}
fn numeric_claim_tokens(claim: &str) -> Vec<String> {
canonicalize_claim(claim)
.split_whitespace()
.filter(|token| token.chars().any(|ch| ch.is_ascii_digit()))
.map(ToOwned::to_owned)
.collect()
}
fn claim_is_approximate(claim: &str) -> bool {
let normalized = claim.to_ascii_lowercase();
[
"estimated",
"estimate",
"approximately",
"approx",
"about ",
"over ",
"under ",
"close to",
"around ",
"range of",
"between ",
"currently",
"historically",
"more than",
"less than",
]
.iter()
.any(|needle| normalized.contains(needle))
}
#[allow(clippy::cast_precision_loss)]
fn fact_priority_score(
summary: &DdFactSummary,
approximate: bool,
numeric_token_count: usize,
) -> f64 {
let confidence_score = summary.confidence * 100.0;
let support_bonus = summary.support_count as f64 * 6.0;
let evidence_bonus = summary.evidence_count as f64 * 2.0;
let exactness_bonus = if approximate { 0.0 } else { 5.0 };
let numeric_bonus = if numeric_token_count > 0 { 2.0 } else { 0.0 };
confidence_score + support_bonus + evidence_bonus + exactness_bonus + numeric_bonus
}
fn compare_candidates(left: &ConsolidationCandidate, right: &ConsolidationCandidate) -> Ordering {
category_sort_order(left.summary.category.as_str())
.cmp(&category_sort_order(right.summary.category.as_str()))
.then_with(|| right.priority_score.total_cmp(&left.priority_score))
.then_with(|| right.summary.claim.len().cmp(&left.summary.claim.len()))
}
fn category_sort_order(category: &str) -> usize {
match category {
"product" => 0,
"customers" => 1,
"technology" => 2,
"competition" => 3,
"market" => 4,
"financials" => 5,
"team" => 6,
"governance" => 7,
"risk" => 8,
_ => 9,
}
}
fn category_fact_cap(category: &str) -> usize {
match category {
"technology" => 5,
"financials" => 4,
"customers" | "competition" => 3,
_ => 2,
}
}
fn should_skip_candidate(
candidate: &ConsolidationCandidate,
existing: &ConsolidationCandidate,
) -> bool {
if candidate.summary.category != existing.summary.category {
return false;
}
let similarity = token_similarity(&candidate.distinctive_tokens, &existing.distinctive_tokens);
let topic_similarity = token_similarity(&candidate.topic_tokens, &existing.topic_tokens);
if similarity >= 0.86 {
return true;
}
if !candidate.numeric_tokens.is_empty()
&& candidate.numeric_tokens == existing.numeric_tokens
&& similarity >= 0.55
{
return true;
}
candidate.approximate
&& topic_similarity >= 0.5
&& (!existing.approximate || candidate.numeric_tokens == existing.numeric_tokens)
}
#[allow(clippy::cast_precision_loss)]
fn token_similarity(left: &HashSet<String>, right: &HashSet<String>) -> f64 {
if left.is_empty() || right.is_empty() {
return 0.0;
}
let intersection = left.intersection(right).count() as f64;
let union = left.union(right).count() as f64;
if union == 0.0 {
0.0
} else {
intersection / union
}
}
fn dd_stopwords() -> &'static [&'static str] {
&[
"and",
"for",
"the",
"with",
"into",
"that",
"from",
"their",
"this",
"those",
"these",
"across",
"through",
"using",
"used",
"helps",
"help",
"offer",
"offers",
"provides",
"provide",
"company",
"companies",
"solution",
"solutions",
]
}
fn existing_fact_signature(content: &str) -> Option<String> {
let value = serde_json::from_str::<serde_json::Value>(content).ok()?;
let normalized = normalize_dd_fact(&value)?;
Some(dd_fact_signature(&normalized))
}
fn normalize_dd_fact(fact: &serde_json::Value) -> Option<serde_json::Value> {
let claim = fact.get("claim")?.as_str()?.trim();
if claim.is_empty() {
return None;
}
let category = normalize_dd_category(
fact.get("category").and_then(serde_json::Value::as_str),
claim,
);
let confidence = fact
.get("confidence")
.and_then(serde_json::Value::as_f64)
.unwrap_or(0.5)
.clamp(0.0, 1.0);
let source_indices = fact
.get("source_indices")
.and_then(serde_json::Value::as_array)
.map(|values| {
values
.iter()
.filter(|value| value.is_i64() || value.is_u64())
.cloned()
.collect::<Vec<_>>()
})
.unwrap_or_default();
Some(serde_json::json!({
"claim": claim,
"category": category,
"source_indices": source_indices,
"confidence": confidence,
}))
}
fn normalize_dd_category(raw_category: Option<&str>, claim: &str) -> &'static str {
match raw_category
.unwrap_or_default()
.trim()
.to_ascii_lowercase()
.as_str()
{
"product" => {
if claim_looks_technical(claim) {
"technology"
} else {
"product"
}
}
"customer" | "customers" => "customers",
"technology" | "tech" | "platform" | "architecture" | "engineering" | "integrations"
| "integration" | "stack" => "technology",
"competition" | "competitor" | "competitors" => "competition",
"market" | "positioning" => "market",
"financial" | "financials" | "finance" | "funding" | "ownership" | "investors" => {
"financials"
}
"team" | "leadership" | "management" => "team",
"risk" => "risk",
"governance" => "governance",
_ => infer_dd_category_from_claim(claim),
}
}
fn infer_dd_category_from_claim(claim: &str) -> &'static str {
let claim = claim.to_ascii_lowercase();
if claim_looks_technical(&claim) {
"technology"
} else if claim.contains("customer")
|| claim.contains("clients")
|| claim.contains("serves ")
|| claim.contains("countries")
{
"customers"
} else if claim.contains("funding")
|| claim.contains("raised")
|| claim.contains("investor")
|| claim.contains("acquired")
|| claim.contains("revenue")
|| claim.contains("arr")
{
"financials"
} else if claim.contains("competitor") || claim.contains("competes") {
"competition"
} else if claim.contains("market") || claim.contains("major player") || claim.contains("idc") {
"market"
} else if claim.contains("chief ")
|| claim.contains("officer")
|| claim.contains("executive")
|| claim.contains("leadership")
{
"team"
} else {
"product"
}
}
fn claim_looks_technical(claim: &str) -> bool {
let claim = claim.to_ascii_lowercase();
[
"technology",
"architecture",
"platform",
"integration",
"integrations",
"api",
"apis",
"cloud",
"threat intelligence",
"attack surface",
"monitor",
"monitoring",
"ctem",
"exposure management",
"internet-facing",
"dark web",
"open web",
"deep web",
"technical moat",
]
.iter()
.any(|needle| claim.contains(needle))
}
fn dd_fact_signature(fact: &serde_json::Value) -> String {
let category = fact
.get("category")
.and_then(serde_json::Value::as_str)
.unwrap_or("product");
let claim = fact
.get("claim")
.and_then(serde_json::Value::as_str)
.unwrap_or_default();
format!("{category}:{}", canonicalize_claim(claim))
}
fn canonicalize_claim(claim: &str) -> String {
claim
.chars()
.map(|ch| {
if ch.is_ascii_alphanumeric() {
ch.to_ascii_lowercase()
} else {
' '
}
})
.collect::<String>()
.split_whitespace()
.collect::<Vec<_>>()
.join(" ")
}
fn covered_dd_categories(hypotheses: &[ContextFact]) -> Vec<String> {
let mut categories: Vec<String> = hypotheses
.iter()
.filter_map(|fact| {
let value = serde_json::from_str::<serde_json::Value>(fact_text(fact)).ok()?;
let normalized = normalize_dd_fact(&value)?;
normalized["category"].as_str().map(ToOwned::to_owned)
})
.collect::<HashSet<_>>()
.into_iter()
.collect();
categories.sort();
categories
}
fn missing_expected_dd_categories(hypotheses: &[ContextFact]) -> Vec<&'static str> {
let covered: HashSet<String> = covered_dd_categories(hypotheses).into_iter().collect();
expected_dd_categories()
.into_iter()
.filter(|category| !covered.contains(*category))
.collect()
}
fn expected_dd_categories() -> [&'static str; 6] {
[
"product",
"customers",
"technology",
"competition",
"market",
"financials",
]
}
fn parse_json_array_response(
raw: &str,
field_name: &str,
) -> Result<Vec<serde_json::Value>, String> {
let cleaned = strip_fences(raw);
try_parse_json_array(cleaned, field_name).or_else(|first_error| {
extract_first_json_value(cleaned)
.filter(|candidate| *candidate != cleaned)
.ok_or(first_error.clone())
.and_then(|candidate| {
try_parse_json_array(candidate, field_name).map_err(|second_error| {
format!("{first_error}; recovered JSON failed: {second_error}")
})
})
})
}
fn try_parse_json_array(raw: &str, field_name: &str) -> Result<Vec<serde_json::Value>, String> {
match serde_json::from_str::<serde_json::Value>(raw) {
Ok(serde_json::Value::Array(values)) => Ok(values),
Ok(serde_json::Value::Object(map)) => map
.get(field_name)
.and_then(serde_json::Value::as_array)
.cloned()
.ok_or_else(|| format!("expected object field `{field_name}` containing an array")),
Ok(_) => Err(format!(
"expected top-level JSON array or object with `{field_name}`"
)),
Err(error) => Err(error.to_string()),
}
}
fn extract_first_json_value(raw: &str) -> Option<&str> {
let (start, _) = raw.char_indices().find(|(_, ch)| matches!(ch, '{' | '['))?;
let mut stack = Vec::new();
let mut in_string = false;
let mut escaped = false;
for (offset, ch) in raw[start..].char_indices() {
if in_string {
if escaped {
escaped = false;
continue;
}
match ch {
'\\' => escaped = true,
'"' => in_string = false,
_ => {}
}
continue;
}
match ch {
'"' => in_string = true,
'{' => stack.push('}'),
'[' => stack.push(']'),
'}' | ']' => {
if stack.pop() != Some(ch) {
return None;
}
if stack.is_empty() {
let end = start + offset + ch.len_utf8();
return Some(&raw[start..end]);
}
}
_ => {}
}
}
None
}
fn is_relevant(title: &str, content: &str, url: &str, subject: &str) -> bool {
let s = subject.to_lowercase();
let t = title.to_lowercase();
let b = content.to_lowercase();
let u = url.to_lowercase();
t.contains(&s)
|| b.contains(&s)
|| u.contains(&s.replace(' ', ""))
|| u.contains(&s.replace(' ', "-"))
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use converge_pack::{Context, ContextFact, ContextKey, ProposedFact, Suggestor};
use converge_provider::{BoxFuture, ChatRequest, ChatResponse, DynChatBackend, LlmError};
use super::{
DdError, SharedBudget, SynthesisSuggestor, canonicalize_claim, consolidate_dd_fact_values,
extract_first_json_value, next_batch_bounds, normalize_dd_fact, parse_json_array_response,
};
struct StubLlm;
impl DynChatBackend for StubLlm {
fn chat(&self, _req: ChatRequest) -> BoxFuture<'_, Result<ChatResponse, LlmError>> {
Box::pin(async {
Ok(ChatResponse {
content: "{}".to_string(),
tool_calls: Vec::new(),
usage: None,
model: None,
finish_reason: None,
metadata: std::collections::HashMap::new(),
})
})
}
}
struct StubContext {
hypothesis_count: usize,
has_proposals: bool,
}
impl Context for StubContext {
fn has(&self, key: ContextKey) -> bool {
match key {
ContextKey::Hypotheses => self.hypothesis_count > 0,
ContextKey::Proposals => self.has_proposals,
_ => false,
}
}
fn get(&self, _key: ContextKey) -> &[ContextFact] {
&[]
}
fn get_proposals(&self, _key: ContextKey) -> &[ProposedFact] {
&[]
}
fn count(&self, key: ContextKey) -> usize {
match key {
ContextKey::Hypotheses => self.hypothesis_count,
_ => 0,
}
}
}
#[test]
fn synthesis_suggestor_is_always_schedulable() {
let budget = Arc::new(SharedBudget::new().with_limit("llm", 1));
let suggestor = SynthesisSuggestor::new("Acme", budget, Arc::new(StubLlm));
assert!(suggestor.dependencies().is_empty());
}
#[test]
fn synthesis_accepts_after_hypotheses_stabilize() {
let budget = Arc::new(SharedBudget::new().with_limit("llm", 1));
let suggestor = SynthesisSuggestor::new("Acme", budget, Arc::new(StubLlm))
.with_required_stable_cycles(2);
let first_fact_wave = StubContext {
hypothesis_count: 5,
has_proposals: false,
};
let first_stable_cycle = StubContext {
hypothesis_count: 5,
has_proposals: false,
};
let second_stable_cycle = StubContext {
hypothesis_count: 5,
has_proposals: false,
};
assert!(!suggestor.accepts(&first_fact_wave));
assert!(!suggestor.accepts(&first_stable_cycle));
assert!(suggestor.accepts(&second_stable_cycle));
}
#[test]
fn parse_json_array_response_accepts_wrapped_object() {
let parsed = parse_json_array_response(
r#"{"facts":[{"claim":"Acme sells software","confidence":0.9}]}"#,
"facts",
)
.expect("wrapped array should parse");
assert_eq!(parsed.len(), 1);
assert_eq!(parsed[0]["claim"], "Acme sells software");
}
#[test]
fn parse_json_array_response_accepts_legacy_array_shape() {
let parsed = parse_json_array_response(
r#"[{"query":"Acme competitors","mode":"breadth","reason":"market"}]"#,
"strategies",
)
.expect("legacy array should parse");
assert_eq!(parsed.len(), 1);
assert_eq!(parsed[0]["query"], "Acme competitors");
}
#[test]
fn parse_json_array_response_recovers_json_from_prose() {
let parsed = parse_json_array_response(
"Here is the JSON you requested:\n```json\n{\"facts\":[{\"claim\":\"Acme grows\",\"confidence\":0.7}]}\n```\nThanks.",
"facts",
)
.expect("embedded JSON should parse");
assert_eq!(parsed.len(), 1);
assert_eq!(parsed[0]["claim"], "Acme grows");
}
#[test]
fn extract_first_json_value_handles_nested_arrays_and_objects() {
let extracted = extract_first_json_value(
"prefix {\"facts\":[{\"claim\":\"A\",\"source_indices\":[0,1]}]} suffix",
)
.expect("should find first JSON value");
assert_eq!(
extracted,
r#"{"facts":[{"claim":"A","source_indices":[0,1]}]}"#
);
}
#[test]
fn next_batch_bounds_advances_through_unprocessed_signals() {
assert_eq!(next_batch_bounds(37, 0, 15), (0, 15));
assert_eq!(next_batch_bounds(37, 15, 15), (15, 30));
assert_eq!(next_batch_bounds(37, 30, 15), (30, 37));
assert_eq!(next_batch_bounds(37, 37, 15), (37, 37));
}
#[test]
fn normalize_dd_fact_reclassifies_technical_product_claims() {
let normalized = normalize_dd_fact(&serde_json::json!({
"claim": "Outpost24's Sweepatic Platform monitors internet-facing assets for attack surface management.",
"category": "product",
"source_indices": [0],
"confidence": 0.9,
}))
.expect("fact should normalize");
assert_eq!(normalized["category"], "technology");
}
#[test]
fn canonicalize_claim_ignores_case_and_punctuation() {
assert_eq!(
canonicalize_claim("Outpost24 raised $23.8M!"),
canonicalize_claim("outpost24 raised 23 8m")
);
}
#[test]
fn consolidate_dd_fact_values_merges_exact_duplicates() {
let summaries = consolidate_dd_fact_values(vec![
serde_json::json!({
"claim": "Outpost24 offers a 100% open API for easy integration into security operations.",
"category": "technology",
"source_indices": [0],
"confidence": 0.9,
}),
serde_json::json!({
"claim": "Outpost24 offers a 100% open API for easy integration into security operations.",
"category": "technology",
"source_indices": [1],
"confidence": 0.8,
}),
]);
assert_eq!(summaries.len(), 1);
assert_eq!(summaries[0].support_count, 2);
assert_eq!(summaries[0].evidence_count, 2);
}
#[test]
fn consolidate_dd_fact_values_drops_vague_same_topic_repeats() {
let summaries = consolidate_dd_fact_values(vec![
serde_json::json!({
"claim": "Outpost24 has 195 employees.",
"category": "team",
"source_indices": [0],
"confidence": 0.9,
}),
serde_json::json!({
"claim": "Outpost24 has over 200 employees.",
"category": "team",
"source_indices": [1],
"confidence": 0.7,
}),
serde_json::json!({
"claim": "Outpost24 offers a 100% open API for easy integration into security operations.",
"category": "technology",
"source_indices": [2],
"confidence": 0.9,
}),
]);
let team_facts: Vec<_> = summaries
.iter()
.filter(|summary| summary.category == "team")
.collect();
assert_eq!(team_facts.len(), 1);
assert_eq!(team_facts[0].claim, "Outpost24 has 195 employees.");
}
#[test]
fn consolidate_dd_fact_values_preserves_conflicting_exact_financials() {
let summaries = consolidate_dd_fact_values(vec![
serde_json::json!({
"claim": "Outpost24's 2023 revenue was $42.19M.",
"category": "financials",
"source_indices": [0],
"confidence": 0.9,
}),
serde_json::json!({
"claim": "Outpost24 generates $67.5 million in revenue.",
"category": "financials",
"source_indices": [1],
"confidence": 0.9,
}),
]);
assert_eq!(summaries.len(), 2);
}
#[test]
fn normalize_dd_fact_rejects_empty_claim() {
assert!(
normalize_dd_fact(&serde_json::json!({
"claim": "",
"category": "product",
}))
.is_none()
);
}
#[test]
fn normalize_dd_fact_rejects_whitespace_only_claim() {
assert!(
normalize_dd_fact(&serde_json::json!({
"claim": " ",
"category": "product",
}))
.is_none()
);
}
#[test]
fn normalize_dd_fact_rejects_missing_claim() {
assert!(
normalize_dd_fact(&serde_json::json!({
"category": "product",
}))
.is_none()
);
}
#[test]
fn normalize_dd_fact_clamps_confidence() {
let normalized = normalize_dd_fact(&serde_json::json!({
"claim": "test",
"category": "product",
"confidence": 5.0,
}))
.unwrap();
assert_eq!(normalized["confidence"], 1.0);
let normalized = normalize_dd_fact(&serde_json::json!({
"claim": "test",
"category": "product",
"confidence": -1.0,
}))
.unwrap();
assert_eq!(normalized["confidence"], 0.0);
}
#[test]
fn normalize_dd_fact_defaults_missing_confidence() {
let normalized = normalize_dd_fact(&serde_json::json!({
"claim": "test claim",
"category": "product",
}))
.unwrap();
assert_eq!(normalized["confidence"], 0.5);
}
#[test]
fn normalize_dd_fact_filters_non_integer_source_indices() {
let normalized = normalize_dd_fact(&serde_json::json!({
"claim": "test",
"category": "product",
"source_indices": [0, "bad", 2, null, 3],
}))
.unwrap();
let indices = normalized["source_indices"].as_array().unwrap();
assert_eq!(indices.len(), 3);
}
#[test]
fn parse_json_array_response_rejects_plain_text() {
assert!(parse_json_array_response("just some text", "facts").is_err());
}
#[test]
fn parse_json_array_response_rejects_object_with_wrong_field() {
assert!(parse_json_array_response(r#"{"results":[{"claim":"X"}]}"#, "facts").is_err());
}
#[test]
fn parse_json_array_response_rejects_scalar() {
assert!(parse_json_array_response("42", "facts").is_err());
assert!(parse_json_array_response("true", "facts").is_err());
assert!(parse_json_array_response(r#""string""#, "facts").is_err());
}
#[test]
fn extract_first_json_value_returns_none_for_no_json() {
assert!(extract_first_json_value("no json here").is_none());
}
#[test]
fn extract_first_json_value_returns_none_for_mismatched_braces() {
assert!(extract_first_json_value("{unclosed").is_none());
assert!(extract_first_json_value("[}").is_none());
}
#[test]
fn extract_first_json_value_handles_escaped_quotes_in_strings() {
let result = extract_first_json_value(r#"prefix {"key":"val\"ue"} suffix"#);
assert!(result.is_some());
let parsed: serde_json::Value = serde_json::from_str(result.unwrap()).unwrap();
assert_eq!(parsed["key"], r#"val"ue"#);
}
#[test]
fn consolidate_dd_fact_values_handles_empty_input() {
assert!(consolidate_dd_fact_values(vec![]).is_empty());
}
#[test]
fn consolidate_dd_fact_values_handles_all_invalid_facts() {
let summaries = consolidate_dd_fact_values(vec![
serde_json::json!({"claim": "", "category": "product"}),
serde_json::json!({"no_claim": true}),
serde_json::json!(null),
]);
assert!(summaries.is_empty());
}
#[test]
fn next_batch_bounds_zero_total() {
assert_eq!(next_batch_bounds(0, 0, 15), (0, 0));
}
#[test]
fn next_batch_bounds_processed_exceeds_total() {
assert_eq!(next_batch_bounds(5, 100, 15), (5, 5));
}
#[test]
fn canonicalize_claim_handles_empty_string() {
assert_eq!(canonicalize_claim(""), "");
}
#[test]
fn canonicalize_claim_handles_only_punctuation() {
assert_eq!(canonicalize_claim("!!!...???"), "");
}
#[test]
fn synthesis_does_not_accept_when_proposals_exist() {
let budget = Arc::new(SharedBudget::new().with_limit("llm", 1));
let suggestor = SynthesisSuggestor::new("Acme", budget, Arc::new(StubLlm));
let ctx_with_proposals = StubContext {
hypothesis_count: 10,
has_proposals: true,
};
assert!(!suggestor.accepts(&ctx_with_proposals));
}
#[test]
fn synthesis_does_not_accept_without_hypotheses() {
let budget = Arc::new(SharedBudget::new().with_limit("llm", 1));
let suggestor = SynthesisSuggestor::new("Acme", budget, Arc::new(StubLlm));
let empty_ctx = StubContext {
hypothesis_count: 0,
has_proposals: false,
};
assert!(!suggestor.accepts(&empty_ctx));
}
#[test]
fn dd_error_infra_vs_non_infra() {
assert!(
DdError::CreditsExhausted {
provider: "x".into(),
detail: "y".into()
}
.is_infra_failure()
);
assert!(
DdError::RateLimited {
provider: "x".into(),
retry_after_ms: None
}
.is_infra_failure()
);
assert!(
DdError::ProviderUnavailable {
provider: "x".into(),
detail: "y".into()
}
.is_infra_failure()
);
assert!(
!DdError::BadResponse {
provider: "x".into(),
detail: "y".into()
}
.is_infra_failure()
);
assert!(
!DdError::ParseFailed {
provider: "x".into(),
detail: "y".into()
}
.is_infra_failure()
);
assert!(
!DdError::PromptTooLarge {
provider: "x".into(),
tokens: None
}
.is_infra_failure()
);
}
#[test]
fn dd_error_only_credits_exhausted_is_fatal() {
assert!(
DdError::CreditsExhausted {
provider: "x".into(),
detail: "y".into()
}
.is_fatal()
);
assert!(
!DdError::RateLimited {
provider: "x".into(),
retry_after_ms: None
}
.is_fatal()
);
assert!(
!DdError::ProviderUnavailable {
provider: "x".into(),
detail: "y".into()
}
.is_fatal()
);
}
#[allow(clippy::cast_precision_loss)]
mod proptests {
use super::*;
use proptest::prelude::*;
proptest! {
#[test]
fn canonicalize_is_idempotent(claim in ".*") {
let first = canonicalize_claim(&claim);
let second = canonicalize_claim(&first);
prop_assert_eq!(first, second);
}
#[test]
fn canonicalize_is_case_insensitive(claim in "[a-zA-Z0-9 ]{1,100}") {
prop_assert_eq!(
canonicalize_claim(&claim),
canonicalize_claim(&claim.to_uppercase())
);
}
#[test]
fn normalize_dd_fact_never_panics(
claim in ".*",
category in ".*",
confidence in proptest::num::f64::ANY,
) {
let _ = normalize_dd_fact(&serde_json::json!({
"claim": claim,
"category": category,
"confidence": confidence,
}));
}
#[test]
fn normalize_preserves_non_empty_claims(
claim in "[a-zA-Z]{1,50}",
category in prop_oneof![
Just("product"), Just("technology"), Just("financials"),
Just("customers"), Just("competition"), Just("market"),
],
) {
let normalized = normalize_dd_fact(&serde_json::json!({
"claim": claim,
"category": category,
"confidence": 0.8,
}));
prop_assert!(normalized.is_some());
let n = normalized.unwrap();
prop_assert!(!n["claim"].as_str().unwrap().is_empty());
}
#[test]
fn consolidate_never_panics(
n in 0_usize..20,
) {
let categories = ["product", "technology", "financials"];
let facts: Vec<serde_json::Value> = (0..n).map(|i| {
serde_json::json!({
"claim": format!("Fact number {i} about the company"),
"category": categories[i % 3],
"source_indices": [i],
"confidence": 0.5 + (i as f64 * 0.02),
})
}).collect();
let result = consolidate_dd_fact_values(facts);
prop_assert!(result.len() <= n);
}
#[test]
fn next_batch_bounds_always_valid(
total in 0_usize..1000,
processed in 0_usize..1000,
max_batch in 1_usize..100,
) {
let (start, end) = next_batch_bounds(total, processed, max_batch);
prop_assert!(start <= total);
prop_assert!(end <= total);
prop_assert!(start <= end);
prop_assert!(end - start <= max_batch);
}
#[test]
fn extract_first_json_value_never_panics(input in ".*") {
let _ = extract_first_json_value(&input);
}
#[test]
fn parse_json_array_response_never_panics(
input in ".*",
field in "[a-z]{1,10}",
) {
let _ = parse_json_array_response(&input, &field);
}
}
}
}