cruster-macros 0.0.1

Procedural macros for the cruster distributed entity framework
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
# Cruster

A Rust framework for building distributed, stateful entity systems with durable workflows.

Cruster provides a distributed actor/entity model with consistent-hashing-based shard assignment, persistent messaging, singleton management, durable workflows, and cron scheduling.

> **Project Status:** This project is under active development and **not yet ready for production use**. The API may change without notice. It is currently being developed alongside an internal project. Feedback and contributions are welcome, but please be aware of the experimental nature of this library.

## Features

- **Entity System** - Define stateful actors with automatic persistence and lifecycle management
- **Durable Workflows** - Long-running operations that survive crashes and restarts
- **Activity Journaling** - State mutations with transactional guarantees and replay safety
- **Entity Traits** - Composable behaviors that can be mixed into multiple entities
- **Singletons** - Cluster-wide unique instances with automatic failover
- **Scheduled Tasks** - Cron-based scheduling with distributed coordination
- **gRPC Transport** - Built-in inter-node communication with streaming support
- **Storage** - PostgreSQL for persistence, etcd for cluster formation & health monitoring

## Installation

### Prerequisites

Cruster requires the Protocol Buffers compiler (`protoc`) to be installed for building gRPC support:

```bash
# Debian/Ubuntu
sudo apt-get install protobuf-compiler

# macOS
brew install protobuf

# Windows (with Chocolatey)
choco install protoc
```

### Cargo

Add to your `Cargo.toml`:

```toml
[dependencies]
cruster = "0.1"
cruster-macros = "0.1"
```

## Quick Start

### Defining an Entity

```rust
use cruster::prelude::*;
use serde::{Deserialize, Serialize};

// Define the entity's state
#[derive(Clone, Serialize, Deserialize, Default)]
struct CounterState {
    count: i32,
}

// Define the entity
#[entity]
#[derive(Clone)]
struct Counter;

#[entity_impl]
#[state(CounterState)]
impl Counter {
    // Initialize state when entity is first created
    fn init(&self, _ctx: &EntityContext) -> Result<CounterState, ClusterError> {
        Ok(CounterState::default())
    }

    // Activity: mutates state, journaled for replay
    #[activity]
    async fn increment(&mut self, amount: i32) -> Result<i32, ClusterError> {
        self.state.count += amount;
        Ok(self.state.count)
    }

    // RPC: read-only, publicly callable
    #[rpc]
    async fn get_count(&self) -> Result<i32, ClusterError> {
        Ok(self.state.count)
    }
}
```

### Using the Entity

```rust
use cruster::testing::TestCluster;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create an in-memory test cluster
    let cluster = TestCluster::new().await;
    
    // Register the entity and get a typed client
    let client = cluster.register(Counter).await?;
    
    // Call methods on the entity
    let count = client.call("counter-1", "increment", 5).await?;
    println!("Count: {}", count); // Count: 5
    
    let count = client.call("counter-1", "get_count", ()).await?;
    println!("Count: {}", count); // Count: 5
    
    Ok(())
}
```

## Core Concepts

### Method Types

| Annotation | Purpose | Self Type | Journaled | Callable From |
|------------|---------|-----------|-----------|---------------|
| `#[rpc]` | Read-only operations | `&self` | No | External clients |
| `#[activity]` | State mutations | `&mut self` | Yes | Workflows, other activities |
| `#[workflow]` | Orchestrate activities | `&self` or `&mut self` | Yes | External clients |
| `#[method]` | Private helpers | Any | No | Internal only |

### Visibility Modifiers

- `#[public]` - Callable from generated client (default for `#[rpc]` and `#[workflow]`)
- `#[protected]` - Callable within the entity and by composed traits
- `#[private]` - Callable only within the defining scope (default for `#[activity]`)

### State Persistence

All entity state is automatically persisted. When an entity is evicted from memory (due to idle timeout) and later reactivated, its state is restored from storage.

```rust
#[entity_impl]
#[state(MyState)]  // State is always persisted
impl MyEntity {
    fn init(&self, _ctx: &EntityContext) -> Result<MyState, ClusterError> {
        Ok(MyState::default())
    }
}
```

### Durable Workflows

