Module parallel

Module parallel 

Source
Expand description

High-performance parallel processing system for subtitle operations.

This module provides a sophisticated task scheduling and execution framework designed specifically for subtitle processing workloads. It offers intelligent load balancing, resource management, and fault tolerance for CPU-intensive operations like format conversion, AI analysis, and audio synchronization.

§Core Features

§Intelligent Task Scheduling

  • Priority-Based Queuing: Tasks are prioritized based on complexity and user preferences
  • Resource-Aware Scheduling: Considers CPU, memory, and I/O constraints
  • Adaptive Load Balancing: Dynamically adjusts worker allocation based on system load
  • Dependency Management: Handles task dependencies and execution ordering

§Worker Pool Management

  • Dynamic Scaling: Automatically adjusts worker count based on system resources
  • Specialized Workers: Different worker types for different operation categories
  • Health Monitoring: Tracks worker performance and handles failures gracefully
  • Resource Isolation: Prevents resource contention between concurrent tasks

§Performance Optimization

  • Batch Processing: Groups similar tasks for efficient execution
  • Memory Pool: Reuses memory allocations to reduce garbage collection
  • Cache-Aware Scheduling: Optimizes for CPU cache locality
  • NUMA Awareness: Considers system topology for optimal performance

§Architecture Overview

The parallel processing system follows a producer-consumer pattern with multiple specialized components:

┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│   Task Queue    │────│   Scheduler      │────│  Load Balancer  │
│   - Priority    │    │   - Dispatch     │    │   - Resource    │
│   - Dependencies│    │   - Monitoring   │    │   - Scaling     │
│   - Batching    │    │   - Recovery     │    │   - Affinity    │
└─────────────────┘    └──────────────────┘    └─────────────────┘
        │                        │                        │
        └────────────────────────┼────────────────────────┘
                                 │
                   ┌─────────────────────────┐
                   │     Worker Pool         │
                   │   ┌───────────────────┐ │
                   │   │ Format Converter  │ │
                   │   │ AI Analyzer       │ │
                   │   │ Audio Processor   │ │
                   │   │ File Manager      │ │
                   │   └───────────────────┘ │
                   └─────────────────────────┘

§Usage Examples

§Basic Task Execution

use subx_cli::core::parallel::{TaskScheduler, Task, ProcessingOperation};
use std::path::PathBuf;

// Create a task scheduler with default configuration
let scheduler = TaskScheduler::new().await?;

// Define a format conversion task
let task = Task::new(
    ProcessingOperation::FormatConversion {
        input_path: PathBuf::from("input.srt"),
        output_path: PathBuf::from("output.ass"),
        target_format: "ass".to_string(),
    },
    1, // Normal priority
);

// Submit task and wait for completion
let result = scheduler.submit_and_wait(task).await?;
println!("Task completed: {:?}", result);

§Batch Processing

use subx_cli::core::parallel::{FileProcessingTask, TaskScheduler};

let scheduler = TaskScheduler::new().await?;
let mut tasks = Vec::new();

// Create multiple tasks for batch processing
for file_path in subtitle_files {
    let task = FileProcessingTask::new(
        file_path,
        ProcessingOperation::EncodingDetection,
        2, // Higher priority for encoding detection
    );
    tasks.push(task);
}

// Submit all tasks for parallel execution
let results = scheduler.submit_batch(tasks).await?;

for result in results {
    match result {
        Ok(output) => println!("Success: {:?}", output),
        Err(error) => eprintln!("Failed: {}", error),
    }
}

§Custom Worker Configuration

use subx_cli::core::parallel::{TaskScheduler, WorkerConfig};

let config = WorkerConfig {
    max_workers: 8,
    min_workers: 2,
    cpu_intensive_workers: 4,
    io_intensive_workers: 2,
    memory_limit_mb: 1024,
    task_timeout_secs: 300,
};

let scheduler = TaskScheduler::with_config(config).await?;

§Task Types

§Format Conversion Tasks

  • Convert between different subtitle formats (SRT, ASS, VTT, SUB)
  • Preserve styling information where possible
  • Handle encoding conversion automatically

§AI Analysis Tasks

  • Semantic content analysis for matching
  • Language detection and verification
  • Quality assessment and scoring
  • Content similarity comparison

§Audio Processing Tasks

  • Speech detection and timing extraction
  • Audio-subtitle synchronization
  • Voice activity detection
  • Audio quality analysis

§File Management Tasks

  • Batch file operations (copy, move, rename)
  • Encoding detection and conversion
  • Metadata extraction and validation
  • Directory structure analysis

§Performance Characteristics

§CPU Optimization

  • Work Stealing: Idle workers steal tasks from busy workers
  • Cache Affinity: Tasks are scheduled to maintain CPU cache locality
  • Thread Pinning: Critical workers can be pinned to specific CPU cores
  • SIMD Utilization: Vectorized operations where applicable

§Memory Management

  • Pool Allocation: Reuses memory buffers to reduce allocation overhead
  • Memory Pressure Handling: Automatically reduces concurrency under memory pressure
  • Garbage Collection Optimization: Minimizes allocations in hot paths
  • Memory-Mapped I/O: Uses memory mapping for large file operations

§I/O Optimization

  • Asynchronous I/O: Non-blocking file operations where possible
  • Read-Ahead: Predictive file reading based on task patterns
  • Write Coalescing: Batches small writes for better performance
  • Network Optimization: Optimized handling of AI service requests

§Error Handling and Recovery

§Task-Level Recovery

  • Automatic Retry: Failed tasks are retried with exponential backoff
  • Fallback Strategies: Alternative approaches for failed operations
  • Partial Results: Returns partial results when possible
  • Progress Preservation: Saves intermediate results for long-running tasks

§System-Level Recovery

  • Worker Restart: Automatically restarts failed workers
  • Resource Cleanup: Ensures proper cleanup after failures
  • State Persistence: Maintains scheduler state across restarts
  • Graceful Degradation: Continues operation with reduced capacity

§Monitoring and Observability

The parallel system provides comprehensive monitoring:

  • Task Metrics: Execution time, success rate, resource usage
  • Worker Health: CPU usage, memory consumption, error rates
  • System Performance: Overall throughput, queue depth, response times
  • Resource Utilization: CPU, memory, disk, and network usage

§Thread Safety

All components are designed for concurrent access:

  • Lock-free data structures where possible
  • Minimal contention in critical paths
  • Safe sharing of resources between threads
  • Proper synchronization for shared state

Re-exports§

pub use scheduler::TaskScheduler;
pub use task::FileProcessingTask;
pub use task::ProcessingOperation;
pub use task::Task;
pub use task::TaskResult;
pub use task::TaskStatus;

Modules§

config
Parallel processing configuration module
load_balancer
Load balancer for dynamic adjustment of worker distributions
pool
Worker pool and related utilities placeholder module
scheduler
Task scheduler for parallel processing
task
Task definition and utilities for parallel processing
worker
Worker pool and worker definitions for parallel processing