Module thread_pool

Source
Expand description

§Thread Pool Module - ThreadManager

This module provides ThreadManager, a standalone utility for managing threads with shared data, independent of the ThreadShare structures.

§🚀 Overview

ThreadManager is a lightweight thread management utility that provides:

  • Simplified Thread Spawning: Spawn threads with descriptive names
  • Shared Data Management: Manage multiple types of shared data
  • Thread Tracking: Monitor active thread count and status
  • Automatic Thread Joining: Wait for all threads to complete
  • Type-Safe Operations: Compile-time guarantees for thread safety

§Key Features

§🧵 Thread Management

  • Named Threads: Each thread gets a descriptive name for debugging
  • Automatic Tracking: Monitor active thread count and completion status
  • Error Handling: Comprehensive error handling for thread failures
  • Resource Cleanup: Automatic cleanup of completed threads

§📦 Shared Data Support

  • Type-Safe Access: Compile-time type checking for shared data
  • Multiple Data Types: Support for different types of shared data
  • Automatic Cloning: Safe data sharing between threads
  • Thread Isolation: Each thread gets its own clone of shared data

§Architecture

ThreadManager uses internal structures to track:

  • threads: Arc<Mutex<HashMap<String, JoinHandle<()>>>> - Active thread tracking
  • shared_data: Arc<Mutex<HashMap<TypeId, Box<dyn Any + Send + Sync>>>> - Shared data storage

§Example Usage

§Basic Thread Management

use thread_share::{ThreadManager, share};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let manager = ThreadManager::new();
    let data = share!(vec![1, 2, 3]);

    // Spawn individual threads
    manager.spawn("sorter", data.clone(), |data| {
        data.update(|v| v.sort());
    })?;

    manager.spawn("validator", data.clone(), |data| {
        assert!(data.get().is_sorted());
    })?;

    // Wait for completion
    manager.join_all()?;
    Ok(())
}

§Advanced Usage

use thread_share::{ThreadManager, share};
use std::time::Duration;

#[derive(Clone)]
struct WorkItem {
    id: u32,
    data: String,
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let manager = ThreadManager::new();
    let work_queue = share!(vec![
        WorkItem { id: 1, data: "Task 1".to_string() },
        WorkItem { id: 2, data: "Task 2".to_string() },
    ]);

    // Spawn worker threads
    for i in 0..3 {
        let queue_clone = work_queue.clone();
        let worker_id = i;
        manager.spawn(&format!("worker-{}", i), queue_clone, move |queue| {
            loop {
                let mut items = queue.get();
                if items.is_empty() {
                    break;
                }
                 
                if let Some(item) = items.pop() {
                    println!("Worker {} processing: {}", worker_id, item.data);
                    std::thread::sleep(Duration::from_millis(100));
                }
                 
                queue.set(items);
            }
        })?;
    }

    // Wait for all workers to complete
    manager.join_all()?;
    println!("All work completed!");
    Ok(())
}

§Thread Lifecycle

  1. Creation: ThreadManager::new() or ThreadManager::default()
  2. Spawning: manager.spawn(name, data, function) creates named threads
  3. Execution: Threads run with access to shared data
  4. Monitoring: Track active threads with active_threads()
  5. Completion: Wait for all threads with join_all()

§Performance Characteristics

  • Thread Spawning: Minimal overhead over standard thread::spawn
  • Thread Tracking: Constant-time operations for thread management
  • Memory Usage: Small overhead for tracking structures
  • Scalability: Efficient for up to hundreds of threads
  • Lock Contention: Minimal due to efficient parking_lot primitives

§Best Practices

  1. Use descriptive thread names for easier debugging
  2. Keep thread functions focused on single responsibilities
  3. Always call join_all() to ensure proper cleanup
  4. Monitor thread count with active_threads() for debugging
  5. Handle errors gracefully from spawn() and join_all()
  6. Clone shared data for each thread to avoid ownership issues

§Error Handling

use thread_share::{ThreadManager, share};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let manager = ThreadManager::new();
    let data = share!(0);

    // Handle spawn errors
    if let Err(e) = manager.spawn("worker", data.clone(), |data| { /* logic */ }) {
        eprintln!("Failed to spawn worker: {}", e);
        return Ok(());
    }

    // Handle join errors
    if let Err(e) = manager.join_all() {
        eprintln!("Thread execution failed: {}", e);
    }
    Ok(())
}

§Thread Safety

ThreadManager automatically implements Send and Sync traits, making it safe to use across thread boundaries. The internal synchronization primitives ensure that all operations are thread-safe.

§Memory Management

  • Arc: Provides reference counting for shared ownership
  • Mutex: Ensures exclusive access to internal structures
  • HashMap: Efficient storage for thread handles and shared data
  • Automatic Cleanup: Completed threads are automatically removed

§Comparison with EnhancedThreadShare

AspectThreadManagerEnhancedThreadShare
PurposeStandalone utilityIntegrated with ThreadShare
Data ManagementManual cloning requiredAutomatic data management
Thread TrackingManual thread managementBuilt-in thread tracking
Use CaseComplex thread scenariosSimple thread management
FlexibilityHighMedium
Ease of UseMediumHigh

§Integration with ThreadShare

ThreadManager works seamlessly with ThreadShare<T>:

§Advanced Patterns

§Thread Pools

use thread_share::{ThreadManager, share};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let manager = ThreadManager::new();
    let counter = share!(0u32);

    // Spawn worker pool
    for i in 0..4 {
        let counter_clone = counter.clone();
        let worker_id = i;
        manager.spawn(&format!("worker-{}", i), counter_clone, move |data| {
            data.update(|x| *x = *x + 1);
            println!("Worker {} incremented counter", worker_id);
        })?;
    }

    // Wait for all workers to complete
    manager.join_all()?;
    println!("Final counter value: {}", counter.get());
    Ok(())
}

§Producer-Consumer

use thread_share::{ThreadManager, share};
use std::time::Duration;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let manager = ThreadManager::new();
    let queue = share!(Vec::<String>::new());

    // Producer thread
    manager.spawn("producer", queue.clone(), |queue| {
        for i in 0..5 {
            queue.update(|q| q.push(format!("Item {}", i)));
            std::thread::sleep(Duration::from_millis(10));
        }
    })?;

    // Consumer thread
    manager.spawn("consumer", queue.clone(), |queue| {
        let mut consumed_count = 0;
        while consumed_count < 5 {
            let items = queue.get();
            if items.is_empty() {
                std::thread::sleep(Duration::from_millis(10));
                continue;
            }
             
            if let Some(item) = items.last() {
                println!("Consumed: {}", item);
                queue.update(|q| { q.pop(); });
                consumed_count = consumed_count + 1;
            }
        }
    })?;

    // Wait for completion
    manager.join_all()?;
    Ok(())
}

Structs§

ThreadManager
Simplified thread management for ThreadShare