manager_handlers 0.7.2

A microservice manager implementation that creates HTTP-accessible handlers with configurable replicas. Handlers communicate via an internal bus, enabling collaborative request processing in a distributed architecture.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
# manager_handlers

[![Crates.io](https://img.shields.io/crates/v/manager_handlers.svg)](https://crates.io/crates/manager_handlers)
[![Documentation](https://docs.rs/manager_handlers/badge.svg)](https://docs.rs/manager_handlers)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)

A scalable, async-driven microservice framework for Rust that enables dynamic handler registration with HTTP endpoints and internal pub/sub messaging.

## Overview

`manager_handlers` is built on top of Actix Web and Tokio, providing a robust foundation for building microservice architectures. It allows you to create handlers that process HTTP requests and communicate with each other through an internal message bus, making it highly modular and scalable.

## Features

- **🚀 Dynamic Handler Registration**: Register handlers at runtime with configurable replica counts for horizontal scaling
- **📬 Dual Communication**: Internal pub/sub messaging bus + Redis pub/sub for distributed systems
- **🔒 Security First**: Built-in TLS support with optional client certificate authentication
- **📁 File Operations**: Streaming file upload/download with metadata support
- **🎯 Concurrency Control**: Semaphore-based request limiting with per-handler concurrency settings
- **💾 Shared State**: Thread-safe state management with support for primitives and function pointers
- **🔄 Async by Design**: Built on Tokio for high-performance async I/O operations
- **⚡ Zero-Copy Streaming**: Efficient file handling without loading entire files into memory
- **🛑 Graceful Shutdown**: Coordinated service termination with cleanup

## Requirements

- Rust 1.85+ (2024 edition)
- Tokio runtime
- OpenSSL (for TLS support)

## Installation

Add this to your `Cargo.toml`:

```toml
[dependencies]
manager_handlers = "0.7.2"
async-trait = "0.1"
```

Or use cargo:

```bash
cargo add manager_handlers async-trait
```

## Quick Start

```rust
use manager_handlers::manager::Manager;
use manager_handlers::handler;
use async_trait::async_trait;

// Define a simple handler using the macro
handler!(HelloHandler, hello;
    async fn run(&self, _: String, data: String) -> Result<String, Box<dyn std::error::Error + Send + Sync>> { 
     Ok(format!("Hello, {}!", data))
    }
);

#[tokio::main]
async fn main() {
    let mut manager = Manager::new_default();
    
    // Register the handler with 3 replicas
    manager.add_handler::<HelloHandler>("hello", 3);
    
    // Start the server on port 8080
    manager.start().await;
}
```

## Core Concepts

### Handlers

Handlers are the core processing units that:
- Process incoming HTTP requests
- Communicate with other handlers via message bus
- Access shared state
- Handle file operations

### Message Bus

The internal `MultiBus` provides:
- Async message passing between handlers
- Request/response pattern with `publish()`
- Fire-and-forget pattern with `dispatch()`
- Backpressure and timeout handling

### Shared State

Thread-safe storage supporting:
- Primitive types (Int, Float, String, etc.)
- Synchronous and async function pointers
- Custom types via `AnyType` wrapper

## Usage Examples

### Creating Custom Handlers

Implement the `Base` trait for full control:

```rust
use async_trait::async_trait;
use std::sync::Arc;
use futures::future::{BoxFuture, FutureExt};
use std::time::Duration;
use tokio::time::sleep;
use manager_handlers::multibus::MultiBus;
use manager_handlers::manager::{StateType, SharedState, Base};

pub struct MyHandler {
   communication_line: Arc<MultiBus>,
   shared_state: Arc<SharedState>
};

#[async_trait]
impl Base for MyHandler {
    async fn run(&self, src: String, data: String) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
        // Process the incoming data
        println!("Received data: {}", data);
        
        // Example: Publishing a message and awaiting response
        let response = self.publish(
            data.clone(),
            "other_handler".to_string()
        ).await;
        
        // Example: Fire-and-forget dispatch
        self.dispatch(
            "notification data".to_string(), 
            "notification_handler".to_string()
        ).await;
        
        // Example: Store a value in shared state
        self.shared_state.insert(&"counter".to_string(), StateType::Int(42)).await;
        
        // Example: Store a synchronous function
        let shared_function: Arc<dyn Fn(String) -> String + Sync + Send> = Arc::new(|input: String| -> String {
          println!("Hello, {}!", input);
          input + " pesto"
        });
        self.shared_state.insert(&"sync_func".to_string(), StateType::FunctionSync(shared_function)).await;
        
        // Example: Store an asynchronous function
        let shared_async_function: Arc<dyn Fn(String) -> BoxFuture<'static, String> + Send + Sync> = Arc::new(|input: String| async move {
          println!("Got in the async function");
          sleep(Duration::from_secs(5)).await;
          "Done".to_string()
        }.boxed());
        self.shared_state.insert(&"async_func".to_string(), StateType::FunctionAsync(shared_async_function)).await;
        
        Ok(format!("Processed data with response: {}", response))
    }

   fn get_shared_state(&self) -> Arc<SharedState> {
      Arc::clone(&self.shared_state)
   }
   fn get_communication_line(&self) -> Arc<MultiBus> {
      Arc::clone(&self.communication_line)
   }
   fn get_name(&self) -> String {
      "myhandler".to_string()
   }
   fn new(communication_line: Arc<MultiBus>, shared_state: Arc<SharedState>) -> Self {
      MyHandler {communication_line, shared_state}
   }
}
```

### Using the Handler Macro

For simpler handlers, use the `handler!` macro:

```rust
use manager_handlers::handler;

// Simple handler
handler!(EchoHandler, echo; 
    async fn run(&self, _: String, data: String) -> Result<String, Box<dyn std::error::Error + Send + Sync>>  {
      Ok(format!("Echo from {}: {}", src, data))
    }
);

// Handler with state access
handler!(CounterHandler, counter;
   async fn run(&self, _: String, data: String) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
       let state = self.get_shared_state();
       
       // Increment counter
       let counter = match state.get(&"counter".to_string()).await {
           Some(StateType::Int(val)) => val + 1,
           _ => 1,
       };
       
       state.insert(&"counter".to_string(), StateType::Int(counter)).await;
       Ok(format!("Counter: {}", counter))
   }
);
```

### Manager Configuration

```rust
use manager_handlers::manager::Manager;
use std::collections::HashMap;

#[tokio::main]
async fn main() {
    // Create a new Manager instance
    let mut manager = Manager::new_default();
    
    // Optional: Configure TLS
    manager.with_tls("path/to/cert.pem", "path/to/key.pem", Some("path/to/ca.pem"));
    
    // Optional: Set allowed client certificate names if using client cert auth
    manager.with_allowed_names(vec!["client1".to_string(), "client2".to_string()]);
    
    // Optional: Set API key for authentication
    manager.with_api_key("my-secret-api-key");
    
    // Optional: Configure maximum concurrent requests
    manager.with_max_requests(100);
    
    // Optional: Configure keep-alive timeout
    manager.with_keep_alive(30);
    
    // Register handlers with their replica counts
    manager.add_handler::<MyHandler>("my_handler", 5);
    manager.add_handler::<OtherHandler>("other_handler", 2);
    
    println!("Starting manager...");
    
    // Start the manager
    manager.start().await;
}
```

## Advanced Features

### Redis Integration

Enable distributed pub/sub with Redis:

```rust
// Configure Redis URL
manager.with_redis_url(Some("redis://localhost:6379".to_string()));

// In your handler, use Redis pub/sub
let response = self.publish_redis(
    "message".to_string(), 
    "remote_handler".to_string(), 
    Some(5000) // 5 second timeout
).await;

// Subscribe to Redis topics
let request = self.subscribe_topic_redis("my_topic".to_string()).await?;
```

### File Operations

Implement file handling capabilities:

1. **UploadHandler**: Implement this trait to customize how files are uploaded and stored.
   ```rust
   #[async_trait]
   impl Base for UploadHandler {
       async fn run_stream(&self, src: String, mut stream: Pin<Box<dyn Stream<Item=Bytes> + Send>>, file_name: String, approx_size: usize) -> Result<String, Box<dyn Error + Send + Sync>> {
          todo!()
       }
   }
   ```

2. **DownloadHandler**: Implement this trait to customize how files are downloaded.
   ```rust
   #[async_trait]
   impl Base for DownloadHandler {
       async fn run_file(&self, src: String, filename: String) -> Result<(Box<dyn AsyncRead + Send + Unpin>, u64), Box<dyn Error + Send + Sync>> {
          todo!()    
       }
   }
   ```

3. **MetadataHandler**: Implement this trait to customize how file metadata is retrieved.
   ```rust
   #[async_trait]
   impl Base for MetadataHandler {
       async fn run_metadata(&self, src: String, filename: String) -> Result<String, Box<dyn Error + Send + Sync>> {
          todo!() 
       }
   }
   ```

## API Reference

### HTTP Endpoints

### Handler Endpoints

#### `POST /{handler_name}`

Send a request to a registered handler.

Example:
```bash
curl -X POST http://localhost:8080/my_handler \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -d '{"type":"request","src":"client","data":"hello world"}'
```

### File Operations

#### Upload a File: `POST /stream/upload/{file_name}`

Upload files to the server.

Example:
```bash
curl -X POST http://localhost:8080/stream/upload/example.txt \
  -H "Authorization: Bearer YOUR_API_KEY" \
  --data-binary "@/path/to/local/example.txt"
```

#### Download a File: `GET /stream/download/{file_id}`

Download a previously uploaded file.

Example:
```bash
curl -X GET http://localhost:8080/stream/download/abc123 \
  -H "Authorization: Bearer YOUR_API_KEY" \
  --output downloaded_file.txt
```

#### Retrieve File Metadata: `GET /stream/metadata/{file_id}`

Get metadata for a file.

Example:
```bash
curl -X GET http://localhost:8080/stream/metadata/abc123 \
  -H "Authorization: Bearer YOUR_API_KEY"
```

### System Management

#### Shutdown Server: `POST /shutdown`

Gracefully shut down the server.

Example:
```bash
curl -X POST http://localhost:8080/shutdown \
  -H "Authorization: Bearer YOUR_API_KEY"
```

## Security

### Authentication Methods

1. **API Key Authentication**
   - Set via `with_api_key()`
   - Pass as Bearer token in Authorization header

2. **TLS/SSL Support**
   - Configure with `with_tls()`
   - Optional client certificate verification

3. **Client Certificate Authentication**
   - Specify allowed certificate names with `with_allowed_names()`
   - Requires CA certificate configuration

### Example Security Configuration

```rust
let mut manager = Manager::new_default();

// Enable API key authentication
manager.with_api_key("your-secret-api-key");

// Configure TLS with client certificates
manager.with_tls(
    "path/to/server-cert.pem",
    "path/to/server-key.pem", 
    Some("path/to/ca-cert.pem")
);

// Allow specific client certificates
manager.with_allowed_names(vec![
    "trusted-client-1".to_string(),
    "trusted-client-2".to_string()
]);
```

## Performance Tuning

### Concurrency Settings

```rust
// Limit total concurrent HTTP requests
manager.with_max_requests(1000);

// Configure handler replicas for load distribution
manager.add_handler::<MyHandler>("heavy_processor", 10);

// Set keep-alive for connection reuse
manager.with_keep_alive(60);
```

### Resource Limits

- Maximum payload size: 10 GiB
- Maximum JSON size: 1 GiB
- Default request timeout: 120 seconds
- WebSocket ping interval: 10 seconds

## Error Handling

The framework provides comprehensive error handling:

```rust
// Handler errors are automatically caught and returned
async fn run(&self, src: String, data: String) -> Result<String, Box<dyn Error + Send + Sync>> {
    // Your error will be properly formatted and returned to client
    Err(Box::new(std::io::Error::new(
        std::io::ErrorKind::NotFound,
        "Resource not found"
    )))
}
```

### Common Error Responses

```json
// Handler not found
{
    "status": "error",
    "message": "Handler not found: invalid_handler"
}

// Authentication failure
{
    "status": "error", 
    "message": "Unauthorized"
}

// Internal error
{
    "status": "error",
    "message": "Internal server error: details..."
}
```

## Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

## License

This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

## Author

Matei Aruxandei - [stefmatei22@gmail.com](mailto:stefmatei22@gmail.com)