use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use futures::StreamExt;
use pulsehive_core::agent::{AgentDefinition, AgentKind, LlmAgentConfig};
use pulsehive_core::lens::Lens;
use pulsehive_core::llm::*;
use pulsehive_runtime::hivemind::{HiveMind, Task};
struct MockLlm {
call_count: AtomicUsize,
}
impl MockLlm {
fn new() -> Self {
Self {
call_count: AtomicUsize::new(0),
}
}
}
#[async_trait::async_trait]
impl LlmProvider for MockLlm {
async fn chat(
&self,
messages: Vec<Message>,
_tools: Vec<ToolDefinition>,
_config: &LlmConfig,
) -> pulsehive_core::error::Result<LlmResponse> {
let n = self.call_count.fetch_add(1, Ordering::Relaxed);
let system = messages
.iter()
.find_map(|m| match m {
Message::System { content } => Some(content.as_str()),
_ => None,
})
.unwrap_or("");
let response = if system.contains("research") {
format!("[Research findings #{n}] PulseHive uses a shared consciousness model where agents perceive each other's experiences through a persistent substrate.")
} else if system.contains("summariz") {
format!("[Summary #{n}] Key finding: shared consciousness enables implicit agent coordination without message passing.")
} else if system.contains("frontend") {
format!("[Frontend Review #{n}] Component architecture follows best practices.")
} else if system.contains("backend") {
format!("[Backend Review #{n}] API endpoints are well-structured with proper error handling.")
} else {
format!("[Response #{n}] Analysis complete.")
};
Ok(LlmResponse {
content: Some(response),
tool_calls: vec![],
usage: TokenUsage::default(),
})
}
async fn chat_stream(
&self,
_m: Vec<Message>,
_t: Vec<ToolDefinition>,
_c: &LlmConfig,
) -> pulsehive_core::error::Result<
Pin<Box<dyn futures_core::Stream<Item = pulsehive_core::error::Result<LlmChunk>> + Send>>,
> {
Err(pulsehive_core::error::PulseHiveError::llm(
"Streaming not supported in mock",
))
}
}
fn llm_agent(name: &str, prompt: &str, domains: &[&str]) -> AgentDefinition {
AgentDefinition {
name: name.into(),
kind: AgentKind::Llm(Box::new(LlmAgentConfig {
system_prompt: prompt.into(),
tools: vec![],
lens: Lens::new(domains.iter().copied()),
llm_config: LlmConfig::new("mock", "demo"),
experience_extractor: None,
refresh_every_n_tool_calls: None,
})),
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let dir = tempfile::tempdir()?;
let hive = HiveMind::builder()
.substrate_path(dir.path().join("multi_agent.db"))
.llm_provider("mock", MockLlm::new())
.build()?;
println!("=== Sequential Pipeline: Research → Summarize ===\n");
let pipeline = AgentDefinition {
name: "research-pipeline".into(),
kind: AgentKind::Sequential(vec![
llm_agent(
"researcher",
"You research topics thoroughly. Provide detailed findings.",
&["research"],
),
llm_agent(
"summarizer",
"You summarize research findings into bullet points.",
&["research", "summary"],
),
]),
};
let mut stream = hive
.deploy(
vec![pipeline],
vec![Task::new("Research PulseHive architecture")],
)
.await?;
let mut completed = 0usize;
let mut started = 0usize;
while let Some(event) = stream.next().await {
let data = format!("{event:?}");
if data.contains("AgentStarted") {
started += 1;
println!(" {data}");
} else if data.contains("AgentCompleted") {
completed += 1;
println!(" {data}");
if completed >= started {
break; }
}
}
println!("\n=== Parallel Team: Frontend + Backend Review ===\n");
let team = AgentDefinition {
name: "review-team".into(),
kind: AgentKind::Parallel(vec![
llm_agent(
"frontend-reviewer",
"You review frontend code for best practices.",
&["frontend", "ui"],
),
llm_agent(
"backend-reviewer",
"You review backend code for performance and security.",
&["backend", "security"],
),
]),
};
let mut stream = hive
.deploy(vec![team], vec![Task::new("Review the web application")])
.await?;
let mut completed = 0usize;
let mut started = 0usize;
while let Some(event) = stream.next().await {
let data = format!("{event:?}");
if data.contains("AgentStarted") {
started += 1;
println!(" {data}");
} else if data.contains("AgentCompleted") {
completed += 1;
println!(" {data}");
if completed >= started {
break; }
}
}
println!("\n=== Nested: Parallel Analysis → Summary ===\n");
let nested = AgentDefinition {
name: "full-review".into(),
kind: AgentKind::Sequential(vec![
AgentDefinition {
name: "parallel-analysis".into(),
kind: AgentKind::Parallel(vec![
llm_agent("analyst-a", "You research performance.", &["performance"]),
llm_agent("analyst-b", "You research security.", &["security"]),
]),
},
llm_agent(
"final-summary",
"You summarize all findings into an executive report.",
&["performance", "security", "summary"],
),
]),
};
let mut stream = hive
.deploy(vec![nested], vec![Task::new("Full system review")])
.await?;
let mut completed = 0usize;
let mut started = 0usize;
while let Some(event) = stream.next().await {
let data = format!("{event:?}");
if data.contains("AgentStarted") {
started += 1;
println!(" {data}");
} else if data.contains("AgentCompleted") {
completed += 1;
println!(" {data}");
if completed >= started {
break; }
}
}
hive.shutdown();
println!("\nDone! All workflows completed.");
std::process::exit(0);
}