rsactor 0.14.1

A Simple and Efficient In-Process Actor Model Implementation for Rust.
Documentation
# Blocking Task

This example demonstrates how actors can interact with CPU-intensive or blocking operations without blocking the async runtime. It shows the proper way to integrate blocking tasks with the actor system using `tokio::task::spawn_blocking` and the blocking communication APIs.

## Overview

The blocking task pattern is essential when you need to:
- **CPU-intensive computations** that would block the async executor
- **Synchronous I/O operations** with legacy APIs
- **Bridge synchronous and asynchronous code**
- **Prevent blocking the main actor runtime**

## Key Concepts

### Why Use Blocking Tasks?

Tokio's async runtime is designed for I/O-bound operations. CPU-intensive or truly blocking operations can:
- Block the entire async executor
- Prevent other actors from processing messages
- Degrade overall system performance

### Solution: `spawn_blocking`

Tokio provides `spawn_blocking` to run blocking operations on a dedicated thread pool, keeping the main async runtime responsive.

## Architecture

```
Actor ←→ [tokio channels] ←→ Blocking Task
  ↑                              ↓
  └──[blocking API]──────────────┘
     (ask_blocking/tell_blocking)
```

## Implementation

### Message Types

```rust
// Messages for actor communication
struct GetState;
struct SetFactor(f64);
struct ProcessedData {
    value: f64,
    timestamp: std::time::Instant,
}

// Commands for the blocking task
enum TaskCommand {
    ChangeInterval(Duration),
    Stop,
}
```

### Actor with Blocking Task

```rust
struct SyncDataProcessorActor {
    factor: f64,
    latest_value: Option<f64>,
    task_sender: Option<mpsc::UnboundedSender<TaskCommand>>,
    task_handle: Option<task::JoinHandle<()>>,
}

impl Actor for SyncDataProcessorActor {
    type Args = f64; // Initial factor
    type Error = anyhow::Error;

    async fn on_start(factor: Self::Args, actor_ref: &ActorRef<Self>) -> Result<Self, Self::Error> {
        info!("SyncDataProcessorActor started with factor: {}", factor);

        // Create communication channels
        let (task_sender, task_receiver) = mpsc::unbounded_channel();

        // Spawn the blocking task
        let actor_ref_clone = actor_ref.clone();
        let task_handle = task::spawn_blocking(move || {
            sync_background_task(factor, task_receiver, actor_ref_clone)
        });

        Ok(Self {
            factor,
            latest_value: None,
            task_sender: Some(task_sender),
            task_handle: Some(task_handle),
        })
    }

    async fn on_stop(&mut self, _: &ActorRef<Self>) -> Result<(), Self::Error> {
        info!("SyncDataProcessorActor stopping - sending stop command to background task");

        // Signal the background task to stop
        if let Some(sender) = &self.task_sender {
            if sender.send(TaskCommand::Stop).is_err() {
                debug!("Background task already stopped or receiver dropped");
            }
        }

        // Wait for the background task to complete
        if let Some(handle) = self.task_handle.take() {
            match handle.await {
                Ok(()) => info!("Background task completed successfully"),
                Err(e) => info!("Background task completed with error: {}", e),
            }
        }

        Ok(())
    }
}
```

### Blocking Task Implementation

```rust
fn sync_background_task(
    mut factor: f64,
    mut task_receiver: mpsc::UnboundedReceiver<TaskCommand>,
    actor_ref: ActorRef<SyncDataProcessorActor>,
) {
    info!("Sync background task started");
    let mut interval = Duration::from_millis(500);
    let mut counter = 0.0;

    loop {
        // Check for commands from the actor (non-blocking)
        match task_receiver.try_recv() {
            Ok(TaskCommand::ChangeInterval(new_interval)) => {
                info!("Background task: changing interval to {:?}", new_interval);
                interval = new_interval;
            }
            Ok(TaskCommand::Stop) => {
                info!("Background task: received stop command");
                break;
            }
            Err(mpsc::error::TryRecvError::Empty) => {
                // No command available, continue with normal processing
            }
            Err(mpsc::error::TryRecvError::Disconnected) => {
                info!("Background task: actor disconnected, stopping");
                break;
            }
        }

        // Simulate CPU-intensive work
        counter += 1.0;
        let processed_value = expensive_calculation(counter, factor);

        // Send result back to actor using blocking API
        let message = ProcessedData {
            value: processed_value,
            timestamp: std::time::Instant::now(),
        };

        if let Err(e) = actor_ref.tell_blocking(message) {
            info!("Background task: failed to send data to actor: {}", e);
            break;
        }

        // Sleep (blocking operation)
        thread::sleep(interval);
    }

    info!("Sync background task finished");
}

fn expensive_calculation(input: f64, factor: f64) -> f64 {
    // Simulate CPU-intensive calculation
    let mut result = input * factor;
    for _ in 0..1000000 {
        result = result.sin().cos().tan();
    }
    result
}
```

