thread_share/worker_manager.rs
1//! # Worker Manager Module
2//!
3//! This module provides the `WorkerManager` struct for controlling spawned threads.
4//! It allows you to manage individual workers: pause, resume, remove, and monitor them.
5//!
6//! ## Overview
7//!
8//! `WorkerManager` is designed to work with the `spawn_workers!` macro and provides
9//! fine-grained control over thread management. It's particularly useful for:
10//!
11//! - **Dynamic worker management**: Add/remove workers at runtime
12//! - **Worker state control**: Pause/resume individual workers
13//! - **Monitoring**: Track worker status and count
14//! - **Synchronization**: Wait for all workers to complete
15//!
16//! ## Key Features
17//!
18//! - 🔄 **Dynamic Worker Management**: Add/remove workers programmatically
19//! - ⏸️ **State Control**: Pause/resume individual workers
20//! - 📊 **Real-time Monitoring**: Track worker status and count
21//! - 🔒 **Thread Safety**: All operations are thread-safe
22//! - 🎮 **Fine-grained Control**: Manage each worker individually
23//! - 📈 **Scalability**: Handle hundreds of workers efficiently
24//!
25//! ## Basic Usage
26//!
27//! ```rust
28//! use thread_share::{enhanced_share, spawn_workers};
29//!
30//! // Create shared data
31//! let data = enhanced_share!(0u32);
32//!
33//! // Spawn workers and get manager
34//! let manager = spawn_workers!(data, {
35//! counter: |data| {
36//! for i in 1..=10 {
37//! data.update(|x| *x += i);
38//! std::thread::sleep(std::time::Duration::from_millis(100));
39//! }
40//! },
41//! monitor: |data| {
42//! for _ in 0..5 {
43//! std::thread::sleep(std::time::Duration::from_millis(200));
44//! println!("Value: {}", data.get());
45//! }
46//! }
47//! });
48//!
49//! // Control workers
50//! println!("Active workers: {}", manager.active_workers());
51//! println!("Worker names: {:?}", manager.get_worker_names());
52//!
53//! // Wait for completion
54//! manager.join_all().expect("Workers failed");
55//! ```
56//!
57//! ## Advanced Usage with Programmatic Worker Addition
58//!
59//! ```rust
60//! use thread_share::{enhanced_share, spawn_workers, worker_manager::WorkerManager};
61//! use std::thread;
62//!
63//! let data = enhanced_share!(0u32);
64//!
65//! // Start with initial workers
66//! let manager = spawn_workers!(data, {
67//! initial_worker: |data| {
68//! data.update(|x| *x += 1);
69//! std::thread::sleep(std::time::Duration::from_millis(100));
70//! }
71//! });
72//!
73//! // Add more workers programmatically
74//! let data_clone = data.clone();
75//! let handle = thread::spawn(move || {
76//! for _ in 0..3 {
77//! data_clone.update(|x| *x *= 2);
78//! std::thread::sleep(std::time::Duration::from_millis(150));
79//! }
80//! });
81//!
82//! manager.add_worker("dynamic_worker", handle).expect("Failed to add worker");
83//!
84//! // Now we have 2 workers
85//! assert_eq!(manager.active_workers(), 2);
86//! ```
87//!
88//! ## Worker Lifecycle Management
89//!
90//! ```rust
91//! use thread_share::{enhanced_share, spawn_workers};
92//!
93//! let data = enhanced_share!(0u32);
94//! let manager = spawn_workers!(data, {
95//! worker1: |data| { /* work */ },
96//! worker2: |data| { /* work */ }
97//! });
98//!
99//! // Pause a worker
100//! manager.pause_worker("worker1").expect("Failed to pause");
101//!
102//! // Check if paused
103//! assert!(manager.is_worker_paused("worker1"));
104//!
105//! // Resume a worker
106//! manager.resume_worker("worker1").expect("Failed to resume");
107//!
108//! // Remove from tracking
109//! manager.remove_worker("worker2").expect("Failed to remove");
110//!
111//! // Remove all workers
112//! manager.remove_all_workers().expect("Failed to remove all");
113//! ```
114//!
115//! ## Error Handling
116//!
117//! All methods return `Result<T, String>` for proper error handling:
118//!
119//! ```rust
120//! use thread_share::{enhanced_share, spawn_workers};
121//!
122//! let data = enhanced_share!(0u32);
123//! let manager = spawn_workers!(data, {
124//! worker: |data| { /* work */ }
125//! });
126//!
127//! // Handle errors gracefully
128//! match manager.pause_worker("nonexistent") {
129//! Ok(()) => println!("Worker paused successfully"),
130//! Err(e) => println!("Failed to pause worker: {}", e),
131//! }
132//!
133//! // Remove worker and handle result
134//! match manager.remove_worker("worker") {
135//! Ok(()) => println!("Worker removed successfully"),
136//! Err(e) => println!("Failed to remove worker: {}", e),
137//! }
138//! ```
139//!
140//! ## Thread Safety
141//!
142//! `WorkerManager` is designed to be thread-safe and can be shared between threads:
143//!
144//! ```rust
145//! use thread_share::{enhanced_share, spawn_workers};
146//! use std::thread;
147//! use std::sync::Arc;
148//!
149//! let data = enhanced_share!(0u32);
150//! let manager = spawn_workers!(data, {
151//! worker: |data| { /* work */ }
152//! });
153//!
154//! // Clone manager for another thread
155//! let manager_clone = manager.clone();
156//! let thread_handle = thread::spawn(move || {
157//! // Use manager in another thread
158//! let names = manager_clone.get_worker_names();
159//! println!("Worker names from thread: {:?}", names);
160//! });
161//!
162//! thread_handle.join().expect("Thread failed");
163//! ```
164//!
165//! ## Performance Considerations
166//!
167//! - **Thread Spawning**: Minimal overhead over standard `thread::spawn`
168//! - **Worker Management**: Constant-time operations for most management functions
169//! - **Memory Usage**: Small overhead for worker tracking structures
170//! - **Scalability**: Efficient for up to hundreds of workers
171//!
172//! ## When to Use WorkerManager
173//!
174//! - **Complex Applications**: When you need fine-grained control over workers
175//! - **Dynamic Workloads**: When worker count changes at runtime
176//! - **Monitoring Requirements**: When you need real-time worker status
177//! - **Production Systems**: When you need robust worker management
178//! - **Debugging**: When you need to pause/resume workers for debugging
179
180use std::collections::HashMap;
181use std::sync::{Arc, Mutex};
182use std::thread;
183
184/// Worker Manager for controlling spawned threads
185///
186/// This struct provides methods to control individual workers:
187/// - Pause/resume specific workers
188/// - Remove workers from tracking
189/// - Monitor worker status
190/// - Add new workers programmatically
191///
192/// ## Creation
193///
194/// `WorkerManager` is typically created by the `spawn_workers!` macro:
195///
196/// ```rust
197/// use thread_share::{enhanced_share, spawn_workers};
198///
199/// let data = enhanced_share!(0u32);
200/// let manager = spawn_workers!(data, {
201/// worker1: |data| { /* work */ },
202/// worker2: |data| { /* work */ }
203/// });
204/// ```
205///
206/// You can also create it directly:
207///
208/// ```rust
209/// use thread_share::worker_manager::WorkerManager;
210/// use std::sync::{Arc, Mutex};
211/// use std::collections::HashMap;
212///
213/// let threads = Arc::new(Mutex::new(HashMap::new()));
214/// let manager = WorkerManager::new(threads);
215/// ```
216///
217/// ## Thread Safety
218///
219/// `WorkerManager` implements `Clone` and can be safely shared between threads.
220/// All operations are thread-safe and use proper locking mechanisms.
221///
222/// ## Example: Complete Worker Lifecycle
223///
224/// ```rust
225/// use thread_share::{enhanced_share, spawn_workers};
226/// use std::thread;
227/// use std::time::Duration;
228///
229/// let data = enhanced_share!(0u32);
230///
231/// // Start initial workers
232/// let manager = spawn_workers!(data, {
233/// counter: |data| {
234/// for i in 1..=5 {
235/// data.update(|x| *x += i);
236/// thread::sleep(Duration::from_millis(100));
237/// }
238/// }
239/// });
240///
241/// // Add worker programmatically
242/// let data_clone = data.clone();
243/// let handle = thread::spawn(move || {
244/// for _ in 0..3 {
245/// data_clone.update(|x| *x *= 2);
246/// thread::sleep(Duration::from_millis(150));
247/// }
248/// });
249///
250/// manager.add_worker("multiplier", handle).expect("Failed to add worker");
251///
252/// // Monitor workers
253/// println!("Active workers: {}", manager.active_workers());
254/// println!("Worker names: {:?}", manager.get_worker_names());
255///
256/// // Control workers
257/// manager.pause_worker("counter").expect("Failed to pause");
258/// thread::sleep(Duration::from_millis(200));
259/// manager.resume_worker("counter").expect("Failed to resume");
260///
261/// // Wait for completion
262/// manager.join_all().expect("Workers failed");
263///
264/// println!("Final value: {}", data.get());
265/// ```
266pub struct WorkerManager {
267 threads: Arc<Mutex<HashMap<String, thread::JoinHandle<()>>>>,
268 paused_workers: Arc<Mutex<HashMap<String, bool>>>,
269}
270
271impl WorkerManager {
272 /// Creates a new WorkerManager
273 ///
274 /// ## Arguments
275 ///
276 /// * `threads` - Arc<Mutex<HashMap<String, JoinHandle<()>>>> containing thread handles
277 ///
278 /// ## Example
279 ///
280 /// ```rust
281 /// use thread_share::worker_manager::WorkerManager;
282 /// use std::sync::{Arc, Mutex};
283 /// use std::collections::HashMap;
284 ///
285 /// let threads = Arc::new(Mutex::new(HashMap::new()));
286 /// let manager = WorkerManager::new(threads);
287 /// ```
288 pub fn new(threads: Arc<Mutex<HashMap<String, thread::JoinHandle<()>>>>) -> Self {
289 Self {
290 threads,
291 paused_workers: Arc::new(Mutex::new(HashMap::new())),
292 }
293 }
294
295 /// Adds a new worker to the manager
296 ///
297 /// This method allows you to add workers programmatically after the manager is created.
298 /// The worker will be tracked and can be managed like any other worker.
299 ///
300 /// ## Arguments
301 ///
302 /// * `name` - A descriptive name for the worker
303 /// * `handle` - The JoinHandle of the spawned thread
304 ///
305 /// ## Returns
306 ///
307 /// `Ok(())` on success, `Err(String)` if a worker with the same name already exists.
308 ///
309 /// ## Example
310 ///
311 /// ```rust
312 /// use thread_share::worker_manager::WorkerManager;
313 /// use std::sync::{Arc, Mutex};
314 /// use std::collections::HashMap;
315 /// use std::thread;
316 ///
317 /// let threads = Arc::new(Mutex::new(HashMap::new()));
318 /// let manager = WorkerManager::new(threads.clone());
319 ///
320 /// // Spawn a thread manually
321 /// let handle = thread::spawn(|| {
322 /// println!("Worker doing work...");
323 /// });
324 ///
325 /// // Add it to the manager
326 /// manager.add_worker("manual_worker", handle).expect("Failed to add worker");
327 /// ```
328 pub fn add_worker(&self, name: &str, handle: thread::JoinHandle<()>) -> Result<(), String> {
329 let mut threads = self.threads.lock().unwrap();
330
331 if threads.contains_key(name) {
332 return Err(format!("Worker '{}' already exists", name));
333 }
334
335 threads.insert(name.to_string(), handle);
336 println!("Worker '{}' added to manager", name);
337 Ok(())
338 }
339
340 /// Pauses a specific worker by name
341 ///
342 /// Note: This is a placeholder for future implementation.
343 /// Currently, Rust doesn't support pausing threads directly.
344 ///
345 /// ## Arguments
346 ///
347 /// * `name` - The name of the worker to pause
348 ///
349 /// ## Returns
350 ///
351 /// `Ok(())` on success, `Err(String)` if the worker doesn't exist
352 ///
353 /// ## Example
354 ///
355 /// ```rust
356 /// use thread_share::{enhanced_share, spawn_workers};
357 ///
358 /// let data = enhanced_share!(0u32);
359 /// let manager = spawn_workers!(data, {
360 /// worker: |data| { /* work */ }
361 /// });
362 ///
363 /// // Pause the worker
364 /// manager.pause_worker("worker").expect("Failed to pause");
365 /// ```
366 pub fn pause_worker(&self, name: &str) -> Result<(), String> {
367 let mut paused = self.paused_workers.lock().unwrap();
368 paused.insert(name.to_string(), true);
369 println!("Worker '{}' marked for pause (implementation pending)", name);
370 Ok(())
371 }
372
373 /// Resumes a specific worker by name
374 ///
375 /// ## Arguments
376 ///
377 /// * `name` - The name of the worker to resume
378 ///
379 /// ## Returns
380 ///
381 /// `Ok(())` on success, `Err(String)` if the worker doesn't exist
382 ///
383 /// ## Example
384 ///
385 /// ```rust
386 /// use thread_share::{enhanced_share, spawn_workers};
387 ///
388 /// let data = enhanced_share!(0u32);
389 /// let manager = spawn_workers!(data, {
390 /// worker: |data| { /* work */ }
391 /// });
392 ///
393 /// // Pause then resume
394 /// manager.pause_worker("worker").expect("Failed to pause");
395 /// manager.resume_worker("worker").expect("Failed to resume");
396 /// ```
397 pub fn resume_worker(&self, name: &str) -> Result<(), String> {
398 let mut paused = self.paused_workers.lock().unwrap();
399 paused.remove(name);
400 println!("Worker '{}' resumed", name);
401 Ok(())
402 }
403
404 /// Removes a worker from tracking without stopping it
405 ///
406 /// This method removes the worker from the manager's tracking but doesn't
407 /// actually stop the thread. The thread will continue running until it
408 /// completes naturally.
409 ///
410 /// ## Arguments
411 ///
412 /// * `name` - The name of the worker to remove
413 ///
414 /// ## Returns
415 ///
416 /// `Ok(())` on success, `Err(String)` if the worker doesn't exist
417 ///
418 /// ## Example
419 ///
420 /// ```rust
421 /// use thread_share::{enhanced_share, spawn_workers};
422 ///
423 /// let data = enhanced_share!(0u32);
424 /// let manager = spawn_workers!(data, {
425 /// worker: |data| { /* work */ }
426 /// });
427 ///
428 /// // Remove from tracking
429 /// manager.remove_worker("worker").expect("Failed to remove");
430 /// ```
431 pub fn remove_worker(&self, name: &str) -> Result<(), String> {
432 let mut threads = self.threads.lock().unwrap();
433 if threads.remove(name).is_some() {
434 println!("Worker '{}' removed from tracking", name);
435 Ok(())
436 } else {
437 Err(format!("Worker '{}' not found", name))
438 }
439 }
440
441 /// Removes all workers from tracking without stopping them
442 ///
443 /// This method removes all workers from the manager's tracking but doesn't
444 /// actually stop the threads. The threads will continue running until they
445 /// complete naturally.
446 ///
447 /// ## Returns
448 ///
449 /// `Ok(())` on success
450 ///
451 /// ## Example
452 ///
453 /// ```rust
454 /// use thread_share::{enhanced_share, spawn_workers};
455 ///
456 /// let data = enhanced_share!(0u32);
457 /// let manager = spawn_workers!(data, {
458 /// worker1: |data| { /* work */ },
459 /// worker2: |data| { /* work */ }
460 /// });
461 ///
462 /// // Remove all workers from tracking
463 /// manager.remove_all_workers().expect("Failed to remove all workers");
464 /// ```
465 pub fn remove_all_workers(&self) -> Result<(), String> {
466 let mut threads = self.threads.lock().unwrap();
467 let count = threads.len();
468 threads.clear();
469 println!("Removed {} workers from tracking", count);
470 Ok(())
471 }
472
473 /// Gets the list of all worker names
474 ///
475 /// ## Returns
476 ///
477 /// A `Vec<String>` containing all worker names
478 ///
479 /// ## Example
480 ///
481 /// ```rust
482 /// use thread_share::{enhanced_share, spawn_workers};
483 ///
484 /// let data = enhanced_share!(0u32);
485 /// let manager = spawn_workers!(data, {
486 /// counter: |data| { /* work */ },
487 /// monitor: |data| { /* work */ }
488 /// });
489 ///
490 /// let names = manager.get_worker_names();
491 /// assert_eq!(names.len(), 2);
492 /// assert!(names.contains(&"counter".to_string()));
493 /// assert!(names.contains(&"monitor".to_string()));
494 /// ```
495 pub fn get_worker_names(&self) -> Vec<String> {
496 let threads = self.threads.lock().unwrap();
497 threads.keys().cloned().collect()
498 }
499
500 /// Gets the number of active workers
501 ///
502 /// ## Returns
503 ///
504 /// The number of workers currently being tracked
505 ///
506 /// ## Example
507 ///
508 /// ```rust
509 /// use thread_share::{enhanced_share, spawn_workers};
510 ///
511 /// let data = enhanced_share!(0u32);
512 /// let manager = spawn_workers!(data, {
513 /// worker1: |data| { /* work */ },
514 /// worker2: |data| { /* work */ }
515 /// });
516 ///
517 /// assert_eq!(manager.active_workers(), 2);
518 /// ```
519 pub fn active_workers(&self) -> usize {
520 let threads = self.threads.lock().unwrap();
521 threads.len()
522 }
523
524 /// Checks if a specific worker is paused
525 ///
526 /// ## Arguments
527 ///
528 /// * `name` - The name of the worker to check
529 ///
530 /// ## Returns
531 ///
532 /// `true` if the worker is paused, `false` otherwise
533 ///
534 /// ## Example
535 ///
536 /// ```rust
537 /// use thread_share::{enhanced_share, spawn_workers};
538 ///
539 /// let data = enhanced_share!(0u32);
540 /// let manager = spawn_workers!(data, {
541 /// worker: |data| { /* work */ }
542 /// });
543 ///
544 /// // Initially not paused
545 /// assert!(!manager.is_worker_paused("worker"));
546 ///
547 /// // Pause the worker
548 /// manager.pause_worker("worker").expect("Failed to pause");
549 /// assert!(manager.is_worker_paused("worker"));
550 ///
551 /// // Resume the worker
552 /// manager.resume_worker("worker").expect("Failed to resume");
553 /// assert!(!manager.is_worker_paused("worker"));
554 /// ```
555 pub fn is_worker_paused(&self, name: &str) -> bool {
556 let paused = self.paused_workers.lock().unwrap();
557 paused.contains_key(name)
558 }
559
560 /// Waits for all workers to complete
561 ///
562 /// This method blocks until all tracked workers have completed.
563 /// It removes all workers from tracking after they complete.
564 ///
565 /// ## Returns
566 ///
567 /// `Ok(())` if all workers completed successfully, `Err(String)` if any worker failed
568 ///
569 /// ## Example
570 ///
571 /// ```rust
572 /// use thread_share::{enhanced_share, spawn_workers};
573 /// use std::thread;
574 /// use std::time::Duration;
575 ///
576 /// let data = enhanced_share!(0u32);
577 /// let manager = spawn_workers!(data, {
578 /// worker: |data| {
579 /// thread::sleep(Duration::from_millis(100));
580 /// data.update(|x| *x += 1);
581 /// }
582 /// });
583 ///
584 /// // Wait for completion
585 /// manager.join_all().expect("Workers failed");
586 ///
587 /// // All workers are now completed and removed
588 /// assert_eq!(manager.active_workers(), 0);
589 /// ```
590 pub fn join_all(&self) -> Result<(), String> {
591 let mut threads = self.threads.lock().unwrap();
592 let thread_handles: Vec<_> = threads.drain().collect();
593 drop(threads);
594
595 for (name, handle) in thread_handles {
596 let result = handle.join();
597 if let Err(e) = result {
598 return Err(format!("Worker '{}' failed: {:?}", name, e));
599 }
600 }
601 Ok(())
602 }
603}
604
605impl Clone for WorkerManager {
606 /// Creates a clone of the WorkerManager
607 ///
608 /// The cloned manager shares the same underlying thread tracking data,
609 /// so operations on one clone will affect the other.
610 ///
611 /// ## Example
612 ///
613 /// ```rust
614 /// use thread_share::{enhanced_share, spawn_workers};
615 ///
616 /// let data = enhanced_share!(0u32);
617 /// let manager = spawn_workers!(data, {
618 /// worker: |data| { /* work */ }
619 /// });
620 ///
621 /// // Clone the manager
622 /// let manager_clone = manager.clone();
623 ///
624 /// // Both managers track the same workers
625 /// assert_eq!(manager.active_workers(), manager_clone.active_workers());
626 /// ```
627 fn clone(&self) -> Self {
628 Self {
629 threads: self.threads.clone(),
630 paused_workers: self.paused_workers.clone(),
631 }
632 }
633}