Expand description
§Thread Pool Module - ThreadManager
This module provides ThreadManager, a standalone utility for managing threads
with shared data, independent of the ThreadShare structures.
§🚀 Overview
ThreadManager is a lightweight thread management utility that provides:
- Simplified Thread Spawning: Spawn threads with descriptive names
- Shared Data Management: Manage multiple types of shared data
- Thread Tracking: Monitor active thread count and status
- Automatic Thread Joining: Wait for all threads to complete
- Type-Safe Operations: Compile-time guarantees for thread safety
§Key Features
§🧵 Thread Management
- Named Threads: Each thread gets a descriptive name for debugging
- Automatic Tracking: Monitor active thread count and completion status
- Error Handling: Comprehensive error handling for thread failures
- Resource Cleanup: Automatic cleanup of completed threads
§📦 Shared Data Support
- Type-Safe Access: Compile-time type checking for shared data
- Multiple Data Types: Support for different types of shared data
- Automatic Cloning: Safe data sharing between threads
- Thread Isolation: Each thread gets its own clone of shared data
§Architecture
ThreadManager uses internal structures to track:
threads: Arc<Mutex<HashMap<String, JoinHandle<()>>>>- Active thread trackingshared_data: Arc<Mutex<HashMap<TypeId, Box<dyn Any + Send + Sync>>>>- Shared data storage
§Example Usage
§Basic Thread Management
use thread_share::{ThreadManager, share};
fn main() -> Result<(), Box<dyn std::error::Error>> {
let manager = ThreadManager::new();
let data = share!(vec![1, 2, 3]);
// Spawn individual threads
manager.spawn("sorter", data.clone(), |data| {
data.update(|v| v.sort());
})?;
manager.spawn("validator", data.clone(), |data| {
assert!(data.get().is_sorted());
})?;
// Wait for completion
manager.join_all()?;
Ok(())
}§Advanced Usage
use thread_share::{ThreadManager, share};
use std::time::Duration;
#[derive(Clone)]
struct WorkItem {
id: u32,
data: String,
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let manager = ThreadManager::new();
let work_queue = share!(vec![
WorkItem { id: 1, data: "Task 1".to_string() },
WorkItem { id: 2, data: "Task 2".to_string() },
]);
// Spawn worker threads
for i in 0..3 {
let queue_clone = work_queue.clone();
let worker_id = i;
manager.spawn(&format!("worker-{}", i), queue_clone, move |queue| {
loop {
let mut items = queue.get();
if items.is_empty() {
break;
}
if let Some(item) = items.pop() {
println!("Worker {} processing: {}", worker_id, item.data);
std::thread::sleep(Duration::from_millis(100));
}
queue.set(items);
}
})?;
}
// Wait for all workers to complete
manager.join_all()?;
println!("All work completed!");
Ok(())
}§Thread Lifecycle
- Creation:
ThreadManager::new()orThreadManager::default() - Spawning:
manager.spawn(name, data, function)creates named threads - Execution: Threads run with access to shared data
- Monitoring: Track active threads with
active_threads() - Completion: Wait for all threads with
join_all()
§Performance Characteristics
- Thread Spawning: Minimal overhead over standard
thread::spawn - Thread Tracking: Constant-time operations for thread management
- Memory Usage: Small overhead for tracking structures
- Scalability: Efficient for up to hundreds of threads
- Lock Contention: Minimal due to efficient
parking_lotprimitives
§Best Practices
- Use descriptive thread names for easier debugging
- Keep thread functions focused on single responsibilities
- Always call
join_all()to ensure proper cleanup - Monitor thread count with
active_threads()for debugging - Handle errors gracefully from
spawn()andjoin_all() - Clone shared data for each thread to avoid ownership issues
§Error Handling
use thread_share::{ThreadManager, share};
fn main() -> Result<(), Box<dyn std::error::Error>> {
let manager = ThreadManager::new();
let data = share!(0);
// Handle spawn errors
if let Err(e) = manager.spawn("worker", data.clone(), |data| { /* logic */ }) {
eprintln!("Failed to spawn worker: {}", e);
return Ok(());
}
// Handle join errors
if let Err(e) = manager.join_all() {
eprintln!("Thread execution failed: {}", e);
}
Ok(())
}§Thread Safety
ThreadManager automatically implements Send and Sync traits,
making it safe to use across thread boundaries. The internal synchronization
primitives ensure that all operations are thread-safe.
§Memory Management
- Arc: Provides reference counting for shared ownership
- Mutex: Ensures exclusive access to internal structures
- HashMap: Efficient storage for thread handles and shared data
- Automatic Cleanup: Completed threads are automatically removed
§Comparison with EnhancedThreadShare
| Aspect | ThreadManager | EnhancedThreadShare |
|---|---|---|
| Purpose | Standalone utility | Integrated with ThreadShare |
| Data Management | Manual cloning required | Automatic data management |
| Thread Tracking | Manual thread management | Built-in thread tracking |
| Use Case | Complex thread scenarios | Simple thread management |
| Flexibility | High | Medium |
| Ease of Use | Medium | High |
§Integration with ThreadShare
ThreadManager works seamlessly with ThreadShare<T>:
§Advanced Patterns
§Thread Pools
use thread_share::{ThreadManager, share};
fn main() -> Result<(), Box<dyn std::error::Error>> {
let manager = ThreadManager::new();
let counter = share!(0u32);
// Spawn worker pool
for i in 0..4 {
let counter_clone = counter.clone();
let worker_id = i;
manager.spawn(&format!("worker-{}", i), counter_clone, move |data| {
data.update(|x| *x = *x + 1);
println!("Worker {} incremented counter", worker_id);
})?;
}
// Wait for all workers to complete
manager.join_all()?;
println!("Final counter value: {}", counter.get());
Ok(())
}§Producer-Consumer
use thread_share::{ThreadManager, share};
use std::time::Duration;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let manager = ThreadManager::new();
let queue = share!(Vec::<String>::new());
// Producer thread
manager.spawn("producer", queue.clone(), |queue| {
for i in 0..5 {
queue.update(|q| q.push(format!("Item {}", i)));
std::thread::sleep(Duration::from_millis(10));
}
})?;
// Consumer thread
manager.spawn("consumer", queue.clone(), |queue| {
let mut consumed_count = 0;
while consumed_count < 5 {
let items = queue.get();
if items.is_empty() {
std::thread::sleep(Duration::from_millis(10));
continue;
}
if let Some(item) = items.last() {
println!("Consumed: {}", item);
queue.update(|q| { q.pop(); });
consumed_count = consumed_count + 1;
}
}
})?;
// Wait for completion
manager.join_all()?;
Ok(())
}Structs§
- Thread
Manager - Simplified thread management for ThreadShare