clicktype 0.2.0

Type-safe ClickHouse client for Rust with compile-time query validation
Documentation
# ClickType

ClickType is a ClickHouse client for Rust designed for bulk data ingestion and type-safe query construction. It focuses on explicit memory control and high performance by utilizing the RowBinary format.

## Key Features

*   **Data Modeling**: Schema definition via the `#[derive(ClickTable)]` macro.
*   **Batch Ingestion**: Async buffering system with active memory management and backpressure control.
*   **Query Builder**: Fluent API for SQL generation, validating column names and types at compile time.
*   **Observability**: Native integration with the `tracing` ecosystem for monitoring latencies and errors.
*   **Complex Types**: Support for `Nullable`, `LowCardinality`, `Array`, and `Map`.

---

## Installation

Add the dependency to your `Cargo.toml` file:

```toml
[dependencies]
clicktype = "0.1"
tokio = { version = "1", features = ["full"] }
tracing = "0.1"
```

---

## Table Definition

Map your ClickHouse tables to Rust structs. The macro handles serialization and DDL generation.

```rust
use clicktype::prelude::*;

#[derive(ClickTable)]
#[click_table(name = "user_events", primary_key = "user_id")]
pub struct UserEvent {
    #[click_column(primary_key)]
    pub user_id: u64,

    pub timestamp: DateTime64<3>,

    pub event_type: LowCardinality<String>,

    pub properties: Map<String, String>,

    pub session_id: Nullable<String>,
    
    #[click_column(materialized = "now()")]
    pub server_time: DateTime64<3>,
}
```

---

## Data Ingestion (Batcher)

The `Batcher` manages the grouping of rows in memory before sending them to ClickHouse. It allows configuration of row limits, buffer sizes, and timeouts.

### Memory and Error Management
*   **Memory Release**: The internal buffer is automatically reduced after a flush if it exceeds the configured threshold (`buffer_shrink_threshold`).
*   **Supervision**: The `spawn()` method returns the worker's `JoinHandle` to detect unexpected stops or panics.
*   **Backpressure Strategies**: Supports `insert` (blocks until capacity is available) and `try_insert` (fails immediately if the buffer is full).

### Usage Example

```rust
use clicktype::batch::{Batcher, BatchConfig};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = BatchConfig {
        max_rows: 50_000,
        max_wait: Duration::from_secs(5),
        buffer_shrink_threshold: 2 * 1024 * 1024, // 2 MB
        ..BatchConfig::default()
    };

    let client = clicktype::Client::new("http://localhost:8123");
    let batcher = Batcher::<UserEvent>::new(client, config);
    
    // The handle is used for data insertion; worker_task supervises the execution thread.
    let (handle, worker_task) = batcher.spawn();

    let event = UserEvent {
        user_id: 1,
        timestamp: DateTime64::now(),
        event_type: "click".into(),
        properties: [("key".into(), "val".into())].into(),
        session_id: Nullable::null(),
        server_time: DateTime64::default(),
    };

    // Insertion with backpressure
    handle.insert(event).await?;

    handle.close().await?;
    worker_task.await?;

    Ok(())
}
```

---

## Querying

The `QueryBuilder` generates structured SQL using the column constants provided by the macro.

```rust
use clicktype::query::QueryBuilder;

async fn get_stats(client: &clicktype::Client) -> Result<(), Box<dyn std::error::Error>> {
    let sql = QueryBuilder::<UserEvent>::new()
        .select(UserEvent::USER_ID)
        .select_raw("count() as total")
        .filter(UserEvent::TIMESTAMP, ">=", "today()")
        .group_by(UserEvent::USER_ID)
        .limit(10)
        .to_sql();
    
    let results = client.query::<ResultRow>(&sql).await?;
    Ok(())
}
```

---

## Observability

ClickType uses `tracing` to emit events. Logs include structured data regarding:
*   Number of rows and bytes sent per batch.
*   Latency of HTTP requests to ClickHouse.
*   Retries performed during network failures.

To enable log output:
```rust
tracing_subscriber::fmt::init();
```

---

## Implementation Details

1.  **RowBinary Protocol**: RowBinary is used for all insertions as it is the most efficient format in terms of CPU and bandwidth.
2.  **Incremental Serialization**: Data is serialized to the buffer at the time of insertion (`insert`), distributing CPU cost and preventing latency spikes during the flush operation.
3.  **Buffer Protection**: If a batch persistently fails after all configured retries, the buffer is cleared to prevent a total deadlock of the ingestion process.
4.  **Schema Validation**: Automatic schema validation on first insert prevents silent data corruption from schema mismatches.

---

## Production Considerations

### Automatic Schema Protection

**ClickType validates your schema automatically to prevent data corruption.**

RowBinary is ClickHouse's fastest format, but it's position-based (no column names in the wire format). ClickType protects you with comprehensive validation:

**What ClickType Validates Automatically:**
✅ **Column order** - Position mismatch detected immediately
✅ **Column types** - Type mismatches caught before any data is sent
✅ **Column count** - Missing or extra insertable columns detected
✅ **Column names** - Ensures struct fields match table columns

**Validation happens on first insert:**
```rust
let batcher = Batcher::<UserEvent>::new(client, config);
let (handle, worker) = batcher.spawn();

// Schema validation runs here - fails fast if anything is wrong:
handle.insert(event).await?;
// ✓ Order validated
// ✓ Types validated
// ✓ Count validated
```

