1use std::collections::HashMap;
18use std::path::Path;
19use std::pin::Pin;
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::sync::Arc;
22
23use futures::stream;
24use futures::{Stream, StreamExt};
25use pulsedb::{
26 CollectiveId, Config, ExperienceId, NewExperience, PulseDB, PulseDBSubstrate, SubstrateProvider,
27};
28use tokio::sync::broadcast;
29
30use pulsehive_core::agent::AgentDefinition;
31use pulsehive_core::approval::{ApprovalHandler, AutoApprove};
32use pulsehive_core::embedding::EmbeddingProvider;
33use pulsehive_core::error::{PulseHiveError, Result};
34use pulsehive_core::event::{EventBus, HiveEvent};
35use pulsehive_core::llm::LlmProvider;
36
37use crate::intelligence::insight::InsightSynthesizer;
38use crate::intelligence::relationship::RelationshipDetector;
39use crate::workflow::{self, WorkflowContext};
40
41#[derive(Debug, Clone)]
43pub struct Task {
44 pub description: String,
46 pub collective_id: CollectiveId,
48}
49
50impl Task {
51 pub fn new(description: impl Into<String>) -> Self {
53 Self {
54 description: description.into(),
55 collective_id: CollectiveId::new(),
56 }
57 }
58
59 pub fn with_collective(description: impl Into<String>, collective_id: CollectiveId) -> Self {
61 Self {
62 description: description.into(),
63 collective_id,
64 }
65 }
66}
67
68pub struct HiveMind {
73 pub(crate) substrate: Arc<dyn SubstrateProvider>,
74 pub(crate) llm_providers: HashMap<String, Arc<dyn LlmProvider>>,
75 pub(crate) approval_handler: Arc<dyn ApprovalHandler>,
76 pub(crate) event_bus: EventBus,
77 pub(crate) relationship_detector: Option<RelationshipDetector>,
78 pub(crate) insight_synthesizer: Option<InsightSynthesizer>,
79 pub(crate) embedding_provider: Option<Arc<dyn EmbeddingProvider>>,
82 shutdown: Arc<AtomicBool>,
84 watch_handle: std::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
86}
87
88impl std::fmt::Debug for HiveMind {
89 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90 f.debug_struct("HiveMind")
91 .field(
92 "llm_providers",
93 &self.llm_providers.keys().collect::<Vec<_>>(),
94 )
95 .finish_non_exhaustive()
96 }
97}
98
99impl HiveMind {
100 pub fn builder() -> HiveMindBuilder {
102 HiveMindBuilder::new()
103 }
104
105 pub fn substrate(&self) -> &dyn SubstrateProvider {
107 self.substrate.as_ref()
108 }
109
110 pub async fn deploy(
120 &self,
121 agents: Vec<AgentDefinition>,
122 tasks: Vec<Task>,
123 ) -> Result<Pin<Box<dyn Stream<Item = HiveEvent> + Send>>> {
124 if agents.is_empty() {
125 return Ok(Box::pin(stream::empty()));
126 }
127
128 let mut task = tasks.into_iter().next().unwrap_or_else(|| Task::new(""));
130
131 let collective_name = format!("collective-{}", task.collective_id);
133 let collective_id = self
134 .substrate
135 .get_or_create_collective(&collective_name)
136 .await?;
137 task.collective_id = collective_id;
138
139 let watch_substrate = Arc::clone(&self.substrate);
143 let watch_emitter = self.event_bus.clone();
144 let watch_shutdown = Arc::clone(&self.shutdown);
145 let watch_handle = tokio::spawn(async move {
146 match watch_substrate.watch(collective_id).await {
147 Ok(mut watch_stream) => {
148 while !watch_shutdown.load(Ordering::Relaxed) {
149 match watch_stream.next().await {
150 Some(event) => {
151 watch_emitter.emit(HiveEvent::WatchNotification {
152 experience_id: event.experience_id,
153 collective_id: event.collective_id,
154 event_type: format!("{:?}", event.event_type),
155 });
156 }
157 None => break,
158 }
159 }
160 }
161 Err(e) => {
162 tracing::warn!(error = %e, "Failed to subscribe to Watch system");
163 }
164 }
165 });
166 *self.watch_handle.lock().unwrap() = Some(watch_handle);
167
168 let rx = self.event_bus.subscribe();
169
170 for agent in agents {
171 self.spawn_agent(agent, task.clone());
172 }
173
174 Ok(Box::pin(BroadcastStream::new(rx)))
176 }
177
178 pub async fn record_experience(&self, experience: NewExperience) -> Result<ExperienceId> {
184 let agent_id = experience.source_agent.0.clone();
185 let collective_id = experience.collective_id;
186
187 let mut experience = experience;
189 if let Some(provider) = &self.embedding_provider {
190 if experience.embedding.is_none() {
191 match provider.embed(&experience.content).await {
192 Ok(embedding) => {
193 experience.embedding = Some(embedding);
194 }
195 Err(e) => {
196 tracing::warn!(error = %e, "Failed to compute embedding in record_experience, storing without");
197 }
198 }
199 }
200 }
201
202 let id = self.substrate.store_experience(experience).await?;
203 self.event_bus.emit(HiveEvent::ExperienceRecorded {
204 experience_id: id,
205 agent_id,
206 });
207
208 if let Some(detector) = &self.relationship_detector {
210 if let Ok(Some(stored)) = self.substrate.get_experience(id).await {
211 let relations = detector
212 .infer_relations(&stored, self.substrate.as_ref())
213 .await;
214
215 for rel in relations {
216 match self.substrate.store_relation(rel).await {
217 Ok(relation_id) => {
218 self.event_bus
219 .emit(HiveEvent::RelationshipInferred { relation_id });
220 }
221 Err(e) => {
222 tracing::warn!(error = %e, "Failed to store inferred relation");
223 }
224 }
225 }
226 }
227 }
228
229 if let Some(synthesizer) = &self.insight_synthesizer {
231 if !synthesizer.is_debounced(collective_id) {
232 let cluster = synthesizer.find_cluster(id, self.substrate.as_ref()).await;
233
234 if synthesizer.should_synthesize(cluster.len()) {
235 if let Some((provider_name, provider)) = self.llm_providers.iter().next() {
237 let llm_config =
238 pulsehive_core::llm::LlmConfig::new(provider_name, "default");
239 if let Some(insight) = synthesizer
240 .synthesize_cluster(
241 &cluster,
242 collective_id,
243 provider.as_ref(),
244 &llm_config,
245 )
246 .await
247 {
248 let source_count = insight.source_experience_ids.len();
249 match self.substrate.store_insight(insight).await {
250 Ok(insight_id) => {
251 synthesizer.mark_synthesized(collective_id);
252 self.event_bus.emit(HiveEvent::InsightGenerated {
253 insight_id,
254 source_count,
255 });
256 }
257 Err(e) => {
258 tracing::warn!(error = %e, "Failed to store synthesized insight");
259 }
260 }
261 }
262 }
263 }
264 }
265 }
266
267 Ok(id)
268 }
269
270 pub fn shutdown(&self) {
275 self.shutdown.store(true, Ordering::Relaxed);
276 if let Some(handle) = self.watch_handle.lock().unwrap().take() {
279 handle.abort();
280 }
281 tracing::info!("HiveMind shutdown signaled");
282 }
283
284 pub fn is_shutdown(&self) -> bool {
286 self.shutdown.load(Ordering::Relaxed)
287 }
288
289 pub async fn redeploy(&self, agents: Vec<AgentDefinition>, task: Task) -> Result<()> {
296 if agents.is_empty() {
297 return Ok(());
298 }
299
300 let mut task = task;
302 let collective_name = format!("collective-{}", task.collective_id);
303 let collective_id = self
304 .substrate
305 .get_or_create_collective(&collective_name)
306 .await?;
307 task.collective_id = collective_id;
308
309 for agent in agents {
310 self.spawn_agent(agent, task.clone());
311 }
312
313 Ok(())
314 }
315
316 fn spawn_agent(&self, agent: AgentDefinition, task: Task) {
321 let ctx = WorkflowContext {
322 task,
323 llm_providers: self.llm_providers.clone(),
324 substrate: Arc::clone(&self.substrate),
325 approval_handler: Arc::clone(&self.approval_handler),
326 event_emitter: self.event_bus.clone(),
327 embedding_provider: self.embedding_provider.clone(),
328 };
329
330 tokio::spawn(async move {
331 workflow::dispatch_agent(agent, &ctx).await;
332 });
333 }
334}
335
336impl Drop for HiveMind {
337 fn drop(&mut self) {
338 self.shutdown.store(true, Ordering::Relaxed);
339 if let Some(handle) = self.watch_handle.get_mut().unwrap().take() {
340 handle.abort();
341 }
342 }
343}
344
345struct BroadcastStream {
347 rx: broadcast::Receiver<HiveEvent>,
348}
349
350impl BroadcastStream {
351 fn new(rx: broadcast::Receiver<HiveEvent>) -> Self {
352 Self { rx }
353 }
354}
355
356impl Stream for BroadcastStream {
357 type Item = HiveEvent;
358
359 fn poll_next(
360 mut self: Pin<&mut Self>,
361 cx: &mut std::task::Context<'_>,
362 ) -> std::task::Poll<Option<Self::Item>> {
363 match self.rx.try_recv() {
364 Ok(event) => std::task::Poll::Ready(Some(event)),
365 Err(broadcast::error::TryRecvError::Empty) => {
366 cx.waker().wake_by_ref();
368 std::task::Poll::Pending
369 }
370 Err(broadcast::error::TryRecvError::Lagged(n)) => {
371 tracing::warn!(lagged = n, "Event stream lagged, some events dropped");
372 cx.waker().wake_by_ref();
373 std::task::Poll::Pending
374 }
375 Err(broadcast::error::TryRecvError::Closed) => std::task::Poll::Ready(None),
376 }
377 }
378}
379
380pub struct HiveMindBuilder {
382 substrate: Option<Box<dyn SubstrateProvider>>,
383 substrate_path: Option<String>,
384 llm_providers: HashMap<String, Arc<dyn LlmProvider>>,
385 approval_handler: Option<Box<dyn ApprovalHandler>>,
386 relationship_detector: Option<Option<RelationshipDetector>>,
387 insight_synthesizer: Option<Option<InsightSynthesizer>>,
388 embedding_provider: Option<Arc<dyn EmbeddingProvider>>,
389}
390
391impl HiveMindBuilder {
392 fn new() -> Self {
393 Self {
394 substrate: None,
395 substrate_path: None,
396 llm_providers: HashMap::new(),
397 approval_handler: None,
398 relationship_detector: None,
399 insight_synthesizer: None,
400 embedding_provider: None,
401 }
402 }
403
404 pub fn substrate_path(mut self, path: impl AsRef<Path>) -> Self {
406 self.substrate_path = Some(path.as_ref().to_string_lossy().into_owned());
407 self
408 }
409
410 pub fn substrate(mut self, provider: Box<dyn SubstrateProvider>) -> Self {
412 self.substrate = Some(provider);
413 self
414 }
415
416 pub fn llm_provider(
418 mut self,
419 name: impl Into<String>,
420 provider: impl LlmProvider + 'static,
421 ) -> Self {
422 self.llm_providers.insert(name.into(), Arc::new(provider));
423 self
424 }
425
426 pub fn approval_handler(mut self, handler: impl ApprovalHandler + 'static) -> Self {
428 self.approval_handler = Some(Box::new(handler));
429 self
430 }
431
432 pub fn relationship_detector(mut self, detector: RelationshipDetector) -> Self {
434 self.relationship_detector = Some(Some(detector));
435 self
436 }
437
438 pub fn no_relationship_detector(mut self) -> Self {
440 self.relationship_detector = Some(None);
441 self
442 }
443
444 pub fn insight_synthesizer(mut self, synthesizer: InsightSynthesizer) -> Self {
446 self.insight_synthesizer = Some(Some(synthesizer));
447 self
448 }
449
450 pub fn no_insight_synthesizer(mut self) -> Self {
452 self.insight_synthesizer = Some(None);
453 self
454 }
455
456 pub fn embedding_provider(mut self, provider: impl EmbeddingProvider + 'static) -> Self {
462 self.embedding_provider = Some(Arc::new(provider));
463 self
464 }
465
466 pub fn build(self) -> Result<HiveMind> {
468 let substrate: Arc<dyn SubstrateProvider> = if let Some(s) = self.substrate {
469 Arc::from(s)
470 } else if let Some(path) = self.substrate_path {
471 let config = if self.embedding_provider.is_some() {
472 Config::new()
474 } else {
475 Config::with_builtin_embeddings()
477 };
478 let db = PulseDB::open(&path, config)?;
479 Arc::new(PulseDBSubstrate::from_db(db))
480 } else {
481 return Err(PulseHiveError::config(
482 "Substrate not configured. Call substrate_path() or substrate() on the builder.",
483 ));
484 };
485
486 let approval: Arc<dyn ApprovalHandler> = match self.approval_handler {
487 Some(h) => Arc::from(h),
488 None => Arc::new(AutoApprove),
489 };
490
491 let relationship_detector = match self.relationship_detector {
493 Some(explicit) => explicit,
494 None => Some(RelationshipDetector::with_defaults()),
495 };
496
497 let insight_synthesizer = match self.insight_synthesizer {
499 Some(explicit) => explicit,
500 None => Some(InsightSynthesizer::with_defaults()),
501 };
502
503 Ok(HiveMind {
504 substrate,
505 llm_providers: self.llm_providers,
506 approval_handler: approval,
507 event_bus: EventBus::default(),
508 relationship_detector,
509 insight_synthesizer,
510 embedding_provider: self.embedding_provider,
511 shutdown: Arc::new(AtomicBool::new(false)),
512 watch_handle: std::sync::Mutex::new(None),
513 })
514 }
515}
516
517#[cfg(test)]
518mod tests {
519 use super::*;
520 use futures::StreamExt;
521
522 #[test]
523 fn test_build_fails_without_substrate() {
524 let result = HiveMind::builder().build();
525 assert!(result.is_err());
526 assert!(result
527 .unwrap_err()
528 .to_string()
529 .contains("Substrate not configured"));
530 }
531
532 #[test]
533 fn test_build_with_substrate_path() {
534 let dir = tempfile::tempdir().unwrap();
535 let path = dir.path().join("test.db");
536 assert!(HiveMind::builder().substrate_path(&path).build().is_ok());
537 }
538
539 #[tokio::test]
540 async fn test_deploy_empty_agents_returns_empty_stream() {
541 let dir = tempfile::tempdir().unwrap();
542 let path = dir.path().join("test.db");
543 let hive = HiveMind::builder().substrate_path(&path).build().unwrap();
544
545 let mut stream = hive.deploy(vec![], vec![]).await.unwrap();
546 assert!(stream.next().await.is_none());
547 }
548
549 #[test]
550 fn test_task_new() {
551 let task = Task::new("Analyze the codebase");
552 assert_eq!(task.description, "Analyze the codebase");
553 }
554
555 #[test]
556 fn test_task_with_collective() {
557 let cid = CollectiveId::new();
558 let task = Task::with_collective("Search for bugs", cid);
559 assert_eq!(task.collective_id, cid);
560 }
561
562 async fn build_hive_with_collective() -> (HiveMind, CollectiveId) {
564 let dir = tempfile::tempdir().unwrap();
565 let path = dir.path().join("test.db");
566 let dir = Box::leak(Box::new(dir));
568 let _ = dir;
569 let hive = HiveMind::builder().substrate_path(&path).build().unwrap();
570 let cid = hive
572 .substrate
573 .get_or_create_collective("test")
574 .await
575 .unwrap();
576 (hive, cid)
577 }
578
579 #[tokio::test]
580 async fn test_record_experience_stores_and_emits_event() {
581 let (hive, cid) = build_hive_with_collective().await;
582 let mut rx = hive.event_bus.subscribe();
583
584 let dummy_embedding: Vec<f32> = (0..384).map(|i| (i as f32 * 0.01).sin()).collect();
585 let exp = pulsedb::NewExperience {
586 collective_id: cid,
587 content: "Learned that Rust's ownership model prevents data races.".into(),
588 experience_type: pulsedb::ExperienceType::Generic {
589 category: Some("rust".into()),
590 },
591 embedding: Some(dummy_embedding),
592 importance: 0.8,
593 confidence: 0.9,
594 domain: vec!["rust".into(), "concurrency".into()],
595 source_agent: pulsedb::AgentId("test-agent".into()),
596 source_task: None,
597 related_files: vec![],
598 };
599
600 let id = hive.record_experience(exp).await.unwrap();
601
602 let event = rx.try_recv().unwrap();
604 match event {
605 HiveEvent::ExperienceRecorded {
606 experience_id,
607 agent_id,
608 } => {
609 assert_eq!(experience_id, id);
610 assert_eq!(agent_id, "test-agent");
611 }
612 other => panic!("Expected ExperienceRecorded, got: {other:?}"),
613 }
614 }
615
616 #[tokio::test]
617 async fn test_record_experience_retrievable() {
618 let (hive, cid) = build_hive_with_collective().await;
619
620 let dummy_embedding: Vec<f32> = (0..384).map(|i| (i as f32 * 0.01).sin()).collect();
623 let exp = pulsedb::NewExperience {
624 collective_id: cid,
625 content: "Test experience for retrieval.".into(),
626 experience_type: pulsedb::ExperienceType::Generic { category: None },
627 embedding: Some(dummy_embedding),
628 importance: 0.5,
629 confidence: 0.5,
630 domain: vec![],
631 source_agent: pulsedb::AgentId("agent-1".into()),
632 source_task: None,
633 related_files: vec![],
634 };
635
636 let id = hive.record_experience(exp).await.unwrap();
637
638 let retrieved = hive.substrate.get_experience(id).await.unwrap();
640 assert!(retrieved.is_some());
641 let retrieved = retrieved.unwrap();
642 assert_eq!(retrieved.content, "Test experience for retrieval.");
643 }
644
645 #[test]
648 fn test_shutdown_sets_flag() {
649 let dir = tempfile::tempdir().unwrap();
650 let path = dir.path().join("test.db");
651 let hive = HiveMind::builder().substrate_path(&path).build().unwrap();
652
653 assert!(!hive.is_shutdown());
654 hive.shutdown();
655 assert!(hive.is_shutdown());
656 }
657
658 #[test]
659 fn test_drop_sets_shutdown_flag() {
660 let dir = tempfile::tempdir().unwrap();
661 let path = dir.path().join("test.db");
662 let hive = HiveMind::builder().substrate_path(&path).build().unwrap();
663 let shutdown = Arc::clone(&hive.shutdown);
664
665 assert!(!shutdown.load(Ordering::Relaxed));
666 drop(hive);
667 assert!(shutdown.load(Ordering::Relaxed));
668 }
669
670 #[tokio::test]
671 async fn test_redeploy_empty_is_noop() {
672 let (hive, _cid) = build_hive_with_collective().await;
673 let task = Task::new("test");
674 assert!(hive.redeploy(vec![], task).await.is_ok());
675 }
676}