# Cruster
A Rust framework for building distributed, stateful entity systems with durable workflows.
> **Status:** Under active development. API may change. Feedback welcome.
## Features
- **Distributed Entities** - Stateful actors with automatic sharding, persistence, and lifecycle management
- **Durable Workflows** - Long-running operations that survive crashes with automatic replay
- **Activity Journaling** - State mutations with transactional guarantees and idempotency
- **Entity Traits** - Composable behaviors across multiple entity types
- **Singletons** - Cluster-wide unique instances with automatic failover
- **Timers & Scheduling** - Durable sleep and cron-based scheduling
- **gRPC Transport** - Inter-node communication with streaming support
## Installation
### Prerequisites
Protocol Buffers compiler is required:
```bash
# Debian/Ubuntu
sudo apt-get install protobuf-compiler
# macOS
brew install protobuf
```
### Cargo
```toml
[dependencies]
cruster = "0.0.2"
```
## Quick Start
### Defining an Entity
```rust
use cruster::prelude::*;
use serde::{Deserialize, Serialize};
// 1. Define persistent state
#[derive(Clone, Default, Serialize, Deserialize)]
struct CounterState {
value: i64,
}
// 2. Define the entity
#[entity]
#[derive(Clone)]
struct Counter;
// 3. Implement handlers
#[entity_impl]
#[state(CounterState)]
impl Counter {
// Required: initialize state for new entities
fn init(&self, _ctx: &EntityContext) -> Result<CounterState, ClusterError> {
Ok(CounterState::default())
}
// Activity: mutates state, journaled for replay safety
#[activity]
async fn do_increment(&mut self, amount: i64) -> Result<i64, ClusterError> {
self.state.value += amount;
Ok(self.state.value)
}
// Workflow: public entry point, orchestrates activities
#[workflow]
pub async fn increment(&self, amount: i64) -> Result<i64, ClusterError> {
self.do_increment(amount).await
}
// RPC: read-only, no journaling
#[rpc]
pub async fn get(&self) -> Result<i64, ClusterError> {
Ok(self.state.value)
}
}
```
### Using the Entity
```rust
use cruster::testing::TestCluster;
use cruster::types::EntityId;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create test cluster
let cluster = TestCluster::new().await;
// Register entity and get typed client (CounterClient is generated by the macro)
let counter = Counter.register(cluster.sharding().clone()).await?;
// Call methods using the typed client - entity is created on first access
let entity_id = EntityId::new("counter-1");
let value = counter.increment(&entity_id, &5i64).await?;
assert_eq!(value, 5);
let value = counter.get(&entity_id).await?;
assert_eq!(value, 5);
Ok(())
}
```
## Core Concepts
### Method Types
| `#[rpc]` | Read-only queries | `&self` | No | `#[public]` |
| `#[workflow]` | Durable operations | `&self` | Yes | `#[public]` |
| `#[activity]` | State mutations | `&mut self` | Yes | `#[private]` |
### The Workflow + Activity Pattern
**Best Practice:** Separate concerns between workflows (orchestration) and activities (mutation).
```rust
#[entity_impl]
#[state(OrderState)]
impl Order {
// Activities: private, handle state mutations
#[activity]
async fn set_status(&mut self, status: OrderStatus) -> Result<(), ClusterError> {
self.state.status = status;
Ok(())
}
#[activity]
async fn record_payment(&mut self, payment_id: String) -> Result<(), ClusterError> {
self.state.payment_id = Some(payment_id);
Ok(())
}
// Workflow: public, orchestrates activities
#[workflow]
pub async fn process_payment(&self, payment: PaymentRequest) -> Result<(), ClusterError> {
// Validation (deterministic, no side effects)
if payment.amount <= 0 {
return Err(ClusterError::MalformedMessage {
reason: "Invalid amount".into(),
source: None,
});
}
// Each activity is journaled - replays skip completed steps
self.set_status(OrderStatus::Processing).await?;
self.record_payment(payment.id).await?;
self.set_status(OrderStatus::Paid).await?;
Ok(())
}
}
```
### Visibility Modifiers
```rust
#[entity_impl]
impl MyEntity {
// Public: callable by external clients (default for #[rpc], #[workflow])
#[rpc]
#[public]
pub async fn query(&self) -> Result<Data, ClusterError> { ... }
// Protected: visible in the current trait and all entities that implement it
#[activity]
#[protected]
pub async fn internal_update(&mut self, data: Data) -> Result<(), ClusterError> { ... }
// Private: only callable within this entity (default for #[activity])
#[activity]
#[private]
async fn do_mutation(&mut self) -> Result<(), ClusterError> { ... }
}
```
### Durable Workflows
Workflows survive crashes. On restart, the journal replays and completed activities return cached results:
```rust
#[workflow]
pub async fn checkout(&self, cart: Cart) -> Result<OrderId, ClusterError> {
// If we crash after validate_inventory but before charge_payment,
// on restart validate_inventory returns its cached result instantly
self.validate_inventory(&cart).await?;
let charge_id = self.charge_payment(&cart.total).await?;
let order_id = self.create_order(&cart, &charge_id).await?;
// Fire-and-forget notification (won't block workflow completion)
self.send_confirmation_email(&order_id).await?;
Ok(order_id)
}
```
### Durable Timers
```rust
use std::time::Duration;
#[workflow]
pub async fn delayed_reminder(&self, user_id: String) -> Result<(), ClusterError> {
// Durable sleep - survives restarts, resumes where it left off
self.sleep("reminder-delay", Duration::from_secs(3600)).await?;
self.send_reminder(&user_id).await?;
Ok(())
}
```
### Entity Traits (Composition)
Share behavior across entity types:
```rust
// Define a trait
#[entity_trait]
#[derive(Clone)]
pub struct Auditable;
#[entity_trait_impl]
#[state(AuditState)]
impl Auditable {
fn init(&self) -> Result<AuditState, ClusterError> {
Ok(AuditState::default())
}
// Use #[protected] so entities using this trait can call it
#[activity]
#[protected]
pub async fn log_action(&mut self, action: String) -> Result<(), ClusterError> {
self.state.log.push(AuditEntry {
action,
timestamp: Utc::now(),
});
Ok(())
}
#[rpc]
pub async fn get_audit_log(&self) -> Result<Vec<AuditEntry>, ClusterError> {
Ok(self.state.log.clone())
}
}
// Use trait in entity
#[entity_impl(traits(Auditable))]
#[state(UserState)]
impl User {
#[workflow]
pub async fn update_email(&self, email: String) -> Result<(), ClusterError> {
self.do_update_email(email.clone()).await?;
// Call trait method
self.log_action(format!("email_updated:{}", email)).await?;
Ok(())
}
#[activity]
async fn do_update_email(&mut self, email: String) -> Result<(), ClusterError> {
self.state.email = email;
Ok(())
}
}
```
### Entity Configuration
```rust
#[entity(
name = "user", // Custom entity type name
shard_group = "premium", // Shard group for isolation
max_idle_time_secs = 300, // Eviction timeout (default: 60)
mailbox_capacity = 50, // Message queue size (default: 100)
concurrency = 4, // Parallel request handling (default: 1)
)]
#[derive(Clone)]
struct User;
```
### Self-Scheduling
Entities can schedule messages to themselves:
```rust
#[workflow]
pub async fn start_timeout(&self, timeout_secs: u64) -> Result<(), ClusterError> {
if let Some(client) = self.self_client() {
let deliver_at = Utc::now() + chrono::Duration::seconds(timeout_secs as i64);
let entity_id = self.entity_id().clone();
client.notify_at(&entity_id, "handle_timeout", &(), deliver_at).await?;
}
Ok(())
}
#[workflow]
pub async fn handle_timeout(&self) -> Result<(), ClusterError> {
self.on_timeout().await
}
```
### Singletons
Cluster-wide unique instances:
```rust
use cruster::singleton::register_singleton;
register_singleton(&*sharding, "leader-election", || async {
loop {
// This runs on exactly one node
perform_leader_duties().await;
tokio::time::sleep(Duration::from_secs(10)).await;
}
}).await?;
```
## Testing
### TestCluster
```rust
use cruster::testing::TestCluster;
use cruster::types::EntityId;
#[tokio::test]
async fn test_counter_increment() {
// Basic cluster (in-memory)
let cluster = TestCluster::new().await;
// Or with workflow support (durable context)
let cluster = TestCluster::with_workflow_support().await;
// Register entity and get typed client
let counter = Counter.register(cluster.sharding().clone()).await.unwrap();
let entity_id = EntityId::new("test-1");
let result = counter.increment(&entity_id, &10i64).await.unwrap();
assert_eq!(result, 10);
let result = counter.get(&entity_id).await.unwrap();
assert_eq!(result, 10);
}
```
### Client Methods
```rust
// Request-response
let result: Response = client.send(&entity_id, "method", &request).await?;
// Fire-and-forget
client.notify(&entity_id, "method", &request).await?;
// Persisted (at-least-once delivery)
let result: Response = client.send_persisted(&entity_id, "method", &request, Uninterruptible::Server).await?;
// Scheduled delivery
client.notify_at(&entity_id, "method", &request, deliver_at).await?;
```
## Production Deployment
### Multi-Node Cluster
```rust
use cruster::config::ShardingConfig;
use cruster::sharding_impl::ShardingImpl;
let config = ShardingConfig {
runner_address: "10.0.0.1:9000".parse()?,
shards_per_group: 2048,
entity_max_idle_time: Duration::from_secs(300),
..Default::default()
};
let sharding = ShardingImpl::new(
config,
postgres_pool,
etcd_runner_storage,
grpc_runners,
).await?;
sharding.register_entity(Arc::new(MyEntity)).await?;
sharding.start().await?;
```
### Storage Requirements
**PostgreSQL** - State persistence, message storage, workflow journals
**etcd** - Runner discovery, shard locks, health monitoring
### Configuration Reference
| `shards_per_group` | 2048 | Shards per shard group |
| `entity_max_idle_time` | 60s | Idle timeout before eviction |
| `entity_mailbox_capacity` | 100 | Per-entity message queue size |
| `storage_poll_interval` | 500ms | Message storage polling frequency |
| `storage_message_max_retries` | 10 | Max delivery attempts |
## Architecture
```
┌─────────────────────────────────────────────────────────┐
│ Clients │
└─────────────────────────┬───────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Sharding Layer │
│ Consistent hashing + request routing │
└─────────────────────────┬───────────────────────────────┘
│
┌───────────────┼───────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Runner 1 │ │ Runner 2 │ │ Runner 3 │
│ Shards │ │ Shards │ │ Shards │
│ 0-682 │ │ 683-1365 │ │1366-2047 │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
└───────────────┴───────────────┘
│
┌───────────────┴───────────────┐
▼ ▼
┌───────────┐ ┌───────────┐
│ PostgreSQL│ │ etcd │
│ - State │ │ - Runners │
│ - Journals│ │ - Locks │
│ - Messages│ │ - Health │
└───────────┘ └───────────┘
```
## Examples
See [`examples/`](examples/) for complete examples:
- **cluster-tests** - E2E test suite covering all features
- **chess-cluster** - Distributed chess server with matchmaking
## License
MIT