Skip to main content

pulsehive_runtime/
hivemind.rs

1//! HiveMind orchestrator and builder.
2//!
3//! [`HiveMind`] is the central entry point of PulseHive. It owns the substrate,
4//! LLM providers, approval handler, and event bus. Products construct it via
5//! the builder pattern and deploy agents through it.
6//!
7//! # Example
8//! ```rust,ignore
9//! let hive = HiveMind::builder()
10//!     .substrate_path("/tmp/my_project.db")
11//!     .llm_provider("openai", my_openai_provider)
12//!     .build()?;
13//!
14//! let events = hive.deploy(agents, tasks).await?;
15//! ```
16
17use std::collections::HashMap;
18use std::path::Path;
19use std::pin::Pin;
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::sync::Arc;
22
23use futures::stream;
24use futures::{Stream, StreamExt};
25use pulsedb::{
26    CollectiveId, Config, ExperienceId, NewExperience, PulseDB, PulseDBSubstrate, SubstrateProvider,
27};
28use tokio::sync::broadcast;
29
30use pulsehive_core::agent::AgentDefinition;
31use pulsehive_core::approval::{ApprovalHandler, AutoApprove};
32use pulsehive_core::embedding::EmbeddingProvider;
33use pulsehive_core::error::{PulseHiveError, Result};
34use pulsehive_core::event::{EventBus, HiveEvent};
35use pulsehive_core::llm::LlmProvider;
36
37use crate::intelligence::insight::InsightSynthesizer;
38use crate::intelligence::relationship::RelationshipDetector;
39use crate::workflow::{self, WorkflowContext};
40
41/// A task to be executed by deployed agents.
42#[derive(Debug, Clone)]
43pub struct Task {
44    /// Human-readable description of what to accomplish.
45    pub description: String,
46    /// Collective (namespace) this task operates within.
47    pub collective_id: CollectiveId,
48}
49
50impl Task {
51    /// Creates a task with a new collective ID.
52    pub fn new(description: impl Into<String>) -> Self {
53        Self {
54            description: description.into(),
55            collective_id: CollectiveId::new(),
56        }
57    }
58
59    /// Creates a task within an existing collective.
60    pub fn with_collective(description: impl Into<String>, collective_id: CollectiveId) -> Self {
61        Self {
62            description: description.into(),
63            collective_id,
64        }
65    }
66}
67
68/// The central orchestrator of PulseHive.
69///
70/// Owns the substrate, LLM providers, approval handler, and event bus.
71/// Constructed exclusively via [`HiveMind::builder()`].
72pub struct HiveMind {
73    pub(crate) substrate: Arc<dyn SubstrateProvider>,
74    pub(crate) llm_providers: HashMap<String, Arc<dyn LlmProvider>>,
75    pub(crate) approval_handler: Arc<dyn ApprovalHandler>,
76    pub(crate) event_bus: EventBus,
77    pub(crate) relationship_detector: Option<RelationshipDetector>,
78    pub(crate) insight_synthesizer: Option<InsightSynthesizer>,
79    /// Optional embedding provider for domain-specific models.
80    /// When set, embeddings are computed via this provider before PulseDB storage.
81    pub(crate) embedding_provider: Option<Arc<dyn EmbeddingProvider>>,
82    /// Shutdown signal for background tasks (Watch system).
83    shutdown: Arc<AtomicBool>,
84    /// Handle to the Watch background task for graceful cancellation.
85    watch_handle: std::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
86}
87
88impl std::fmt::Debug for HiveMind {
89    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90        f.debug_struct("HiveMind")
91            .field(
92                "llm_providers",
93                &self.llm_providers.keys().collect::<Vec<_>>(),
94            )
95            .finish_non_exhaustive()
96    }
97}
98
99impl HiveMind {
100    /// Creates a new builder for constructing a HiveMind.
101    pub fn builder() -> HiveMindBuilder {
102        HiveMindBuilder::new()
103    }
104
105    /// Access the substrate provider for direct operations.
106    pub fn substrate(&self) -> &dyn SubstrateProvider {
107        self.substrate.as_ref()
108    }
109
110    /// Deploy agents to execute tasks. Returns a stream of events.
111    ///
112    /// Each agent is spawned as a Tokio task and dispatched via
113    /// the workflow module's `dispatch_agent()` which handles all agent kinds
114    /// (LLM, Sequential, Parallel, Loop).
115    ///
116    /// Automatically subscribes to the PulseDB Watch system for the collective,
117    /// forwarding substrate change events as [`HiveEvent::WatchNotification`].
118    /// If Watch subscription fails, agents still execute normally (graceful degradation).
119    pub async fn deploy(
120        &self,
121        agents: Vec<AgentDefinition>,
122        tasks: Vec<Task>,
123    ) -> Result<Pin<Box<dyn Stream<Item = HiveEvent> + Send>>> {
124        if agents.is_empty() {
125            return Ok(Box::pin(stream::empty()));
126        }
127
128        // Get the first task (or create a default one)
129        let mut task = tasks.into_iter().next().unwrap_or_else(|| Task::new(""));
130
131        // Ensure the collective exists in the substrate
132        let collective_name = format!("collective-{}", task.collective_id);
133        let collective_id = self
134            .substrate
135            .get_or_create_collective(&collective_name)
136            .await?;
137        task.collective_id = collective_id;
138
139        // Subscribe to Watch system for real-time substrate change notifications.
140        // Runs as a background task — failure to subscribe doesn't block deployment.
141        // Respects the shutdown flag for graceful termination.
142        let watch_substrate = Arc::clone(&self.substrate);
143        let watch_emitter = self.event_bus.clone();
144        let watch_shutdown = Arc::clone(&self.shutdown);
145        let watch_handle = tokio::spawn(async move {
146            match watch_substrate.watch(collective_id).await {
147                Ok(mut watch_stream) => {
148                    while !watch_shutdown.load(Ordering::Relaxed) {
149                        match watch_stream.next().await {
150                            Some(event) => {
151                                watch_emitter.emit(HiveEvent::WatchNotification {
152                                    experience_id: event.experience_id,
153                                    collective_id: event.collective_id,
154                                    event_type: format!("{:?}", event.event_type),
155                                });
156                            }
157                            None => break,
158                        }
159                    }
160                }
161                Err(e) => {
162                    tracing::warn!(error = %e, "Failed to subscribe to Watch system");
163                }
164            }
165        });
166        *self.watch_handle.lock().unwrap() = Some(watch_handle);
167
168        let rx = self.event_bus.subscribe();
169
170        for agent in agents {
171            self.spawn_agent(agent, task.clone());
172        }
173
174        // Convert broadcast::Receiver into a Stream
175        Ok(Box::pin(BroadcastStream::new(rx)))
176    }
177
178    /// Record an experience in the substrate.
179    ///
180    /// Stores the experience via PulseDB, emits an `ExperienceRecorded` event,
181    /// runs the RelationshipDetector to infer relations, and triggers the
182    /// InsightSynthesizer if a cluster exceeds the density threshold.
183    pub async fn record_experience(&self, experience: NewExperience) -> Result<ExperienceId> {
184        let agent_id = experience.source_agent.0.clone();
185        let collective_id = experience.collective_id;
186
187        // Compute embedding via provider if available and not already set
188        let mut experience = experience;
189        if let Some(provider) = &self.embedding_provider {
190            if experience.embedding.is_none() {
191                match provider.embed(&experience.content).await {
192                    Ok(embedding) => {
193                        experience.embedding = Some(embedding);
194                    }
195                    Err(e) => {
196                        tracing::warn!(error = %e, "Failed to compute embedding in record_experience, storing without");
197                    }
198                }
199            }
200        }
201
202        let id = self.substrate.store_experience(experience).await?;
203        self.event_bus.emit(HiveEvent::ExperienceRecorded {
204            experience_id: id,
205            agent_id,
206        });
207
208        // Run relationship inference if detector is configured
209        if let Some(detector) = &self.relationship_detector {
210            if let Ok(Some(stored)) = self.substrate.get_experience(id).await {
211                let relations = detector
212                    .infer_relations(&stored, self.substrate.as_ref())
213                    .await;
214
215                for rel in relations {
216                    match self.substrate.store_relation(rel).await {
217                        Ok(relation_id) => {
218                            self.event_bus
219                                .emit(HiveEvent::RelationshipInferred { relation_id });
220                        }
221                        Err(e) => {
222                            tracing::warn!(error = %e, "Failed to store inferred relation");
223                        }
224                    }
225                }
226            }
227        }
228
229        // Run insight synthesis if synthesizer is configured
230        if let Some(synthesizer) = &self.insight_synthesizer {
231            if !synthesizer.is_debounced(collective_id) {
232                let cluster = synthesizer.find_cluster(id, self.substrate.as_ref()).await;
233
234                if synthesizer.should_synthesize(cluster.len()) {
235                    // Use the first available LLM provider for synthesis
236                    if let Some((provider_name, provider)) = self.llm_providers.iter().next() {
237                        let llm_config =
238                            pulsehive_core::llm::LlmConfig::new(provider_name, "default");
239                        if let Some(insight) = synthesizer
240                            .synthesize_cluster(
241                                &cluster,
242                                collective_id,
243                                provider.as_ref(),
244                                &llm_config,
245                            )
246                            .await
247                        {
248                            let source_count = insight.source_experience_ids.len();
249                            match self.substrate.store_insight(insight).await {
250                                Ok(insight_id) => {
251                                    synthesizer.mark_synthesized(collective_id);
252                                    self.event_bus.emit(HiveEvent::InsightGenerated {
253                                        insight_id,
254                                        source_count,
255                                    });
256                                }
257                                Err(e) => {
258                                    tracing::warn!(error = %e, "Failed to store synthesized insight");
259                                }
260                            }
261                        }
262                    }
263                }
264            }
265        }
266
267        Ok(id)
268    }
269
270    /// Signal shutdown to all background tasks (Watch system).
271    ///
272    /// Sets the shutdown flag, causing the Watch background task to stop
273    /// after processing its current event. This is non-blocking.
274    pub fn shutdown(&self) {
275        self.shutdown.store(true, Ordering::Relaxed);
276        // Abort the Watch background task so it drops its EventBus sender clone,
277        // allowing the broadcast channel to close and BroadcastStream to terminate.
278        if let Some(handle) = self.watch_handle.lock().unwrap().take() {
279            handle.abort();
280        }
281        tracing::info!("HiveMind shutdown signaled");
282    }
283
284    /// Returns true if shutdown has been signaled.
285    pub fn is_shutdown(&self) -> bool {
286        self.shutdown.load(Ordering::Relaxed)
287    }
288
289    /// Redeploy agents on the existing substrate and event bus.
290    ///
291    /// Use this to restart failed agents. Products typically call this when
292    /// they observe `AgentCompleted { outcome: Error { .. } }` on the event stream.
293    ///
294    /// The collective is created/resolved from the task, same as in [`HiveMind::deploy()`].
295    pub async fn redeploy(&self, agents: Vec<AgentDefinition>, task: Task) -> Result<()> {
296        if agents.is_empty() {
297            return Ok(());
298        }
299
300        // Ensure the collective exists
301        let mut task = task;
302        let collective_name = format!("collective-{}", task.collective_id);
303        let collective_id = self
304            .substrate
305            .get_or_create_collective(&collective_name)
306            .await?;
307        task.collective_id = collective_id;
308
309        for agent in agents {
310            self.spawn_agent(agent, task.clone());
311        }
312
313        Ok(())
314    }
315
316    /// Spawn a single agent as a Tokio task.
317    ///
318    /// Builds a [`WorkflowContext`] from HiveMind's fields and delegates
319    /// to [`workflow::dispatch_agent()`] which handles all agent kinds.
320    fn spawn_agent(&self, agent: AgentDefinition, task: Task) {
321        let ctx = WorkflowContext {
322            task,
323            llm_providers: self.llm_providers.clone(),
324            substrate: Arc::clone(&self.substrate),
325            approval_handler: Arc::clone(&self.approval_handler),
326            event_emitter: self.event_bus.clone(),
327            embedding_provider: self.embedding_provider.clone(),
328        };
329
330        tokio::spawn(async move {
331            workflow::dispatch_agent(agent, &ctx).await;
332        });
333    }
334}
335
336impl Drop for HiveMind {
337    fn drop(&mut self) {
338        self.shutdown.store(true, Ordering::Relaxed);
339        if let Some(handle) = self.watch_handle.get_mut().unwrap().take() {
340            handle.abort();
341        }
342    }
343}
344
345/// Adapter that converts a `broadcast::Receiver<HiveEvent>` into a `Stream`.
346struct BroadcastStream {
347    rx: broadcast::Receiver<HiveEvent>,
348}
349
350impl BroadcastStream {
351    fn new(rx: broadcast::Receiver<HiveEvent>) -> Self {
352        Self { rx }
353    }
354}
355
356impl Stream for BroadcastStream {
357    type Item = HiveEvent;
358
359    fn poll_next(
360        mut self: Pin<&mut Self>,
361        cx: &mut std::task::Context<'_>,
362    ) -> std::task::Poll<Option<Self::Item>> {
363        match self.rx.try_recv() {
364            Ok(event) => std::task::Poll::Ready(Some(event)),
365            Err(broadcast::error::TryRecvError::Empty) => {
366                // No events yet — register waker and return Pending
367                cx.waker().wake_by_ref();
368                std::task::Poll::Pending
369            }
370            Err(broadcast::error::TryRecvError::Lagged(n)) => {
371                tracing::warn!(lagged = n, "Event stream lagged, some events dropped");
372                cx.waker().wake_by_ref();
373                std::task::Poll::Pending
374            }
375            Err(broadcast::error::TryRecvError::Closed) => std::task::Poll::Ready(None),
376        }
377    }
378}
379
380/// Builder for constructing a [`HiveMind`] with validated configuration.
381pub struct HiveMindBuilder {
382    substrate: Option<Box<dyn SubstrateProvider>>,
383    substrate_path: Option<String>,
384    llm_providers: HashMap<String, Arc<dyn LlmProvider>>,
385    approval_handler: Option<Box<dyn ApprovalHandler>>,
386    relationship_detector: Option<Option<RelationshipDetector>>,
387    insight_synthesizer: Option<Option<InsightSynthesizer>>,
388    embedding_provider: Option<Arc<dyn EmbeddingProvider>>,
389}
390
391impl HiveMindBuilder {
392    fn new() -> Self {
393        Self {
394            substrate: None,
395            substrate_path: None,
396            llm_providers: HashMap::new(),
397            approval_handler: None,
398            relationship_detector: None,
399            insight_synthesizer: None,
400            embedding_provider: None,
401        }
402    }
403
404    /// Set substrate via file path.
405    pub fn substrate_path(mut self, path: impl AsRef<Path>) -> Self {
406        self.substrate_path = Some(path.as_ref().to_string_lossy().into_owned());
407        self
408    }
409
410    /// Set a custom substrate provider (e.g., for testing with mocks).
411    pub fn substrate(mut self, provider: Box<dyn SubstrateProvider>) -> Self {
412        self.substrate = Some(provider);
413        self
414    }
415
416    /// Register a named LLM provider.
417    pub fn llm_provider(
418        mut self,
419        name: impl Into<String>,
420        provider: impl LlmProvider + 'static,
421    ) -> Self {
422        self.llm_providers.insert(name.into(), Arc::new(provider));
423        self
424    }
425
426    /// Set a custom approval handler. Defaults to [`AutoApprove`] if not set.
427    pub fn approval_handler(mut self, handler: impl ApprovalHandler + 'static) -> Self {
428        self.approval_handler = Some(Box::new(handler));
429        self
430    }
431
432    /// Set a custom relationship detector. Default: enabled with default thresholds.
433    pub fn relationship_detector(mut self, detector: RelationshipDetector) -> Self {
434        self.relationship_detector = Some(Some(detector));
435        self
436    }
437
438    /// Disable automatic relationship detection.
439    pub fn no_relationship_detector(mut self) -> Self {
440        self.relationship_detector = Some(None);
441        self
442    }
443
444    /// Set a custom insight synthesizer. Default: enabled with default thresholds.
445    pub fn insight_synthesizer(mut self, synthesizer: InsightSynthesizer) -> Self {
446        self.insight_synthesizer = Some(Some(synthesizer));
447        self
448    }
449
450    /// Disable automatic insight synthesis.
451    pub fn no_insight_synthesizer(mut self) -> Self {
452        self.insight_synthesizer = Some(None);
453        self
454    }
455
456    /// Set a custom embedding provider for domain-specific models.
457    ///
458    /// When set, PulseHive computes embeddings via this provider before storing
459    /// experiences in PulseDB (External mode). When not set, PulseDB uses its
460    /// built-in all-MiniLM-L6-v2 model (384d).
461    pub fn embedding_provider(mut self, provider: impl EmbeddingProvider + 'static) -> Self {
462        self.embedding_provider = Some(Arc::new(provider));
463        self
464    }
465
466    /// Build the HiveMind. Validates that a substrate is configured.
467    pub fn build(self) -> Result<HiveMind> {
468        let substrate: Arc<dyn SubstrateProvider> = if let Some(s) = self.substrate {
469            Arc::from(s)
470        } else if let Some(path) = self.substrate_path {
471            let config = if self.embedding_provider.is_some() {
472                // External mode: PulseHive computes embeddings via the provider
473                Config::new()
474            } else {
475                // Builtin mode: PulseDB computes embeddings internally
476                Config::with_builtin_embeddings()
477            };
478            let db = PulseDB::open(&path, config)?;
479            Arc::new(PulseDBSubstrate::from_db(db))
480        } else {
481            return Err(PulseHiveError::config(
482                "Substrate not configured. Call substrate_path() or substrate() on the builder.",
483            ));
484        };
485
486        let approval: Arc<dyn ApprovalHandler> = match self.approval_handler {
487            Some(h) => Arc::from(h),
488            None => Arc::new(AutoApprove),
489        };
490
491        // Default: relationship detector enabled with default thresholds
492        let relationship_detector = match self.relationship_detector {
493            Some(explicit) => explicit,
494            None => Some(RelationshipDetector::with_defaults()),
495        };
496
497        // Default: insight synthesizer enabled with default thresholds
498        let insight_synthesizer = match self.insight_synthesizer {
499            Some(explicit) => explicit,
500            None => Some(InsightSynthesizer::with_defaults()),
501        };
502
503        Ok(HiveMind {
504            substrate,
505            llm_providers: self.llm_providers,
506            approval_handler: approval,
507            event_bus: EventBus::default(),
508            relationship_detector,
509            insight_synthesizer,
510            embedding_provider: self.embedding_provider,
511            shutdown: Arc::new(AtomicBool::new(false)),
512            watch_handle: std::sync::Mutex::new(None),
513        })
514    }
515}
516
517#[cfg(test)]
518mod tests {
519    use super::*;
520    use futures::StreamExt;
521
522    #[test]
523    fn test_build_fails_without_substrate() {
524        let result = HiveMind::builder().build();
525        assert!(result.is_err());
526        assert!(result
527            .unwrap_err()
528            .to_string()
529            .contains("Substrate not configured"));
530    }
531
532    #[test]
533    fn test_build_with_substrate_path() {
534        let dir = tempfile::tempdir().unwrap();
535        let path = dir.path().join("test.db");
536        assert!(HiveMind::builder().substrate_path(&path).build().is_ok());
537    }
538
539    #[tokio::test]
540    async fn test_deploy_empty_agents_returns_empty_stream() {
541        let dir = tempfile::tempdir().unwrap();
542        let path = dir.path().join("test.db");
543        let hive = HiveMind::builder().substrate_path(&path).build().unwrap();
544
545        let mut stream = hive.deploy(vec![], vec![]).await.unwrap();
546        assert!(stream.next().await.is_none());
547    }
548
549    #[test]
550    fn test_task_new() {
551        let task = Task::new("Analyze the codebase");
552        assert_eq!(task.description, "Analyze the codebase");
553    }
554
555    #[test]
556    fn test_task_with_collective() {
557        let cid = CollectiveId::new();
558        let task = Task::with_collective("Search for bugs", cid);
559        assert_eq!(task.collective_id, cid);
560    }
561
562    /// Helper: create a HiveMind with Builtin embeddings and a collective for testing.
563    async fn build_hive_with_collective() -> (HiveMind, CollectiveId) {
564        let dir = tempfile::tempdir().unwrap();
565        let path = dir.path().join("test.db");
566        // Leak tempdir so it lives long enough
567        let dir = Box::leak(Box::new(dir));
568        let _ = dir;
569        let hive = HiveMind::builder().substrate_path(&path).build().unwrap();
570        // Create collective via SubstrateProvider trait (no raw PulseDB needed!)
571        let cid = hive
572            .substrate
573            .get_or_create_collective("test")
574            .await
575            .unwrap();
576        (hive, cid)
577    }
578
579    #[tokio::test]
580    async fn test_record_experience_stores_and_emits_event() {
581        let (hive, cid) = build_hive_with_collective().await;
582        let mut rx = hive.event_bus.subscribe();
583
584        let exp = pulsedb::NewExperience {
585            collective_id: cid,
586            content: "Learned that Rust's ownership model prevents data races.".into(),
587            experience_type: pulsedb::ExperienceType::Generic {
588                category: Some("rust".into()),
589            },
590            embedding: None, // Builtin embeddings auto-compute
591            importance: 0.8,
592            confidence: 0.9,
593            domain: vec!["rust".into(), "concurrency".into()],
594            source_agent: pulsedb::AgentId("test-agent".into()),
595            source_task: None,
596            related_files: vec![],
597        };
598
599        let id = hive.record_experience(exp).await.unwrap();
600
601        // Verify event emitted
602        let event = rx.try_recv().unwrap();
603        match event {
604            HiveEvent::ExperienceRecorded {
605                experience_id,
606                agent_id,
607            } => {
608                assert_eq!(experience_id, id);
609                assert_eq!(agent_id, "test-agent");
610            }
611            other => panic!("Expected ExperienceRecorded, got: {other:?}"),
612        }
613    }
614
615    #[tokio::test]
616    async fn test_record_experience_retrievable() {
617        let (hive, cid) = build_hive_with_collective().await;
618
619        let exp = pulsedb::NewExperience {
620            collective_id: cid,
621            content: "Test experience for retrieval.".into(),
622            experience_type: pulsedb::ExperienceType::Generic { category: None },
623            embedding: None, // Builtin embeddings auto-compute
624            importance: 0.5,
625            confidence: 0.5,
626            domain: vec![],
627            source_agent: pulsedb::AgentId("agent-1".into()),
628            source_task: None,
629            related_files: vec![],
630        };
631
632        let id = hive.record_experience(exp).await.unwrap();
633
634        // Verify retrievable
635        let retrieved = hive.substrate.get_experience(id).await.unwrap();
636        assert!(retrieved.is_some());
637        let retrieved = retrieved.unwrap();
638        assert_eq!(retrieved.content, "Test experience for retrieval.");
639    }
640
641    // ── Shutdown & Restart tests ─────────────────────────────────────
642
643    #[test]
644    fn test_shutdown_sets_flag() {
645        let dir = tempfile::tempdir().unwrap();
646        let path = dir.path().join("test.db");
647        let hive = HiveMind::builder().substrate_path(&path).build().unwrap();
648
649        assert!(!hive.is_shutdown());
650        hive.shutdown();
651        assert!(hive.is_shutdown());
652    }
653
654    #[test]
655    fn test_drop_sets_shutdown_flag() {
656        let dir = tempfile::tempdir().unwrap();
657        let path = dir.path().join("test.db");
658        let hive = HiveMind::builder().substrate_path(&path).build().unwrap();
659        let shutdown = Arc::clone(&hive.shutdown);
660
661        assert!(!shutdown.load(Ordering::Relaxed));
662        drop(hive);
663        assert!(shutdown.load(Ordering::Relaxed));
664    }
665
666    #[tokio::test]
667    async fn test_redeploy_empty_is_noop() {
668        let (hive, _cid) = build_hive_with_collective().await;
669        let task = Task::new("test");
670        assert!(hive.redeploy(vec![], task).await.is_ok());
671    }
672}