cruster-macros 0.0.11

Procedural macros for the cruster distributed entity framework
Documentation

Cruster

A Rust framework for building distributed, stateful entity systems with durable workflows.

Status: Under active development. API may change. Feedback welcome.

Features

  • Distributed Entities - Stateful actors with automatic sharding, persistence, and lifecycle management
  • Durable Workflows - Long-running operations that survive crashes with automatic replay
  • Activity Journaling - State mutations with transactional guarantees and idempotency
  • Entity Traits - Composable behaviors across multiple entity types
  • Singletons - Cluster-wide unique instances with automatic failover
  • Timers & Scheduling - Durable sleep and cron-based scheduling
  • gRPC Transport - Inter-node communication with streaming support

Installation

Prerequisites

Protocol Buffers compiler is required:

# Debian/Ubuntu
sudo apt-get install protobuf-compiler

# macOS
brew install protobuf

Cargo

[dependencies]
cruster = "0.0.2"

Quick Start

Defining an Entity

use cruster::prelude::*;
use serde::{Deserialize, Serialize};

// 1. Define persistent state
#[derive(Clone, Default, Serialize, Deserialize)]
struct CounterState {
    value: i64,
}

// 2. Define the entity
#[entity]
#[derive(Clone)]
struct Counter;

// 3. Implement handlers
#[entity_impl]
#[state(CounterState)]
impl Counter {
    // Required: initialize state for new entities
    fn init(&self, _ctx: &EntityContext) -> Result<CounterState, ClusterError> {
        Ok(CounterState::default())
    }

    // Activity: mutates state, journaled for replay safety
    #[activity]
    async fn do_increment(&mut self, amount: i64) -> Result<i64, ClusterError> {
        self.state.value += amount;
        Ok(self.state.value)
    }

    // Workflow: public entry point, orchestrates activities
    #[workflow]
    pub async fn increment(&self, amount: i64) -> Result<i64, ClusterError> {
        self.do_increment(amount).await
    }

    // RPC: read-only, no journaling
    #[rpc]
    pub async fn get(&self) -> Result<i64, ClusterError> {
        Ok(self.state.value)
    }
}

Using the Entity

use cruster::testing::TestCluster;
use cruster::types::EntityId;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create test cluster
    let cluster = TestCluster::new().await;
    
    // Register entity and get typed client (CounterClient is generated by the macro)
    let counter = Counter.register(cluster.sharding().clone()).await?;
    
    // Call methods using the typed client - entity is created on first access
    let entity_id = EntityId::new("counter-1");
    let value = counter.increment(&entity_id, &5i64).await?;
    assert_eq!(value, 5);
    
    let value = counter.get(&entity_id).await?;
    assert_eq!(value, 5);
    
    Ok(())
}

Core Concepts

Method Types

Attribute Purpose Self Journaled Default Visibility
#[rpc] Read-only queries &self No #[public]
#[workflow] Durable operations &self Yes #[public]
#[activity] State mutations &mut self Yes #[private]

The Workflow + Activity Pattern

Best Practice: Separate concerns between workflows (orchestration) and activities (mutation).

#[entity_impl]
#[state(OrderState)]
impl Order {
    // Activities: private, handle state mutations
    #[activity]
    async fn set_status(&mut self, status: OrderStatus) -> Result<(), ClusterError> {
        self.state.status = status;
        Ok(())
    }

    #[activity]
    async fn record_payment(&mut self, payment_id: String) -> Result<(), ClusterError> {
        self.state.payment_id = Some(payment_id);
        Ok(())
    }

    // Workflow: public, orchestrates activities
    #[workflow]
    pub async fn process_payment(&self, payment: PaymentRequest) -> Result<(), ClusterError> {
        // Validation (deterministic, no side effects)
        if payment.amount <= 0 {
            return Err(ClusterError::MalformedMessage {
                reason: "Invalid amount".into(),
                source: None,
            });
        }

        // Each activity is journaled - replays skip completed steps
        self.set_status(OrderStatus::Processing).await?;
        self.record_payment(payment.id).await?;
        self.set_status(OrderStatus::Paid).await?;
        
        Ok(())
    }
}

