use std::pin::Pin;
use std::sync::Mutex;
use std::time::Duration;
use async_trait::async_trait;
use futures::StreamExt;
use futures_core::Stream;
use pulsehive_core::agent::{AgentDefinition, AgentKind, AgentOutcome, LlmAgentConfig};
use pulsehive_core::error::{PulseHiveError, Result};
use pulsehive_core::event::HiveEvent;
use pulsehive_core::lens::Lens;
use pulsehive_core::llm::*;
use pulsehive_runtime::hivemind::{HiveMind, Task};
use pulsehive_runtime::intelligence::context::ContextOptimizer;
struct MockLlm {
responses: Mutex<Vec<LlmResponse>>,
}
impl MockLlm {
fn new(responses: Vec<LlmResponse>) -> Self {
Self {
responses: Mutex::new(responses),
}
}
fn text(content: &str) -> LlmResponse {
LlmResponse {
content: Some(content.into()),
tool_calls: vec![],
usage: TokenUsage::default(),
}
}
}
#[async_trait]
impl LlmProvider for MockLlm {
async fn chat(
&self,
_messages: Vec<Message>,
_tools: Vec<ToolDefinition>,
_config: &LlmConfig,
) -> Result<LlmResponse> {
let mut responses = self.responses.lock().unwrap();
if responses.is_empty() {
Err(PulseHiveError::llm("No more scripted responses"))
} else {
Ok(responses.remove(0))
}
}
async fn chat_stream(
&self,
_messages: Vec<Message>,
_tools: Vec<ToolDefinition>,
_config: &LlmConfig,
) -> Result<Pin<Box<dyn Stream<Item = Result<LlmChunk>> + Send>>> {
Err(PulseHiveError::llm("Not used in tests"))
}
}
fn build_hive(provider: MockLlm) -> HiveMind {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.db");
Box::leak(Box::new(dir));
HiveMind::builder()
.substrate_path(&path)
.llm_provider("mock", provider)
.build()
.unwrap()
}
fn llm_agent(name: &str) -> AgentDefinition {
AgentDefinition {
name: name.into(),
kind: AgentKind::Llm(Box::new(LlmAgentConfig {
system_prompt: "You are a test agent.".into(),
tools: vec![],
lens: Lens::default(),
llm_config: LlmConfig::new("mock", "test-model"),
experience_extractor: None,
refresh_every_n_tool_calls: None,
})),
}
}
async fn collect_events(
mut stream: Pin<Box<dyn Stream<Item = HiveEvent> + Send>>,
timeout: Duration,
) -> Vec<HiveEvent> {
let mut events = vec![];
let deadline = tokio::time::Instant::now() + timeout;
loop {
tokio::select! {
event = stream.next() => {
match event {
Some(e) => events.push(e),
None => break,
}
}
_ = tokio::time::sleep_until(deadline) => { break; }
}
}
events
}
#[tokio::test]
async fn test_phase2_parallel_3_agents() {
let provider = MockLlm::new(vec![
MockLlm::text("Agent 1 done"),
MockLlm::text("Agent 2 done"),
MockLlm::text("Agent 3 done"),
]);
let hive = build_hive(provider);
let workflow = AgentDefinition {
name: "phase2-parallel".into(),
kind: AgentKind::Parallel(vec![
llm_agent("agent-1"),
llm_agent("agent-2"),
llm_agent("agent-3"),
]),
};
let task = Task::new("Phase 2 parallel test");
let stream = hive.deploy(vec![workflow], vec![task]).await.unwrap();
let events = collect_events(stream, Duration::from_secs(10)).await;
let completed = events
.iter()
.filter(|e| {
matches!(
e,
HiveEvent::AgentCompleted {
outcome: AgentOutcome::Complete { .. },
..
}
)
})
.count();
assert!(
completed >= 3,
"All 3 parallel agents should complete. Got {completed}. Events: {events:?}"
);
}
#[tokio::test]
async fn test_phase2_relationship_inference() {
let hive = {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.db");
Box::leak(Box::new(dir));
HiveMind::builder().substrate_path(&path).build().unwrap()
};
let cid = hive
.substrate()
.get_or_create_collective("phase2-rel")
.await
.unwrap();
let _id1 = hive
.record_experience(pulsedb::NewExperience {
collective_id: cid,
content: "Network timeouts are a major reliability issue in distributed systems."
.into(),
experience_type: pulsedb::ExperienceType::Difficulty {
description: "Network timeouts".into(),
severity: pulsedb::Severity::High,
},
embedding: None,
importance: 0.8,
confidence: 0.9,
domain: vec!["networking".into()],
source_agent: pulsedb::AgentId("agent-1".into()),
source_task: None,
related_files: vec![],
})
.await
.unwrap();
let id2 = hive
.record_experience(pulsedb::NewExperience {
collective_id: cid,
content: "Network timeout handling using exponential backoff with jitter resolves reliability issues.".into(),
experience_type: pulsedb::ExperienceType::Solution {
problem_ref: None,
approach: "exponential backoff".into(),
worked: true,
},
embedding: None,
importance: 0.9,
confidence: 0.95,
domain: vec!["networking".into()],
source_agent: pulsedb::AgentId("agent-1".into()),
source_task: None,
related_files: vec![],
})
.await
.unwrap();
let related = hive.substrate().get_related(id2).await.unwrap();
println!(
"Phase 2 relation test: {} relations found for experience {}",
related.len(),
id2
);
assert!(
hive.substrate()
.get_experience(id2)
.await
.unwrap()
.is_some(),
"Experience should be stored"
);
}
#[test]
fn test_phase2_context_optimizer_72h_decay() {
let optimizer = ContextOptimizer::with_defaults();
let now = pulsedb::Timestamp(1_700_000_000_000);
let age_ms = (72.0 * 3600.0 * 1000.0) as i64;
let exp = pulsedb::Experience {
id: pulsedb::ExperienceId::new(),
collective_id: pulsedb::CollectiveId::new(),
content: "Test experience".into(),
embedding: vec![],
experience_type: pulsedb::ExperienceType::Generic { category: None },
importance: 1.0,
confidence: 0.8,
applications: 0,
domain: vec![],
related_files: vec![],
source_agent: pulsedb::AgentId("test".into()),
source_task: None,
timestamp: pulsedb::Timestamp(now.0 - age_ms),
archived: false,
};
let decayed = optimizer.compute_decayed_importance(&exp, now);
assert!(
(decayed - 0.5).abs() < 0.01,
"72h decay should be ~0.5 (within 0.01). Got: {decayed}"
);
}