Skip to main content

pulsehive_runtime/
hivemind.rs

1//! HiveMind orchestrator and builder.
2//!
3//! [`HiveMind`] is the central entry point of PulseHive. It owns the substrate,
4//! LLM providers, approval handler, and event bus. Products construct it via
5//! the builder pattern and deploy agents through it.
6//!
7//! # Example
8//! ```rust,ignore
9//! let hive = HiveMind::builder()
10//!     .substrate_path("/tmp/my_project.db")
11//!     .llm_provider("openai", my_openai_provider)
12//!     .build()?;
13//!
14//! let events = hive.deploy(agents, tasks).await?;
15//! ```
16
17use std::collections::HashMap;
18use std::path::Path;
19use std::pin::Pin;
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::sync::Arc;
22
23use futures::stream;
24use futures::{Stream, StreamExt};
25use pulsedb::{
26    CollectiveId, Config, ExperienceId, NewExperience, PulseDB, PulseDBSubstrate, SubstrateProvider,
27};
28use tokio::sync::broadcast;
29
30use pulsehive_core::agent::AgentDefinition;
31use pulsehive_core::approval::{ApprovalHandler, AutoApprove};
32use pulsehive_core::embedding::EmbeddingProvider;
33use pulsehive_core::error::{PulseHiveError, Result};
34use pulsehive_core::event::{EventBus, HiveEvent};
35use pulsehive_core::llm::LlmProvider;
36
37use crate::intelligence::insight::InsightSynthesizer;
38use crate::intelligence::relationship::RelationshipDetector;
39use crate::workflow::{self, WorkflowContext};
40
41/// A task to be executed by deployed agents.
42#[derive(Debug, Clone)]
43pub struct Task {
44    /// Human-readable description of what to accomplish.
45    pub description: String,
46    /// Collective (namespace) this task operates within.
47    pub collective_id: CollectiveId,
48}
49
50impl Task {
51    /// Creates a task with a new collective ID.
52    pub fn new(description: impl Into<String>) -> Self {
53        Self {
54            description: description.into(),
55            collective_id: CollectiveId::new(),
56        }
57    }
58
59    /// Creates a task within an existing collective.
60    pub fn with_collective(description: impl Into<String>, collective_id: CollectiveId) -> Self {
61        Self {
62            description: description.into(),
63            collective_id,
64        }
65    }
66}
67
68/// The central orchestrator of PulseHive.
69///
70/// Owns the substrate, LLM providers, approval handler, and event bus.
71/// Constructed exclusively via [`HiveMind::builder()`].
72pub struct HiveMind {
73    pub(crate) substrate: Arc<dyn SubstrateProvider>,
74    pub(crate) llm_providers: HashMap<String, Arc<dyn LlmProvider>>,
75    pub(crate) approval_handler: Arc<dyn ApprovalHandler>,
76    pub(crate) event_bus: EventBus,
77    pub(crate) relationship_detector: Option<RelationshipDetector>,
78    pub(crate) insight_synthesizer: Option<InsightSynthesizer>,
79    /// Optional embedding provider for domain-specific models.
80    /// When set, embeddings are computed via this provider before PulseDB storage.
81    pub(crate) embedding_provider: Option<Arc<dyn EmbeddingProvider>>,
82    /// Shutdown signal for background tasks (Watch system).
83    shutdown: Arc<AtomicBool>,
84    /// Handle to the Watch background task for graceful cancellation.
85    watch_handle: std::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
86}
87
88impl std::fmt::Debug for HiveMind {
89    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90        f.debug_struct("HiveMind")
91            .field(
92                "llm_providers",
93                &self.llm_providers.keys().collect::<Vec<_>>(),
94            )
95            .finish_non_exhaustive()
96    }
97}
98
99impl HiveMind {
100    /// Creates a new builder for constructing a HiveMind.
101    pub fn builder() -> HiveMindBuilder {
102        HiveMindBuilder::new()
103    }
104
105    /// Access the substrate provider for direct operations.
106    pub fn substrate(&self) -> &dyn SubstrateProvider {
107        self.substrate.as_ref()
108    }
109
110    /// Deploy agents to execute tasks. Returns a stream of events.
111    ///
112    /// Each agent is spawned as a Tokio task and dispatched via
113    /// the workflow module's `dispatch_agent()` which handles all agent kinds
114    /// (LLM, Sequential, Parallel, Loop).
115    ///
116    /// Automatically subscribes to the PulseDB Watch system for the collective,
117    /// forwarding substrate change events as [`HiveEvent::WatchNotification`].
118    /// If Watch subscription fails, agents still execute normally (graceful degradation).
119    pub async fn deploy(
120        &self,
121        agents: Vec<AgentDefinition>,
122        tasks: Vec<Task>,
123    ) -> Result<Pin<Box<dyn Stream<Item = HiveEvent> + Send>>> {
124        if agents.is_empty() {
125            return Ok(Box::pin(stream::empty()));
126        }
127
128        // Get the first task (or create a default one)
129        let mut task = tasks.into_iter().next().unwrap_or_else(|| Task::new(""));
130
131        // Ensure the collective exists in the substrate
132        let collective_name = format!("collective-{}", task.collective_id);
133        let collective_id = self
134            .substrate
135            .get_or_create_collective(&collective_name)
136            .await?;
137        task.collective_id = collective_id;
138
139        // Subscribe to Watch system for real-time substrate change notifications.
140        // Runs as a background task — failure to subscribe doesn't block deployment.
141        // Respects the shutdown flag for graceful termination.
142        let watch_substrate = Arc::clone(&self.substrate);
143        let watch_emitter = self.event_bus.clone();
144        let watch_shutdown = Arc::clone(&self.shutdown);
145        let watch_handle = tokio::spawn(async move {
146            match watch_substrate.watch(collective_id).await {
147                Ok(mut watch_stream) => {
148                    while !watch_shutdown.load(Ordering::Relaxed) {
149                        match watch_stream.next().await {
150                            Some(event) => {
151                                watch_emitter.emit(HiveEvent::WatchNotification {
152                                    timestamp_ms: pulsehive_core::event::now_ms(),
153                                    experience_id: event.experience_id,
154                                    collective_id: event.collective_id,
155                                    event_type: format!("{:?}", event.event_type),
156                                });
157                            }
158                            None => break,
159                        }
160                    }
161                }
162                Err(e) => {
163                    tracing::warn!(error = %e, "Failed to subscribe to Watch system");
164                }
165            }
166        });
167        *self.watch_handle.lock().unwrap() = Some(watch_handle);
168
169        let rx = self.event_bus.subscribe();
170
171        for agent in agents {
172            self.spawn_agent(agent, task.clone());
173        }
174
175        // Convert broadcast::Receiver into a Stream
176        Ok(Box::pin(BroadcastStream::new(rx)))
177    }
178
179    /// Record an experience in the substrate.
180    ///
181    /// Stores the experience via PulseDB, emits an `ExperienceRecorded` event,
182    /// runs the RelationshipDetector to infer relations, and triggers the
183    /// InsightSynthesizer if a cluster exceeds the density threshold.
184    pub async fn record_experience(&self, experience: NewExperience) -> Result<ExperienceId> {
185        let agent_id = experience.source_agent.0.clone();
186        let collective_id = experience.collective_id;
187
188        // Compute embedding via provider if available and not already set
189        let mut experience = experience;
190        if let Some(provider) = &self.embedding_provider {
191            if experience.embedding.is_none() {
192                match provider.embed(&experience.content).await {
193                    Ok(embedding) => {
194                        experience.embedding = Some(embedding);
195                    }
196                    Err(e) => {
197                        tracing::warn!(error = %e, "Failed to compute embedding in record_experience, storing without");
198                    }
199                }
200            }
201        }
202
203        // Capture metadata before move
204        let content_preview: String = experience.content.chars().take(200).collect();
205        let experience_type_str = format!("{:?}", experience.experience_type);
206        let importance = experience.importance;
207
208        let id = self.substrate.store_experience(experience).await?;
209        self.event_bus.emit(HiveEvent::ExperienceRecorded {
210            timestamp_ms: pulsehive_core::event::now_ms(),
211            experience_id: id,
212            agent_id: agent_id.clone(),
213            content_preview,
214            experience_type: experience_type_str,
215            importance,
216        });
217
218        // Run relationship inference if detector is configured
219        if let Some(detector) = &self.relationship_detector {
220            if let Ok(Some(stored)) = self.substrate.get_experience(id).await {
221                let relations = detector
222                    .infer_relations(&stored, self.substrate.as_ref())
223                    .await;
224
225                for rel in relations {
226                    match self.substrate.store_relation(rel).await {
227                        Ok(relation_id) => {
228                            self.event_bus.emit(HiveEvent::RelationshipInferred {
229                                timestamp_ms: pulsehive_core::event::now_ms(),
230                                relation_id,
231                                agent_id: agent_id.clone(),
232                            });
233                        }
234                        Err(e) => {
235                            tracing::warn!(error = %e, "Failed to store inferred relation");
236                        }
237                    }
238                }
239            }
240        }
241
242        // Run insight synthesis if synthesizer is configured
243        if let Some(synthesizer) = &self.insight_synthesizer {
244            if !synthesizer.is_debounced(collective_id) {
245                let cluster = synthesizer.find_cluster(id, self.substrate.as_ref()).await;
246
247                if synthesizer.should_synthesize(cluster.len()) {
248                    // Use the first available LLM provider for synthesis
249                    if let Some((provider_name, provider)) = self.llm_providers.iter().next() {
250                        let llm_config =
251                            pulsehive_core::llm::LlmConfig::new(provider_name, "default");
252                        if let Some(insight) = synthesizer
253                            .synthesize_cluster(
254                                &cluster,
255                                collective_id,
256                                provider.as_ref(),
257                                &llm_config,
258                            )
259                            .await
260                        {
261                            let source_count = insight.source_experience_ids.len();
262                            match self.substrate.store_insight(insight).await {
263                                Ok(insight_id) => {
264                                    synthesizer.mark_synthesized(collective_id);
265                                    self.event_bus.emit(HiveEvent::InsightGenerated {
266                                        timestamp_ms: pulsehive_core::event::now_ms(),
267                                        insight_id,
268                                        source_count,
269                                        agent_id: agent_id.clone(),
270                                    });
271                                }
272                                Err(e) => {
273                                    tracing::warn!(error = %e, "Failed to store synthesized insight");
274                                }
275                            }
276                        }
277                    }
278                }
279            }
280        }
281
282        Ok(id)
283    }
284
285    /// Signal shutdown to all background tasks (Watch system).
286    ///
287    /// Sets the shutdown flag, causing the Watch background task to stop
288    /// after processing its current event. This is non-blocking.
289    pub fn shutdown(&self) {
290        self.shutdown.store(true, Ordering::Relaxed);
291        // Abort the Watch background task so it drops its EventBus sender clone,
292        // allowing the broadcast channel to close and BroadcastStream to terminate.
293        if let Some(handle) = self.watch_handle.lock().unwrap().take() {
294            handle.abort();
295        }
296        tracing::info!("HiveMind shutdown signaled");
297    }
298
299    /// Returns true if shutdown has been signaled.
300    pub fn is_shutdown(&self) -> bool {
301        self.shutdown.load(Ordering::Relaxed)
302    }
303
304    /// Redeploy agents on the existing substrate and event bus.
305    ///
306    /// Use this to restart failed agents. Products typically call this when
307    /// they observe `AgentCompleted { outcome: Error { .. } }` on the event stream.
308    ///
309    /// The collective is created/resolved from the task, same as in [`HiveMind::deploy()`].
310    pub async fn redeploy(&self, agents: Vec<AgentDefinition>, task: Task) -> Result<()> {
311        if agents.is_empty() {
312            return Ok(());
313        }
314
315        // Ensure the collective exists
316        let mut task = task;
317        let collective_name = format!("collective-{}", task.collective_id);
318        let collective_id = self
319            .substrate
320            .get_or_create_collective(&collective_name)
321            .await?;
322        task.collective_id = collective_id;
323
324        for agent in agents {
325            self.spawn_agent(agent, task.clone());
326        }
327
328        Ok(())
329    }
330
331    /// Spawn a single agent as a Tokio task.
332    ///
333    /// Builds a [`WorkflowContext`] from HiveMind's fields and delegates
334    /// to [`workflow::dispatch_agent()`] which handles all agent kinds.
335    fn spawn_agent(&self, agent: AgentDefinition, task: Task) {
336        let ctx = WorkflowContext {
337            task,
338            llm_providers: self.llm_providers.clone(),
339            substrate: Arc::clone(&self.substrate),
340            approval_handler: Arc::clone(&self.approval_handler),
341            event_emitter: self.event_bus.clone(),
342            embedding_provider: self.embedding_provider.clone(),
343        };
344
345        tokio::spawn(async move {
346            workflow::dispatch_agent(agent, &ctx).await;
347        });
348    }
349}
350
351impl Drop for HiveMind {
352    fn drop(&mut self) {
353        self.shutdown.store(true, Ordering::Relaxed);
354        if let Some(handle) = self.watch_handle.get_mut().unwrap().take() {
355            handle.abort();
356        }
357    }
358}
359
360/// Adapter that converts a `broadcast::Receiver<HiveEvent>` into a `Stream`.
361struct BroadcastStream {
362    rx: broadcast::Receiver<HiveEvent>,
363}
364
365impl BroadcastStream {
366    fn new(rx: broadcast::Receiver<HiveEvent>) -> Self {
367        Self { rx }
368    }
369}
370
371impl Stream for BroadcastStream {
372    type Item = HiveEvent;
373
374    fn poll_next(
375        mut self: Pin<&mut Self>,
376        cx: &mut std::task::Context<'_>,
377    ) -> std::task::Poll<Option<Self::Item>> {
378        match self.rx.try_recv() {
379            Ok(event) => std::task::Poll::Ready(Some(event)),
380            Err(broadcast::error::TryRecvError::Empty) => {
381                // No events yet — register waker and return Pending
382                cx.waker().wake_by_ref();
383                std::task::Poll::Pending
384            }
385            Err(broadcast::error::TryRecvError::Lagged(n)) => {
386                tracing::warn!(lagged = n, "Event stream lagged, some events dropped");
387                cx.waker().wake_by_ref();
388                std::task::Poll::Pending
389            }
390            Err(broadcast::error::TryRecvError::Closed) => std::task::Poll::Ready(None),
391        }
392    }
393}
394
395/// Builder for constructing a [`HiveMind`] with validated configuration.
396pub struct HiveMindBuilder {
397    substrate: Option<Box<dyn SubstrateProvider>>,
398    substrate_path: Option<String>,
399    llm_providers: HashMap<String, Arc<dyn LlmProvider>>,
400    approval_handler: Option<Box<dyn ApprovalHandler>>,
401    relationship_detector: Option<Option<RelationshipDetector>>,
402    insight_synthesizer: Option<Option<InsightSynthesizer>>,
403    embedding_provider: Option<Arc<dyn EmbeddingProvider>>,
404}
405
406impl HiveMindBuilder {
407    fn new() -> Self {
408        Self {
409            substrate: None,
410            substrate_path: None,
411            llm_providers: HashMap::new(),
412            approval_handler: None,
413            relationship_detector: None,
414            insight_synthesizer: None,
415            embedding_provider: None,
416        }
417    }
418
419    /// Set substrate via file path.
420    pub fn substrate_path(mut self, path: impl AsRef<Path>) -> Self {
421        self.substrate_path = Some(path.as_ref().to_string_lossy().into_owned());
422        self
423    }
424
425    /// Set a custom substrate provider (e.g., for testing with mocks).
426    pub fn substrate(mut self, provider: Box<dyn SubstrateProvider>) -> Self {
427        self.substrate = Some(provider);
428        self
429    }
430
431    /// Register a named LLM provider.
432    pub fn llm_provider(
433        mut self,
434        name: impl Into<String>,
435        provider: impl LlmProvider + 'static,
436    ) -> Self {
437        self.llm_providers.insert(name.into(), Arc::new(provider));
438        self
439    }
440
441    /// Set a custom approval handler. Defaults to [`AutoApprove`] if not set.
442    pub fn approval_handler(mut self, handler: impl ApprovalHandler + 'static) -> Self {
443        self.approval_handler = Some(Box::new(handler));
444        self
445    }
446
447    /// Set a custom relationship detector. Default: enabled with default thresholds.
448    pub fn relationship_detector(mut self, detector: RelationshipDetector) -> Self {
449        self.relationship_detector = Some(Some(detector));
450        self
451    }
452
453    /// Disable automatic relationship detection.
454    pub fn no_relationship_detector(mut self) -> Self {
455        self.relationship_detector = Some(None);
456        self
457    }
458
459    /// Set a custom insight synthesizer. Default: enabled with default thresholds.
460    pub fn insight_synthesizer(mut self, synthesizer: InsightSynthesizer) -> Self {
461        self.insight_synthesizer = Some(Some(synthesizer));
462        self
463    }
464
465    /// Disable automatic insight synthesis.
466    pub fn no_insight_synthesizer(mut self) -> Self {
467        self.insight_synthesizer = Some(None);
468        self
469    }
470
471    /// Set a custom embedding provider for domain-specific models.
472    ///
473    /// When set, PulseHive computes embeddings via this provider before storing
474    /// experiences in PulseDB (External mode). When not set, PulseDB uses its
475    /// built-in all-MiniLM-L6-v2 model (384d).
476    pub fn embedding_provider(mut self, provider: impl EmbeddingProvider + 'static) -> Self {
477        self.embedding_provider = Some(Arc::new(provider));
478        self
479    }
480
481    /// Build the HiveMind. Validates that a substrate is configured.
482    pub fn build(self) -> Result<HiveMind> {
483        let substrate: Arc<dyn SubstrateProvider> = if let Some(s) = self.substrate {
484            Arc::from(s)
485        } else if let Some(path) = self.substrate_path {
486            let config = if self.embedding_provider.is_some() {
487                // External mode: PulseHive computes embeddings via the provider
488                Config::new()
489            } else {
490                // Builtin mode: PulseDB computes embeddings internally
491                Config::with_builtin_embeddings()
492            };
493            let db = PulseDB::open(&path, config)?;
494            Arc::new(PulseDBSubstrate::from_db(db))
495        } else {
496            return Err(PulseHiveError::config(
497                "Substrate not configured. Call substrate_path() or substrate() on the builder.",
498            ));
499        };
500
501        let approval: Arc<dyn ApprovalHandler> = match self.approval_handler {
502            Some(h) => Arc::from(h),
503            None => Arc::new(AutoApprove),
504        };
505
506        // Default: relationship detector enabled with default thresholds
507        let relationship_detector = match self.relationship_detector {
508            Some(explicit) => explicit,
509            None => Some(RelationshipDetector::with_defaults()),
510        };
511
512        // Default: insight synthesizer enabled with default thresholds
513        let insight_synthesizer = match self.insight_synthesizer {
514            Some(explicit) => explicit,
515            None => Some(InsightSynthesizer::with_defaults()),
516        };
517
518        Ok(HiveMind {
519            substrate,
520            llm_providers: self.llm_providers,
521            approval_handler: approval,
522            event_bus: EventBus::default(),
523            relationship_detector,
524            insight_synthesizer,
525            embedding_provider: self.embedding_provider,
526            shutdown: Arc::new(AtomicBool::new(false)),
527            watch_handle: std::sync::Mutex::new(None),
528        })
529    }
530}
531
532#[cfg(test)]
533mod tests {
534    use super::*;
535    use futures::StreamExt;
536
537    #[test]
538    fn test_build_fails_without_substrate() {
539        let result = HiveMind::builder().build();
540        assert!(result.is_err());
541        assert!(result
542            .unwrap_err()
543            .to_string()
544            .contains("Substrate not configured"));
545    }
546
547    #[test]
548    fn test_build_with_substrate_path() {
549        let dir = tempfile::tempdir().unwrap();
550        let path = dir.path().join("test.db");
551        assert!(HiveMind::builder().substrate_path(&path).build().is_ok());
552    }
553
554    #[tokio::test]
555    async fn test_deploy_empty_agents_returns_empty_stream() {
556        let dir = tempfile::tempdir().unwrap();
557        let path = dir.path().join("test.db");
558        let hive = HiveMind::builder().substrate_path(&path).build().unwrap();
559
560        let mut stream = hive.deploy(vec![], vec![]).await.unwrap();
561        assert!(stream.next().await.is_none());
562    }
563
564    #[test]
565    fn test_task_new() {
566        let task = Task::new("Analyze the codebase");
567        assert_eq!(task.description, "Analyze the codebase");
568    }
569
570    #[test]
571    fn test_task_with_collective() {
572        let cid = CollectiveId::new();
573        let task = Task::with_collective("Search for bugs", cid);
574        assert_eq!(task.collective_id, cid);
575    }
576
577    /// Helper: create a HiveMind with Builtin embeddings and a collective for testing.
578    async fn build_hive_with_collective() -> (HiveMind, CollectiveId) {
579        let dir = tempfile::tempdir().unwrap();
580        let path = dir.path().join("test.db");
581        // Leak tempdir so it lives long enough
582        let dir = Box::leak(Box::new(dir));
583        let _ = dir;
584        let hive = HiveMind::builder().substrate_path(&path).build().unwrap();
585        // Create collective via SubstrateProvider trait (no raw PulseDB needed!)
586        let cid = hive
587            .substrate
588            .get_or_create_collective("test")
589            .await
590            .unwrap();
591        (hive, cid)
592    }
593
594    #[tokio::test]
595    async fn test_record_experience_stores_and_emits_event() {
596        let (hive, cid) = build_hive_with_collective().await;
597        let mut rx = hive.event_bus.subscribe();
598
599        let dummy_embedding: Vec<f32> = (0..384).map(|i| (i as f32 * 0.01).sin()).collect();
600        let exp = pulsedb::NewExperience {
601            collective_id: cid,
602            content: "Learned that Rust's ownership model prevents data races.".into(),
603            experience_type: pulsedb::ExperienceType::Generic {
604                category: Some("rust".into()),
605            },
606            embedding: Some(dummy_embedding),
607            importance: 0.8,
608            confidence: 0.9,
609            domain: vec!["rust".into(), "concurrency".into()],
610            source_agent: pulsedb::AgentId("test-agent".into()),
611            source_task: None,
612            related_files: vec![],
613        };
614
615        let id = hive.record_experience(exp).await.unwrap();
616
617        // Verify event emitted
618        let event = rx.try_recv().unwrap();
619        match event {
620            HiveEvent::ExperienceRecorded {
621                experience_id,
622                agent_id,
623                ..
624            } => {
625                assert_eq!(experience_id, id);
626                assert_eq!(agent_id, "test-agent");
627            }
628            other => panic!("Expected ExperienceRecorded, got: {other:?}"),
629        }
630    }
631
632    #[tokio::test]
633    async fn test_record_experience_retrievable() {
634        let (hive, cid) = build_hive_with_collective().await;
635
636        // Provide a pre-computed embedding to avoid requiring PulseDB's builtin
637        // ONNX model, which may not be cached on CI runners.
638        let dummy_embedding: Vec<f32> = (0..384).map(|i| (i as f32 * 0.01).sin()).collect();
639        let exp = pulsedb::NewExperience {
640            collective_id: cid,
641            content: "Test experience for retrieval.".into(),
642            experience_type: pulsedb::ExperienceType::Generic { category: None },
643            embedding: Some(dummy_embedding),
644            importance: 0.5,
645            confidence: 0.5,
646            domain: vec![],
647            source_agent: pulsedb::AgentId("agent-1".into()),
648            source_task: None,
649            related_files: vec![],
650        };
651
652        let id = hive.record_experience(exp).await.unwrap();
653
654        // Verify retrievable
655        let retrieved = hive.substrate.get_experience(id).await.unwrap();
656        assert!(retrieved.is_some());
657        let retrieved = retrieved.unwrap();
658        assert_eq!(retrieved.content, "Test experience for retrieval.");
659    }
660
661    // ── Shutdown & Restart tests ─────────────────────────────────────
662
663    #[test]
664    fn test_shutdown_sets_flag() {
665        let dir = tempfile::tempdir().unwrap();
666        let path = dir.path().join("test.db");
667        let hive = HiveMind::builder().substrate_path(&path).build().unwrap();
668
669        assert!(!hive.is_shutdown());
670        hive.shutdown();
671        assert!(hive.is_shutdown());
672    }
673
674    #[test]
675    fn test_drop_sets_shutdown_flag() {
676        let dir = tempfile::tempdir().unwrap();
677        let path = dir.path().join("test.db");
678        let hive = HiveMind::builder().substrate_path(&path).build().unwrap();
679        let shutdown = Arc::clone(&hive.shutdown);
680
681        assert!(!shutdown.load(Ordering::Relaxed));
682        drop(hive);
683        assert!(shutdown.load(Ordering::Relaxed));
684    }
685
686    #[tokio::test]
687    async fn test_redeploy_empty_is_noop() {
688        let (hive, _cid) = build_hive_with_collective().await;
689        let task = Task::new("test");
690        assert!(hive.redeploy(vec![], task).await.is_ok());
691    }
692}