Workflows orchestrate multiple activities and survive crashes. On restart, workflows replay from their journal, skipping already-completed activities.

```rust
#[entity_impl]
#[state(OrderState)]
impl OrderProcessor {
    #[workflow]
    async fn process_order(&mut self, order_id: String) -> Result<(), ClusterError> {
        // Each activity is journaled - if we crash and restart,
        // completed activities return their cached results
        self.validate_order(&order_id).await?;
        self.charge_payment(&order_id).await?;
        self.ship_order(&order_id).await?;
        self.send_confirmation(&order_id).await?;
        Ok(())
    }

    #[activity]
    async fn validate_order(&mut self, order_id: &str) -> Result<(), ClusterError> {
        // Validation logic...
        Ok(())
    }

    #[activity]
    async fn charge_payment(&mut self, order_id: &str) -> Result<(), ClusterError> {
        // Payment logic - only executed once even on replay
        Ok(())
    }
    
    // ... more activities
}
```

### Idempotency Keys

Activities and workflows are deduplicated by their idempotency key. The default key is `hash(method_name, serialized_params)`.

Custom keys can be specified:

```rust
#[workflow(key(|order_id, _body| order_id))]
async fn send_email(&mut self, order_id: OrderId, body: String) -> Result<(), ClusterError> {
    // Same order_id always returns cached result
    Ok(())
}
```

### Entity Traits

Traits provide reusable behaviors that can be composed into entities:

```rust
// Define a trait
#[entity_trait]
pub trait Auditable {
    #[rpc]
    async fn get_audit_log(&self, limit: usize) -> Result<Vec<AuditEntry>, ClusterError>;
    
    #[activity]
    async fn log_audit(&mut self, action: String, actor: String) -> Result<(), ClusterError>;
}

// Compose into an entity
#[entity_impl(traits(Auditable))]
#[state(MyState)]
impl MyEntity {
    // Entity methods...
}
```

### Singletons

Singletons are cluster-wide unique instances. Only one instance runs at a time, with automatic failover if the hosting node fails.

```rust
use cruster::singleton::singleton;

// Register a singleton using the builder API
singleton("leaderboard", || async {
    // Singleton logic runs on exactly one node
    loop {
        // Process leaderboard updates...
        tokio::time::sleep(Duration::from_secs(1)).await;
    }
})
.register(&*sharding)
.await?;

// Or use the direct function
use cruster::singleton::register_singleton;

register_singleton(&*sharding, "metrics-collector", || async {
    loop {
        collect_metrics().await;
        tokio::time::sleep(Duration::from_secs(10)).await;
    }
}).await?;
```

### Deferreds (Async Coordination)

Deferreds allow workflows to wait for signals from other parts of the system:

```rust
#[entity_impl]
#[state(ApprovalState)]
impl ApprovalWorkflow {
    #[workflow]
    async fn wait_for_approval(&mut self, request_id: String) -> Result<bool, ClusterError> {
        let signal_name = format!("approval/{}", request_id);
        
        // This suspends the workflow until resolved
        let approved: bool = self.await_deferred(&signal_name).await?;
        
        Ok(approved)
    }

    // Called by another workflow or RPC to resolve the deferred
    #[activity]
    async fn approve_request(&mut self, request_id: String) -> Result<(), ClusterError> {
        let signal_name = format!("approval/{}", request_id);
        self.resolve_deferred(&signal_name, &true).await?;
        Ok(())
    }
}
```

### Timers and Sleep

Durable sleep that survives restarts:

```rust
#[workflow]
async fn delayed_notification(&mut self) -> Result<(), ClusterError> {
    // Sleep is persisted - if we restart, we resume from where we left off
    self.sleep(Duration::from_secs(3600)).await?;
    self.send_notification().await?;
    Ok(())
}
```

## Cluster Configuration

### Single Node (Development)

```rust
use cruster::single_runner::SingleRunner;

let runner = SingleRunner::new(postgres_pool).await?;
runner.register(MyEntity).await?;
runner.start().await?;
```

### Multi-Node (Production)

```rust
use cruster::config::ShardingConfig;
use cruster::sharding_impl::ShardingImpl;

let config = ShardingConfig {
    runner_address: "10.0.0.1:9000".parse()?,
    shard_count: 128,
    ..Default::default()
};

let sharding = ShardingImpl::new(
    config,
    postgres_pool,
    etcd_runner_storage,
    grpc_runners,
).await?;

sharding.register_entity(Arc::new(MyEntity)).await?;
sharding.start().await?;
```

