Skip to main content

pulsehive_runtime/
hivemind.rs

1//! HiveMind orchestrator and builder.
2//!
3//! [`HiveMind`] is the central entry point of PulseHive. It owns the substrate,
4//! LLM providers, approval handler, and event bus. Products construct it via
5//! the builder pattern and deploy agents through it.
6//!
7//! # Example
8//! ```rust,ignore
9//! let hive = HiveMind::builder()
10//!     .substrate_path("/tmp/my_project.db")
11//!     .llm_provider("openai", my_openai_provider)
12//!     .build()?;
13//!
14//! let events = hive.deploy(agents, tasks).await?;
15//! ```
16
17use std::collections::HashMap;
18use std::path::Path;
19use std::pin::Pin;
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::sync::Arc;
22
23use futures::stream;
24use futures::{Stream, StreamExt};
25use pulsedb::{
26    CollectiveId, Config, ExperienceId, NewExperience, PulseDB, PulseDBSubstrate, SubstrateProvider,
27};
28use tokio::sync::broadcast;
29
30use pulsehive_core::agent::AgentDefinition;
31use pulsehive_core::approval::{ApprovalHandler, AutoApprove};
32use pulsehive_core::embedding::EmbeddingProvider;
33use pulsehive_core::error::{PulseHiveError, Result};
34use pulsehive_core::event::{EventBus, HiveEvent};
35use pulsehive_core::export::EventExporter;
36use pulsehive_core::llm::LlmProvider;
37
38use crate::intelligence::insight::InsightSynthesizer;
39use crate::intelligence::relationship::RelationshipDetector;
40use crate::workflow::{self, WorkflowContext};
41
42/// A task to be executed by deployed agents.
43#[derive(Debug, Clone)]
44pub struct Task {
45    /// Human-readable description of what to accomplish.
46    pub description: String,
47    /// Collective (namespace) this task operates within.
48    pub collective_id: CollectiveId,
49}
50
51impl Task {
52    /// Creates a task with a new collective ID.
53    pub fn new(description: impl Into<String>) -> Self {
54        Self {
55            description: description.into(),
56            collective_id: CollectiveId::new(),
57        }
58    }
59
60    /// Creates a task within an existing collective.
61    pub fn with_collective(description: impl Into<String>, collective_id: CollectiveId) -> Self {
62        Self {
63            description: description.into(),
64            collective_id,
65        }
66    }
67}
68
69/// The central orchestrator of PulseHive.
70///
71/// Owns the substrate, LLM providers, approval handler, and event bus.
72/// Constructed exclusively via [`HiveMind::builder()`].
73pub struct HiveMind {
74    pub(crate) substrate: Arc<dyn SubstrateProvider>,
75    pub(crate) llm_providers: HashMap<String, Arc<dyn LlmProvider>>,
76    pub(crate) approval_handler: Arc<dyn ApprovalHandler>,
77    pub(crate) event_bus: EventBus,
78    pub(crate) relationship_detector: Option<RelationshipDetector>,
79    pub(crate) insight_synthesizer: Option<InsightSynthesizer>,
80    /// Optional embedding provider for domain-specific models.
81    /// When set, embeddings are computed via this provider before PulseDB storage.
82    pub(crate) embedding_provider: Option<Arc<dyn EmbeddingProvider>>,
83    /// Shutdown signal for background tasks (Watch system).
84    shutdown: Arc<AtomicBool>,
85    /// Handle to the Watch background task for graceful cancellation.
86    watch_handle: std::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
87}
88
89impl std::fmt::Debug for HiveMind {
90    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91        f.debug_struct("HiveMind")
92            .field(
93                "llm_providers",
94                &self.llm_providers.keys().collect::<Vec<_>>(),
95            )
96            .finish_non_exhaustive()
97    }
98}
99
100impl HiveMind {
101    /// Creates a new builder for constructing a HiveMind.
102    pub fn builder() -> HiveMindBuilder {
103        HiveMindBuilder::new()
104    }
105
106    /// Access the substrate provider for direct operations.
107    pub fn substrate(&self) -> &dyn SubstrateProvider {
108        self.substrate.as_ref()
109    }
110
111    /// Deploy agents to execute tasks. Returns a stream of events.
112    ///
113    /// Each agent is spawned as a Tokio task and dispatched via
114    /// the workflow module's `dispatch_agent()` which handles all agent kinds
115    /// (LLM, Sequential, Parallel, Loop).
116    ///
117    /// Automatically subscribes to the PulseDB Watch system for the collective,
118    /// forwarding substrate change events as [`HiveEvent::WatchNotification`].
119    /// If Watch subscription fails, agents still execute normally (graceful degradation).
120    pub async fn deploy(
121        &self,
122        agents: Vec<AgentDefinition>,
123        tasks: Vec<Task>,
124    ) -> Result<Pin<Box<dyn Stream<Item = HiveEvent> + Send>>> {
125        if agents.is_empty() {
126            return Ok(Box::pin(stream::empty()));
127        }
128
129        // Get the first task (or create a default one)
130        let mut task = tasks.into_iter().next().unwrap_or_else(|| Task::new(""));
131
132        // Ensure the collective exists in the substrate
133        let collective_name = format!("collective-{}", task.collective_id);
134        let collective_id = self
135            .substrate
136            .get_or_create_collective(&collective_name)
137            .await?;
138        task.collective_id = collective_id;
139
140        // Subscribe to Watch system for real-time substrate change notifications.
141        // Runs as a background task — failure to subscribe doesn't block deployment.
142        // Respects the shutdown flag for graceful termination.
143        let watch_substrate = Arc::clone(&self.substrate);
144        let watch_emitter = self.event_bus.clone();
145        let watch_shutdown = Arc::clone(&self.shutdown);
146        let watch_handle = tokio::spawn(async move {
147            match watch_substrate.watch(collective_id).await {
148                Ok(mut watch_stream) => {
149                    while !watch_shutdown.load(Ordering::Relaxed) {
150                        match watch_stream.next().await {
151                            Some(event) => {
152                                watch_emitter.emit(HiveEvent::WatchNotification {
153                                    timestamp_ms: pulsehive_core::event::now_ms(),
154                                    experience_id: event.experience_id,
155                                    collective_id: event.collective_id,
156                                    event_type: format!("{:?}", event.event_type),
157                                });
158                            }
159                            None => break,
160                        }
161                    }
162                }
163                Err(e) => {
164                    tracing::warn!(error = %e, "Failed to subscribe to Watch system");
165                }
166            }
167        });
168        *self.watch_handle.lock().unwrap() = Some(watch_handle);
169
170        let rx = self.event_bus.subscribe();
171
172        for agent in agents {
173            self.spawn_agent(agent, task.clone());
174        }
175
176        // Convert broadcast::Receiver into a Stream
177        Ok(Box::pin(BroadcastStream::new(rx)))
178    }
179
180    /// Record an experience in the substrate.
181    ///
182    /// Stores the experience via PulseDB, emits an `ExperienceRecorded` event,
183    /// runs the RelationshipDetector to infer relations, and triggers the
184    /// InsightSynthesizer if a cluster exceeds the density threshold.
185    pub async fn record_experience(&self, experience: NewExperience) -> Result<ExperienceId> {
186        let agent_id = experience.source_agent.0.clone();
187        let collective_id = experience.collective_id;
188
189        // Compute embedding via provider if available and not already set
190        let mut experience = experience;
191        if let Some(provider) = &self.embedding_provider {
192            if experience.embedding.is_none() {
193                match provider.embed(&experience.content).await {
194                    Ok(embedding) => {
195                        experience.embedding = Some(embedding);
196                    }
197                    Err(e) => {
198                        tracing::warn!(error = %e, "Failed to compute embedding in record_experience, storing without");
199                    }
200                }
201            }
202        }
203
204        // Capture metadata before move
205        let content_preview: String = experience.content.chars().take(200).collect();
206        let experience_type_str = format!("{:?}", experience.experience_type);
207        let importance = experience.importance;
208
209        let id = self.substrate.store_experience(experience).await?;
210        self.event_bus.emit(HiveEvent::ExperienceRecorded {
211            timestamp_ms: pulsehive_core::event::now_ms(),
212            experience_id: id,
213            agent_id: agent_id.clone(),
214            content_preview,
215            experience_type: experience_type_str,
216            importance,
217        });
218
219        // Run relationship inference if detector is configured
220        if let Some(detector) = &self.relationship_detector {
221            if let Ok(Some(stored)) = self.substrate.get_experience(id).await {
222                let relations = detector
223                    .infer_relations(&stored, self.substrate.as_ref())
224                    .await;
225
226                for rel in relations {
227                    match self.substrate.store_relation(rel).await {
228                        Ok(relation_id) => {
229                            self.event_bus.emit(HiveEvent::RelationshipInferred {
230                                timestamp_ms: pulsehive_core::event::now_ms(),
231                                relation_id,
232                                agent_id: agent_id.clone(),
233                            });
234                        }
235                        Err(e) => {
236                            tracing::warn!(error = %e, "Failed to store inferred relation");
237                        }
238                    }
239                }
240            }
241        }
242
243        // Run insight synthesis if synthesizer is configured
244        if let Some(synthesizer) = &self.insight_synthesizer {
245            if !synthesizer.is_debounced(collective_id) {
246                let cluster = synthesizer.find_cluster(id, self.substrate.as_ref()).await;
247
248                if synthesizer.should_synthesize(cluster.len()) {
249                    // Use the first available LLM provider for synthesis
250                    if let Some((provider_name, provider)) = self.llm_providers.iter().next() {
251                        let llm_config =
252                            pulsehive_core::llm::LlmConfig::new(provider_name, "default");
253                        if let Some(insight) = synthesizer
254                            .synthesize_cluster(
255                                &cluster,
256                                collective_id,
257                                provider.as_ref(),
258                                &llm_config,
259                            )
260                            .await
261                        {
262                            let source_count = insight.source_experience_ids.len();
263                            match self.substrate.store_insight(insight).await {
264                                Ok(insight_id) => {
265                                    synthesizer.mark_synthesized(collective_id);
266                                    self.event_bus.emit(HiveEvent::InsightGenerated {
267                                        timestamp_ms: pulsehive_core::event::now_ms(),
268                                        insight_id,
269                                        source_count,
270                                        agent_id: agent_id.clone(),
271                                    });
272                                }
273                                Err(e) => {
274                                    tracing::warn!(error = %e, "Failed to store synthesized insight");
275                                }
276                            }
277                        }
278                    }
279                }
280            }
281        }
282
283        Ok(id)
284    }
285
286    /// Signal shutdown to all background tasks (Watch system).
287    ///
288    /// Sets the shutdown flag, causing the Watch background task to stop
289    /// after processing its current event. This is non-blocking.
290    pub fn shutdown(&self) {
291        self.shutdown.store(true, Ordering::Relaxed);
292        // Abort the Watch background task so it drops its EventBus sender clone,
293        // allowing the broadcast channel to close and BroadcastStream to terminate.
294        if let Some(handle) = self.watch_handle.lock().unwrap().take() {
295            handle.abort();
296        }
297        tracing::info!("HiveMind shutdown signaled");
298    }
299
300    /// Returns true if shutdown has been signaled.
301    pub fn is_shutdown(&self) -> bool {
302        self.shutdown.load(Ordering::Relaxed)
303    }
304
305    /// Redeploy agents on the existing substrate and event bus.
306    ///
307    /// Use this to restart failed agents. Products typically call this when
308    /// they observe `AgentCompleted { outcome: Error { .. } }` on the event stream.
309    ///
310    /// The collective is created/resolved from the task, same as in [`HiveMind::deploy()`].
311    pub async fn redeploy(&self, agents: Vec<AgentDefinition>, task: Task) -> Result<()> {
312        if agents.is_empty() {
313            return Ok(());
314        }
315
316        // Ensure the collective exists
317        let mut task = task;
318        let collective_name = format!("collective-{}", task.collective_id);
319        let collective_id = self
320            .substrate
321            .get_or_create_collective(&collective_name)
322            .await?;
323        task.collective_id = collective_id;
324
325        for agent in agents {
326            self.spawn_agent(agent, task.clone());
327        }
328
329        Ok(())
330    }
331
332    /// Spawn a single agent as a Tokio task.
333    ///
334    /// Builds a [`WorkflowContext`] from HiveMind's fields and delegates
335    /// to [`workflow::dispatch_agent()`] which handles all agent kinds.
336    fn spawn_agent(&self, agent: AgentDefinition, task: Task) {
337        let ctx = WorkflowContext {
338            task,
339            llm_providers: self.llm_providers.clone(),
340            substrate: Arc::clone(&self.substrate),
341            approval_handler: Arc::clone(&self.approval_handler),
342            event_emitter: self.event_bus.clone(),
343            embedding_provider: self.embedding_provider.clone(),
344        };
345
346        tokio::spawn(async move {
347            workflow::dispatch_agent(agent, &ctx).await;
348        });
349    }
350}
351
352impl Drop for HiveMind {
353    fn drop(&mut self) {
354        self.shutdown.store(true, Ordering::Relaxed);
355        if let Some(handle) = self.watch_handle.get_mut().unwrap().take() {
356            handle.abort();
357        }
358    }
359}
360
361/// Adapter that converts a `broadcast::Receiver<HiveEvent>` into a `Stream`.
362struct BroadcastStream {
363    rx: broadcast::Receiver<HiveEvent>,
364}
365
366impl BroadcastStream {
367    fn new(rx: broadcast::Receiver<HiveEvent>) -> Self {
368        Self { rx }
369    }
370}
371
372impl Stream for BroadcastStream {
373    type Item = HiveEvent;
374
375    fn poll_next(
376        mut self: Pin<&mut Self>,
377        cx: &mut std::task::Context<'_>,
378    ) -> std::task::Poll<Option<Self::Item>> {
379        match self.rx.try_recv() {
380            Ok(event) => std::task::Poll::Ready(Some(event)),
381            Err(broadcast::error::TryRecvError::Empty) => {
382                // No events yet — register waker and return Pending
383                cx.waker().wake_by_ref();
384                std::task::Poll::Pending
385            }
386            Err(broadcast::error::TryRecvError::Lagged(n)) => {
387                tracing::warn!(lagged = n, "Event stream lagged, some events dropped");
388                cx.waker().wake_by_ref();
389                std::task::Poll::Pending
390            }
391            Err(broadcast::error::TryRecvError::Closed) => std::task::Poll::Ready(None),
392        }
393    }
394}
395
396/// Builder for constructing a [`HiveMind`] with validated configuration.
397pub struct HiveMindBuilder {
398    substrate: Option<Box<dyn SubstrateProvider>>,
399    substrate_path: Option<String>,
400    llm_providers: HashMap<String, Arc<dyn LlmProvider>>,
401    approval_handler: Option<Box<dyn ApprovalHandler>>,
402    relationship_detector: Option<Option<RelationshipDetector>>,
403    insight_synthesizer: Option<Option<InsightSynthesizer>>,
404    embedding_provider: Option<Arc<dyn EmbeddingProvider>>,
405    event_exporter: Option<Arc<dyn EventExporter>>,
406}
407
408impl HiveMindBuilder {
409    fn new() -> Self {
410        Self {
411            substrate: None,
412            substrate_path: None,
413            llm_providers: HashMap::new(),
414            approval_handler: None,
415            relationship_detector: None,
416            insight_synthesizer: None,
417            embedding_provider: None,
418            event_exporter: None,
419        }
420    }
421
422    /// Set substrate via file path.
423    pub fn substrate_path(mut self, path: impl AsRef<Path>) -> Self {
424        self.substrate_path = Some(path.as_ref().to_string_lossy().into_owned());
425        self
426    }
427
428    /// Set a custom substrate provider (e.g., for testing with mocks).
429    pub fn substrate(mut self, provider: Box<dyn SubstrateProvider>) -> Self {
430        self.substrate = Some(provider);
431        self
432    }
433
434    /// Register a named LLM provider.
435    pub fn llm_provider(
436        mut self,
437        name: impl Into<String>,
438        provider: impl LlmProvider + 'static,
439    ) -> Self {
440        self.llm_providers.insert(name.into(), Arc::new(provider));
441        self
442    }
443
444    /// Set a custom approval handler. Defaults to [`AutoApprove`] if not set.
445    pub fn approval_handler(mut self, handler: impl ApprovalHandler + 'static) -> Self {
446        self.approval_handler = Some(Box::new(handler));
447        self
448    }
449
450    /// Set a custom relationship detector. Default: enabled with default thresholds.
451    pub fn relationship_detector(mut self, detector: RelationshipDetector) -> Self {
452        self.relationship_detector = Some(Some(detector));
453        self
454    }
455
456    /// Disable automatic relationship detection.
457    pub fn no_relationship_detector(mut self) -> Self {
458        self.relationship_detector = Some(None);
459        self
460    }
461
462    /// Set a custom insight synthesizer. Default: enabled with default thresholds.
463    pub fn insight_synthesizer(mut self, synthesizer: InsightSynthesizer) -> Self {
464        self.insight_synthesizer = Some(Some(synthesizer));
465        self
466    }
467
468    /// Disable automatic insight synthesis.
469    pub fn no_insight_synthesizer(mut self) -> Self {
470        self.insight_synthesizer = Some(None);
471        self
472    }
473
474    /// Set a custom embedding provider for domain-specific models.
475    ///
476    /// When set, PulseHive computes embeddings via this provider before storing
477    /// experiences in PulseDB (External mode). When not set, PulseDB uses its
478    /// built-in all-MiniLM-L6-v2 model (384d).
479    pub fn embedding_provider(mut self, provider: impl EmbeddingProvider + 'static) -> Self {
480        self.embedding_provider = Some(Arc::new(provider));
481        self
482    }
483
484    /// Set an event exporter for streaming events to external observability systems.
485    ///
486    /// When set, every `HiveEvent` emission is also forwarded to the exporter
487    /// via a fire-and-forget `tokio::spawn` call — zero latency on the emit path.
488    ///
489    /// Use this to connect PulseHive to PulseVision or custom dashboards.
490    pub fn event_exporter(mut self, exporter: impl EventExporter + 'static) -> Self {
491        self.event_exporter = Some(Arc::new(exporter));
492        self
493    }
494
495    /// Build the HiveMind. Validates that a substrate is configured.
496    pub fn build(self) -> Result<HiveMind> {
497        let substrate: Arc<dyn SubstrateProvider> = if let Some(s) = self.substrate {
498            Arc::from(s)
499        } else if let Some(path) = self.substrate_path {
500            let config = if self.embedding_provider.is_some() {
501                // External mode: PulseHive computes embeddings via the provider
502                Config::new()
503            } else {
504                // Builtin mode: PulseDB computes embeddings internally
505                Config::with_builtin_embeddings()
506            };
507            let db = PulseDB::open(&path, config)?;
508            Arc::new(PulseDBSubstrate::from_db(db))
509        } else {
510            return Err(PulseHiveError::config(
511                "Substrate not configured. Call substrate_path() or substrate() on the builder.",
512            ));
513        };
514
515        let approval: Arc<dyn ApprovalHandler> = match self.approval_handler {
516            Some(h) => Arc::from(h),
517            None => Arc::new(AutoApprove),
518        };
519
520        // Default: relationship detector enabled with default thresholds
521        let relationship_detector = match self.relationship_detector {
522            Some(explicit) => explicit,
523            None => Some(RelationshipDetector::with_defaults()),
524        };
525
526        // Default: insight synthesizer enabled with default thresholds
527        let insight_synthesizer = match self.insight_synthesizer {
528            Some(explicit) => explicit,
529            None => Some(InsightSynthesizer::with_defaults()),
530        };
531
532        let event_bus = match self.event_exporter {
533            Some(exporter) => EventBus::with_exporter(256, exporter),
534            None => EventBus::default(),
535        };
536
537        Ok(HiveMind {
538            substrate,
539            llm_providers: self.llm_providers,
540            approval_handler: approval,
541            event_bus,
542            relationship_detector,
543            insight_synthesizer,
544            embedding_provider: self.embedding_provider,
545            shutdown: Arc::new(AtomicBool::new(false)),
546            watch_handle: std::sync::Mutex::new(None),
547        })
548    }
549}
550
551#[cfg(test)]
552mod tests {
553    use super::*;
554    use futures::StreamExt;
555
556    #[test]
557    fn test_build_fails_without_substrate() {
558        let result = HiveMind::builder().build();
559        assert!(result.is_err());
560        assert!(result
561            .unwrap_err()
562            .to_string()
563            .contains("Substrate not configured"));
564    }
565
566    #[test]
567    fn test_build_with_substrate_path() {
568        let dir = tempfile::tempdir().unwrap();
569        let path = dir.path().join("test.db");
570        assert!(HiveMind::builder().substrate_path(&path).build().is_ok());
571    }
572
573    #[tokio::test]
574    async fn test_deploy_empty_agents_returns_empty_stream() {
575        let dir = tempfile::tempdir().unwrap();
576        let path = dir.path().join("test.db");
577        let hive = HiveMind::builder().substrate_path(&path).build().unwrap();
578
579        let mut stream = hive.deploy(vec![], vec![]).await.unwrap();
580        assert!(stream.next().await.is_none());
581    }
582
583    #[test]
584    fn test_task_new() {
585        let task = Task::new("Analyze the codebase");
586        assert_eq!(task.description, "Analyze the codebase");
587    }
588
589    #[test]
590    fn test_task_with_collective() {
591        let cid = CollectiveId::new();
592        let task = Task::with_collective("Search for bugs", cid);
593        assert_eq!(task.collective_id, cid);
594    }
595
596    /// Helper: create a HiveMind with Builtin embeddings and a collective for testing.
597    async fn build_hive_with_collective() -> (HiveMind, CollectiveId) {
598        let dir = tempfile::tempdir().unwrap();
599        let path = dir.path().join("test.db");
600        // Leak tempdir so it lives long enough
601        let dir = Box::leak(Box::new(dir));
602        let _ = dir;
603        let hive = HiveMind::builder().substrate_path(&path).build().unwrap();
604        // Create collective via SubstrateProvider trait (no raw PulseDB needed!)
605        let cid = hive
606            .substrate
607            .get_or_create_collective("test")
608            .await
609            .unwrap();
610        (hive, cid)
611    }
612
613    #[tokio::test]
614    async fn test_record_experience_stores_and_emits_event() {
615        let (hive, cid) = build_hive_with_collective().await;
616        let mut rx = hive.event_bus.subscribe();
617
618        let dummy_embedding: Vec<f32> = (0..384).map(|i| (i as f32 * 0.01).sin()).collect();
619        let exp = pulsedb::NewExperience {
620            collective_id: cid,
621            content: "Learned that Rust's ownership model prevents data races.".into(),
622            experience_type: pulsedb::ExperienceType::Generic {
623                category: Some("rust".into()),
624            },
625            embedding: Some(dummy_embedding),
626            importance: 0.8,
627            confidence: 0.9,
628            domain: vec!["rust".into(), "concurrency".into()],
629            source_agent: pulsedb::AgentId("test-agent".into()),
630            source_task: None,
631            related_files: vec![],
632        };
633
634        let id = hive.record_experience(exp).await.unwrap();
635
636        // Verify event emitted
637        let event = rx.try_recv().unwrap();
638        match event {
639            HiveEvent::ExperienceRecorded {
640                experience_id,
641                agent_id,
642                ..
643            } => {
644                assert_eq!(experience_id, id);
645                assert_eq!(agent_id, "test-agent");
646            }
647            other => panic!("Expected ExperienceRecorded, got: {other:?}"),
648        }
649    }
650
651    #[tokio::test]
652    async fn test_record_experience_retrievable() {
653        let (hive, cid) = build_hive_with_collective().await;
654
655        // Provide a pre-computed embedding to avoid requiring PulseDB's builtin
656        // ONNX model, which may not be cached on CI runners.
657        let dummy_embedding: Vec<f32> = (0..384).map(|i| (i as f32 * 0.01).sin()).collect();
658        let exp = pulsedb::NewExperience {
659            collective_id: cid,
660            content: "Test experience for retrieval.".into(),
661            experience_type: pulsedb::ExperienceType::Generic { category: None },
662            embedding: Some(dummy_embedding),
663            importance: 0.5,
664            confidence: 0.5,
665            domain: vec![],
666            source_agent: pulsedb::AgentId("agent-1".into()),
667            source_task: None,
668            related_files: vec![],
669        };
670
671        let id = hive.record_experience(exp).await.unwrap();
672
673        // Verify retrievable
674        let retrieved = hive.substrate.get_experience(id).await.unwrap();
675        assert!(retrieved.is_some());
676        let retrieved = retrieved.unwrap();
677        assert_eq!(retrieved.content, "Test experience for retrieval.");
678    }
679
680    // ── Shutdown & Restart tests ─────────────────────────────────────
681
682    #[test]
683    fn test_shutdown_sets_flag() {
684        let dir = tempfile::tempdir().unwrap();
685        let path = dir.path().join("test.db");
686        let hive = HiveMind::builder().substrate_path(&path).build().unwrap();
687
688        assert!(!hive.is_shutdown());
689        hive.shutdown();
690        assert!(hive.is_shutdown());
691    }
692
693    #[test]
694    fn test_drop_sets_shutdown_flag() {
695        let dir = tempfile::tempdir().unwrap();
696        let path = dir.path().join("test.db");
697        let hive = HiveMind::builder().substrate_path(&path).build().unwrap();
698        let shutdown = Arc::clone(&hive.shutdown);
699
700        assert!(!shutdown.load(Ordering::Relaxed));
701        drop(hive);
702        assert!(shutdown.load(Ordering::Relaxed));
703    }
704
705    #[tokio::test]
706    async fn test_redeploy_empty_is_noop() {
707        let (hive, _cid) = build_hive_with_collective().await;
708        let task = Task::new("test");
709        assert!(hive.redeploy(vec![], task).await.is_ok());
710    }
711}