# ClickType
ClickType is a ClickHouse client for Rust designed for bulk data ingestion and type-safe query construction. It focuses on explicit memory control and high performance by utilizing the RowBinary format.
## Key Features
* **Data Modeling**: Schema definition via the `#[derive(ClickTable)]` macro.
* **Batch Ingestion**: Async buffering system with active memory management and backpressure control.
* **Query Builder**: Fluent API for SQL generation, validating column names and types at compile time.
* **Observability**: Native integration with the `tracing` ecosystem for monitoring latencies and errors.
* **Complex Types**: Support for `Nullable`, `LowCardinality`, `Array`, and `Map`.
---
## Installation
Add the dependency to your `Cargo.toml` file:
```toml
[dependencies]
clicktype = "0.1"
tokio = { version = "1", features = ["full"] }
tracing = "0.1"
```
---
## Table Definition
Map your ClickHouse tables to Rust structs. The macro handles serialization and DDL generation.
```rust
use clicktype::prelude::*;
#[derive(ClickTable)]
#[click_table(name = "user_events", primary_key = "user_id")]
pub struct UserEvent {
#[click_column(primary_key)]
pub user_id: u64,
pub timestamp: DateTime64<3>,
pub event_type: LowCardinality<String>,
pub properties: Map<String, String>,
pub session_id: Nullable<String>,
#[click_column(materialized = "now()")]
pub server_time: DateTime64<3>,
}
```
---
## Data Ingestion (Batcher)
The `Batcher` manages the grouping of rows in memory before sending them to ClickHouse. It allows configuration of row limits, buffer sizes, and timeouts.
### Memory and Error Management
* **Memory Release**: The internal buffer is automatically reduced after a flush if it exceeds the configured threshold (`buffer_shrink_threshold`).
* **Supervision**: The `spawn()` method returns the worker's `JoinHandle` to detect unexpected stops or panics.
* **Backpressure Strategies**: Supports `insert` (blocks until capacity is available) and `try_insert` (fails immediately if the buffer is full).
### Usage Example
```rust
use clicktype::batch::{Batcher, BatchConfig};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = BatchConfig {
max_rows: 50_000,
max_wait: Duration::from_secs(5),
buffer_shrink_threshold: 2 * 1024 * 1024, // 2 MB
..BatchConfig::default()
};
let client = clicktype::Client::new("http://localhost:8123");
let batcher = Batcher::<UserEvent>::new(client, config);
// The handle is used for data insertion; worker_task supervises the execution thread.
let (handle, worker_task) = batcher.spawn();
let event = UserEvent {
user_id: 1,
timestamp: DateTime64::now(),
event_type: "click".into(),
properties: [("key".into(), "val".into())].into(),
session_id: Nullable::null(),
server_time: DateTime64::default(),
};
// Insertion with backpressure
handle.insert(event).await?;
handle.close().await?;
worker_task.await?;
Ok(())
}
```
---
## Querying
The `QueryBuilder` generates structured SQL using the column constants provided by the macro.
```rust
use clicktype::query::QueryBuilder;
async fn get_stats(client: &clicktype::Client) -> Result<(), Box<dyn std::error::Error>> {
let sql = QueryBuilder::<UserEvent>::new()
.select(UserEvent::USER_ID)
.select_raw("count() as total")
.filter(UserEvent::TIMESTAMP, ">=", "today()")
.group_by(UserEvent::USER_ID)
.limit(10)
.to_sql();
let results = client.query::<ResultRow>(&sql).await?;
Ok(())
}
```
---
## Observability
ClickType uses `tracing` to emit events. Logs include structured data regarding:
* Number of rows and bytes sent per batch.
* Latency of HTTP requests to ClickHouse.
* Retries performed during network failures.
To enable log output:
```rust
tracing_subscriber::fmt::init();
```
---
## Implementation Details
1. **RowBinary Protocol**: RowBinary is used for all insertions as it is the most efficient format in terms of CPU and bandwidth.
2. **Incremental Serialization**: Data is serialized to the buffer at the time of insertion (`insert`), distributing CPU cost and preventing latency spikes during the flush operation.
3. **Buffer Protection**: If a batch persistently fails after all configured retries, the buffer is cleared to prevent a total deadlock of the ingestion process.
4. **Schema Validation**: Automatic schema validation on first insert prevents silent data corruption from schema mismatches.
---
## Production Considerations
### Automatic Schema Protection
**ClickType validates your schema automatically to prevent data corruption.**
RowBinary is ClickHouse's fastest format, but it's position-based (no column names in the wire format). ClickType protects you with comprehensive validation:
**What ClickType Validates Automatically:**
✅ **Column order** - Position mismatch detected immediately
✅ **Column types** - Type mismatches caught before any data is sent
✅ **Column count** - Missing or extra insertable columns detected
✅ **Column names** - Ensures struct fields match table columns
**Validation happens on first insert:**
```rust
let batcher = Batcher::<UserEvent>::new(client, config);
let (handle, worker) = batcher.spawn();
// Schema validation runs here - fails fast if anything is wrong:
handle.insert(event).await?;
// ✓ Order validated
// ✓ Types validated
// ✓ Count validated
```
**Example validation error (column order mismatch):**
```
Schema validation failed for table 'events':
Column order mismatch at position 0: struct has 'id', table has 'name'
Column order mismatch at position 1: struct has 'name', table has 'value'
```
**Inherent RowBinary Limitations (Can't be validated):**
- ⚠️ MATERIALIZED/ALIAS expressions (not insertable, skipped)
- ⚠️ DEFAULT expressions (server-side, not visible in schema)
- ⚠️ Codec settings (compression, internal)
### Best Practices
#### 1. Schema Changes - Use Migrations
When you need to change your schema:
```rust
// Step 1: Create new table version
#[derive(ClickTable)]
#[click_table(name = "events_v2")]
pub struct EventV2 {
pub id: u64,
pub new_field: String, // ← Added field
pub timestamp: u64,
}
// Step 2: Deploy code that writes to BOTH tables
batcher_v1.insert(old_event).await?;
batcher_v2.insert(new_event).await?;
// Step 3: Backfill data
// INSERT INTO events_v2 SELECT id, '', timestamp FROM events
// Step 4: Switch reads to v2
// Step 5: Stop writes to v1, drop old table
```
**Alternative:** Use ClickHouse ALTER TABLE for compatible changes:
```sql
-- Adding a column with DEFAULT (safe)
ALTER TABLE events ADD COLUMN new_field String DEFAULT ''
```
#### 2. Monitor Buffer Memory
```rust
let config = BatchConfig {
max_buffer_size: 64 * 1024 * 1024, // 64 MB hard limit
buffer_shrink_threshold: 2 * 1024 * 1024, // Shrink if > 2 MB after flush
..Default::default()
};
```
**Why This Matters:**
- Traffic spike → 64 MB buffer allocated
- Without shrink threshold → buffer stays 64 MB forever (memory leak!)
- With shrink threshold → buffer returns to 1 MB after spike
#### 3. Choose Backpressure Strategy
```rust
// Data integrity priority (wait for capacity)
handle.insert(event).await?; // Blocks if channel full
// High availability priority (drop if full)
if let Err(_) = handle.try_insert(event) {
metrics.record_dropped_event();
}
```
#### 4. Supervise Worker Task
```rust
let (handle, worker) = batcher.spawn();
tokio::select! {
result = worker => {
// Worker crashed - handle it!
error!("Batcher worker died: {:?}", result);
// Restart batcher, alert ops, etc.
}
_ = tokio::signal::ctrl_c() => {
handle.close().await?;
}
}
```
### Troubleshooting
#### "Schema validation failed: type mismatch"
**Cause:** Struct field type doesn't match ClickHouse column type.
**Fix:**
```sql
-- Check actual ClickHouse schema
DESCRIBE TABLE your_table;
```
Compare with your Rust struct. Common mismatches:
- `i32` vs `i64` (Int32 vs Int64)
- `String` vs `LowCardinality<String>`
- `Option<T>` vs `T` (Nullable vs non-Nullable)
#### "Insert failed after 3 retries"
**Causes:**
1. Network issues
2. ClickHouse server overload
3. Quota/permissions
4. Invalid data (e.g., duplicate primary key)
**Debugging:**
```rust
use tracing_subscriber;
// Enable detailed logs
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.init();
// Logs will show:
// - Exact HTTP error from ClickHouse
// - Retry attempts
// - Batch size and payload
```
#### Memory keeps growing
**Check:**
1. Is `buffer_shrink_threshold` configured?
2. Are flushes completing successfully?
3. Is the channel capacity too large?
**Fix:**
```rust
let config = BatchConfig {
buffer_shrink_threshold: 2 * 1024 * 1024, // Add this!
max_buffer_size: 64 * 1024 * 1024,
channel_capacity: 10_000, // Don't set too high
..Default::default()
};
```
#### Worker stops processing
**Likely cause:** Worker panicked due to:
1. Schema validation failure (fixed in v0.1+)
2. Network error during flush
3. Out of memory
**Detection:**
```rust
let (handle, worker) = batcher.spawn();
// Monitor worker
tokio::spawn(async move {
if let Err(e) = worker.await {
error!("Worker panicked: {:?}", e);
// Alert, restart, failover, etc.
}
});
```
---
## Testing
### Property-Based Tests
ClickType includes extensive fuzzing tests using `proptest`:
```bash
cargo test -p clicktype-core --test fuzzing_tests
```
Validates:
- Roundtrip serialization for all types
- Edge cases (NaN, infinity, empty strings, null bytes, max values)
- Large data (1MB strings, 100k element arrays)
### Load Tests
Production-scale stress tests (run manually):
```bash
# 1M row insertion test
cargo test --release --test load_tests load_test_1m_rows -- --ignored --nocapture
# Burst scenario (memory management)
cargo test --release --test load_tests load_test_burst_scenario -- --ignored --nocapture
# Concurrent inserts
cargo test --release --test load_tests load_test_concurrent_inserts -- --ignored --nocapture
# Backpressure testing
cargo test --release --test load_tests load_test_try_insert_backpressure -- --ignored --nocapture
```
---