dynamo-llm 1.0.2

Dynamo LLM Library
# Active Message Handling System

This module provides an async future-based active message handling system with proper error handling, response notifications, and channel-based communication.

## Key Features

- **Async Future-Based**: Handlers are `Arc<dyn Future>` that can capture resources and run asynchronously
- **Concurrency Control**: Configurable concurrency limits with semaphore-based throttling
- **Response Notifications**: Optional response notifications with `:ok` or `:err(<message>)` format
- **Channel-Based Communication**: All communication happens through channels for clean separation
- **Error Handling**: Comprehensive error handling with logging and monitoring
- **Resource Capture**: Handlers can capture and share resources safely

## Architecture

```
┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│  Communication  │───▶│ ActiveMessage    │───▶│   Handler       │
│     Layer       │    │    Manager       │    │   Futures       │
└─────────────────┘    └──────────────────┘    └─────────────────┘
         ▲                        │                       │
         │                        ▼                       ▼
         │              ┌──────────────────┐    ┌─────────────────┐
         └──────────────│   Response       │◀───│   Async Task    │
                        │  Notifications   │    │    Pool         │
                        └──────────────────┘    └─────────────────┘
```

## Usage

### 1. Initialize the System

```rust
use dynamo_llm::block_manager::distributed::worker::*;

// Create a worker and initialize active message manager
let mut worker = KvBlockManagerWorker::new(config)?;
worker.init_active_message_manager(4)?; // 4 concurrent handlers

// Create handlers
let handlers = create_example_handlers();
worker.register_handlers(handlers)?;

// Get communication channels
let message_sender = worker.get_message_sender()?;
let response_receiver = worker.get_response_receiver()?;
```

### 2. Create Custom Handlers

```rust
#[derive(Clone)]
struct MyHandler {
    name: String,
    shared_resource: Arc<Mutex<SomeResource>>,
}

impl MyHandler {
    async fn handle_message(&self, data: Vec<u8>) -> Result<()> {
        // Process the message asynchronously
        let processed_data = self.process_data(data).await?;

        // Update shared resources
        let mut resource = self.shared_resource.lock().await;
        resource.update(processed_data)?;

        Ok(())
    }
}

// Register the handler
let handler = MyHandler::new("my_handler".to_string(), shared_resource);
let mut handlers = HashMap::new();
handlers.insert("my_message_type".to_string(), create_handler!(handler));
```

### 3. Send Messages

```rust
// Message with response notification
let message = IncomingActiveMessage {
    message_type: "my_message_type".to_string(),
    message_data: b"Hello, World!".to_vec(),
    response_notification: Some("request_123".to_string()),
};

message_sender.send(message)?;
```

### 4. Handle Responses

```rust
// Spawn a task to handle responses
tokio::spawn(async move {
    while let Some(response) = response_receiver.recv().await {
        match response.is_success {
            true => {
                info!("✅ Success: {}", response.notification);
                // response.notification = "request_123:ok"
            }
            false => {
                warn!("❌ Error: {}", response.notification);
                // response.notification = "request_123:err(Error message)"
            }
        }
    }
});
```

## Message Flow

1. **Incoming Message**: Communication layer receives bytes and optional response notification prefix
2. **Channel Send**: Message is sent through the channel to the active message manager
3. **Handler Lookup**: Manager finds the appropriate handler for the message type
4. **Future Creation**: Handler factory creates an async future with captured resources
5. **Async Execution**: Future is spawned in a task with concurrency control
6. **Response Generation**: On completion, response notification is generated (if requested)
7. **Response Send**: Response is sent back through the response channel

## Response Notification Format

- **Success**: `{prefix}:ok`
- **Error**: `{prefix}:err({error_message})`

Example:
- Request with notification prefix: `"user_request_456"`
- Success response: `"user_request_456:ok"`
- Error response: `"user_request_456:err(Invalid data format)"`

## Error Handling

The system provides multiple levels of error handling:

1. **Handler Errors**: Caught and converted to error response notifications
2. **Unknown Message Types**: Generate error responses for unregistered message types
3. **Channel Errors**: Logged and handled gracefully
4. **Concurrency Limits**: Managed with semaphores to prevent resource exhaustion

## Testing

Run the comprehensive test suite:

```bash
cargo test test_active_message_flow
cargo test test_resource_capturing_handler
cargo test test_communication_integration
cargo test test_concurrency_performance
```

## Performance Characteristics

- **Concurrency**: Configurable concurrent handler limit
- **Memory**: Efficient channel-based communication with minimal copying
- **Latency**: Low-latency message dispatch with async processing
- **Throughput**: High throughput with proper backpressure handling

## Best Practices

1. **Handler Design**: Keep handlers lightweight and async-friendly
2. **Resource Management**: Use `Arc<Mutex<T>>` for shared resources
3. **Error Handling**: Always handle errors gracefully in handlers
4. **Concurrency**: Set appropriate concurrency limits based on workload
5. **Monitoring**: Use the response notifications for monitoring and debugging