thread_share/
worker_manager.rs

1//! # Worker Manager Module
2//!
3//! This module provides the `WorkerManager` struct for controlling spawned threads.
4//! It allows you to manage individual workers: pause, resume, remove, and monitor them.
5//!
6//! ## Overview
7//!
8//! `WorkerManager` is designed to work with the `spawn_workers!` macro and provides
9//! fine-grained control over thread management. It's particularly useful for:
10//!
11//! - **Dynamic worker management**: Add/remove workers at runtime
12//! - **Worker state control**: Pause/resume individual workers
13//! - **Monitoring**: Track worker status and count
14//! - **Synchronization**: Wait for all workers to complete
15//!
16//! ## Key Features
17//!
18//! - 🔄 **Dynamic Worker Management**: Add/remove workers programmatically
19//! - ⏸️ **State Control**: Pause/resume individual workers
20//! - 📊 **Real-time Monitoring**: Track worker status and count
21//! - 🔒 **Thread Safety**: All operations are thread-safe
22//! - 🎮 **Fine-grained Control**: Manage each worker individually
23//! - 📈 **Scalability**: Handle hundreds of workers efficiently
24//!
25//! ## Basic Usage
26//!
27//! ```rust
28//! use thread_share::{enhanced_share, spawn_workers};
29//!
30//! // Create shared data
31//! let data = enhanced_share!(0u32);
32//!
33//! // Spawn workers and get manager
34//! let manager = spawn_workers!(data, {
35//!     counter: |data| {
36//!         for i in 1..=10 {
37//!             data.update(|x| *x += i);
38//!             std::thread::sleep(std::time::Duration::from_millis(100));
39//!         }
40//!     },
41//!     monitor: |data| {
42//!         for _ in 0..5 {
43//!             std::thread::sleep(std::time::Duration::from_millis(200));
44//!             println!("Value: {}", data.get());
45//!         }
46//!     }
47//! });
48//!
49//! // Control workers
50//! println!("Active workers: {}", manager.active_workers());
51//! println!("Worker names: {:?}", manager.get_worker_names());
52//!
53//! // Wait for completion
54//! manager.join_all().expect("Workers failed");
55//! ```
56//!
57//! ## Advanced Usage with Programmatic Worker Addition
58//!
59//! ```rust
60//! use thread_share::{enhanced_share, spawn_workers, worker_manager::WorkerManager};
61//! use std::thread;
62//!
63//! let data = enhanced_share!(0u32);
64//!
65//! // Start with initial workers
66//! let manager = spawn_workers!(data, {
67//!     initial_worker: |data| {
68//!         data.update(|x| *x += 1);
69//!         std::thread::sleep(std::time::Duration::from_millis(100));
70//!     }
71//! });
72//!
73//! // Add more workers programmatically
74//! let data_clone = data.clone();
75//! let handle = thread::spawn(move || {
76//!     for _ in 0..3 {
77//!         data_clone.update(|x| *x *= 2);
78//!         std::thread::sleep(std::time::Duration::from_millis(150));
79//!     }
80//! });
81//!
82//! manager.add_worker("dynamic_worker", handle).expect("Failed to add worker");
83//!
84//! // Now we have 2 workers
85//! assert_eq!(manager.active_workers(), 2);
86//! ```
87//!
88//! ## Worker Lifecycle Management
89//!
90//! ```rust
91//! use thread_share::{enhanced_share, spawn_workers};
92//!
93//! let data = enhanced_share!(0u32);
94//! let manager = spawn_workers!(data, {
95//!     worker1: |data| { /* work */ },
96//!     worker2: |data| { /* work */ }
97//! });
98//!
99//! // Pause a worker
100//! manager.pause_worker("worker1").expect("Failed to pause");
101//!
102//! // Check if paused
103//! assert!(manager.is_worker_paused("worker1"));
104//!
105//! // Resume a worker
106//! manager.resume_worker("worker1").expect("Failed to resume");
107//!
108//! // Remove from tracking
109//! manager.remove_worker("worker2").expect("Failed to remove");
110//!
111//! // Remove all workers
112//! manager.remove_all_workers().expect("Failed to remove all");
113//! ```
114//!
115//! ## Error Handling
116//!
117//! All methods return `Result<T, String>` for proper error handling:
118//!
119//! ```rust
120//! use thread_share::{enhanced_share, spawn_workers};
121//!
122//! let data = enhanced_share!(0u32);
123//! let manager = spawn_workers!(data, {
124//!     worker: |data| { /* work */ }
125//! });
126//!
127//! // Handle errors gracefully
128//! match manager.pause_worker("nonexistent") {
129//!     Ok(()) => println!("Worker paused successfully"),
130//!     Err(e) => println!("Failed to pause worker: {}", e),
131//! }
132//!
133//! // Remove worker and handle result
134//! match manager.remove_worker("worker") {
135//!     Ok(()) => println!("Worker removed successfully"),
136//!     Err(e) => println!("Failed to remove worker: {}", e),
137//! }
138//! ```
139//!
140//! ## Thread Safety
141//!
142//! `WorkerManager` is designed to be thread-safe and can be shared between threads:
143//!
144//! ```rust
145//! use thread_share::{enhanced_share, spawn_workers};
146//! use std::thread;
147//! use std::sync::Arc;
148//!
149//! let data = enhanced_share!(0u32);
150//! let manager = spawn_workers!(data, {
151//!     worker: |data| { /* work */ }
152//! });
153//!
154//! // Clone manager for another thread
155//! let manager_clone = manager.clone();
156//! let thread_handle = thread::spawn(move || {
157//!     // Use manager in another thread
158//!     let names = manager_clone.get_worker_names();
159//!     println!("Worker names from thread: {:?}", names);
160//! });
161//!
162//! thread_handle.join().expect("Thread failed");
163//! ```
164//!
165//! ## Performance Considerations
166//!
167//! - **Thread Spawning**: Minimal overhead over standard `thread::spawn`
168//! - **Worker Management**: Constant-time operations for most management functions
169//! - **Memory Usage**: Small overhead for worker tracking structures
170//! - **Scalability**: Efficient for up to hundreds of workers
171//!
172//! ## When to Use WorkerManager
173//!
174//! - **Complex Applications**: When you need fine-grained control over workers
175//! - **Dynamic Workloads**: When worker count changes at runtime
176//! - **Monitoring Requirements**: When you need real-time worker status
177//! - **Production Systems**: When you need robust worker management
178//! - **Debugging**: When you need to pause/resume workers for debugging
179
180use std::collections::HashMap;
181use std::sync::{Arc, Mutex};
182use std::thread;
183
184/// Worker Manager for controlling spawned threads
185///
186/// This struct provides methods to control individual workers:
187/// - Pause/resume specific workers
188/// - Remove workers from tracking
189/// - Monitor worker status
190/// - Add new workers programmatically
191///
192/// ## Creation
193///
194/// `WorkerManager` is typically created by the `spawn_workers!` macro:
195///
196/// ```rust
197/// use thread_share::{enhanced_share, spawn_workers};
198///
199/// let data = enhanced_share!(0u32);
200/// let manager = spawn_workers!(data, {
201///     worker1: |data| { /* work */ },
202///     worker2: |data| { /* work */ }
203/// });
204/// ```
205///
206/// You can also create it directly:
207///
208/// ```rust
209/// use thread_share::worker_manager::WorkerManager;
210/// use std::sync::{Arc, Mutex};
211/// use std::collections::HashMap;
212///
213/// let threads = Arc::new(Mutex::new(HashMap::new()));
214/// let manager = WorkerManager::new(threads);
215/// ```
216///
217/// ## Thread Safety
218///
219/// `WorkerManager` implements `Clone` and can be safely shared between threads.
220/// All operations are thread-safe and use proper locking mechanisms.
221///
222/// ## Example: Complete Worker Lifecycle
223///
224/// ```rust
225/// use thread_share::{enhanced_share, spawn_workers};
226/// use std::thread;
227/// use std::time::Duration;
228///
229/// let data = enhanced_share!(0u32);
230///
231/// // Start initial workers
232/// let manager = spawn_workers!(data, {
233///     counter: |data| {
234///         for i in 1..=5 {
235///             data.update(|x| *x += i);
236///             thread::sleep(Duration::from_millis(100));
237///         }
238///     }
239/// });
240///
241/// // Add worker programmatically
242/// let data_clone = data.clone();
243/// let handle = thread::spawn(move || {
244///     for _ in 0..3 {
245///         data_clone.update(|x| *x *= 2);
246///         thread::sleep(Duration::from_millis(150));
247///     }
248/// });
249///
250/// manager.add_worker("multiplier", handle).expect("Failed to add worker");
251///
252/// // Monitor workers
253/// println!("Active workers: {}", manager.active_workers());
254/// println!("Worker names: {:?}", manager.get_worker_names());
255///
256/// // Control workers
257/// manager.pause_worker("counter").expect("Failed to pause");
258/// thread::sleep(Duration::from_millis(200));
259/// manager.resume_worker("counter").expect("Failed to resume");
260///
261/// // Wait for completion
262/// manager.join_all().expect("Workers failed");
263///
264/// println!("Final value: {}", data.get());
265/// ```
266pub struct WorkerManager {
267    threads: Arc<Mutex<HashMap<String, thread::JoinHandle<()>>>>,
268    paused_workers: Arc<Mutex<HashMap<String, bool>>>,
269}
270
271impl WorkerManager {
272    /// Creates a new WorkerManager
273    ///
274    /// ## Arguments
275    ///
276    /// * `threads` - Arc<Mutex<HashMap<String, JoinHandle<()>>>> containing thread handles
277    ///
278    /// ## Example
279    ///
280    /// ```rust
281    /// use thread_share::worker_manager::WorkerManager;
282    /// use std::sync::{Arc, Mutex};
283    /// use std::collections::HashMap;
284    ///
285    /// let threads = Arc::new(Mutex::new(HashMap::new()));
286    /// let manager = WorkerManager::new(threads);
287    /// ```
288    pub fn new(threads: Arc<Mutex<HashMap<String, thread::JoinHandle<()>>>>) -> Self {
289        Self {
290            threads,
291            paused_workers: Arc::new(Mutex::new(HashMap::new())),
292        }
293    }
294
295    /// Adds a new worker to the manager
296    ///
297    /// This method allows you to add workers programmatically after the manager is created.
298    /// The worker will be tracked and can be managed like any other worker.
299    ///
300    /// ## Arguments
301    ///
302    /// * `name` - A descriptive name for the worker
303    /// * `handle` - The JoinHandle of the spawned thread
304    ///
305    /// ## Returns
306    ///
307    /// `Ok(())` on success, `Err(String)` if a worker with the same name already exists.
308    ///
309    /// ## Example
310    ///
311    /// ```rust
312    /// use thread_share::worker_manager::WorkerManager;
313    /// use std::sync::{Arc, Mutex};
314    /// use std::collections::HashMap;
315    /// use std::thread;
316    ///
317    /// let threads = Arc::new(Mutex::new(HashMap::new()));
318    /// let manager = WorkerManager::new(threads.clone());
319    ///
320    /// // Spawn a thread manually
321    /// let handle = thread::spawn(|| {
322    ///     println!("Worker doing work...");
323    /// });
324    ///
325    /// // Add it to the manager
326    /// manager.add_worker("manual_worker", handle).expect("Failed to add worker");
327    /// ```
328    pub fn add_worker(&self, name: &str, handle: thread::JoinHandle<()>) -> Result<(), String> {
329        let mut threads = self.threads.lock().unwrap();
330        
331        if threads.contains_key(name) {
332            return Err(format!("Worker '{}' already exists", name));
333        }
334        
335        threads.insert(name.to_string(), handle);
336        println!("Worker '{}' added to manager", name);
337        Ok(())
338    }
339
340    /// Pauses a specific worker by name
341    ///
342    /// Note: This is a placeholder for future implementation.
343    /// Currently, Rust doesn't support pausing threads directly.
344    ///
345    /// ## Arguments
346    ///
347    /// * `name` - The name of the worker to pause
348    ///
349    /// ## Returns
350    ///
351    /// `Ok(())` on success, `Err(String)` if the worker doesn't exist
352    ///
353    /// ## Example
354    ///
355    /// ```rust
356    /// use thread_share::{enhanced_share, spawn_workers};
357    ///
358    /// let data = enhanced_share!(0u32);
359    /// let manager = spawn_workers!(data, {
360    ///     worker: |data| { /* work */ }
361    /// });
362    ///
363    /// // Pause the worker
364    /// manager.pause_worker("worker").expect("Failed to pause");
365    /// ```
366    pub fn pause_worker(&self, name: &str) -> Result<(), String> {
367        let mut paused = self.paused_workers.lock().unwrap();
368        paused.insert(name.to_string(), true);
369        println!("Worker '{}' marked for pause (implementation pending)", name);
370        Ok(())
371    }
372
373    /// Resumes a specific worker by name
374    ///
375    /// ## Arguments
376    ///
377    /// * `name` - The name of the worker to resume
378    ///
379    /// ## Returns
380    ///
381    /// `Ok(())` on success, `Err(String)` if the worker doesn't exist
382    ///
383    /// ## Example
384    ///
385    /// ```rust
386    /// use thread_share::{enhanced_share, spawn_workers};
387    ///
388    /// let data = enhanced_share!(0u32);
389    /// let manager = spawn_workers!(data, {
390    ///     worker: |data| { /* work */ }
391    /// });
392    ///
393    /// // Pause then resume
394    /// manager.pause_worker("worker").expect("Failed to pause");
395    /// manager.resume_worker("worker").expect("Failed to resume");
396    /// ```
397    pub fn resume_worker(&self, name: &str) -> Result<(), String> {
398        let mut paused = self.paused_workers.lock().unwrap();
399        paused.remove(name);
400        println!("Worker '{}' resumed", name);
401        Ok(())
402    }
403
404    /// Removes a worker from tracking without stopping it
405    ///
406    /// This method removes the worker from the manager's tracking but doesn't
407    /// actually stop the thread. The thread will continue running until it
408    /// completes naturally.
409    ///
410    /// ## Arguments
411    ///
412    /// * `name` - The name of the worker to remove
413    ///
414    /// ## Returns
415    ///
416    /// `Ok(())` on success, `Err(String)` if the worker doesn't exist
417    ///
418    /// ## Example
419    ///
420    /// ```rust
421    /// use thread_share::{enhanced_share, spawn_workers};
422    ///
423    /// let data = enhanced_share!(0u32);
424    /// let manager = spawn_workers!(data, {
425    ///     worker: |data| { /* work */ }
426    /// });
427    ///
428    /// // Remove from tracking
429    /// manager.remove_worker("worker").expect("Failed to remove");
430    /// ```
431    pub fn remove_worker(&self, name: &str) -> Result<(), String> {
432        let mut threads = self.threads.lock().unwrap();
433        if threads.remove(name).is_some() {
434            println!("Worker '{}' removed from tracking", name);
435            Ok(())
436        } else {
437            Err(format!("Worker '{}' not found", name))
438        }
439    }
440
441    /// Removes all workers from tracking without stopping them
442    ///
443    /// This method removes all workers from the manager's tracking but doesn't
444    /// actually stop the threads. The threads will continue running until they
445    /// complete naturally.
446    ///
447    /// ## Returns
448    ///
449    /// `Ok(())` on success
450    ///
451    /// ## Example
452    ///
453    /// ```rust
454    /// use thread_share::{enhanced_share, spawn_workers};
455    ///
456    /// let data = enhanced_share!(0u32);
457    /// let manager = spawn_workers!(data, {
458    ///     worker1: |data| { /* work */ },
459    ///     worker2: |data| { /* work */ }
460    /// });
461    ///
462    /// // Remove all workers from tracking
463    /// manager.remove_all_workers().expect("Failed to remove all workers");
464    /// ```
465    pub fn remove_all_workers(&self) -> Result<(), String> {
466        let mut threads = self.threads.lock().unwrap();
467        let count = threads.len();
468        threads.clear();
469        println!("Removed {} workers from tracking", count);
470        Ok(())
471    }
472
473    /// Gets the list of all worker names
474    ///
475    /// ## Returns
476    ///
477    /// A `Vec<String>` containing all worker names
478    ///
479    /// ## Example
480    ///
481    /// ```rust
482    /// use thread_share::{enhanced_share, spawn_workers};
483    ///
484    /// let data = enhanced_share!(0u32);
485    /// let manager = spawn_workers!(data, {
486    ///     counter: |data| { /* work */ },
487    ///     monitor: |data| { /* work */ }
488    /// });
489    ///
490    /// let names = manager.get_worker_names();
491    /// assert_eq!(names.len(), 2);
492    /// assert!(names.contains(&"counter".to_string()));
493    /// assert!(names.contains(&"monitor".to_string()));
494    /// ```
495    pub fn get_worker_names(&self) -> Vec<String> {
496        let threads = self.threads.lock().unwrap();
497        threads.keys().cloned().collect()
498    }
499
500    /// Gets the number of active workers
501    ///
502    /// ## Returns
503    ///
504    /// The number of workers currently being tracked
505    ///
506    /// ## Example
507    ///
508    /// ```rust
509    /// use thread_share::{enhanced_share, spawn_workers};
510    ///
511    /// let data = enhanced_share!(0u32);
512    /// let manager = spawn_workers!(data, {
513    ///     worker1: |data| { /* work */ },
514    ///     worker2: |data| { /* work */ }
515    /// });
516    ///
517    /// assert_eq!(manager.active_workers(), 2);
518    /// ```
519    pub fn active_workers(&self) -> usize {
520        let threads = self.threads.lock().unwrap();
521        threads.len()
522    }
523
524    /// Checks if a specific worker is paused
525    ///
526    /// ## Arguments
527    ///
528    /// * `name` - The name of the worker to check
529    ///
530    /// ## Returns
531    ///
532    /// `true` if the worker is paused, `false` otherwise
533    ///
534    /// ## Example
535    ///
536    /// ```rust
537    /// use thread_share::{enhanced_share, spawn_workers};
538    ///
539    /// let data = enhanced_share!(0u32);
540    /// let manager = spawn_workers!(data, {
541    ///     worker: |data| { /* work */ }
542    /// });
543    ///
544    /// // Initially not paused
545    /// assert!(!manager.is_worker_paused("worker"));
546    ///
547    /// // Pause the worker
548    /// manager.pause_worker("worker").expect("Failed to pause");
549    /// assert!(manager.is_worker_paused("worker"));
550    ///
551    /// // Resume the worker
552    /// manager.resume_worker("worker").expect("Failed to resume");
553    /// assert!(!manager.is_worker_paused("worker"));
554    /// ```
555    pub fn is_worker_paused(&self, name: &str) -> bool {
556        let paused = self.paused_workers.lock().unwrap();
557        paused.contains_key(name)
558    }
559
560    /// Waits for all workers to complete
561    ///
562    /// This method blocks until all tracked workers have completed.
563    /// It removes all workers from tracking after they complete.
564    ///
565    /// ## Returns
566    ///
567    /// `Ok(())` if all workers completed successfully, `Err(String)` if any worker failed
568    ///
569    /// ## Example
570    ///
571    /// ```rust
572    /// use thread_share::{enhanced_share, spawn_workers};
573    /// use std::thread;
574    /// use std::time::Duration;
575    ///
576    /// let data = enhanced_share!(0u32);
577    /// let manager = spawn_workers!(data, {
578    ///     worker: |data| {
579    ///         thread::sleep(Duration::from_millis(100));
580    ///         data.update(|x| *x += 1);
581    ///     }
582    /// });
583    ///
584    /// // Wait for completion
585    /// manager.join_all().expect("Workers failed");
586    ///
587    /// // All workers are now completed and removed
588    /// assert_eq!(manager.active_workers(), 0);
589    /// ```
590    pub fn join_all(&self) -> Result<(), String> {
591        let mut threads = self.threads.lock().unwrap();
592        let thread_handles: Vec<_> = threads.drain().collect();
593        drop(threads);
594
595        for (name, handle) in thread_handles {
596            let result = handle.join();
597            if let Err(e) = result {
598                return Err(format!("Worker '{}' failed: {:?}", name, e));
599            }
600        }
601        Ok(())
602    }
603}
604
605impl Clone for WorkerManager {
606    /// Creates a clone of the WorkerManager
607    ///
608    /// The cloned manager shares the same underlying thread tracking data,
609    /// so operations on one clone will affect the other.
610    ///
611    /// ## Example
612    ///
613    /// ```rust
614    /// use thread_share::{enhanced_share, spawn_workers};
615    ///
616    /// let data = enhanced_share!(0u32);
617    /// let manager = spawn_workers!(data, {
618    ///     worker: |data| { /* work */ }
619    /// });
620    ///
621    /// // Clone the manager
622    /// let manager_clone = manager.clone();
623    ///
624    /// // Both managers track the same workers
625    /// assert_eq!(manager.active_workers(), manager_clone.active_workers());
626    /// ```
627    fn clone(&self) -> Self {
628        Self {
629            threads: self.threads.clone(),
630            paused_workers: self.paused_workers.clone(),
631        }
632    }
633}