GreptimeDB Rust Ingester

A high-performance Rust client for ingesting data into GreptimeDB, supporting both low-latency individual inserts and high-throughput bulk streaming operations.
Features
- Two Ingestion Approaches: Choose between low-latency inserts and high-throughput bulk streaming
- Parallel Processing: Async request submission with configurable parallelism
- Type Safety: Comprehensive support for all GreptimeDB data types
- Performance Optimized: Memory-efficient operations with zero-copy access patterns
- Production Ready: Robust error handling, timeouts, and connection management
Architecture Overview
The ingester provides two main APIs tailored for different use cases:
1. Low-Latency Insert API
Best for: Real-time applications, IoT sensors, interactive systems
use greptimedb_ingester::api::v1::*;
use greptimedb_ingester::client::Client;
use greptimedb_ingester::helpers::schema::*;
use greptimedb_ingester::helpers::values::*;
use greptimedb_ingester::{database::Database, Result, ColumnDataType};
#[tokio::main]
async fn main() -> Result<()> {
let client = Client::with_urls(&["localhost:4001"]);
let database = Database::new_with_dbname("public", client);
let schema = vec![
tag("device_id", ColumnDataType::String),
timestamp("ts", ColumnDataType::TimestampMillisecond),
field("temperature", ColumnDataType::Float64),
];
let rows = vec![Row {
values: vec![
string_value("device_001".to_string()),
timestamp_millisecond_value(1234567890000),
f64_value(23.5),
],
}];
let insert_request = RowInsertRequests {
inserts: vec![RowInsertRequest {
table_name: "sensor_data".to_string(),
rows: Some(Rows {
schema,
rows,
}),
}],
};
let affected_rows = database.insert(insert_request).await?;
println!("Inserted {} rows", affected_rows);
Ok(())
}
2. High-Throughput Bulk API
Best for: ETL operations, data migration, batch processing, log ingestion
use greptimedb_ingester::{BulkInserter, BulkWriteOptions, ColumnDataType, CompressionType, Row, TableSchema, Value};
use greptimedb_ingester::api::v1::*;
use greptimedb_ingester::helpers::schema::*;
use greptimedb_ingester::helpers::values::*;
use greptimedb_ingester::client::Client;
use greptimedb_ingester::database::Database;
use std::time::Duration;
#[tokio::main]
async fn main() -> greptimedb_ingester::Result<()> {
let client = Client::with_urls(&["localhost:4001"]);
let current_timestamp = || 1234567890000i64;
struct SensorData { timestamp: i64, device_id: String, temperature: f64 }
let sensor_data: Vec<SensorData> = vec![
SensorData { timestamp: 1234567890000, device_id: "device_001".to_string(), temperature: 23.5 },
SensorData { timestamp: 1234567890001, device_id: "device_002".to_string(), temperature: 24.0 },
];
let database = Database::new_with_dbname("public", client.clone());
let init_schema = vec![
timestamp("ts", ColumnDataType::TimestampMillisecond),
field("device_id", ColumnDataType::String),
field("temperature", ColumnDataType::Float64),
];
let init_request = RowInsertRequests {
inserts: vec![RowInsertRequest {
table_name: "sensor_readings".to_string(),
rows: Some(Rows {
schema: init_schema,
rows: vec![greptimedb_ingester::api::v1::Row {
values: vec![
timestamp_millisecond_value(current_timestamp()),
string_value("init_device".to_string()),
f64_value(0.0),
],
}],
}),
}],
};
database.insert(init_request).await?;
let bulk_inserter = BulkInserter::new(client, "public");
let table_template = TableSchema::builder()
.name("sensor_readings")
.build()
.unwrap()
.add_timestamp("ts", ColumnDataType::TimestampMillisecond)
.add_field("device_id", ColumnDataType::String)
.add_field("temperature", ColumnDataType::Float64);
let mut bulk_writer = bulk_inserter
.create_bulk_stream_writer(
&table_template,
Some(BulkWriteOptions::default()
.with_parallelism(8) .with_compression(CompressionType::Zstd) .with_timeout(Duration::from_secs(60)) ),
)
.await?;
let mut rows1 = bulk_writer.alloc_rows_buffer(10000, 1024)?; for data in &sensor_data {
let row = Row::new().add_values(vec![
Value::TimestampMillisecond(data.timestamp),
Value::String(data.device_id.clone()),
Value::Float64(data.temperature),
]);
rows1.add_row(row)?;
}
let request_id1 = bulk_writer.write_rows_async(rows1).await?;
let mut rows2 = bulk_writer.alloc_rows_buffer(10000, 1024)?; for data in &sensor_data {
let row = bulk_writer.new_row()
.set("ts", Value::TimestampMillisecond(data.timestamp))?
.set("device_id", Value::String(data.device_id.clone()))?
.set("temperature", Value::Float64(data.temperature))?
.build()?;
rows2.add_row(row)?;
}
let request_id2 = bulk_writer.write_rows_async(rows2).await?;
let responses = bulk_writer.wait_for_all_pending().await?;
bulk_writer.finish().await?;
Ok(())
}
Important:
- Manual Table Creation Required: Bulk API does not create tables automatically. You must create the table beforehand using either:
- Insert API (which supports auto table creation), or
- SQL DDL statements (CREATE TABLE)
- Schema Matching: The table template in bulk API must exactly match the existing table schema.
When to Choose Which API
| Scenario |
API Choice |
Why |
| IoT sensor data |
Low-Latency Insert |
Real-time requirements, small batches |
| Interactive dashboards |
Low-Latency Insert |
User expects immediate feedback |
| ETL pipelines |
Bulk Streaming |
Process millions of records efficiently |
| Log ingestion |
Bulk Streaming |
High volume, can batch data |
| Data migration |
Bulk Streaming |
Transfer large datasets quickly |
Examples
The repository includes comprehensive examples demonstrating both approaches:
Low-Latency Examples
Run with: cargo run --example insert_example
- Real-time sensor ingestion: Simulates IoT devices sending data with latency measurements
- Data type demonstration: Shows support for all GreptimeDB column types
- Interactive patterns: Best practices for real-time applications
High-Throughput Examples
Run with: cargo run --example bulk_stream_writer_example
- Performance comparison: Sequential vs parallel processing benchmarks
- Async submission patterns: Demonstrates
write_rows_async() for maximum throughput
- Best practices: Optimal configuration for high-volume scenarios
API Design & Optimization
Schema-Bound Writer
Each BulkStreamWriter is bound to a specific table schema, providing both safety and performance benefits:
- Schema Validation: Automatic validation ensures data consistency
- Zero-Cost Optimization: Schema-bound buffers share
Arc<Schema> for ultra-fast validation
- Type Safety: Prevents common mistakes like field order errors
- Dynamic Growth: Arrow builders automatically expand capacity as needed
Buffer Allocation
use greptimedb_ingester::{BulkStreamWriter, Rows, Column};
async fn example(bulk_writer: &BulkStreamWriter, column_schemas: &[Column]) -> greptimedb_ingester::Result<()> {
let mut rows = bulk_writer.alloc_rows_buffer(10000, 1024)?;
let mut rows = Rows::new(column_schemas, 10000, 1024)?; Ok(())
}
Row Building APIs
Fast API (production recommended):
use greptimedb_ingester::{Row, Value};
fn create_row() -> Row {
let ts = 1234567890i64;
let device_id = "device001".to_string();
let temperature = 25.0f64;
Row::new().add_values(vec![
Value::TimestampMillisecond(ts),
Value::String(device_id),
Value::Float64(temperature),
])
}
Safe API (development recommended):
use greptimedb_ingester::{BulkStreamWriter, Value};
async fn example(bulk_writer: &BulkStreamWriter) -> greptimedb_ingester::Result<()> {
let ts = 1234567890i64;
let device_id = "device001".to_string();
let temperature = 25.0f64;
let row = bulk_writer.new_row()
.set("timestamp", Value::TimestampMillisecond(ts))?
.set("device_id", Value::String(device_id))?
.set("temperature", Value::Float64(temperature))?
.build()?;
Ok(())
}
Performance Characteristics
Low-Latency Insert API
- Latency: sub-millisecond per operation
- Throughput: 1k ~ 10k ops/sec
- Memory: Low, constant
- Use case: Real-time applications
Bulk Streaming API
- Latency: 1-1000 milliseconds per batch
- Throughput: > 10k rows/sec
- Memory: Higher during batching
- Use case: High-volume processing
Advanced Usage
Parallel Bulk Operations
The bulk API supports true parallelism through async request submission:
use greptimedb_ingester::{BulkStreamWriter, Rows};
async fn example(bulk_writer: &mut BulkStreamWriter, batches: Vec<Rows>) -> greptimedb_ingester::Result<()> {
let mut request_ids = Vec::new();
for batch in batches {
let id = bulk_writer.write_rows_async(batch).await?;
request_ids.push(id);
}
let responses = bulk_writer.wait_for_all_pending().await?;
for request_id in request_ids {
let response = bulk_writer.wait_for_response(request_id).await?;
println!("Request {} completed with {} rows",
request_id, response.affected_rows());
}
Ok(())
}
Data Type Support
Full support for GreptimeDB data types:
use greptimedb_ingester::{Value, ColumnDataType, Row};
fn create_data_row() -> Row {
Row::new()
.add_value(Value::TimestampMillisecond(1234567890123))
.add_value(Value::String("device_001".to_string()))
.add_value(Value::Float64(23.5))
.add_value(Value::Int64(1))
.add_value(Value::Boolean(true))
.add_value(Value::Binary(vec![0xDE, 0xAD, 0xBE, 0xEF]))
.add_value(Value::Json(r#"{"key": "value"}"#.to_string()))
}
Type-Safe Data Access
Efficient data access patterns:
use greptimedb_ingester::Row;
fn process_row_data(row: &Row) {
fn process_binary(_data: &[u8]) {
}
if let Some(device_name) = row.get_string(1) {
println!("Device: {}", device_name);
}
if let Some(binary_data) = row.get_binary(5) {
process_binary(&binary_data);
}
}
Best Practices
For Low-Latency Applications
- Use small batch sizes (200-1000 rows)
- Monitor and optimize network round-trip times
For High-Throughput Applications
- Create tables manually first - bulk API requires existing tables
- Use parallelism=8-16 for network-bound workloads
- Batch 2000-100000 rows per request for optimal performance
- Enable compression to reduce network overhead
- Monitor memory usage when submitting many async requests
- Implement backpressure control for very high-volume scenarios
General Recommendations
- Use appropriate data types to minimize serialization overhead
- Pre-allocate vectors with known capacity
- Reuse connections when possible
- Handle errors gracefully with retry logic
- Monitor performance metrics in production
Configuration
Set up your GreptimeDB connection:
use greptimedb_ingester::{ChannelConfig, ChannelManager};
use greptimedb_ingester::client::Client;
use std::time::Duration;
fn setup_client() -> Client {
let channel_config = ChannelConfig::default()
.timeout(Duration::from_secs(30))
.connect_timeout(Duration::from_secs(5));
let channel_manager = ChannelManager::with_config(channel_config);
Client::with_manager_and_urls(channel_manager,
&["localhost:4001"])
}
Leave transport compression unset unless you need it explicitly. If the server expects compressed gRPC traffic, configure it like this:
use greptimedb_ingester::{ChannelConfig, GrpcCompression};
let channel_config = ChannelConfig::default()
.with_send_compression(GrpcCompression::Zstd)
.with_accept_compression(GrpcCompression::Zstd);
Use GrpcCompression::Gzip or GrpcCompression::Zstd based on the server configuration.
Error Handling
The library provides comprehensive error types:
use greptimedb_ingester::{Result, Error};
use greptimedb_ingester::api::v1::RowInsertRequests;
use greptimedb_ingester::database::Database;
async fn handle_insert(database: &Database, request: RowInsertRequests) {
match database.insert(request).await {
Ok(affected_rows) => println!("Inserted {} rows", affected_rows),
Err(Error::RequestTimeout { .. }) => {
},
Err(Error::SerializeMetadata { .. }) => {
},
Err(e) => {
eprintln!("Unexpected error: {:?}", e);
}
}
}
API Reference
Core Types
Client: Connection management
Database: Low-level insert operations
BulkInserter: High-level bulk operations
BulkStreamWriter: Streaming bulk writer
Table: Table schema definition
Row: Data row representation
Value: Type-safe value wrapper
Key Methods
Low-Latency API:
database.insert(request) - Insert with immediate response
Bulk API:
bulk_writer.write_rows(rows) - Submit and wait for completion
bulk_writer.write_rows_async(rows) - Submit without waiting
bulk_writer.wait_for_response(id) - Wait for specific request
bulk_writer.wait_for_all_pending() - Wait for all pending requests
bulk_writer.finish() - Clean shutdown
bulk_writer.finish_with_responses() - Shutdown with response collection
License
This library uses the Apache 2.0 license to strike a balance between open contributions and allowing you to use the software however you want.
Links