**Example validation error (column order mismatch):**
```
Schema validation failed for table 'events':
Column order mismatch at position 0: struct has 'id', table has 'name'
Column order mismatch at position 1: struct has 'name', table has 'value'
```

**Inherent RowBinary Limitations (Can't be validated):**
- ⚠️ MATERIALIZED/ALIAS expressions (not insertable, skipped)
- ⚠️ DEFAULT expressions (server-side, not visible in schema)
- ⚠️ Codec settings (compression, internal)

### Best Practices

#### 1. Schema Changes - Use Migrations

When you need to change your schema:

```rust
// Step 1: Create new table version
#[derive(ClickTable)]
#[click_table(name = "events_v2")]
pub struct EventV2 {
    pub id: u64,
    pub new_field: String,  // ← Added field
    pub timestamp: u64,
}

// Step 2: Deploy code that writes to BOTH tables
batcher_v1.insert(old_event).await?;
batcher_v2.insert(new_event).await?;

// Step 3: Backfill data
// INSERT INTO events_v2 SELECT id, '', timestamp FROM events

// Step 4: Switch reads to v2

// Step 5: Stop writes to v1, drop old table
```

**Alternative:** Use ClickHouse ALTER TABLE for compatible changes:
```sql
-- Adding a column with DEFAULT (safe)
ALTER TABLE events ADD COLUMN new_field String DEFAULT ''
```

#### 2. Monitor Buffer Memory

```rust
let config = BatchConfig {
    max_buffer_size: 64 * 1024 * 1024,        // 64 MB hard limit
    buffer_shrink_threshold: 2 * 1024 * 1024, // Shrink if > 2 MB after flush
    ..Default::default()
};
```

**Why This Matters:**
- Traffic spike → 64 MB buffer allocated
- Without shrink threshold → buffer stays 64 MB forever (memory leak!)
- With shrink threshold → buffer returns to 1 MB after spike

#### 3. Choose Backpressure Strategy

```rust
// Data integrity priority (wait for capacity)
handle.insert(event).await?;  // Blocks if channel full

// High availability priority (drop if full)
if let Err(_) = handle.try_insert(event) {
    metrics.record_dropped_event();
}
```

#### 4. Supervise Worker Task

```rust
let (handle, worker) = batcher.spawn();

tokio::select! {
    result = worker => {
        // Worker crashed - handle it!
        error!("Batcher worker died: {:?}", result);
        // Restart batcher, alert ops, etc.
    }
    _ = tokio::signal::ctrl_c() => {
        handle.close().await?;
    }
}
```

### Troubleshooting

#### "Schema validation failed: type mismatch"

**Cause:** Struct field type doesn't match ClickHouse column type.

**Fix:**
```sql
-- Check actual ClickHouse schema
DESCRIBE TABLE your_table;
```

Compare with your Rust struct. Common mismatches:
- `i32` vs `i64` (Int32 vs Int64)
- `String` vs `LowCardinality<String>`
- `Option<T>` vs `T` (Nullable vs non-Nullable)

#### "Insert failed after 3 retries"

**Causes:**
1. Network issues
2. ClickHouse server overload
3. Quota/permissions
4. Invalid data (e.g., duplicate primary key)

**Debugging:**
```rust
use tracing_subscriber;

// Enable detailed logs
tracing_subscriber::fmt()
    .with_max_level(tracing::Level::DEBUG)
    .init();

// Logs will show:
// - Exact HTTP error from ClickHouse
// - Retry attempts
// - Batch size and payload
```

#### Memory keeps growing

**Check:**
1. Is `buffer_shrink_threshold` configured?
2. Are flushes completing successfully?
3. Is the channel capacity too large?

**Fix:**
```rust
let config = BatchConfig {
    buffer_shrink_threshold: 2 * 1024 * 1024,  // Add this!
    max_buffer_size: 64 * 1024 * 1024,
    channel_capacity: 10_000,  // Don't set too high
    ..Default::default()
};
```

#### Worker stops processing

**Likely cause:** Worker panicked due to:
1. Schema validation failure (fixed in v0.1+)
2. Network error during flush
3. Out of memory

**Detection:**
```rust
let (handle, worker) = batcher.spawn();

// Monitor worker
tokio::spawn(async move {
    if let Err(e) = worker.await {
        error!("Worker panicked: {:?}", e);
        // Alert, restart, failover, etc.
    }
});
```

---

## Testing

### Property-Based Tests

ClickType includes extensive fuzzing tests using `proptest`:
```bash
cargo test -p clicktype-core --test fuzzing_tests
```

Validates:
- Roundtrip serialization for all types
- Edge cases (NaN, infinity, empty strings, null bytes, max values)
- Large data (1MB strings, 100k element arrays)

### Load Tests

Production-scale stress tests (run manually):
```bash
# 1M row insertion test
cargo test --release --test load_tests load_test_1m_rows -- --ignored --nocapture

# Burst scenario (memory management)
cargo test --release --test load_tests load_test_burst_scenario -- --ignored --nocapture

# Concurrent inserts
cargo test --release --test load_tests load_test_concurrent_inserts -- --ignored --nocapture

# Backpressure testing
cargo test --release --test load_tests load_test_try_insert_backpressure -- --ignored --nocapture
```

---