Visibility Modifiers

#[entity_impl]
impl MyEntity {
    // Public: callable by external clients (default for #[rpc], #[workflow])
    #[rpc]
    #[public]
    pub async fn query(&self) -> Result<Data, ClusterError> { ... }

    // Protected: visible in the current trait and all entities that implement it
    #[activity]
    #[protected]
    pub async fn internal_update(&mut self, data: Data) -> Result<(), ClusterError> { ... }

    // Private: only callable within this entity (default for #[activity])
    #[activity]
    #[private]
    async fn do_mutation(&mut self) -> Result<(), ClusterError> { ... }
}

Durable Workflows

Workflows survive crashes. On restart, the journal replays and completed activities return cached results:

#[workflow]
pub async fn checkout(&self, cart: Cart) -> Result<OrderId, ClusterError> {
    // If we crash after validate_inventory but before charge_payment,
    // on restart validate_inventory returns its cached result instantly
    self.validate_inventory(&cart).await?;
    
    let charge_id = self.charge_payment(&cart.total).await?;
    
    let order_id = self.create_order(&cart, &charge_id).await?;
    
    // Fire-and-forget notification (won't block workflow completion)
    self.send_confirmation_email(&order_id).await?;
    
    Ok(order_id)
}

Durable Timers

use std::time::Duration;

#[workflow]
pub async fn delayed_reminder(&self, user_id: String) -> Result<(), ClusterError> {
    // Durable sleep - survives restarts, resumes where it left off
    self.sleep("reminder-delay", Duration::from_secs(3600)).await?;
    
    self.send_reminder(&user_id).await?;
    Ok(())
}

Entity Traits (Composition)

Share behavior across entity types:

// Define a trait
#[entity_trait]
#[derive(Clone)]
pub struct Auditable;

#[entity_trait_impl]
#[state(AuditState)]
impl Auditable {
    fn init(&self) -> Result<AuditState, ClusterError> {
        Ok(AuditState::default())
    }

    // Use #[protected] so entities using this trait can call it
    #[activity]
    #[protected]
    pub async fn log_action(&mut self, action: String) -> Result<(), ClusterError> {
        self.state.log.push(AuditEntry {
            action,
            timestamp: Utc::now(),
        });
        Ok(())
    }

    #[rpc]
    pub async fn get_audit_log(&self) -> Result<Vec<AuditEntry>, ClusterError> {
        Ok(self.state.log.clone())
    }
}

// Use trait in entity
#[entity_impl(traits(Auditable))]
#[state(UserState)]
impl User {
    #[workflow]
    pub async fn update_email(&self, email: String) -> Result<(), ClusterError> {
        self.do_update_email(email.clone()).await?;
        
        // Call trait method
        self.log_action(format!("email_updated:{}", email)).await?;
        
        Ok(())
    }
    
    #[activity]
    async fn do_update_email(&mut self, email: String) -> Result<(), ClusterError> {
        self.state.email = email;
        Ok(())
    }
}

Entity Configuration

#[entity(
    name = "user",                    // Custom entity type name
    shard_group = "premium",          // Shard group for isolation
    max_idle_time_secs = 300,         // Eviction timeout (default: 60)
    mailbox_capacity = 50,            // Message queue size (default: 100)
    concurrency = 4,                  // Parallel request handling (default: 1)
)]
#[derive(Clone)]
struct User;

Self-Scheduling

Entities can schedule messages to themselves:

#[workflow]
pub async fn start_timeout(&self, timeout_secs: u64) -> Result<(), ClusterError> {
    if let Some(client) = self.self_client() {
        let deliver_at = Utc::now() + chrono::Duration::seconds(timeout_secs as i64);
        let entity_id = self.entity_id().clone();
        
        client.notify_at(&entity_id, "handle_timeout", &(), deliver_at).await?;
    }
    Ok(())
}

