# Cruster
A Rust framework for building distributed, stateful systems with durable workflows.
> **Status:** Under active development. API may change.
## Features
- **Stateless Entities** -- Addressable RPC handlers with automatic sharding and routing. State is your responsibility (use your database directly).
- **Durable Workflows** -- Long-running orchestrations that survive crashes. Activities are journaled and replay automatically.
- **Activity Groups** -- Reusable bundles of transactional activities, composable across workflows.
- **RPC Groups** -- Reusable bundles of RPC methods, composable across entities.
- **Singletons** -- Cluster-wide unique tasks with automatic failover.
- **Cron Scheduling** -- Distributed cron jobs running as singletons.
- **gRPC Transport** -- Inter-node communication with connection pooling and streaming.
## Installation
### Prerequisites
Protocol Buffers compiler is required:
```bash
# Debian/Ubuntu
sudo apt-get install protobuf-compiler
# macOS
brew install protobuf
```
### Cargo
```toml
[dependencies]
cruster = "version"
```
## Quick Start
### Defining an Entity
Entities are stateless RPC handlers. You manage state yourself via your database.
```rust
use cruster::prelude::*;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
#[derive(Serialize, Deserialize)]
struct IncrementRequest {
entity_id: String,
amount: i64,
}
#[derive(Serialize, Deserialize)]
struct GetCounterRequest {
entity_id: String,
}
#[entity]
#[derive(Clone)]
struct Counter {
pool: PgPool,
}
#[entity_impl]
impl Counter {
// Persisted RPC: at-least-once delivery for writes
#[rpc(persisted)]
pub async fn increment(&self, request: IncrementRequest) -> Result<i64, ClusterError> {
let (value,): (i64,) = sqlx::query_as(
"INSERT INTO counters (id, value) VALUES ($1, $2)
ON CONFLICT (id) DO UPDATE SET value = counters.value + $2
RETURNING value"
)
.bind(&request.entity_id)
.bind(request.amount)
.fetch_one(&self.pool)
.await
.map_err(|e| ClusterError::PersistenceError {
reason: e.to_string(),
source: Some(Box::new(e)),
})?;
Ok(value)
}
// Non-persisted RPC: best-effort delivery for reads
#[rpc]
pub async fn get(&self, request: GetCounterRequest) -> Result<i64, ClusterError> {
let result: Option<(i64,)> = sqlx::query_as(
"SELECT value FROM counters WHERE id = $1"
)
.bind(&request.entity_id)
.fetch_optional(&self.pool)
.await
.map_err(|e| ClusterError::PersistenceError {
reason: e.to_string(),
source: Some(Box::new(e)),
})?;
Ok(result.map(|r| r.0).unwrap_or(0))
}
}
```
### Using the Entity
```rust
use cruster::types::EntityId;
// Register entity and get a typed client (CounterClient is generated by the macro)
let counter = Counter { pool: pool.clone() }
.register(sharding.clone())
.await?;
// Call methods -- entity is created on first access, routed by shard
let entity_id = EntityId::new("counter-1");
let value = counter.increment(&entity_id, &IncrementRequest {
entity_id: "counter-1".into(),
amount: 5,
}).await?;
assert_eq!(value, 5);
let value = counter.get(&entity_id, &GetCounterRequest {
entity_id: "counter-1".into(),
}).await?;
assert_eq!(value, 5);
```
## Core Concepts
### Entity Method Types
| `#[rpc]` | Best-effort queries/reads | At-most-once |
| `#[rpc(persisted)]` | Durable writes/mutations | At-least-once |
Both use `&self` -- entities are stateless. State lives in your database.
### Entity Configuration
```rust
#[entity(
name = "user", // Custom entity type name (default: struct name)
shard_group = "premium", // Shard group for isolation
max_idle_time_secs = 300, // Eviction timeout
mailbox_capacity = 50, // Message queue size
concurrency = 4, // Parallel request handling (default: 1)
)]
#[derive(Clone)]
struct User {
pool: PgPool,
}
```
### Visibility Modifiers
```rust
#[entity_impl]
impl MyEntity {
// Public: on generated client, callable by external callers (default for #[rpc])
#[rpc]
pub async fn query(&self, req: QueryRequest) -> Result<Data, ClusterError> { ... }
// Protected: dispatchable entity-to-entity, but NOT on the generated client
#[rpc]
#[protected]
pub async fn internal_sync(&self, req: SyncRequest) -> Result<(), ClusterError> { ... }
// Private: not dispatchable, not on client -- internal helper only
#[rpc]
#[private]
async fn validate(&self, req: ValidateRequest) -> Result<bool, ClusterError> { ... }
}
```
## Durable Workflows
Workflows are standalone durable orchestration constructs backed by hidden entities. They have an `execute` entry point and `#[activity]` side effects. Activities are journaled -- on crash recovery, completed activities return their cached results.
### Defining a Workflow
```rust
use cruster::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct ProcessOrderRequest {
order_id: String,
item_count: u32,
amount: i64,
}
#[derive(Serialize, Deserialize)]
struct OrderResult {
order_id: String,
status: String,
}
#[workflow]
#[derive(Clone)]
struct OrderWorkflow;
#[workflow_impl(
key = |req: &ProcessOrderRequest| req.order_id.clone(),
hash = false,
)]
impl OrderWorkflow {
async fn execute(&self, request: ProcessOrderRequest) -> Result<OrderResult, ClusterError> {
// Each activity is journaled. On replay, completed steps return cached results.
self.reserve_inventory(request.order_id.clone(), request.item_count).await?;
self.charge_payment(request.order_id.clone(), request.amount).await?;
self.confirm_order(request.order_id.clone()).await?;
Ok(OrderResult {
order_id: request.order_id,
status: "completed".into(),
})
}
#[activity]
async fn reserve_inventory(&self, order_id: String, count: u32) -> Result<(), ClusterError> {
// self.tx is an ActivityTx -- a SQL transaction committed atomically with the journal
sqlx::query("INSERT INTO reservations (order_id, count) VALUES ($1, $2)")
.bind(&order_id)
.bind(count as i64)
.execute(&self.tx)
.await
.map_err(|e| ClusterError::PersistenceError {
reason: e.to_string(),
source: Some(Box::new(e)),
})?;
Ok(())
}
#[activity]
async fn charge_payment(&self, order_id: String, amount: i64) -> Result<(), ClusterError> {
sqlx::query("INSERT INTO payments (order_id, amount) VALUES ($1, $2)")
.bind(&order_id)
.bind(amount)
.execute(&self.tx)
.await?;
Ok(())
}
#[activity]
async fn confirm_order(&self, order_id: String) -> Result<(), ClusterError> {
sqlx::query("UPDATE orders SET status = 'confirmed' WHERE id = $1")
.bind(&order_id)
.execute(&self.tx)
.await?;
Ok(())
}
}
```
### Using a Workflow
```rust
// Register and get typed client (OrderWorkflowClient is generated)
let client = OrderWorkflow.register(sharding.clone()).await?;
// Execute synchronously -- blocks until workflow completes
let result = client.execute(&ProcessOrderRequest {
order_id: "order-42".into(),
item_count: 3,
amount: 9900,
}).await?;
// Start asynchronously -- returns execution ID immediately
let exec_id = client.start(&request).await?;
// Poll for result later
let result: Option<OrderResult> = client.poll(&exec_id).await?;
// Idempotency keys
let result = client.with_key("my-key").execute(&request).await?; // hashed
let result = client.with_key_raw("raw-id").execute(&request).await?; // used as-is
```
### `self.tx` -- Transactional Activities
Inside `#[activity]` methods, `self.tx` is an `ActivityTx` that implements `sqlx::Executor`. All SQL writes through `self.tx` are committed atomically with the workflow journal entry. If the activity fails, both the journal and your SQL writes roll back together.
```rust
#[activity]
async fn transfer_funds(&self, from: String, to: String, amount: i64) -> Result<(), ClusterError> {
sqlx::query("UPDATE accounts SET balance = balance - $2 WHERE id = $1")
.bind(&from)
.bind(amount)
.execute(&self.tx) // committed with journal
.await?;
sqlx::query("UPDATE accounts SET balance = balance + $2 WHERE id = $1")
.bind(&to)
.bind(amount)
.execute(&self.tx) // same transaction
.await?;
Ok(())
}
```
### Activity Retries
```rust
#[activity(retries = 3)]
async fn call_external_api(&self, url: String) -> Result<String, ClusterError> {
// Retried up to 3 times with exponential backoff on failure
...
}
#[activity(retries = 5, backoff = "constant")]
async fn idempotent_write(&self, data: String) -> Result<(), ClusterError> {
// Retried with constant backoff
...
}
```
### Durable Timers
```rust
#[workflow_impl(...)]
impl ReminderWorkflow {
async fn execute(&self, request: ReminderRequest) -> Result<(), ClusterError> {
// Durable sleep -- survives crashes, resumes where it left off
self.sleep(Duration::from_secs(3600)).await?;
self.send_reminder(request.user_id).await?;
Ok(())
}
}
```
## Activity Groups
Reusable bundles of transactional activities that can be composed into multiple workflows.
### Defining Activity Groups
```rust
#[activity_group]
#[derive(Clone)]
struct Inventory;
#[activity_group_impl]
impl Inventory {
#[activity]
async fn reserve(&self, order_id: String, count: u32) -> Result<String, ClusterError> {
let id = format!("res-{order_id}-{count}");
sqlx::query("INSERT INTO reservations (id, order_id, count) VALUES ($1, $2, $3)")
.bind(&id)
.bind(&order_id)
.bind(count as i64)
.execute(&self.tx)
.await?;
Ok(id)
}
}
#[activity_group]
#[derive(Clone)]
struct Payments;
#[activity_group_impl]
impl Payments {
#[activity]
async fn charge(&self, order_id: String, amount: i64) -> Result<String, ClusterError> {
let tx_id = format!("tx-{order_id}");
sqlx::query("INSERT INTO payments (id, order_id, amount) VALUES ($1, $2, $3)")
.bind(&tx_id)
.bind(&order_id)
.bind(amount)
.execute(&self.tx)
.await?;
Ok(tx_id)
}
}
```
### Composing into a Workflow
```rust
#[workflow]
#[derive(Clone)]
struct OrderWorkflow;
#[workflow_impl(
key = |req: &ProcessOrderRequest| req.order_id.clone(),
hash = false,
activity_groups(Inventory, Payments)
)]
impl OrderWorkflow {
async fn execute(&self, request: ProcessOrderRequest) -> Result<OrderResult, ClusterError> {
// Call activities from composed groups
let reservation_id = self.reserve(request.order_id.clone(), request.item_count).await?;
let tx_id = self.charge(request.order_id.clone(), request.amount).await?;
// Local activities work alongside group activities
self.finalize(request.order_id.clone(), reservation_id, tx_id).await?;
Ok(OrderResult { ... })
}
#[activity]
async fn finalize(&self, order_id: String, res_id: String, tx_id: String) -> Result<(), ClusterError> {
sqlx::query("UPDATE orders SET status = 'done', reservation = $2, payment = $3 WHERE id = $1")
.bind(&order_id)
.bind(&res_id)
.bind(&tx_id)
.execute(&self.tx)
.await?;
Ok(())
}
}
```
### Registration with Activity Groups
```rust
let client = OrderWorkflow
.register(sharding.clone(), Inventory, Payments)
.await?;
```
## RPC Groups
Reusable bundles of RPC methods that can be composed into multiple entities.
### Defining an RPC Group
```rust
#[rpc_group]
#[derive(Clone)]
struct Auditable {
pool: PgPool,
}
#[rpc_group_impl]
impl Auditable {
#[rpc(persisted)]
pub async fn log_action(&self, request: LogActionRequest) -> Result<AuditEntry, ClusterError> {
sqlx::query_as("INSERT INTO audit_log (...) VALUES (...) RETURNING *")
.bind(&request.action)
.fetch_one(&self.pool)
.await
.map_err(|e| ClusterError::PersistenceError {
reason: e.to_string(),
source: Some(Box::new(e)),
})
}
#[rpc]
pub async fn get_audit_log(&self, request: GetAuditLogRequest) -> Result<Vec<AuditEntry>, ClusterError> {
...
}
}
```
### Composing into an Entity
```rust
#[entity]
#[derive(Clone)]
struct User {
pool: PgPool,
}
#[entity_impl(rpc_groups(Auditable))]
impl User {
#[rpc(persisted)]
pub async fn update_email(&self, request: UpdateEmailRequest) -> Result<(), ClusterError> {
...
}
}
```
### Registration with RPC Groups
```rust
// Pass RPC group instances during registration
let client = User { pool: pool.clone() }
.register(sharding.clone(), Auditable { pool: pool.clone() })
.await?;
// Generated client includes both entity methods and RPC group methods
client.update_email(&entity_id, &request).await?;
client.log_action(&entity_id, &log_request).await?;
client.get_audit_log(&entity_id, &query).await?;
```
## Singletons
Cluster-wide unique tasks. If the owning node dies, another takes over.
```rust
use cruster::singleton::{register_singleton, SingletonContext};
let cancel = ctx.cancellation();
loop {
tokio::select! {
_ = cancel.cancelled() => {
// Graceful shutdown -- clean up
break;
}
_ = tokio::time::sleep(Duration::from_secs(10)) => {
perform_leader_duties().await;
}
}
}
Ok(())
}).await?;
```
If a singleton does **not** call `ctx.cancellation()`, it is force-cancelled on shutdown.
### Builder API
```rust
use cruster::singleton::singleton;
Ok(())
})
.register(sharding.as_ref())
.await?;
```
## Cron Jobs
Distributed cron jobs that run as cluster singletons:
```rust
use cruster::cron::{ClusterCron, CronSchedule};
let cron = ClusterCron::new(
"cleanup-job",
CronSchedule::parse("0 */5 * * * * *").unwrap(), // every 5 minutes
|scheduled_time| Box::pin(async move {
// Perform cleanup
Ok(())
}),
)
.with_calculate_next_from_previous(true)
.with_skip_if_older_than(Duration::from_secs(3600));
cron.register(sharding.as_ref()).await?;
```
## Client Methods
The generated typed clients provide ergonomic methods. The underlying `EntityClient` supports:
| `send(id, tag, req)` | Best-effort, await reply | Reads |
| `send_persisted(id, tag, req, uninterruptible)` | At-least-once, await reply | Writes |
| `notify(id, tag, req)` | Best-effort, fire-and-forget | Events |
| `notify_persisted(id, tag, req)` | At-least-once, fire-and-forget | Durable events |
| `send_at(id, tag, req, deliver_at)` | Scheduled delivery | Timers |
| `notify_at(id, tag, req, deliver_at)` | Scheduled fire-and-forget | Scheduled events |
| `send_stream(id, tag, req)` | Streaming response | Large result sets |
## Production Deployment
### Cluster Setup
```rust
use cruster::config::ShardingConfig;
use cruster::metrics::ClusterMetrics;
use cruster::sharding_impl::ShardingImpl;
use cruster::storage::etcd_runner::EtcdRunnerStorage;
use cruster::storage::sql_message::SqlMessageStorage;
use cruster::storage::sql_workflow::SqlWorkflowStorage;
use cruster::storage::sql_workflow_engine::SqlWorkflowEngine;
use cruster::transport::grpc::{GrpcRunnerHealth, GrpcRunnerServer, GrpcRunners};
// 1. Storage backends
let pool = PgPoolOptions::new().max_connections(10).connect(&postgres_url).await?;
let message_storage = Arc::new(SqlMessageStorage::new(pool.clone()));
message_storage.migrate().await?;
let state_storage = Arc::new(SqlWorkflowStorage::new(pool.clone()));
let workflow_engine = Arc::new(SqlWorkflowEngine::new(pool.clone()));
// 2. Runner discovery via etcd
let etcd_client = etcd_client::Client::connect(endpoints, None).await?;
let runner_storage = Arc::new(EtcdRunnerStorage::new(etcd_client, "/my-cluster/", 30));
// 3. gRPC transport
let grpc_runners = Arc::new(GrpcRunners::new());
let runner_health = Arc::new(GrpcRunnerHealth::new(grpc_runners.clone()));
// 4. Configure and create
let config = Arc::new(ShardingConfig {
runner_address: "10.0.0.1:9000".parse()?,
shards_per_group: 300,
..Default::default()
});
let sharding = ShardingImpl::new_with_engines(
config,
grpc_runners,
Some(runner_storage),
Some(runner_health),
Some(message_storage),
Some(state_storage),
Some(workflow_engine),
Arc::new(ClusterMetrics::unregistered()),
)?;
// 5. Register entities and workflows
let counter = Counter { pool: pool.clone() }.register(sharding.clone()).await?;
let workflow = OrderWorkflow.register(sharding.clone()).await?;
// 6. Start background loops + gRPC server
sharding.start().await?;
let grpc_server = GrpcRunnerServer::new(sharding.clone());
tonic::transport::Server::builder()
.add_service(grpc_server.into_service())
.serve(grpc_addr)
.await?;
```
### Infrastructure Requirements
| **PostgreSQL** | Message storage, workflow journals, activity transactions |
| **etcd** | Runner discovery, shard locks, health monitoring |
### Configuration Reference
| `shards_per_group` | 300 | Shards per shard group |
| `entity_max_idle_time` | 60s | Idle timeout before eviction |
| `entity_mailbox_capacity` | 100 | Per-entity message queue size |
| `entity_max_concurrent_requests` | 1 | Concurrent requests per entity (0 = unbounded) |
| `storage_poll_interval` | 500ms | Message storage polling frequency |
| `storage_message_max_retries` | 10 | Max delivery attempts before dead-letter |
| `runner_lock_ttl` | 30s | Runner lock TTL in etcd |
| `send_retry_count` | 3 | Retries on routing failures during rebalancing |
| `singleton_crash_backoff_base` | 1s | Base backoff for singleton crash recovery |
| `detachment_enabled` | false | Enable automatic detachment on storage errors |
## Architecture
```
┌─────────────────────────────────────────────────────────┐
│ Clients │
└─────────────────────────┬───────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Sharding Layer │
│ Rendezvous hashing + request routing │
└─────────────────────────┬───────────────────────────────┘
│
┌───────────────┼───────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Runner 1 │ │ Runner 2 │ │ Runner 3 │
│ Shards │ │ Shards │ │ Shards │
│ 0-99 │ │ 100-199 │ │ 200-299 │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
└───────────────┴───────────────┘
│
┌───────────────┴───────────────┐
▼ ▼
┌───────────┐ ┌───────────┐
│ PostgreSQL│ │ etcd │
│ - Messages│ │ - Runners │
│ - Journals│ │ - Locks │
│ - Tx data │ │ - Health │
└───────────┘ └───────────┘
```
## Examples
See [`examples/`](examples/) for a complete working example:
- **cluster-tests** -- End-to-end test suite covering entities, workflows, RPC groups, activity groups, singletons, and timers.
## License
MIT OR Apache-2.0