### Message Handlers

```rust
impl Message<ProcessedData> for SyncDataProcessorActor {
    type Reply = ();

    async fn handle(&mut self, msg: ProcessedData, _: &ActorRef<Self>) -> Self::Reply {
        debug!("Actor received processed data: value={:.6}, timestamp={:?}",
               msg.value, msg.timestamp);
        self.latest_value = Some(msg.value);
    }
}

impl Message<SetFactor> for SyncDataProcessorActor {
    type Reply = f64; // Return the new factor

    async fn handle(&mut self, msg: SetFactor, _: &ActorRef<Self>) -> Self::Reply {
        info!("Actor: changing factor from {} to {}", self.factor, msg.0.0);
        self.factor = msg.0.0;

        // Could send a command to the background task if needed
        // if let Some(sender) = &self.task_sender {
        //     sender.send(TaskCommand::UpdateFactor(self.factor)).ok();
        // }

        self.factor
    }
}

impl Message<GetState> for SyncDataProcessorActor {
    type Reply = (f64, Option<f64>); // (factor, latest_value)

    async fn handle(&mut self, _: GetState, _: &ActorRef<Self>) -> Self::Reply {
        (self.factor, self.latest_value)
    }
}
```

## Key Patterns

### 1. **Proper Blocking API Usage**
- Use `tell_blocking` and `ask_blocking` only within `spawn_blocking` tasks
- These APIs are designed for Tokio's blocking thread pool
- NOT for use in `std::thread::spawn` or general sync code

### 2. **Communication Channels**
- Use `tokio::sync::mpsc` for actor → blocking task communication
- Use actor messages for blocking task → actor communication
- Separate concerns: commands vs. data

### 3. **Lifecycle Management**
- Spawn blocking tasks in `on_start`
- Clean up in `on_stop` by sending stop commands
- Await task completion to ensure proper cleanup

### 4. **Error Handling**
- Handle channel disconnections gracefully
- Propagate errors appropriately between sync and async contexts
- Use try_recv for non-blocking command checking

## Benefits

### **Runtime Protection**
- Blocking operations don't block the async executor
- Other actors continue processing messages
- Maintains system responsiveness

### **Resource Management**
- Tokio manages the blocking thread pool
- Automatic scaling based on workload
- Proper cleanup when actors stop

### **Flexibility**
- Integrate legacy synchronous code
- Handle CPU-intensive algorithms
- Bridge different execution models

## Usage Example

```rust
#[tokio::main]
async fn main() -> Result<()> {
    env_logger::init();

    // Spawn the actor with initial factor
    let (actor_ref, join_handle) = rsactor::spawn::<SyncDataProcessorActor>(2.0);

    // Let it run for a bit
    tokio::time::sleep(Duration::from_secs(2)).await;

    // Get current state
    let (factor, latest) = actor_ref.ask(GetState).await?;
    println!("Current state: factor={}, latest_value={:?}", factor, latest);

    // Change the factor
    let new_factor = actor_ref.ask(SetFactor(3.5)).await?;
    println!("Changed factor to: {}", new_factor);

    // Let it run with new factor
    tokio::time::sleep(Duration::from_secs(2)).await;

    // Stop the actor
    actor_ref.stop().await?;
    let result = join_handle.await?;
    println!("Actor stopped: {:?}", result);

    Ok(())
}
```

## Running the Example

```bash
cargo run --example actor_blocking_task
```

Output shows:
- Background task processing data continuously
- Actor receiving processed values
- Factor changes affecting calculations
- Proper cleanup on shutdown

## When to Use This Pattern

- **Mathematical Computations**: Heavy algorithms, simulations
- **Legacy Integration**: Wrapping synchronous libraries
- **File Processing**: Large file operations, compression
- **Database Operations**: Synchronous database drivers
- **System Calls**: Direct OS interactions

This pattern enables seamless integration of blocking operations while maintaining the benefits of the actor model and async runtime efficiency.