v_queue 0.3.0

simple file based queue
Documentation
# V-Queue Architecture

Detailed architecture documentation for V-Queue system.

## System Architecture

### Layer Overview

V-Queue consists of three main layers:

1. **HTTP Server Layer** (v-queue-server)
2. **Core Library Layer** (v-queue)
3. **Storage Layer** (file system)

## Core Library Architecture

### Queue Structure

Each queue consists of:

- **Queue Metadata File** (`{queue_name}_info_queue`)
  - Current partition ID
  - CRC32 checksum
  - Format: `{name};{partition_id};{crc}\n`

- **Partitions** (directories: `{queue_name}-{partition_id}`)
  - Contains actual message data
  - One partition active for writing at a time
  - Old partitions kept for consumers

- **Partition Files**:
  - `{queue_name}_queue` - Message data file
  - `{queue_name}_info_push` - Write position metadata
    - Format: `{queue_name};{right_edge};{count_pushed};{crc}\n`
    - `right_edge`: Current write position (byte offset)
    - `count_pushed`: Number of messages written to partition
    - `crc`: CRC32 checksum

### Message Format

Each message stored with the following header:

```
┌─────────────┬──────────────┬──────────────┬─────────────┬──────────┬─────────┐
│  Start Pos  │ Msg Length   │ Magic Marker │ Count Push  │ Msg Type │  CRC32  │
│  (8 bytes)  │  (4 bytes)   │  (4 bytes)   │  (4 bytes)  │ (1 byte) │(4 bytes)│
└─────────────┴──────────────┴──────────────┴─────────────┴──────────┴─────────┘
                                HEADER (25 bytes)
┌──────────────────────────────────────────────────────────────────────────────┐
│                          Message Body (variable length)                       │
└──────────────────────────────────────────────────────────────────────────────┘
```

**Header Fields**:
- `start_pos` (u64): Absolute position in partition
- `msg_length` (u32): Length of message body
- `magic_marker` (u32): Fixed value for validation (0xEEEFFEEE)
- `count_pushed` (u32): Sequential message counter
- `msg_type` (u8): Message type (b'S' = String, b'O' = Object)
- `crc` (u32): CRC32 checksum of header + body

### Consumer Tracking

Consumers maintain their own offset tracking:

- **Consumer Metadata File** (`{queue_name}_info_pop_{consumer_name}`)
  - Queue name
  - Consumer name
  - Byte offset in partition
  - Number of messages read
  - Current partition ID

- **Format**: `{queue_name};{consumer_name};{offset};{count_popped};{partition_id}\n`

Note: Consumer metadata does not include CRC32 checksum (unlike queue and push metadata files).

### File Locking

Write operations use exclusive file locks:

```rust
// Lock file: {queue_name}_queue.lock
file.lock_exclusive()?;  // Prevents concurrent writers
```

Only one writer per queue allowed at a time.

## HTTP Server Architecture

### Technology Stack

- **Web Framework**: Axum (async HTTP framework)
- **Runtime**: Tokio (async runtime)
- **Serialization**: Serde + JSON
- **Logging**: Tracing + tracing-subscriber
- **Authentication**: Custom middleware with Basic Auth

### Request Flow

```
Client Request
┌─────────────────┐
│  CORS Layer     │  Allow cross-origin requests
└────────┬────────┘
┌─────────────────┐
│  Trace Layer    │  Logging and metrics
└────────┬────────┘
┌─────────────────┐
│  Auth Middleware│  Validate credentials (if enabled)
└────────┬────────┘
┌─────────────────┐
│  Route Handler  │  Process request
└────────┬────────┘
┌─────────────────┐
│ Queue Manager   │  Access core library
└────────┬────────┘
┌─────────────────┐
│  Core Library   │  Read/write to disk
└────────┬────────┘
   Response to Client
```

### State Management

```rust
pub struct AppState {
    queue_manager: Arc<Mutex<QueueManager>>,
}
```

- **Arc**: Shared ownership across async tasks
- **Mutex**: Thread-safe access to queue manager
- Cloned for each request handler

### Queue Manager

Manages all queues and consumers:

```rust
pub struct QueueManager {
    base_path: String,
    consumers: HashMap<String, Consumer>,
    auth_config: AuthConfig,
}
```

**Responsibilities**:
- Create/open queues on demand
- Manage consumer instances
- Track consumer offsets
- Coordinate access to core library

## Data Flow

### Producer Flow (Direct Library Access)

**Note**: The HTTP server currently does not provide producer/push endpoints. Messages must be produced using the v_queue library directly from Rust code or via JNI bindings.

