thread_share/worker_manager.rs
1//! # Worker Manager Module
2//!
3//! This module provides the `WorkerManager` struct for controlling spawned threads.
4//! It allows you to manage individual workers: pause, resume, remove, and monitor them.
5//!
6//! ## Overview
7//!
8//! `WorkerManager` is designed to work with the `spawn_workers!` macro and provides
9//! fine-grained control over thread management. It's particularly useful for:
10//!
11//! - **Dynamic worker management**: Add/remove workers at runtime
12//! - **Worker state control**: Pause/resume individual workers
13//! - **Monitoring**: Track worker status and count
14//! - **Synchronization**: Wait for all workers to complete
15//!
16//! ## Key Features
17//!
18//! - 🔄 **Dynamic Worker Management**: Add/remove workers programmatically
19//! - ⏸️ **State Control**: Pause/resume individual workers
20//! - 📊 **Real-time Monitoring**: Track worker status and count
21//! - 🔒 **Thread Safety**: All operations are thread-safe
22//! - 🎮 **Fine-grained Control**: Manage each worker individually
23//! - 📈 **Scalability**: Handle hundreds of workers efficiently
24//!
25//! ## Basic Usage
26//!
27//! ```rust
28//! use thread_share::{enhanced_share, spawn_workers};
29//!
30//! // Create shared data
31//! let data = enhanced_share!(0u32);
32//!
33//! // Spawn workers and get manager
34//! let manager = spawn_workers!(data, {
35//! counter: |data| {
36//! for i in 1..=10 {
37//! data.update(|x| *x += i);
38//! std::thread::sleep(std::time::Duration::from_millis(100));
39//! }
40//! },
41//! monitor: |data| {
42//! for _ in 0..5 {
43//! std::thread::sleep(std::time::Duration::from_millis(200));
44//! println!("Value: {}", data.get());
45//! }
46//! }
47//! });
48//!
49//! // Control workers
50//! println!("Active workers: {}", manager.active_workers());
51//! println!("Worker names: {:?}", manager.get_worker_names());
52//!
53//! // Wait for completion
54//! manager.join_all().expect("Workers failed");
55//! ```
56//!
57//! ## Advanced Usage with Programmatic Worker Addition
58//!
59//! ```rust
60//! use thread_share::{enhanced_share, spawn_workers, worker_manager::WorkerManager};
61//! use std::thread;
62//!
63//! let data = enhanced_share!(0u32);
64//!
65//! // Start with initial workers
66//! let manager = spawn_workers!(data, {
67//! initial_worker: |data| {
68//! data.update(|x| *x += 1);
69//! std::thread::sleep(std::time::Duration::from_millis(100));
70//! }
71//! });
72//!
73//! // Add more workers programmatically
74//! let data_clone = data.clone();
75//! let handle = thread::spawn(move || {
76//! for _ in 0..3 {
77//! data_clone.update(|x| *x *= 2);
78//! std::thread::sleep(std::time::Duration::from_millis(150));
79//! }
80//! });
81//!
82//! manager.add_worker("dynamic_worker", handle).expect("Failed to add worker");
83//!
84//! // Now we have 2 workers
85//! assert_eq!(manager.active_workers(), 2);
86//! ```
87//!
88//! ## Creating Empty Manager and Adding Workers
89//!
90//! ```rust
91//! use thread_share::worker_manager::WorkerManager;
92//! use std::thread;
93//! use std::time::Duration;
94//!
95//! // Create empty manager
96//! let manager = WorkerManager::new();
97//!
98//! // Add workers one by one
99//! let handle1 = thread::spawn(|| {
100//! for i in 1..=5 {
101//! println!("Worker 1: {}", i);
102//! thread::sleep(Duration::from_millis(100));
103//! }
104//! });
105//!
106//! let handle2 = thread::spawn(|| {
107//! for i in 1..=3 {
108//! println!("Worker 2: {}", i);
109//! thread::sleep(Duration::from_millis(150));
110//! }
111//! });
112//!
113//! manager.add_worker("worker1", handle1).expect("Failed to add worker1");
114//! manager.add_worker("worker2", handle2).expect("Failed to add worker2");
115//!
116//! assert_eq!(manager.active_workers(), 2);
117//! println!("Worker names: {:?}", manager.get_worker_names());
118//!
119//! // Wait for completion
120//! manager.join_all().expect("Workers failed");
121//! ```
122//!
123//! ## Worker Lifecycle Management
124//!
125//! ```rust
126//! use thread_share::{enhanced_share, spawn_workers};
127//!
128//! let data = enhanced_share!(0u32);
129//! let manager = spawn_workers!(data, {
130//! worker1: |data| { /* work */ },
131//! worker2: |data| { /* work */ }
132//! });
133//!
134//! // Pause a worker
135//! manager.pause_worker("worker1").expect("Failed to pause");
136//!
137//! // Check if paused
138//! assert!(manager.is_worker_paused("worker1"));
139//!
140//! // Resume a worker
141//! manager.resume_worker("worker1").expect("Failed to resume");
142//!
143//! // Remove from tracking
144//! manager.remove_worker("worker2").expect("Failed to remove");
145//!
146//! // Remove all workers
147//! manager.remove_all_workers().expect("Failed to remove all");
148//! ```
149//!
150//! ## Error Handling
151//!
152//! All methods return `Result<T, String>` for proper error handling:
153//!
154//! ```rust
155//! use thread_share::{enhanced_share, spawn_workers};
156//!
157//! let data = enhanced_share!(0u32);
158//! let manager = spawn_workers!(data, {
159//! worker: |data| { /* work */ }
160//! });
161//!
162//! // Handle errors gracefully
163//! match manager.pause_worker("nonexistent") {
164//! Ok(()) => println!("Worker paused successfully"),
165//! Err(e) => println!("Failed to pause worker: {}", e),
166//! }
167//!
168//! // Remove worker and handle result
169//! match manager.remove_worker("worker") {
170//! Ok(()) => println!("Worker removed successfully"),
171//! Err(e) => println!("Failed to remove worker: {}", e),
172//! }
173//! ```
174//!
175//! ## Thread Safety
176//!
177//! `WorkerManager` is designed to be thread-safe and can be shared between threads:
178//!
179//! ```rust
180//! use thread_share::{enhanced_share, spawn_workers};
181//! use std::thread;
182//! use std::sync::Arc;
183//!
184//! let data = enhanced_share!(0u32);
185//! let manager = spawn_workers!(data, {
186//! worker: |data| { /* work */ }
187//! });
188//!
189//! // Clone manager for another thread
190//! let manager_clone = manager.clone();
191//! let thread_handle = thread::spawn(move || {
192//! // Use manager in another thread
193//! let names = manager_clone.get_worker_names();
194//! println!("Worker names from thread: {:?}", names);
195//! });
196//!
197//! thread_handle.join().expect("Thread failed");
198//! ```
199//!
200//! ## Performance Considerations
201//!
202//! - **Thread Spawning**: Minimal overhead over standard `thread::spawn`
203//! - **Worker Management**: Constant-time operations for most management functions
204//! - **Memory Usage**: Small overhead for worker tracking structures
205//! - **Scalability**: Efficient for up to hundreds of workers
206//!
207//! ## When to Use WorkerManager
208//!
209//! - **Complex Applications**: When you need fine-grained control over workers
210//! - **Dynamic Workloads**: When worker count changes at runtime
211//! - **Monitoring Requirements**: When you need real-time worker status
212//! - **Production Systems**: When you need robust worker management
213//! - **Debugging**: When you need to pause/resume workers for debugging
214
215use std::collections::HashMap;
216use std::sync::{Arc, Mutex};
217use std::thread;
218
219/// Worker Manager for controlling spawned threads
220///
221/// This struct provides methods to control individual workers:
222/// - Pause/resume specific workers
223/// - Remove workers from tracking
224/// - Monitor worker status
225/// - Add new workers programmatically
226///
227/// ## Creation
228///
229/// `WorkerManager` is typically created by the `spawn_workers!` macro:
230///
231/// ```rust
232/// use thread_share::{enhanced_share, spawn_workers};
233///
234/// let data = enhanced_share!(0u32);
235/// let manager = spawn_workers!(data, {
236/// worker1: |data| { /* work */ },
237/// worker2: |data| { /* work */ }
238/// });
239/// ```
240///
241/// You can also create it directly:
242///
243/// ```rust
244/// use thread_share::worker_manager::WorkerManager;
245/// use std::sync::{Arc, Mutex};
246/// use std::collections::HashMap;
247///
248/// let threads = Arc::new(Mutex::new(HashMap::new()));
249/// let manager = WorkerManager::new_with_threads(threads);
250/// ```
251///
252/// Or create an empty manager and add workers later:
253///
254/// ```rust
255/// use thread_share::worker_manager::WorkerManager;
256/// use std::thread;
257///
258/// // Create empty manager
259/// let manager = WorkerManager::new();
260///
261/// // Add workers as needed
262/// let handle = thread::spawn(|| { /* work */ });
263/// manager.add_worker("worker", handle).expect("Failed to add worker");
264/// ```
265///
266/// ## Thread Safety
267///
268/// `WorkerManager` implements `Clone` and can be safely shared between threads.
269/// All operations are thread-safe and use proper locking mechanisms.
270///
271/// ## Example: Complete Worker Lifecycle
272///
273/// ```rust
274/// use thread_share::{enhanced_share, spawn_workers};
275/// use std::thread;
276/// use std::time::Duration;
277///
278/// let data = enhanced_share!(0u32);
279///
280/// // Start initial workers
281/// let manager = spawn_workers!(data, {
282/// counter: |data| {
283/// for i in 1..=5 {
284/// data.update(|x| *x += i);
285/// thread::sleep(Duration::from_millis(100));
286/// }
287/// }
288/// });
289///
290/// // Add worker programmatically
291/// let data_clone = data.clone();
292/// let handle = thread::spawn(move || {
293/// for _ in 0..3 {
294/// data_clone.update(|x| *x *= 2);
295/// thread::sleep(Duration::from_millis(150));
296/// }
297/// });
298///
299/// manager.add_worker("multiplier", handle).expect("Failed to add worker");
300///
301/// // Monitor workers
302/// println!("Active workers: {}", manager.active_workers());
303/// println!("Worker names: {:?}", manager.get_worker_names());
304///
305/// // Control workers
306/// manager.pause_worker("counter").expect("Failed to pause");
307/// thread::sleep(Duration::from_millis(200));
308/// manager.resume_worker("counter").expect("Failed to resume");
309///
310/// // Wait for completion
311/// manager.join_all().expect("Workers failed");
312///
313/// println!("Final value: {}", data.get());
314/// ```
315pub struct WorkerManager {
316 threads: Arc<Mutex<HashMap<String, thread::JoinHandle<()>>>>,
317 paused_workers: Arc<Mutex<HashMap<String, bool>>>,
318}
319
320impl WorkerManager {
321 /// Creates a new empty WorkerManager
322 ///
323 /// This method creates a WorkerManager with an empty thread tracking structure.
324 /// Useful when you want to create a manager first and add workers later.
325 ///
326 /// ## Example
327 ///
328 /// ```rust
329 /// use thread_share::worker_manager::WorkerManager;
330 /// use std::thread;
331 ///
332 /// // Create empty manager
333 /// let manager = WorkerManager::new();
334 ///
335 /// // Add workers programmatically
336 /// let handle = thread::spawn(|| {
337 /// println!("Worker doing work...");
338 /// });
339 ///
340 /// manager.add_worker("worker1", handle).expect("Failed to add worker");
341 /// assert_eq!(manager.active_workers(), 1);
342 /// ```
343 pub fn new() -> Self {
344 Self {
345 threads: Arc::new(Mutex::new(HashMap::new())),
346 paused_workers: Arc::new(Mutex::new(HashMap::new())),
347 }
348 }
349
350 /// Creates a new WorkerManager with existing thread handles
351 ///
352 /// ## Arguments
353 ///
354 /// * `threads` - Arc<Mutex<HashMap<String, JoinHandle<()>>>> containing thread handles
355 ///
356 /// ## Example
357 ///
358 /// ```rust
359 /// use thread_share::worker_manager::WorkerManager;
360 /// use std::sync::{Arc, Mutex};
361 /// use std::collections::HashMap;
362 ///
363 /// let threads = Arc::new(Mutex::new(HashMap::new()));
364 /// let manager = WorkerManager::new_with_threads(threads);
365 /// ```
366 pub fn new_with_threads(threads: Arc<Mutex<HashMap<String, thread::JoinHandle<()>>>>) -> Self {
367 Self {
368 threads,
369 paused_workers: Arc::new(Mutex::new(HashMap::new())),
370 }
371 }
372
373 /// Adds a new worker to the manager
374 ///
375 /// This method allows you to add workers programmatically after the manager is created.
376 /// The worker will be tracked and can be managed like any other worker.
377 ///
378 /// ## Arguments
379 ///
380 /// * `name` - A descriptive name for the worker
381 /// * `handle` - The JoinHandle of the spawned thread
382 ///
383 /// ## Returns
384 ///
385 /// `Ok(())` on success, `Err(String)` if a worker with the same name already exists.
386 ///
387 /// ## Example
388 ///
389 /// ```rust
390 /// use thread_share::worker_manager::WorkerManager;
391 /// use std::sync::{Arc, Mutex};
392 /// use std::collections::HashMap;
393 /// use std::thread;
394 ///
395 /// let threads = Arc::new(Mutex::new(HashMap::new()));
396 /// let manager = WorkerManager::new_with_threads(threads.clone());
397 ///
398 /// // Spawn a thread manually
399 /// let handle = thread::spawn(|| {
400 /// println!("Worker doing work...");
401 /// });
402 ///
403 /// // Add it to the manager
404 /// manager.add_worker("manual_worker", handle).expect("Failed to add worker");
405 /// ```
406 pub fn add_worker(&self, name: &str, handle: thread::JoinHandle<()>) -> Result<(), String> {
407 let mut threads = self.threads.lock().unwrap();
408
409 if threads.contains_key(name) {
410 return Err(format!("Worker '{}' already exists", name));
411 }
412
413 threads.insert(name.to_string(), handle);
414 println!("Worker '{}' added to manager", name);
415 Ok(())
416 }
417
418 /// Pauses a specific worker by name
419 ///
420 /// Note: This is a placeholder for future implementation.
421 /// Currently, Rust doesn't support pausing threads directly.
422 ///
423 /// ## Arguments
424 ///
425 /// * `name` - The name of the worker to pause
426 ///
427 /// ## Returns
428 ///
429 /// `Ok(())` on success, `Err(String)` if the worker doesn't exist
430 ///
431 /// ## Example
432 ///
433 /// ```rust
434 /// use thread_share::{enhanced_share, spawn_workers};
435 ///
436 /// let data = enhanced_share!(0u32);
437 /// let manager = spawn_workers!(data, {
438 /// worker: |data| { /* work */ }
439 /// });
440 ///
441 /// // Pause the worker
442 /// manager.pause_worker("worker").expect("Failed to pause");
443 /// ```
444 pub fn pause_worker(&self, name: &str) -> Result<(), String> {
445 let mut paused = self.paused_workers.lock().unwrap();
446 paused.insert(name.to_string(), true);
447 println!("Worker '{}' marked for pause (implementation pending)", name);
448 Ok(())
449 }
450
451 /// Resumes a specific worker by name
452 ///
453 /// ## Arguments
454 ///
455 /// * `name` - The name of the worker to resume
456 ///
457 /// ## Returns
458 ///
459 /// `Ok(())` on success, `Err(String)` if the worker doesn't exist
460 ///
461 /// ## Example
462 ///
463 /// ```rust
464 /// use thread_share::{enhanced_share, spawn_workers};
465 ///
466 /// let data = enhanced_share!(0u32);
467 /// let manager = spawn_workers!(data, {
468 /// worker: |data| { /* work */ }
469 /// });
470 ///
471 /// // Pause then resume
472 /// manager.pause_worker("worker").expect("Failed to pause");
473 /// manager.resume_worker("worker").expect("Failed to resume");
474 /// ```
475 pub fn resume_worker(&self, name: &str) -> Result<(), String> {
476 let mut paused = self.paused_workers.lock().unwrap();
477 paused.remove(name);
478 println!("Worker '{}' resumed", name);
479 Ok(())
480 }
481
482 /// Removes a worker from tracking without stopping it
483 ///
484 /// This method removes the worker from the manager's tracking but doesn't
485 /// actually stop the thread. The thread will continue running until it
486 /// completes naturally.
487 ///
488 /// ## Arguments
489 ///
490 /// * `name` - The name of the worker to remove
491 ///
492 /// ## Returns
493 ///
494 /// `Ok(())` on success, `Err(String)` if the worker doesn't exist
495 ///
496 /// ## Example
497 ///
498 /// ```rust
499 /// use thread_share::{enhanced_share, spawn_workers};
500 ///
501 /// let data = enhanced_share!(0u32);
502 /// let manager = spawn_workers!(data, {
503 /// worker: |data| { /* work */ }
504 /// });
505 ///
506 /// // Remove from tracking
507 /// manager.remove_worker("worker").expect("Failed to remove");
508 /// ```
509 pub fn remove_worker(&self, name: &str) -> Result<(), String> {
510 let mut threads = self.threads.lock().unwrap();
511 if threads.remove(name).is_some() {
512 println!("Worker '{}' removed from tracking", name);
513 Ok(())
514 } else {
515 Err(format!("Worker '{}' not found", name))
516 }
517 }
518
519 /// Removes all workers from tracking without stopping them
520 ///
521 /// This method removes all workers from the manager's tracking but doesn't
522 /// actually stop the threads. The threads will continue running until they
523 /// complete naturally.
524 ///
525 /// ## Returns
526 ///
527 /// `Ok(())` on success
528 ///
529 /// ## Example
530 ///
531 /// ```rust
532 /// use thread_share::{enhanced_share, spawn_workers};
533 ///
534 /// let data = enhanced_share!(0u32);
535 /// let manager = spawn_workers!(data, {
536 /// worker1: |data| { /* work */ },
537 /// worker2: |data| { /* work */ }
538 /// });
539 ///
540 /// // Remove all workers from tracking
541 /// manager.remove_all_workers().expect("Failed to remove all workers");
542 /// ```
543 pub fn remove_all_workers(&self) -> Result<(), String> {
544 let mut threads = self.threads.lock().unwrap();
545 let count = threads.len();
546 threads.clear();
547 println!("Removed {} workers from tracking", count);
548 Ok(())
549 }
550
551 /// Gets the list of all worker names
552 ///
553 /// ## Returns
554 ///
555 /// A `Vec<String>` containing all worker names
556 ///
557 /// ## Example
558 ///
559 /// ```rust
560 /// use thread_share::{enhanced_share, spawn_workers};
561 ///
562 /// let data = enhanced_share!(0u32);
563 /// let manager = spawn_workers!(data, {
564 /// counter: |data| { /* work */ },
565 /// monitor: |data| { /* work */ }
566 /// });
567 ///
568 /// let names = manager.get_worker_names();
569 /// assert_eq!(names.len(), 2);
570 /// assert!(names.contains(&"counter".to_string()));
571 /// assert!(names.contains(&"monitor".to_string()));
572 /// ```
573 pub fn get_worker_names(&self) -> Vec<String> {
574 let threads = self.threads.lock().unwrap();
575 threads.keys().cloned().collect()
576 }
577
578 /// Gets the number of active workers
579 ///
580 /// ## Returns
581 ///
582 /// The number of workers currently being tracked
583 ///
584 /// ## Example
585 ///
586 /// ```rust
587 /// use thread_share::{enhanced_share, spawn_workers};
588 ///
589 /// let data = enhanced_share!(0u32);
590 /// let manager = spawn_workers!(data, {
591 /// worker1: |data| { /* work */ },
592 /// worker2: |data| { /* work */ }
593 /// });
594 ///
595 /// assert_eq!(manager.active_workers(), 2);
596 /// ```
597 pub fn active_workers(&self) -> usize {
598 let threads = self.threads.lock().unwrap();
599 threads.len()
600 }
601
602 /// Checks if a specific worker is paused
603 ///
604 /// ## Arguments
605 ///
606 /// * `name` - The name of the worker to check
607 ///
608 /// ## Returns
609 ///
610 /// `true` if the worker is paused, `false` otherwise
611 ///
612 /// ## Example
613 ///
614 /// ```rust
615 /// use thread_share::{enhanced_share, spawn_workers};
616 ///
617 /// let data = enhanced_share!(0u32);
618 /// let manager = spawn_workers!(data, {
619 /// worker: |data| { /* work */ }
620 /// });
621 ///
622 /// // Initially not paused
623 /// assert!(!manager.is_worker_paused("worker"));
624 ///
625 /// // Pause the worker
626 /// manager.pause_worker("worker").expect("Failed to pause");
627 /// assert!(manager.is_worker_paused("worker"));
628 ///
629 /// // Resume the worker
630 /// manager.resume_worker("worker").expect("Failed to resume");
631 /// assert!(!manager.is_worker_paused("worker"));
632 /// ```
633 pub fn is_worker_paused(&self, name: &str) -> bool {
634 let paused = self.paused_workers.lock().unwrap();
635 paused.contains_key(name)
636 }
637
638 /// Waits for all workers to complete
639 ///
640 /// This method blocks until all tracked workers have completed.
641 /// It removes all workers from tracking after they complete.
642 ///
643 /// ## Returns
644 ///
645 /// `Ok(())` if all workers completed successfully, `Err(String)` if any worker failed
646 ///
647 /// ## Example
648 ///
649 /// ```rust
650 /// use thread_share::{enhanced_share, spawn_workers};
651 /// use std::thread;
652 /// use std::time::Duration;
653 ///
654 /// let data = enhanced_share!(0u32);
655 /// let manager = spawn_workers!(data, {
656 /// worker: |data| {
657 /// thread::sleep(Duration::from_millis(100));
658 /// data.update(|x| *x += 1);
659 /// }
660 /// });
661 ///
662 /// // Wait for completion
663 /// manager.join_all().expect("Workers failed");
664 ///
665 /// // All workers are now completed and removed
666 /// assert_eq!(manager.active_workers(), 0);
667 /// ```
668 pub fn join_all(&self) -> Result<(), String> {
669 let mut threads = self.threads.lock().unwrap();
670 let thread_handles: Vec<_> = threads.drain().collect();
671 drop(threads);
672
673 for (name, handle) in thread_handles {
674 let result = handle.join();
675 if let Err(e) = result {
676 return Err(format!("Worker '{}' failed: {:?}", name, e));
677 }
678 }
679 Ok(())
680 }
681}
682
683impl Clone for WorkerManager {
684 /// Creates a clone of the WorkerManager
685 ///
686 /// The cloned manager shares the same underlying thread tracking data,
687 /// so operations on one clone will affect the other.
688 ///
689 /// ## Example
690 ///
691 /// ```rust
692 /// use thread_share::{enhanced_share, spawn_workers};
693 ///
694 /// let data = enhanced_share!(0u32);
695 /// let manager = spawn_workers!(data, {
696 /// worker: |data| { /* work */ }
697 /// });
698 ///
699 /// // Clone the manager
700 /// let manager_clone = manager.clone();
701 ///
702 /// // Both managers track the same workers
703 /// assert_eq!(manager.active_workers(), manager_clone.active_workers());
704 /// ```
705 fn clone(&self) -> Self {
706 Self {
707 threads: self.threads.clone(),
708 paused_workers: self.paused_workers.clone(),
709 }
710 }
711}