#[workflow]
pub async fn handle_timeout(&self) -> Result<(), ClusterError> {
    self.on_timeout().await
}

Singletons

Cluster-wide unique instances:

use cruster::singleton::register_singleton;

register_singleton(&*sharding, "leader-election", || async {
    loop {
        // This runs on exactly one node
        perform_leader_duties().await;
        tokio::time::sleep(Duration::from_secs(10)).await;
    }
}).await?;

Testing

TestCluster

use cruster::testing::TestCluster;
use cruster::types::EntityId;

#[tokio::test]
async fn test_counter_increment() {
    // Basic cluster (in-memory)
    let cluster = TestCluster::new().await;
    
    // Or with workflow support (durable context)
    let cluster = TestCluster::with_workflow_support().await;
    
    // Register entity and get typed client
    let counter = Counter.register(cluster.sharding().clone()).await.unwrap();
    
    let entity_id = EntityId::new("test-1");
    let result = counter.increment(&entity_id, &10i64).await.unwrap();
    assert_eq!(result, 10);
    
    let result = counter.get(&entity_id).await.unwrap();
    assert_eq!(result, 10);
}

Client Methods

// Request-response
let result: Response = client.send(&entity_id, "method", &request).await?;

// Fire-and-forget
client.notify(&entity_id, "method", &request).await?;

// Persisted (at-least-once delivery)
let result: Response = client.send_persisted(&entity_id, "method", &request, Uninterruptible::Server).await?;

// Scheduled delivery
client.notify_at(&entity_id, "method", &request, deliver_at).await?;

Production Deployment

Multi-Node Cluster

use cruster::config::ShardingConfig;
use cruster::sharding_impl::ShardingImpl;

let config = ShardingConfig {
    runner_address: "10.0.0.1:9000".parse()?,
    shards_per_group: 2048,
    entity_max_idle_time: Duration::from_secs(300),
    ..Default::default()
};

let sharding = ShardingImpl::new(
    config,
    postgres_pool,
    etcd_runner_storage,
    grpc_runners,
).await?;

sharding.register_entity(Arc::new(MyEntity)).await?;
sharding.start().await?;

Storage Requirements

PostgreSQL - State persistence, message storage, workflow journals

etcd - Runner discovery, shard locks, health monitoring

Configuration Reference

Option Default Description
shards_per_group 2048 Shards per shard group
entity_max_idle_time 60s Idle timeout before eviction
entity_mailbox_capacity 100 Per-entity message queue size
storage_poll_interval 500ms Message storage polling frequency
storage_message_max_retries 10 Max delivery attempts

Architecture

┌─────────────────────────────────────────────────────────┐
│                       Clients                            │
└─────────────────────────┬───────────────────────────────┘
                          │
                          ▼
┌─────────────────────────────────────────────────────────┐
│                   Sharding Layer                         │
│         Consistent hashing + request routing             │
└─────────────────────────┬───────────────────────────────┘
                          │
          ┌───────────────┼───────────────┐
          ▼               ▼               ▼
    ┌──────────┐    ┌──────────┐    ┌──────────┐
    │ Runner 1 │    │ Runner 2 │    │ Runner 3 │
    │ Shards   │    │ Shards   │    │ Shards   │
    │  0-682   │    │ 683-1365 │    │1366-2047 │
    └────┬─────┘    └────┬─────┘    └────┬─────┘
         │               │               │
         └───────────────┴───────────────┘
                         │
         ┌───────────────┴───────────────┐
         ▼                               ▼
   ┌───────────┐                   ┌───────────┐
   │ PostgreSQL│                   │   etcd    │
   │  - State  │                   │ - Runners │
   │  - Journals│                  │ - Locks   │
   │  - Messages│                  │ - Health  │
   └───────────┘                   └───────────┘

Examples

See examples/ for complete examples:

  • cluster-tests - E2E test suite covering all features
  • chess-cluster - Distributed chess server with matchmaking

License

MIT