### Configuration Options

| Option | Default | Description |
|--------|---------|-------------|
| `shard_count` | 128 | Number of shards per group |
| `max_idle_time` | 60s | Entity eviction timeout |
| `mailbox_capacity` | 1000 | Per-entity message queue size |
| `storage_poll_interval` | 100ms | Message storage polling frequency |
| `storage_message_max_retries` | 3 | Max delivery attempts before dead-letter |

## Storage

### PostgreSQL

Used for:
- Entity state persistence
- Message storage (at-least-once delivery)
- Workflow journals
- Timer and deferred value storage

Required tables are created via migrations in `migrations/`.

### etcd

Used for:
- Runner registration and discovery
- Shard lock acquisition
- Health monitoring
- Leader election

## Testing

### TestCluster

An in-memory cluster for unit tests:

```rust
use cruster::testing::TestCluster;

#[tokio::test]
async fn test_counter() {
    // Basic cluster
    let cluster = TestCluster::new().await;
    
    // With message storage for at-least-once delivery
    let cluster = TestCluster::with_message_storage().await;
    
    // Full workflow support
    let cluster = TestCluster::with_workflow_support().await;
    
    let client = cluster.register(Counter).await.unwrap();
    
    let result = client.call("test-1", "increment", 5).await.unwrap();
    assert_eq!(result, 5);
}
```

## Examples

### cluster-tests

E2E test suite demonstrating all features. Requires PostgreSQL and etcd:

```bash
# Start infrastructure (Postgres and etcd)
docker run -d --name postgres -p 5432:5432 \
  -e POSTGRES_PASSWORD=postgres \
  -e POSTGRES_DB=cluster \
  postgres:16

docker run -d --name etcd -p 2379:2379 \
  quay.io/coreos/etcd:v3.5.9 \
  /usr/local/bin/etcd \
  --advertise-client-urls http://0.0.0.0:2379 \
  --listen-client-urls http://0.0.0.0:2379

# Run the test runner
cd examples/cluster-tests
POSTGRES_URL=postgres://postgres:postgres@localhost/cluster \
ETCD_ENDPOINTS=localhost:2379 \
RUNNER_ADDRESS=localhost:9000 \
cargo run

# In another terminal, run the bash tests:
./tests/e2e.sh
```

### chess-cluster

Distributed chess server demonstrating:
- Player sessions (in-memory state)
- Game state persistence (workflows)
- Matchmaking service (stateless entity)
- Leaderboard singleton
- Move timeouts (scheduled messages)
- Auditable trait composition

**Note:** The HTTP API layer (M3) is not yet implemented. Currently only the entity layer is complete and can be tested via `TestCluster`:

```bash
cd examples/chess-cluster
cargo test
```

## Architecture

```
┌─────────────────────────────────────────────────────────────────┐
│                         Client                                   │
└───────────────────────────┬─────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│                      Sharding Layer                              │
│  - Consistent hashing for shard assignment                       │
│  - Request routing to correct runner                             │
│  - Storage polling for persisted messages                        │
└───────────────────────────┬─────────────────────────────────────┘
            ┌───────────────┼───────────────┐
            ▼               ▼               ▼
┌───────────────┐  ┌───────────────┐  ┌───────────────┐
│   Runner 1    │  │   Runner 2    │  │   Runner 3    │
│  Shards 0-42  │  │  Shards 43-85 │  │  Shards 86-127│
└───────┬───────┘  └───────┬───────┘  └───────┬───────┘
        │                  │                  │
        └──────────────────┴──────────────────┘
            ┌──────────────┴──────────────┐
            ▼                             ▼
    ┌───────────────┐             ┌───────────────┐
    │   PostgreSQL  │             │     etcd      │
    │  - State      │             │  - Runners    │
    │  - Messages   │             │  - Shard locks│
    │  - Journals   │             │  - Health     │
    └───────────────┘             └───────────────┘
```

## API Reference

See [specs/architecture.md](specs/architecture.md) for detailed API documentation.

## License

MIT