1use std::collections::HashMap;
18use std::path::Path;
19use std::pin::Pin;
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::sync::Arc;
22
23use futures::stream;
24use futures::{Stream, StreamExt};
25use pulsedb::{
26 CollectiveId, Config, ExperienceId, NewExperience, PulseDB, PulseDBSubstrate, SubstrateProvider,
27};
28use tokio::sync::broadcast;
29
30use pulsehive_core::agent::AgentDefinition;
31use pulsehive_core::approval::{ApprovalHandler, AutoApprove};
32use pulsehive_core::embedding::EmbeddingProvider;
33use pulsehive_core::error::{PulseHiveError, Result};
34use pulsehive_core::event::{EventBus, HiveEvent};
35use pulsehive_core::llm::LlmProvider;
36
37use crate::intelligence::insight::InsightSynthesizer;
38use crate::intelligence::relationship::RelationshipDetector;
39use crate::workflow::{self, WorkflowContext};
40
41#[derive(Debug, Clone)]
43pub struct Task {
44 pub description: String,
46 pub collective_id: CollectiveId,
48}
49
50impl Task {
51 pub fn new(description: impl Into<String>) -> Self {
53 Self {
54 description: description.into(),
55 collective_id: CollectiveId::new(),
56 }
57 }
58
59 pub fn with_collective(description: impl Into<String>, collective_id: CollectiveId) -> Self {
61 Self {
62 description: description.into(),
63 collective_id,
64 }
65 }
66}
67
68pub struct HiveMind {
73 pub(crate) substrate: Arc<dyn SubstrateProvider>,
74 pub(crate) llm_providers: HashMap<String, Arc<dyn LlmProvider>>,
75 pub(crate) approval_handler: Arc<dyn ApprovalHandler>,
76 pub(crate) event_bus: EventBus,
77 pub(crate) relationship_detector: Option<RelationshipDetector>,
78 pub(crate) insight_synthesizer: Option<InsightSynthesizer>,
79 pub(crate) embedding_provider: Option<Arc<dyn EmbeddingProvider>>,
82 shutdown: Arc<AtomicBool>,
84 watch_handle: std::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
86}
87
88impl std::fmt::Debug for HiveMind {
89 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90 f.debug_struct("HiveMind")
91 .field(
92 "llm_providers",
93 &self.llm_providers.keys().collect::<Vec<_>>(),
94 )
95 .finish_non_exhaustive()
96 }
97}
98
99impl HiveMind {
100 pub fn builder() -> HiveMindBuilder {
102 HiveMindBuilder::new()
103 }
104
105 pub fn substrate(&self) -> &dyn SubstrateProvider {
107 self.substrate.as_ref()
108 }
109
110 pub async fn deploy(
120 &self,
121 agents: Vec<AgentDefinition>,
122 tasks: Vec<Task>,
123 ) -> Result<Pin<Box<dyn Stream<Item = HiveEvent> + Send>>> {
124 if agents.is_empty() {
125 return Ok(Box::pin(stream::empty()));
126 }
127
128 let mut task = tasks.into_iter().next().unwrap_or_else(|| Task::new(""));
130
131 let collective_name = format!("collective-{}", task.collective_id);
133 let collective_id = self
134 .substrate
135 .get_or_create_collective(&collective_name)
136 .await?;
137 task.collective_id = collective_id;
138
139 let watch_substrate = Arc::clone(&self.substrate);
143 let watch_emitter = self.event_bus.clone();
144 let watch_shutdown = Arc::clone(&self.shutdown);
145 let watch_handle = tokio::spawn(async move {
146 match watch_substrate.watch(collective_id).await {
147 Ok(mut watch_stream) => {
148 while !watch_shutdown.load(Ordering::Relaxed) {
149 match watch_stream.next().await {
150 Some(event) => {
151 watch_emitter.emit(HiveEvent::WatchNotification {
152 timestamp_ms: pulsehive_core::event::now_ms(),
153 experience_id: event.experience_id,
154 collective_id: event.collective_id,
155 event_type: format!("{:?}", event.event_type),
156 });
157 }
158 None => break,
159 }
160 }
161 }
162 Err(e) => {
163 tracing::warn!(error = %e, "Failed to subscribe to Watch system");
164 }
165 }
166 });
167 *self.watch_handle.lock().unwrap() = Some(watch_handle);
168
169 let rx = self.event_bus.subscribe();
170
171 for agent in agents {
172 self.spawn_agent(agent, task.clone());
173 }
174
175 Ok(Box::pin(BroadcastStream::new(rx)))
177 }
178
179 pub async fn record_experience(&self, experience: NewExperience) -> Result<ExperienceId> {
185 let agent_id = experience.source_agent.0.clone();
186 let collective_id = experience.collective_id;
187
188 let mut experience = experience;
190 if let Some(provider) = &self.embedding_provider {
191 if experience.embedding.is_none() {
192 match provider.embed(&experience.content).await {
193 Ok(embedding) => {
194 experience.embedding = Some(embedding);
195 }
196 Err(e) => {
197 tracing::warn!(error = %e, "Failed to compute embedding in record_experience, storing without");
198 }
199 }
200 }
201 }
202
203 let content_preview: String = experience.content.chars().take(200).collect();
205 let experience_type_str = format!("{:?}", experience.experience_type);
206 let importance = experience.importance;
207
208 let id = self.substrate.store_experience(experience).await?;
209 self.event_bus.emit(HiveEvent::ExperienceRecorded {
210 timestamp_ms: pulsehive_core::event::now_ms(),
211 experience_id: id,
212 agent_id: agent_id.clone(),
213 content_preview,
214 experience_type: experience_type_str,
215 importance,
216 });
217
218 if let Some(detector) = &self.relationship_detector {
220 if let Ok(Some(stored)) = self.substrate.get_experience(id).await {
221 let relations = detector
222 .infer_relations(&stored, self.substrate.as_ref())
223 .await;
224
225 for rel in relations {
226 match self.substrate.store_relation(rel).await {
227 Ok(relation_id) => {
228 self.event_bus.emit(HiveEvent::RelationshipInferred {
229 timestamp_ms: pulsehive_core::event::now_ms(),
230 relation_id,
231 agent_id: agent_id.clone(),
232 });
233 }
234 Err(e) => {
235 tracing::warn!(error = %e, "Failed to store inferred relation");
236 }
237 }
238 }
239 }
240 }
241
242 if let Some(synthesizer) = &self.insight_synthesizer {
244 if !synthesizer.is_debounced(collective_id) {
245 let cluster = synthesizer.find_cluster(id, self.substrate.as_ref()).await;
246
247 if synthesizer.should_synthesize(cluster.len()) {
248 if let Some((provider_name, provider)) = self.llm_providers.iter().next() {
250 let llm_config =
251 pulsehive_core::llm::LlmConfig::new(provider_name, "default");
252 if let Some(insight) = synthesizer
253 .synthesize_cluster(
254 &cluster,
255 collective_id,
256 provider.as_ref(),
257 &llm_config,
258 )
259 .await
260 {
261 let source_count = insight.source_experience_ids.len();
262 match self.substrate.store_insight(insight).await {
263 Ok(insight_id) => {
264 synthesizer.mark_synthesized(collective_id);
265 self.event_bus.emit(HiveEvent::InsightGenerated {
266 timestamp_ms: pulsehive_core::event::now_ms(),
267 insight_id,
268 source_count,
269 agent_id: agent_id.clone(),
270 });
271 }
272 Err(e) => {
273 tracing::warn!(error = %e, "Failed to store synthesized insight");
274 }
275 }
276 }
277 }
278 }
279 }
280 }
281
282 Ok(id)
283 }
284
285 pub fn shutdown(&self) {
290 self.shutdown.store(true, Ordering::Relaxed);
291 if let Some(handle) = self.watch_handle.lock().unwrap().take() {
294 handle.abort();
295 }
296 tracing::info!("HiveMind shutdown signaled");
297 }
298
299 pub fn is_shutdown(&self) -> bool {
301 self.shutdown.load(Ordering::Relaxed)
302 }
303
304 pub async fn redeploy(&self, agents: Vec<AgentDefinition>, task: Task) -> Result<()> {
311 if agents.is_empty() {
312 return Ok(());
313 }
314
315 let mut task = task;
317 let collective_name = format!("collective-{}", task.collective_id);
318 let collective_id = self
319 .substrate
320 .get_or_create_collective(&collective_name)
321 .await?;
322 task.collective_id = collective_id;
323
324 for agent in agents {
325 self.spawn_agent(agent, task.clone());
326 }
327
328 Ok(())
329 }
330
331 fn spawn_agent(&self, agent: AgentDefinition, task: Task) {
336 let ctx = WorkflowContext {
337 task,
338 llm_providers: self.llm_providers.clone(),
339 substrate: Arc::clone(&self.substrate),
340 approval_handler: Arc::clone(&self.approval_handler),
341 event_emitter: self.event_bus.clone(),
342 embedding_provider: self.embedding_provider.clone(),
343 };
344
345 tokio::spawn(async move {
346 workflow::dispatch_agent(agent, &ctx).await;
347 });
348 }
349}
350
351impl Drop for HiveMind {
352 fn drop(&mut self) {
353 self.shutdown.store(true, Ordering::Relaxed);
354 if let Some(handle) = self.watch_handle.get_mut().unwrap().take() {
355 handle.abort();
356 }
357 }
358}
359
360struct BroadcastStream {
362 rx: broadcast::Receiver<HiveEvent>,
363}
364
365impl BroadcastStream {
366 fn new(rx: broadcast::Receiver<HiveEvent>) -> Self {
367 Self { rx }
368 }
369}
370
371impl Stream for BroadcastStream {
372 type Item = HiveEvent;
373
374 fn poll_next(
375 mut self: Pin<&mut Self>,
376 cx: &mut std::task::Context<'_>,
377 ) -> std::task::Poll<Option<Self::Item>> {
378 match self.rx.try_recv() {
379 Ok(event) => std::task::Poll::Ready(Some(event)),
380 Err(broadcast::error::TryRecvError::Empty) => {
381 cx.waker().wake_by_ref();
383 std::task::Poll::Pending
384 }
385 Err(broadcast::error::TryRecvError::Lagged(n)) => {
386 tracing::warn!(lagged = n, "Event stream lagged, some events dropped");
387 cx.waker().wake_by_ref();
388 std::task::Poll::Pending
389 }
390 Err(broadcast::error::TryRecvError::Closed) => std::task::Poll::Ready(None),
391 }
392 }
393}
394
395pub struct HiveMindBuilder {
397 substrate: Option<Box<dyn SubstrateProvider>>,
398 substrate_path: Option<String>,
399 llm_providers: HashMap<String, Arc<dyn LlmProvider>>,
400 approval_handler: Option<Box<dyn ApprovalHandler>>,
401 relationship_detector: Option<Option<RelationshipDetector>>,
402 insight_synthesizer: Option<Option<InsightSynthesizer>>,
403 embedding_provider: Option<Arc<dyn EmbeddingProvider>>,
404}
405
406impl HiveMindBuilder {
407 fn new() -> Self {
408 Self {
409 substrate: None,
410 substrate_path: None,
411 llm_providers: HashMap::new(),
412 approval_handler: None,
413 relationship_detector: None,
414 insight_synthesizer: None,
415 embedding_provider: None,
416 }
417 }
418
419 pub fn substrate_path(mut self, path: impl AsRef<Path>) -> Self {
421 self.substrate_path = Some(path.as_ref().to_string_lossy().into_owned());
422 self
423 }
424
425 pub fn substrate(mut self, provider: Box<dyn SubstrateProvider>) -> Self {
427 self.substrate = Some(provider);
428 self
429 }
430
431 pub fn llm_provider(
433 mut self,
434 name: impl Into<String>,
435 provider: impl LlmProvider + 'static,
436 ) -> Self {
437 self.llm_providers.insert(name.into(), Arc::new(provider));
438 self
439 }
440
441 pub fn approval_handler(mut self, handler: impl ApprovalHandler + 'static) -> Self {
443 self.approval_handler = Some(Box::new(handler));
444 self
445 }
446
447 pub fn relationship_detector(mut self, detector: RelationshipDetector) -> Self {
449 self.relationship_detector = Some(Some(detector));
450 self
451 }
452
453 pub fn no_relationship_detector(mut self) -> Self {
455 self.relationship_detector = Some(None);
456 self
457 }
458
459 pub fn insight_synthesizer(mut self, synthesizer: InsightSynthesizer) -> Self {
461 self.insight_synthesizer = Some(Some(synthesizer));
462 self
463 }
464
465 pub fn no_insight_synthesizer(mut self) -> Self {
467 self.insight_synthesizer = Some(None);
468 self
469 }
470
471 pub fn embedding_provider(mut self, provider: impl EmbeddingProvider + 'static) -> Self {
477 self.embedding_provider = Some(Arc::new(provider));
478 self
479 }
480
481 pub fn build(self) -> Result<HiveMind> {
483 let substrate: Arc<dyn SubstrateProvider> = if let Some(s) = self.substrate {
484 Arc::from(s)
485 } else if let Some(path) = self.substrate_path {
486 let config = if self.embedding_provider.is_some() {
487 Config::new()
489 } else {
490 Config::with_builtin_embeddings()
492 };
493 let db = PulseDB::open(&path, config)?;
494 Arc::new(PulseDBSubstrate::from_db(db))
495 } else {
496 return Err(PulseHiveError::config(
497 "Substrate not configured. Call substrate_path() or substrate() on the builder.",
498 ));
499 };
500
501 let approval: Arc<dyn ApprovalHandler> = match self.approval_handler {
502 Some(h) => Arc::from(h),
503 None => Arc::new(AutoApprove),
504 };
505
506 let relationship_detector = match self.relationship_detector {
508 Some(explicit) => explicit,
509 None => Some(RelationshipDetector::with_defaults()),
510 };
511
512 let insight_synthesizer = match self.insight_synthesizer {
514 Some(explicit) => explicit,
515 None => Some(InsightSynthesizer::with_defaults()),
516 };
517
518 Ok(HiveMind {
519 substrate,
520 llm_providers: self.llm_providers,
521 approval_handler: approval,
522 event_bus: EventBus::default(),
523 relationship_detector,
524 insight_synthesizer,
525 embedding_provider: self.embedding_provider,
526 shutdown: Arc::new(AtomicBool::new(false)),
527 watch_handle: std::sync::Mutex::new(None),
528 })
529 }
530}
531
532#[cfg(test)]
533mod tests {
534 use super::*;
535 use futures::StreamExt;
536
537 #[test]
538 fn test_build_fails_without_substrate() {
539 let result = HiveMind::builder().build();
540 assert!(result.is_err());
541 assert!(result
542 .unwrap_err()
543 .to_string()
544 .contains("Substrate not configured"));
545 }
546
547 #[test]
548 fn test_build_with_substrate_path() {
549 let dir = tempfile::tempdir().unwrap();
550 let path = dir.path().join("test.db");
551 assert!(HiveMind::builder().substrate_path(&path).build().is_ok());
552 }
553
554 #[tokio::test]
555 async fn test_deploy_empty_agents_returns_empty_stream() {
556 let dir = tempfile::tempdir().unwrap();
557 let path = dir.path().join("test.db");
558 let hive = HiveMind::builder().substrate_path(&path).build().unwrap();
559
560 let mut stream = hive.deploy(vec![], vec![]).await.unwrap();
561 assert!(stream.next().await.is_none());
562 }
563
564 #[test]
565 fn test_task_new() {
566 let task = Task::new("Analyze the codebase");
567 assert_eq!(task.description, "Analyze the codebase");
568 }
569
570 #[test]
571 fn test_task_with_collective() {
572 let cid = CollectiveId::new();
573 let task = Task::with_collective("Search for bugs", cid);
574 assert_eq!(task.collective_id, cid);
575 }
576
577 async fn build_hive_with_collective() -> (HiveMind, CollectiveId) {
579 let dir = tempfile::tempdir().unwrap();
580 let path = dir.path().join("test.db");
581 let dir = Box::leak(Box::new(dir));
583 let _ = dir;
584 let hive = HiveMind::builder().substrate_path(&path).build().unwrap();
585 let cid = hive
587 .substrate
588 .get_or_create_collective("test")
589 .await
590 .unwrap();
591 (hive, cid)
592 }
593
594 #[tokio::test]
595 async fn test_record_experience_stores_and_emits_event() {
596 let (hive, cid) = build_hive_with_collective().await;
597 let mut rx = hive.event_bus.subscribe();
598
599 let dummy_embedding: Vec<f32> = (0..384).map(|i| (i as f32 * 0.01).sin()).collect();
600 let exp = pulsedb::NewExperience {
601 collective_id: cid,
602 content: "Learned that Rust's ownership model prevents data races.".into(),
603 experience_type: pulsedb::ExperienceType::Generic {
604 category: Some("rust".into()),
605 },
606 embedding: Some(dummy_embedding),
607 importance: 0.8,
608 confidence: 0.9,
609 domain: vec!["rust".into(), "concurrency".into()],
610 source_agent: pulsedb::AgentId("test-agent".into()),
611 source_task: None,
612 related_files: vec![],
613 };
614
615 let id = hive.record_experience(exp).await.unwrap();
616
617 let event = rx.try_recv().unwrap();
619 match event {
620 HiveEvent::ExperienceRecorded {
621 experience_id,
622 agent_id,
623 ..
624 } => {
625 assert_eq!(experience_id, id);
626 assert_eq!(agent_id, "test-agent");
627 }
628 other => panic!("Expected ExperienceRecorded, got: {other:?}"),
629 }
630 }
631
632 #[tokio::test]
633 async fn test_record_experience_retrievable() {
634 let (hive, cid) = build_hive_with_collective().await;
635
636 let dummy_embedding: Vec<f32> = (0..384).map(|i| (i as f32 * 0.01).sin()).collect();
639 let exp = pulsedb::NewExperience {
640 collective_id: cid,
641 content: "Test experience for retrieval.".into(),
642 experience_type: pulsedb::ExperienceType::Generic { category: None },
643 embedding: Some(dummy_embedding),
644 importance: 0.5,
645 confidence: 0.5,
646 domain: vec![],
647 source_agent: pulsedb::AgentId("agent-1".into()),
648 source_task: None,
649 related_files: vec![],
650 };
651
652 let id = hive.record_experience(exp).await.unwrap();
653
654 let retrieved = hive.substrate.get_experience(id).await.unwrap();
656 assert!(retrieved.is_some());
657 let retrieved = retrieved.unwrap();
658 assert_eq!(retrieved.content, "Test experience for retrieval.");
659 }
660
661 #[test]
664 fn test_shutdown_sets_flag() {
665 let dir = tempfile::tempdir().unwrap();
666 let path = dir.path().join("test.db");
667 let hive = HiveMind::builder().substrate_path(&path).build().unwrap();
668
669 assert!(!hive.is_shutdown());
670 hive.shutdown();
671 assert!(hive.is_shutdown());
672 }
673
674 #[test]
675 fn test_drop_sets_shutdown_flag() {
676 let dir = tempfile::tempdir().unwrap();
677 let path = dir.path().join("test.db");
678 let hive = HiveMind::builder().substrate_path(&path).build().unwrap();
679 let shutdown = Arc::clone(&hive.shutdown);
680
681 assert!(!shutdown.load(Ordering::Relaxed));
682 drop(hive);
683 assert!(shutdown.load(Ordering::Relaxed));
684 }
685
686 #[tokio::test]
687 async fn test_redeploy_empty_is_noop() {
688 let (hive, _cid) = build_hive_with_collective().await;
689 let task = Task::new("test");
690 assert!(hive.redeploy(vec![], task).await.is_ok());
691 }
692}