thread_share/
worker_manager.rs

1//! # Worker Manager Module
2//!
3//! This module provides the `WorkerManager` struct for controlling spawned threads.
4//! It allows you to manage individual workers: pause, resume, remove, and monitor them.
5//!
6//! ## Overview
7//!
8//! `WorkerManager` is designed to work with the `spawn_workers!` macro and provides
9//! fine-grained control over thread management. It's particularly useful for:
10//!
11//! - **Dynamic worker management**: Add/remove workers at runtime
12//! - **Worker state control**: Pause/resume individual workers
13//! - **Monitoring**: Track worker status and count
14//! - **Synchronization**: Wait for all workers to complete
15//!
16//! ## Key Features
17//!
18//! - 🔄 **Dynamic Worker Management**: Add/remove workers programmatically
19//! - ⏸️ **State Control**: Pause/resume individual workers
20//! - 📊 **Real-time Monitoring**: Track worker status and count
21//! - 🔒 **Thread Safety**: All operations are thread-safe
22//! - 🎮 **Fine-grained Control**: Manage each worker individually
23//! - 📈 **Scalability**: Handle hundreds of workers efficiently
24//!
25//! ## Basic Usage
26//!
27//! ```rust
28//! use thread_share::{enhanced_share, spawn_workers};
29//!
30//! // Create shared data
31//! let data = enhanced_share!(0u32);
32//!
33//! // Spawn workers and get manager
34//! let manager = spawn_workers!(data, {
35//!     counter: |data| {
36//!         for i in 1..=10 {
37//!             data.update(|x| *x += i);
38//!             std::thread::sleep(std::time::Duration::from_millis(100));
39//!         }
40//!     },
41//!     monitor: |data| {
42//!         for _ in 0..5 {
43//!             std::thread::sleep(std::time::Duration::from_millis(200));
44//!             println!("Value: {}", data.get());
45//!         }
46//!     }
47//! });
48//!
49//! // Control workers
50//! println!("Active workers: {}", manager.active_workers());
51//! println!("Worker names: {:?}", manager.get_worker_names());
52//!
53//! // Wait for completion
54//! manager.join_all().expect("Workers failed");
55//! ```
56//!
57//! ## Advanced Usage with Programmatic Worker Addition
58//!
59//! ```rust
60//! use thread_share::{enhanced_share, spawn_workers, worker_manager::WorkerManager};
61//! use std::thread;
62//!
63//! let data = enhanced_share!(0u32);
64//!
65//! // Start with initial workers
66//! let manager = spawn_workers!(data, {
67//!     initial_worker: |data| {
68//!         data.update(|x| *x += 1);
69//!         std::thread::sleep(std::time::Duration::from_millis(100));
70//!     }
71//! });
72//!
73//! // Add more workers programmatically
74//! let data_clone = data.clone();
75//! let handle = thread::spawn(move || {
76//!     for _ in 0..3 {
77//!         data_clone.update(|x| *x *= 2);
78//!         std::thread::sleep(std::time::Duration::from_millis(150));
79//!     }
80//! });
81//!
82//! manager.add_worker("dynamic_worker", handle).expect("Failed to add worker");
83//!
84//! // Now we have 2 workers
85//! assert_eq!(manager.active_workers(), 2);
86//! ```
87//!
88//! ## Creating Empty Manager and Adding Workers
89//!
90//! ```rust
91//! use thread_share::worker_manager::WorkerManager;
92//! use std::thread;
93//! use std::time::Duration;
94//!
95//! // Create empty manager
96//! let manager = WorkerManager::new();
97//!
98//! // Add workers one by one
99//! let handle1 = thread::spawn(|| {
100//!     for i in 1..=5 {
101//!         println!("Worker 1: {}", i);
102//!         thread::sleep(Duration::from_millis(100));
103//!     }
104//! });
105//!
106//! let handle2 = thread::spawn(|| {
107//!     for i in 1..=3 {
108//!         println!("Worker 2: {}", i);
109//!         thread::sleep(Duration::from_millis(150));
110//!     }
111//! });
112//!
113//! manager.add_worker("worker1", handle1).expect("Failed to add worker1");
114//! manager.add_worker("worker2", handle2).expect("Failed to add worker2");
115//!
116//! assert_eq!(manager.active_workers(), 2);
117//! println!("Worker names: {:?}", manager.get_worker_names());
118//!
119//! // Wait for completion
120//! manager.join_all().expect("Workers failed");
121//! ```
122//!
123//! ## Worker Lifecycle Management
124//!
125//! ```rust
126//! use thread_share::{enhanced_share, spawn_workers};
127//!
128//! let data = enhanced_share!(0u32);
129//! let manager = spawn_workers!(data, {
130//!     worker1: |data| { /* work */ },
131//!     worker2: |data| { /* work */ }
132//! });
133//!
134//! // Pause a worker
135//! manager.pause_worker("worker1").expect("Failed to pause");
136//!
137//! // Check if paused
138//! assert!(manager.is_worker_paused("worker1"));
139//!
140//! // Resume a worker
141//! manager.resume_worker("worker1").expect("Failed to resume");
142//!
143//! // Remove from tracking
144//! manager.remove_worker("worker2").expect("Failed to remove");
145//!
146//! // Remove all workers
147//! manager.remove_all_workers().expect("Failed to remove all");
148//! ```
149//!
150//! ## Error Handling
151//!
152//! All methods return `Result<T, String>` for proper error handling:
153//!
154//! ```rust
155//! use thread_share::{enhanced_share, spawn_workers};
156//!
157//! let data = enhanced_share!(0u32);
158//! let manager = spawn_workers!(data, {
159//!     worker: |data| { /* work */ }
160//! });
161//!
162//! // Handle errors gracefully
163//! match manager.pause_worker("nonexistent") {
164//!     Ok(()) => println!("Worker paused successfully"),
165//!     Err(e) => println!("Failed to pause worker: {}", e),
166//! }
167//!
168//! // Remove worker and handle result
169//! match manager.remove_worker("worker") {
170//!     Ok(()) => println!("Worker removed successfully"),
171//!     Err(e) => println!("Failed to remove worker: {}", e),
172//! }
173//! ```
174//!
175//! ## Thread Safety
176//!
177//! `WorkerManager` is designed to be thread-safe and can be shared between threads:
178//!
179//! ```rust
180//! use thread_share::{enhanced_share, spawn_workers};
181//! use std::thread;
182//! use std::sync::Arc;
183//!
184//! let data = enhanced_share!(0u32);
185//! let manager = spawn_workers!(data, {
186//!     worker: |data| { /* work */ }
187//! });
188//!
189//! // Clone manager for another thread
190//! let manager_clone = manager.clone();
191//! let thread_handle = thread::spawn(move || {
192//!     // Use manager in another thread
193//!     let names = manager_clone.get_worker_names();
194//!     println!("Worker names from thread: {:?}", names);
195//! });
196//!
197//! thread_handle.join().expect("Thread failed");
198//! ```
199//!
200//! ## Performance Considerations
201//!
202//! - **Thread Spawning**: Minimal overhead over standard `thread::spawn`
203//! - **Worker Management**: Constant-time operations for most management functions
204//! - **Memory Usage**: Small overhead for worker tracking structures
205//! - **Scalability**: Efficient for up to hundreds of workers
206//!
207//! ## When to Use WorkerManager
208//!
209//! - **Complex Applications**: When you need fine-grained control over workers
210//! - **Dynamic Workloads**: When worker count changes at runtime
211//! - **Monitoring Requirements**: When you need real-time worker status
212//! - **Production Systems**: When you need robust worker management
213//! - **Debugging**: When you need to pause/resume workers for debugging
214
215use std::collections::HashMap;
216use std::sync::{Arc, Mutex};
217use std::thread;
218
219/// Worker Manager for controlling spawned threads
220///
221/// This struct provides methods to control individual workers:
222/// - Pause/resume specific workers
223/// - Remove workers from tracking
224/// - Monitor worker status
225/// - Add new workers programmatically
226///
227/// ## Creation
228///
229/// `WorkerManager` is typically created by the `spawn_workers!` macro:
230///
231/// ```rust
232/// use thread_share::{enhanced_share, spawn_workers};
233///
234/// let data = enhanced_share!(0u32);
235/// let manager = spawn_workers!(data, {
236///     worker1: |data| { /* work */ },
237///     worker2: |data| { /* work */ }
238/// });
239/// ```
240///
241/// You can also create it directly:
242///
243/// ```rust
244/// use thread_share::worker_manager::WorkerManager;
245/// use std::sync::{Arc, Mutex};
246/// use std::collections::HashMap;
247///
248/// let threads = Arc::new(Mutex::new(HashMap::new()));
249/// let manager = WorkerManager::new_with_threads(threads);
250/// ```
251///
252/// Or create an empty manager and add workers later:
253///
254/// ```rust
255/// use thread_share::worker_manager::WorkerManager;
256/// use std::thread;
257///
258/// // Create empty manager
259/// let manager = WorkerManager::new();
260///
261/// // Add workers as needed
262/// let handle = thread::spawn(|| { /* work */ });
263/// manager.add_worker("worker", handle).expect("Failed to add worker");
264/// ```
265///
266/// ## Thread Safety
267///
268/// `WorkerManager` implements `Clone` and can be safely shared between threads.
269/// All operations are thread-safe and use proper locking mechanisms.
270///
271/// ## Example: Complete Worker Lifecycle
272///
273/// ```rust
274/// use thread_share::{enhanced_share, spawn_workers};
275/// use std::thread;
276/// use std::time::Duration;
277///
278/// let data = enhanced_share!(0u32);
279///
280/// // Start initial workers
281/// let manager = spawn_workers!(data, {
282///     counter: |data| {
283///         for i in 1..=5 {
284///             data.update(|x| *x += i);
285///             thread::sleep(Duration::from_millis(100));
286///         }
287///     }
288/// });
289///
290/// // Add worker programmatically
291/// let data_clone = data.clone();
292/// let handle = thread::spawn(move || {
293///     for _ in 0..3 {
294///         data_clone.update(|x| *x *= 2);
295///         thread::sleep(Duration::from_millis(150));
296///     }
297/// });
298///
299/// manager.add_worker("multiplier", handle).expect("Failed to add worker");
300///
301/// // Monitor workers
302/// println!("Active workers: {}", manager.active_workers());
303/// println!("Worker names: {:?}", manager.get_worker_names());
304///
305/// // Control workers
306/// manager.pause_worker("counter").expect("Failed to pause");
307/// thread::sleep(Duration::from_millis(200));
308/// manager.resume_worker("counter").expect("Failed to resume");
309///
310/// // Wait for completion
311/// manager.join_all().expect("Workers failed");
312///
313/// println!("Final value: {}", data.get());
314/// ```
315pub struct WorkerManager {
316    threads: Arc<Mutex<HashMap<String, thread::JoinHandle<()>>>>,
317    paused_workers: Arc<Mutex<HashMap<String, bool>>>,
318}
319
320impl WorkerManager {
321    /// Creates a new empty WorkerManager
322    ///
323    /// This method creates a WorkerManager with an empty thread tracking structure.
324    /// Useful when you want to create a manager first and add workers later.
325    ///
326    /// ## Example
327    ///
328    /// ```rust
329    /// use thread_share::worker_manager::WorkerManager;
330    /// use std::thread;
331    ///
332    /// // Create empty manager
333    /// let manager = WorkerManager::new();
334    ///
335    /// // Add workers programmatically
336    /// let handle = thread::spawn(|| {
337    ///     println!("Worker doing work...");
338    /// });
339    ///
340    /// manager.add_worker("worker1", handle).expect("Failed to add worker");
341    /// assert_eq!(manager.active_workers(), 1);
342    /// ```
343    pub fn new() -> Self {
344        Self {
345            threads: Arc::new(Mutex::new(HashMap::new())),
346            paused_workers: Arc::new(Mutex::new(HashMap::new())),
347        }
348    }
349
350    /// Creates a new WorkerManager with existing thread handles
351    ///
352    /// ## Arguments
353    ///
354    /// * `threads` - Arc<Mutex<HashMap<String, JoinHandle<()>>>> containing thread handles
355    ///
356    /// ## Example
357    ///
358    /// ```rust
359    /// use thread_share::worker_manager::WorkerManager;
360    /// use std::sync::{Arc, Mutex};
361    /// use std::collections::HashMap;
362    ///
363    /// let threads = Arc::new(Mutex::new(HashMap::new()));
364    /// let manager = WorkerManager::new_with_threads(threads);
365    /// ```
366    pub fn new_with_threads(threads: Arc<Mutex<HashMap<String, thread::JoinHandle<()>>>>) -> Self {
367        Self {
368            threads,
369            paused_workers: Arc::new(Mutex::new(HashMap::new())),
370        }
371    }
372
373    /// Adds a new worker to the manager
374    ///
375    /// This method allows you to add workers programmatically after the manager is created.
376    /// The worker will be tracked and can be managed like any other worker.
377    ///
378    /// ## Arguments
379    ///
380    /// * `name` - A descriptive name for the worker
381    /// * `handle` - The JoinHandle of the spawned thread
382    ///
383    /// ## Returns
384    ///
385    /// `Ok(())` on success, `Err(String)` if a worker with the same name already exists.
386    ///
387    /// ## Example
388    ///
389    /// ```rust
390    /// use thread_share::worker_manager::WorkerManager;
391    /// use std::sync::{Arc, Mutex};
392    /// use std::collections::HashMap;
393    /// use std::thread;
394    ///
395    /// let threads = Arc::new(Mutex::new(HashMap::new()));
396    /// let manager = WorkerManager::new_with_threads(threads.clone());
397    ///
398    /// // Spawn a thread manually
399    /// let handle = thread::spawn(|| {
400    ///     println!("Worker doing work...");
401    /// });
402    ///
403    /// // Add it to the manager
404    /// manager.add_worker("manual_worker", handle).expect("Failed to add worker");
405    /// ```
406    pub fn add_worker(&self, name: &str, handle: thread::JoinHandle<()>) -> Result<(), String> {
407        let mut threads = self.threads.lock().unwrap();
408        
409        if threads.contains_key(name) {
410            return Err(format!("Worker '{}' already exists", name));
411        }
412        
413        threads.insert(name.to_string(), handle);
414        println!("Worker '{}' added to manager", name);
415        Ok(())
416    }
417
418    /// Pauses a specific worker by name
419    ///
420    /// Note: This is a placeholder for future implementation.
421    /// Currently, Rust doesn't support pausing threads directly.
422    ///
423    /// ## Arguments
424    ///
425    /// * `name` - The name of the worker to pause
426    ///
427    /// ## Returns
428    ///
429    /// `Ok(())` on success, `Err(String)` if the worker doesn't exist
430    ///
431    /// ## Example
432    ///
433    /// ```rust
434    /// use thread_share::{enhanced_share, spawn_workers};
435    ///
436    /// let data = enhanced_share!(0u32);
437    /// let manager = spawn_workers!(data, {
438    ///     worker: |data| { /* work */ }
439    /// });
440    ///
441    /// // Pause the worker
442    /// manager.pause_worker("worker").expect("Failed to pause");
443    /// ```
444    pub fn pause_worker(&self, name: &str) -> Result<(), String> {
445        let mut paused = self.paused_workers.lock().unwrap();
446        paused.insert(name.to_string(), true);
447        println!("Worker '{}' marked for pause (implementation pending)", name);
448        Ok(())
449    }
450
451    /// Resumes a specific worker by name
452    ///
453    /// ## Arguments
454    ///
455    /// * `name` - The name of the worker to resume
456    ///
457    /// ## Returns
458    ///
459    /// `Ok(())` on success, `Err(String)` if the worker doesn't exist
460    ///
461    /// ## Example
462    ///
463    /// ```rust
464    /// use thread_share::{enhanced_share, spawn_workers};
465    ///
466    /// let data = enhanced_share!(0u32);
467    /// let manager = spawn_workers!(data, {
468    ///     worker: |data| { /* work */ }
469    /// });
470    ///
471    /// // Pause then resume
472    /// manager.pause_worker("worker").expect("Failed to pause");
473    /// manager.resume_worker("worker").expect("Failed to resume");
474    /// ```
475    pub fn resume_worker(&self, name: &str) -> Result<(), String> {
476        let mut paused = self.paused_workers.lock().unwrap();
477        paused.remove(name);
478        println!("Worker '{}' resumed", name);
479        Ok(())
480    }
481
482    /// Removes a worker from tracking without stopping it
483    ///
484    /// This method removes the worker from the manager's tracking but doesn't
485    /// actually stop the thread. The thread will continue running until it
486    /// completes naturally.
487    ///
488    /// ## Arguments
489    ///
490    /// * `name` - The name of the worker to remove
491    ///
492    /// ## Returns
493    ///
494    /// `Ok(())` on success, `Err(String)` if the worker doesn't exist
495    ///
496    /// ## Example
497    ///
498    /// ```rust
499    /// use thread_share::{enhanced_share, spawn_workers};
500    ///
501    /// let data = enhanced_share!(0u32);
502    /// let manager = spawn_workers!(data, {
503    ///     worker: |data| { /* work */ }
504    /// });
505    ///
506    /// // Remove from tracking
507    /// manager.remove_worker("worker").expect("Failed to remove");
508    /// ```
509    pub fn remove_worker(&self, name: &str) -> Result<(), String> {
510        let mut threads = self.threads.lock().unwrap();
511        if threads.remove(name).is_some() {
512            println!("Worker '{}' removed from tracking", name);
513            Ok(())
514        } else {
515            Err(format!("Worker '{}' not found", name))
516        }
517    }
518
519    /// Removes all workers from tracking without stopping them
520    ///
521    /// This method removes all workers from the manager's tracking but doesn't
522    /// actually stop the threads. The threads will continue running until they
523    /// complete naturally.
524    ///
525    /// ## Returns
526    ///
527    /// `Ok(())` on success
528    ///
529    /// ## Example
530    ///
531    /// ```rust
532    /// use thread_share::{enhanced_share, spawn_workers};
533    ///
534    /// let data = enhanced_share!(0u32);
535    /// let manager = spawn_workers!(data, {
536    ///     worker1: |data| { /* work */ },
537    ///     worker2: |data| { /* work */ }
538    /// });
539    ///
540    /// // Remove all workers from tracking
541    /// manager.remove_all_workers().expect("Failed to remove all workers");
542    /// ```
543    pub fn remove_all_workers(&self) -> Result<(), String> {
544        let mut threads = self.threads.lock().unwrap();
545        let count = threads.len();
546        threads.clear();
547        println!("Removed {} workers from tracking", count);
548        Ok(())
549    }
550
551    /// Gets the list of all worker names
552    ///
553    /// ## Returns
554    ///
555    /// A `Vec<String>` containing all worker names
556    ///
557    /// ## Example
558    ///
559    /// ```rust
560    /// use thread_share::{enhanced_share, spawn_workers};
561    ///
562    /// let data = enhanced_share!(0u32);
563    /// let manager = spawn_workers!(data, {
564    ///     counter: |data| { /* work */ },
565    ///     monitor: |data| { /* work */ }
566    /// });
567    ///
568    /// let names = manager.get_worker_names();
569    /// assert_eq!(names.len(), 2);
570    /// assert!(names.contains(&"counter".to_string()));
571    /// assert!(names.contains(&"monitor".to_string()));
572    /// ```
573    pub fn get_worker_names(&self) -> Vec<String> {
574        let threads = self.threads.lock().unwrap();
575        threads.keys().cloned().collect()
576    }
577
578    /// Gets the number of active workers
579    ///
580    /// ## Returns
581    ///
582    /// The number of workers currently being tracked
583    ///
584    /// ## Example
585    ///
586    /// ```rust
587    /// use thread_share::{enhanced_share, spawn_workers};
588    ///
589    /// let data = enhanced_share!(0u32);
590    /// let manager = spawn_workers!(data, {
591    ///     worker1: |data| { /* work */ },
592    ///     worker2: |data| { /* work */ }
593    /// });
594    ///
595    /// assert_eq!(manager.active_workers(), 2);
596    /// ```
597    pub fn active_workers(&self) -> usize {
598        let threads = self.threads.lock().unwrap();
599        threads.len()
600    }
601
602    /// Checks if a specific worker is paused
603    ///
604    /// ## Arguments
605    ///
606    /// * `name` - The name of the worker to check
607    ///
608    /// ## Returns
609    ///
610    /// `true` if the worker is paused, `false` otherwise
611    ///
612    /// ## Example
613    ///
614    /// ```rust
615    /// use thread_share::{enhanced_share, spawn_workers};
616    ///
617    /// let data = enhanced_share!(0u32);
618    /// let manager = spawn_workers!(data, {
619    ///     worker: |data| { /* work */ }
620    /// });
621    ///
622    /// // Initially not paused
623    /// assert!(!manager.is_worker_paused("worker"));
624    ///
625    /// // Pause the worker
626    /// manager.pause_worker("worker").expect("Failed to pause");
627    /// assert!(manager.is_worker_paused("worker"));
628    ///
629    /// // Resume the worker
630    /// manager.resume_worker("worker").expect("Failed to resume");
631    /// assert!(!manager.is_worker_paused("worker"));
632    /// ```
633    pub fn is_worker_paused(&self, name: &str) -> bool {
634        let paused = self.paused_workers.lock().unwrap();
635        paused.contains_key(name)
636    }
637
638    /// Waits for all workers to complete
639    ///
640    /// This method blocks until all tracked workers have completed.
641    /// It removes all workers from tracking after they complete.
642    ///
643    /// ## Returns
644    ///
645    /// `Ok(())` if all workers completed successfully, `Err(String)` if any worker failed
646    ///
647    /// ## Example
648    ///
649    /// ```rust
650    /// use thread_share::{enhanced_share, spawn_workers};
651    /// use std::thread;
652    /// use std::time::Duration;
653    ///
654    /// let data = enhanced_share!(0u32);
655    /// let manager = spawn_workers!(data, {
656    ///     worker: |data| {
657    ///         thread::sleep(Duration::from_millis(100));
658    ///         data.update(|x| *x += 1);
659    ///     }
660    /// });
661    ///
662    /// // Wait for completion
663    /// manager.join_all().expect("Workers failed");
664    ///
665    /// // All workers are now completed and removed
666    /// assert_eq!(manager.active_workers(), 0);
667    /// ```
668    pub fn join_all(&self) -> Result<(), String> {
669        let mut threads = self.threads.lock().unwrap();
670        let thread_handles: Vec<_> = threads.drain().collect();
671        drop(threads);
672
673        for (name, handle) in thread_handles {
674            let result = handle.join();
675            if let Err(e) = result {
676                return Err(format!("Worker '{}' failed: {:?}", name, e));
677            }
678        }
679        Ok(())
680    }
681}
682
683impl Clone for WorkerManager {
684    /// Creates a clone of the WorkerManager
685    ///
686    /// The cloned manager shares the same underlying thread tracking data,
687    /// so operations on one clone will affect the other.
688    ///
689    /// ## Example
690    ///
691    /// ```rust
692    /// use thread_share::{enhanced_share, spawn_workers};
693    ///
694    /// let data = enhanced_share!(0u32);
695    /// let manager = spawn_workers!(data, {
696    ///     worker: |data| { /* work */ }
697    /// });
698    ///
699    /// // Clone the manager
700    /// let manager_clone = manager.clone();
701    ///
702    /// // Both managers track the same workers
703    /// assert_eq!(manager.active_workers(), manager_clone.active_workers());
704    /// ```
705    fn clone(&self) -> Self {
706        Self {
707            threads: self.threads.clone(),
708            paused_workers: self.paused_workers.clone(),
709        }
710    }
711}