```
1. Application calls queue.push() with message data
2. Core library writes to partition file
3. Update metadata files
4. fsync() for durability
5. Return success to application
```

### Consumer Flow (HTTP GET)

```
1. Client requests messages → HTTP endpoint
2. Auth middleware validates credentials
3. Handler extracts parameters (timeout, max_messages)
4. Queue Manager opens consumer
5. Core library reads from offset
6. Validate CRC32 checksums
7. Deserialize messages
8. Return messages as JSON array
9. Consumer offset NOT updated yet
```

### Commit Flow (HTTP POST)

```
1. Client commits → HTTP endpoint
2. Auth middleware validates credentials
3. Queue Manager updates consumer offset
4. Write consumer metadata to disk
5. Return success to client
```

## Concurrency Model

### Write Concurrency

- **One writer per queue**: File lock ensures exclusive write access
- **Multiple queues**: Different queues can be written concurrently
- **Async I/O**: Tokio runtime handles async operations

### Read Concurrency

- **Multiple readers**: Multiple consumers can read simultaneously
- **Independent offsets**: Each consumer tracks its own position
- **Lock-free reads**: No locks needed for reading
- **Partition isolation**: Readers don't interfere with writers

### Thread Safety

```rust
// Queue Manager wrapped in Arc<Mutex>
let manager = Arc::new(Mutex::new(QueueManager::new()?));

// Each handler acquires lock
async fn handler(State(state): State<AppState>) {
    let mut manager = state.queue_manager.lock().unwrap();
    // ... work with manager
}
```

## Authentication Architecture

### Middleware Implementation

```rust
async fn auth_middleware(
    State(state): State<AppState>,
    request: Request,
    next: Next,
) -> Result<Response, AuthError> {
    // Extract Authorization header
    // Decode Basic Auth credentials
    // Validate against configured users
    // Allow or reject request
}
```

### Protected Routes

Authentication applied via route layering:

```rust
Router::new()
    .route("/api/v1/queues", get(list_queues))
    .route_layer(middleware::from_fn_with_state(state, auth_middleware))
```

## Error Handling

### Core Library Errors

```rust
pub enum ErrorQueue {
    NotReady,
    FailOpen,
    FailWrite,
    FailRead,
    AlreadyOpen,
    Other,
}
```

### HTTP Error Responses

- `200 OK`: Success
- `400 Bad Request`: Invalid parameters
- `401 Unauthorized`: Authentication failed
- `404 Not Found`: Queue/consumer doesn't exist
- `500 Internal Server Error`: System error

## Configuration System

### Priority Order

1. **Command-line arguments** (highest)
2. **Configuration file** (TOML)
3. **Default values** (lowest)

### Configuration Loading

```rust
// Load from file
let config = ServerConfig::from_file("config.toml")?;

// Merge with CLI args
config.merge_with_args(bind, data_dir, log_level, no_auth);
```

## Performance Characteristics

### Write Performance

- **Sequential I/O**: Messages appended to end of file
- **Batch Writes**: Single fsync per message
- **CRC32**: Fast hardware-accelerated checksums
- **No Index**: Simple append-only writes

### Read Performance

- **Sequential Reads**: Follow message chain
- **Consumer Caching**: Maintain file handles
- **Zero-Copy**: Direct file-to-memory mapping where possible
- **Prefetching**: Read multiple messages in batch

### Memory Usage

- **Low Footprint**: Minimal in-memory state
- **File Handle Pool**: Reuse open file handles
- **Streaming**: Process messages without loading all to memory

## Limitations and Trade-offs

### Design Trade-offs

- **No Indexing**: Simpler code, slower random access
- **File-Based**: Easy backup, no clustering
- **Sequential Processing**: Guaranteed order, no parallel processing per consumer
- **Synchronous Writes**: Durability over throughput

### Scalability Limits

- **Single Node**: No distributed architecture
- **Local Disk**: Network storage not recommended
- **File Descriptors**: Limited by OS file handle limits
- **Partition Size**: Very large partitions may impact startup time

## Monitoring and Observability

### Logging

Uses `tracing` framework with configurable levels:
- DEBUG: Detailed operation logs
- INFO: Normal operations
- WARN: Potential issues
- ERROR: Critical errors

### Health Check

Endpoint: `GET /health`

Returns server status and basic metrics.

## Next Steps

- [Installation Guide]03-installation.md
- [Configuration Details]04-configuration.md
- [Performance Tuning]08-performance.md