1use std::collections::HashMap;
18use std::path::Path;
19use std::pin::Pin;
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::sync::Arc;
22
23use futures::stream;
24use futures::{Stream, StreamExt};
25use pulsedb::{
26 CollectiveId, Config, ExperienceId, NewExperience, PulseDB, PulseDBSubstrate, SubstrateProvider,
27};
28use tokio::sync::broadcast;
29
30use pulsehive_core::agent::AgentDefinition;
31use pulsehive_core::approval::{ApprovalHandler, AutoApprove};
32use pulsehive_core::embedding::EmbeddingProvider;
33use pulsehive_core::error::{PulseHiveError, Result};
34use pulsehive_core::event::{EventBus, HiveEvent};
35use pulsehive_core::export::EventExporter;
36use pulsehive_core::llm::LlmProvider;
37
38use crate::intelligence::insight::InsightSynthesizer;
39use crate::intelligence::relationship::RelationshipDetector;
40use crate::workflow::{self, WorkflowContext};
41
42#[derive(Debug, Clone)]
44pub struct Task {
45 pub description: String,
47 pub collective_id: CollectiveId,
49}
50
51impl Task {
52 pub fn new(description: impl Into<String>) -> Self {
54 Self {
55 description: description.into(),
56 collective_id: CollectiveId::new(),
57 }
58 }
59
60 pub fn with_collective(description: impl Into<String>, collective_id: CollectiveId) -> Self {
62 Self {
63 description: description.into(),
64 collective_id,
65 }
66 }
67}
68
69pub struct HiveMind {
74 pub(crate) substrate: Arc<dyn SubstrateProvider>,
75 pub(crate) llm_providers: HashMap<String, Arc<dyn LlmProvider>>,
76 pub(crate) approval_handler: Arc<dyn ApprovalHandler>,
77 pub(crate) event_bus: EventBus,
78 pub(crate) relationship_detector: Option<RelationshipDetector>,
79 pub(crate) insight_synthesizer: Option<InsightSynthesizer>,
80 pub(crate) embedding_provider: Option<Arc<dyn EmbeddingProvider>>,
83 shutdown: Arc<AtomicBool>,
85 watch_handle: std::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
87}
88
89impl std::fmt::Debug for HiveMind {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 f.debug_struct("HiveMind")
92 .field(
93 "llm_providers",
94 &self.llm_providers.keys().collect::<Vec<_>>(),
95 )
96 .finish_non_exhaustive()
97 }
98}
99
100impl HiveMind {
101 pub fn builder() -> HiveMindBuilder {
103 HiveMindBuilder::new()
104 }
105
106 pub fn substrate(&self) -> &dyn SubstrateProvider {
108 self.substrate.as_ref()
109 }
110
111 pub async fn deploy(
121 &self,
122 agents: Vec<AgentDefinition>,
123 tasks: Vec<Task>,
124 ) -> Result<Pin<Box<dyn Stream<Item = HiveEvent> + Send>>> {
125 if agents.is_empty() {
126 return Ok(Box::pin(stream::empty()));
127 }
128
129 let mut task = tasks.into_iter().next().unwrap_or_else(|| Task::new(""));
131
132 let collective_name = format!("collective-{}", task.collective_id);
134 let collective_id = self
135 .substrate
136 .get_or_create_collective(&collective_name)
137 .await?;
138 task.collective_id = collective_id;
139
140 let watch_substrate = Arc::clone(&self.substrate);
144 let watch_emitter = self.event_bus.clone();
145 let watch_shutdown = Arc::clone(&self.shutdown);
146 let watch_handle = tokio::spawn(async move {
147 match watch_substrate.watch(collective_id).await {
148 Ok(mut watch_stream) => {
149 while !watch_shutdown.load(Ordering::Relaxed) {
150 match watch_stream.next().await {
151 Some(event) => {
152 watch_emitter.emit(HiveEvent::WatchNotification {
153 timestamp_ms: pulsehive_core::event::now_ms(),
154 experience_id: event.experience_id,
155 collective_id: event.collective_id,
156 event_type: format!("{:?}", event.event_type),
157 });
158 }
159 None => break,
160 }
161 }
162 }
163 Err(e) => {
164 tracing::warn!(error = %e, "Failed to subscribe to Watch system");
165 }
166 }
167 });
168 *self.watch_handle.lock().unwrap() = Some(watch_handle);
169
170 let rx = self.event_bus.subscribe();
171
172 for agent in agents {
173 self.spawn_agent(agent, task.clone());
174 }
175
176 Ok(Box::pin(BroadcastStream::new(rx)))
178 }
179
180 pub async fn record_experience(&self, experience: NewExperience) -> Result<ExperienceId> {
186 let agent_id = experience.source_agent.0.clone();
187 let collective_id = experience.collective_id;
188
189 let mut experience = experience;
191 if let Some(provider) = &self.embedding_provider {
192 if experience.embedding.is_none() {
193 match provider.embed(&experience.content).await {
194 Ok(embedding) => {
195 experience.embedding = Some(embedding);
196 }
197 Err(e) => {
198 tracing::warn!(error = %e, "Failed to compute embedding in record_experience, storing without");
199 }
200 }
201 }
202 }
203
204 let content_preview: String = experience.content.chars().take(200).collect();
206 let experience_type_str = format!("{:?}", experience.experience_type);
207 let importance = experience.importance;
208
209 let id = self.substrate.store_experience(experience).await?;
210 self.event_bus.emit(HiveEvent::ExperienceRecorded {
211 timestamp_ms: pulsehive_core::event::now_ms(),
212 experience_id: id,
213 agent_id: agent_id.clone(),
214 content_preview,
215 experience_type: experience_type_str,
216 importance,
217 });
218
219 if let Some(detector) = &self.relationship_detector {
221 if let Ok(Some(stored)) = self.substrate.get_experience(id).await {
222 let relations = detector
223 .infer_relations(&stored, self.substrate.as_ref())
224 .await;
225
226 for rel in relations {
227 match self.substrate.store_relation(rel).await {
228 Ok(relation_id) => {
229 self.event_bus.emit(HiveEvent::RelationshipInferred {
230 timestamp_ms: pulsehive_core::event::now_ms(),
231 relation_id,
232 agent_id: agent_id.clone(),
233 });
234 }
235 Err(e) => {
236 tracing::warn!(error = %e, "Failed to store inferred relation");
237 }
238 }
239 }
240 }
241 }
242
243 if let Some(synthesizer) = &self.insight_synthesizer {
245 if !synthesizer.is_debounced(collective_id) {
246 let cluster = synthesizer.find_cluster(id, self.substrate.as_ref()).await;
247
248 if synthesizer.should_synthesize(cluster.len()) {
249 if let Some((provider_name, provider)) = self.llm_providers.iter().next() {
251 let llm_config =
252 pulsehive_core::llm::LlmConfig::new(provider_name, "default");
253 if let Some(insight) = synthesizer
254 .synthesize_cluster(
255 &cluster,
256 collective_id,
257 provider.as_ref(),
258 &llm_config,
259 )
260 .await
261 {
262 let source_count = insight.source_experience_ids.len();
263 match self.substrate.store_insight(insight).await {
264 Ok(insight_id) => {
265 synthesizer.mark_synthesized(collective_id);
266 self.event_bus.emit(HiveEvent::InsightGenerated {
267 timestamp_ms: pulsehive_core::event::now_ms(),
268 insight_id,
269 source_count,
270 agent_id: agent_id.clone(),
271 });
272 }
273 Err(e) => {
274 tracing::warn!(error = %e, "Failed to store synthesized insight");
275 }
276 }
277 }
278 }
279 }
280 }
281 }
282
283 Ok(id)
284 }
285
286 pub fn shutdown(&self) {
291 self.shutdown.store(true, Ordering::Relaxed);
292 if let Some(handle) = self.watch_handle.lock().unwrap().take() {
295 handle.abort();
296 }
297 tracing::info!("HiveMind shutdown signaled");
298 }
299
300 pub fn is_shutdown(&self) -> bool {
302 self.shutdown.load(Ordering::Relaxed)
303 }
304
305 pub async fn redeploy(&self, agents: Vec<AgentDefinition>, task: Task) -> Result<()> {
312 if agents.is_empty() {
313 return Ok(());
314 }
315
316 let mut task = task;
318 let collective_name = format!("collective-{}", task.collective_id);
319 let collective_id = self
320 .substrate
321 .get_or_create_collective(&collective_name)
322 .await?;
323 task.collective_id = collective_id;
324
325 for agent in agents {
326 self.spawn_agent(agent, task.clone());
327 }
328
329 Ok(())
330 }
331
332 fn spawn_agent(&self, agent: AgentDefinition, task: Task) {
337 let ctx = WorkflowContext {
338 task,
339 llm_providers: self.llm_providers.clone(),
340 substrate: Arc::clone(&self.substrate),
341 approval_handler: Arc::clone(&self.approval_handler),
342 event_emitter: self.event_bus.clone(),
343 embedding_provider: self.embedding_provider.clone(),
344 };
345
346 tokio::spawn(async move {
347 workflow::dispatch_agent(agent, &ctx).await;
348 });
349 }
350}
351
352impl Drop for HiveMind {
353 fn drop(&mut self) {
354 self.shutdown.store(true, Ordering::Relaxed);
355 if let Some(handle) = self.watch_handle.get_mut().unwrap().take() {
356 handle.abort();
357 }
358 }
359}
360
361struct BroadcastStream {
363 rx: broadcast::Receiver<HiveEvent>,
364}
365
366impl BroadcastStream {
367 fn new(rx: broadcast::Receiver<HiveEvent>) -> Self {
368 Self { rx }
369 }
370}
371
372impl Stream for BroadcastStream {
373 type Item = HiveEvent;
374
375 fn poll_next(
376 mut self: Pin<&mut Self>,
377 cx: &mut std::task::Context<'_>,
378 ) -> std::task::Poll<Option<Self::Item>> {
379 match self.rx.try_recv() {
380 Ok(event) => std::task::Poll::Ready(Some(event)),
381 Err(broadcast::error::TryRecvError::Empty) => {
382 cx.waker().wake_by_ref();
384 std::task::Poll::Pending
385 }
386 Err(broadcast::error::TryRecvError::Lagged(n)) => {
387 tracing::warn!(lagged = n, "Event stream lagged, some events dropped");
388 cx.waker().wake_by_ref();
389 std::task::Poll::Pending
390 }
391 Err(broadcast::error::TryRecvError::Closed) => std::task::Poll::Ready(None),
392 }
393 }
394}
395
396pub struct HiveMindBuilder {
398 substrate: Option<Box<dyn SubstrateProvider>>,
399 substrate_path: Option<String>,
400 llm_providers: HashMap<String, Arc<dyn LlmProvider>>,
401 approval_handler: Option<Box<dyn ApprovalHandler>>,
402 relationship_detector: Option<Option<RelationshipDetector>>,
403 insight_synthesizer: Option<Option<InsightSynthesizer>>,
404 embedding_provider: Option<Arc<dyn EmbeddingProvider>>,
405 event_exporter: Option<Arc<dyn EventExporter>>,
406}
407
408impl HiveMindBuilder {
409 fn new() -> Self {
410 Self {
411 substrate: None,
412 substrate_path: None,
413 llm_providers: HashMap::new(),
414 approval_handler: None,
415 relationship_detector: None,
416 insight_synthesizer: None,
417 embedding_provider: None,
418 event_exporter: None,
419 }
420 }
421
422 pub fn substrate_path(mut self, path: impl AsRef<Path>) -> Self {
424 self.substrate_path = Some(path.as_ref().to_string_lossy().into_owned());
425 self
426 }
427
428 pub fn substrate(mut self, provider: Box<dyn SubstrateProvider>) -> Self {
430 self.substrate = Some(provider);
431 self
432 }
433
434 pub fn llm_provider(
436 mut self,
437 name: impl Into<String>,
438 provider: impl LlmProvider + 'static,
439 ) -> Self {
440 self.llm_providers.insert(name.into(), Arc::new(provider));
441 self
442 }
443
444 pub fn approval_handler(mut self, handler: impl ApprovalHandler + 'static) -> Self {
446 self.approval_handler = Some(Box::new(handler));
447 self
448 }
449
450 pub fn relationship_detector(mut self, detector: RelationshipDetector) -> Self {
452 self.relationship_detector = Some(Some(detector));
453 self
454 }
455
456 pub fn no_relationship_detector(mut self) -> Self {
458 self.relationship_detector = Some(None);
459 self
460 }
461
462 pub fn insight_synthesizer(mut self, synthesizer: InsightSynthesizer) -> Self {
464 self.insight_synthesizer = Some(Some(synthesizer));
465 self
466 }
467
468 pub fn no_insight_synthesizer(mut self) -> Self {
470 self.insight_synthesizer = Some(None);
471 self
472 }
473
474 pub fn embedding_provider(mut self, provider: impl EmbeddingProvider + 'static) -> Self {
480 self.embedding_provider = Some(Arc::new(provider));
481 self
482 }
483
484 pub fn event_exporter(mut self, exporter: impl EventExporter + 'static) -> Self {
491 self.event_exporter = Some(Arc::new(exporter));
492 self
493 }
494
495 pub fn build(self) -> Result<HiveMind> {
497 let substrate: Arc<dyn SubstrateProvider> = if let Some(s) = self.substrate {
498 Arc::from(s)
499 } else if let Some(path) = self.substrate_path {
500 let config = if self.embedding_provider.is_some() {
501 Config::new()
503 } else {
504 Config::with_builtin_embeddings()
506 };
507 let db = PulseDB::open(&path, config)?;
508 Arc::new(PulseDBSubstrate::from_db(db))
509 } else {
510 return Err(PulseHiveError::config(
511 "Substrate not configured. Call substrate_path() or substrate() on the builder.",
512 ));
513 };
514
515 let approval: Arc<dyn ApprovalHandler> = match self.approval_handler {
516 Some(h) => Arc::from(h),
517 None => Arc::new(AutoApprove),
518 };
519
520 let relationship_detector = match self.relationship_detector {
522 Some(explicit) => explicit,
523 None => Some(RelationshipDetector::with_defaults()),
524 };
525
526 let insight_synthesizer = match self.insight_synthesizer {
528 Some(explicit) => explicit,
529 None => Some(InsightSynthesizer::with_defaults()),
530 };
531
532 let event_bus = match self.event_exporter {
533 Some(exporter) => EventBus::with_exporter(256, exporter),
534 None => EventBus::default(),
535 };
536
537 Ok(HiveMind {
538 substrate,
539 llm_providers: self.llm_providers,
540 approval_handler: approval,
541 event_bus,
542 relationship_detector,
543 insight_synthesizer,
544 embedding_provider: self.embedding_provider,
545 shutdown: Arc::new(AtomicBool::new(false)),
546 watch_handle: std::sync::Mutex::new(None),
547 })
548 }
549}
550
551#[cfg(test)]
552mod tests {
553 use super::*;
554 use futures::StreamExt;
555
556 #[test]
557 fn test_build_fails_without_substrate() {
558 let result = HiveMind::builder().build();
559 assert!(result.is_err());
560 assert!(result
561 .unwrap_err()
562 .to_string()
563 .contains("Substrate not configured"));
564 }
565
566 #[test]
567 fn test_build_with_substrate_path() {
568 let dir = tempfile::tempdir().unwrap();
569 let path = dir.path().join("test.db");
570 assert!(HiveMind::builder().substrate_path(&path).build().is_ok());
571 }
572
573 #[tokio::test]
574 async fn test_deploy_empty_agents_returns_empty_stream() {
575 let dir = tempfile::tempdir().unwrap();
576 let path = dir.path().join("test.db");
577 let hive = HiveMind::builder().substrate_path(&path).build().unwrap();
578
579 let mut stream = hive.deploy(vec![], vec![]).await.unwrap();
580 assert!(stream.next().await.is_none());
581 }
582
583 #[test]
584 fn test_task_new() {
585 let task = Task::new("Analyze the codebase");
586 assert_eq!(task.description, "Analyze the codebase");
587 }
588
589 #[test]
590 fn test_task_with_collective() {
591 let cid = CollectiveId::new();
592 let task = Task::with_collective("Search for bugs", cid);
593 assert_eq!(task.collective_id, cid);
594 }
595
596 async fn build_hive_with_collective() -> (HiveMind, CollectiveId) {
598 let dir = tempfile::tempdir().unwrap();
599 let path = dir.path().join("test.db");
600 let dir = Box::leak(Box::new(dir));
602 let _ = dir;
603 let hive = HiveMind::builder().substrate_path(&path).build().unwrap();
604 let cid = hive
606 .substrate
607 .get_or_create_collective("test")
608 .await
609 .unwrap();
610 (hive, cid)
611 }
612
613 #[tokio::test]
614 async fn test_record_experience_stores_and_emits_event() {
615 let (hive, cid) = build_hive_with_collective().await;
616 let mut rx = hive.event_bus.subscribe();
617
618 let dummy_embedding: Vec<f32> = (0..384).map(|i| (i as f32 * 0.01).sin()).collect();
619 let exp = pulsedb::NewExperience {
620 collective_id: cid,
621 content: "Learned that Rust's ownership model prevents data races.".into(),
622 experience_type: pulsedb::ExperienceType::Generic {
623 category: Some("rust".into()),
624 },
625 embedding: Some(dummy_embedding),
626 importance: 0.8,
627 confidence: 0.9,
628 domain: vec!["rust".into(), "concurrency".into()],
629 source_agent: pulsedb::AgentId("test-agent".into()),
630 source_task: None,
631 related_files: vec![],
632 };
633
634 let id = hive.record_experience(exp).await.unwrap();
635
636 let event = rx.try_recv().unwrap();
638 match event {
639 HiveEvent::ExperienceRecorded {
640 experience_id,
641 agent_id,
642 ..
643 } => {
644 assert_eq!(experience_id, id);
645 assert_eq!(agent_id, "test-agent");
646 }
647 other => panic!("Expected ExperienceRecorded, got: {other:?}"),
648 }
649 }
650
651 #[tokio::test]
652 async fn test_record_experience_retrievable() {
653 let (hive, cid) = build_hive_with_collective().await;
654
655 let dummy_embedding: Vec<f32> = (0..384).map(|i| (i as f32 * 0.01).sin()).collect();
658 let exp = pulsedb::NewExperience {
659 collective_id: cid,
660 content: "Test experience for retrieval.".into(),
661 experience_type: pulsedb::ExperienceType::Generic { category: None },
662 embedding: Some(dummy_embedding),
663 importance: 0.5,
664 confidence: 0.5,
665 domain: vec![],
666 source_agent: pulsedb::AgentId("agent-1".into()),
667 source_task: None,
668 related_files: vec![],
669 };
670
671 let id = hive.record_experience(exp).await.unwrap();
672
673 let retrieved = hive.substrate.get_experience(id).await.unwrap();
675 assert!(retrieved.is_some());
676 let retrieved = retrieved.unwrap();
677 assert_eq!(retrieved.content, "Test experience for retrieval.");
678 }
679
680 #[test]
683 fn test_shutdown_sets_flag() {
684 let dir = tempfile::tempdir().unwrap();
685 let path = dir.path().join("test.db");
686 let hive = HiveMind::builder().substrate_path(&path).build().unwrap();
687
688 assert!(!hive.is_shutdown());
689 hive.shutdown();
690 assert!(hive.is_shutdown());
691 }
692
693 #[test]
694 fn test_drop_sets_shutdown_flag() {
695 let dir = tempfile::tempdir().unwrap();
696 let path = dir.path().join("test.db");
697 let hive = HiveMind::builder().substrate_path(&path).build().unwrap();
698 let shutdown = Arc::clone(&hive.shutdown);
699
700 assert!(!shutdown.load(Ordering::Relaxed));
701 drop(hive);
702 assert!(shutdown.load(Ordering::Relaxed));
703 }
704
705 #[tokio::test]
706 async fn test_redeploy_empty_is_noop() {
707 let (hive, _cid) = build_hive_with_collective().await;
708 let task = Task::new("test");
709 assert!(hive.redeploy(vec![], task).await.is_ok());
710 }
711}