calimero-node 0.10.0

Core Calimero infrastructure and tools
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
# Concurrency & Availability

How Calimero nodes remain available during synchronization.

---

## Overview

Calimero nodes are designed for **continuous availability**. The node can:
- Respond to JSON-RPC queries while syncing
- Process requests for multiple contexts in parallel
- Handle network events without blocking API responses

**Key Design Principles**:
1. Sync runs as a background async task
2. Per-context locking (not global locks)
3. Thread-safe storage layer (RocksDB MVCC)
4. Channel-based event processing

---

## Concurrent Task Architecture

The node runs multiple independent tasks using `tokio::select!`:

```rust
// From: crates/node/src/run.rs
let mut sync = pin!(sync_manager.start());
let mut server = tokio::spawn(server);
let mut bridge = bridge_handle;

loop {
    tokio::select! {
        _ = &mut sync => {},          // Background sync
        res = &mut server => res??,   // HTTP/WebSocket server
        res = &mut bridge => { ... }  // Network event bridge
    }
}
```

```
┌─────────────────────────────────────────────────────────────────┐
│                         Main Event Loop                         │
│                        (tokio::select!)                         │
├─────────────────┬─────────────────┬─────────────────────────────┤
│  SyncManager    │  HTTP Server    │  NetworkEventBridge         │
│  (Background)   │  (User API)     │  (Network → NodeManager)    │
│                 │                 │                             │
│  • Interval     │  • REST API     │  • Forwards gossipsub       │
│    syncs        │  • WebSocket    │    messages to actors       │
│  • Delta DAG    │  • SSE events   │  • Channel-based (async)    │
│    catchup      │  • JSON-RPC     │                             │
└────────┬────────┴────────┬────────┴─────────────┬───────────────┘
         │                 │                      │
         ▼                 ▼                      ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────────────┐
│   NodeManager   │ │  ContextClient  │ │   DeltaStore (per ctx)  │
│   (Actix Actor) │ │  (Shared)       │ │   (RwLock-protected)    │
└─────────────────┘ └─────────────────┘ └─────────────────────────┘
```

---

## Per-Context Locking

### Design

Each context has its own mutex, allowing parallel operations across different contexts:

```rust
// From: crates/context/src/lib.rs
struct ContextMeta {
    meta: Context,
    lock: Arc<Mutex<ContextId>>,  // Per-context lock
}
```

### Implications

| Scenario | Behavior |
|----------|----------|
| Request A on Context 1, Request B on Context 2 | **Parallel** - different locks |
| Request A on Context 1, Request B on Context 1 | **Sequential** - same lock |
| Sync on Context 1, Query on Context 2 | **Parallel** - different locks |
| Sync on Context 1, Query on Context 1 | **Sequential** - waits for lock |

### Lock Acquisition

```rust
// Optimistic try_lock first, then async wait
fn lock(&self) -> Either<OwnedMutexGuard<ContextId>, impl Future<Output = OwnedMutexGuard<ContextId>>> {
    let Ok(guard) = self.lock.clone().try_lock_owned() else {
        return Either::Right(self.lock.clone().lock_owned());
    };
    Either::Left(guard)
}
```

---

## JSON-RPC Request Flow

### Execute Request

```
JSON-RPC POST /jsonrpc
┌─────────────────────────────────────┐
│ ServiceState { ctx_client }         │
│                                     │
│ ctx_client.execute(                 │
│   context_id,                       │
│   executor_public_key,              │
│   method,                           │
│   args                              │
│ )                                   │
└───────────────┬─────────────────────┘
┌─────────────────────────────────────┐
│ ContextManager (Actix Actor)        │
│                                     │
│ 1. get_or_fetch_context()           │
│ 2. context.lock()  ◄─── Per-context │
│ 3. WASM execute()                   │
│ 4. Release lock                     │
└───────────────┬─────────────────────┘
┌─────────────────────────────────────┐
│ RocksDB (Store)                     │
│ - Read state                        │
│ - Write changes (if mutating)       │
└─────────────────────────────────────┘
```

### What Gets Locked

| Operation | Acquires Lock | Why |
|-----------|---------------|-----|
| `execute()` (mutating method) | Yes | Modifies state, creates deltas |
| `execute()` (read-only query) | Yes | WASM runs, needs consistent view |
| `__calimero_sync_next` (sync apply) | Yes | Modifies state |
| Admin GET endpoints (list contexts) | No | Direct DB reads, no WASM |
| WebSocket subscriptions | No | Event streaming only |

---

## Sync Manager Concurrency

### Non-Blocking Design

```rust
// From: crates/node/src/sync/manager.rs
loop {
    tokio::select! {
        _ = next_sync.tick() => {
            // Periodic timer - non-blocking
        }
        Some(()) = async {
            loop { advance(&mut futs, &mut state).await? }
        } => {
            // Process completed syncs
        },
        Some((ctx, peer)) = ctx_sync_rx.recv() => {
            // On-demand sync request
        }
    }
    
    // Actual sync work happens AFTER select
    // Multiple syncs run concurrently via FuturesUnordered
}
```

### Concurrent Sync Operations

```rust
let mut futs = FuturesUnordered::new();

// Multiple contexts can sync in parallel
futs.push(timeout_at(deadline, self.perform_interval_sync(context_id, peer)));

// Only wait when at max concurrency
if futs.len() >= self.sync_config.max_concurrent {
    advance(&mut futs, &mut state).await;
}
```

---

## Storage Layer Concurrency

### RocksDB Thread Safety

The storage layer uses RocksDB which provides:
- **MVCC (Multi-Version Concurrency Control)**: Readers don't block writers
- **Snapshot isolation**: Consistent iteration during sync
- **Thread-safe handles**: Multiple threads can access simultaneously

```rust
// Snapshot iteration for consistent reads during sync
let mut iter = handle.iter_snapshot::<ContextStateKey>()?;

// Regular handle for point reads
let handle = self.context_client.datastore_handle();
let value = handle.get(&key)?;
```

### DeltaStore Locking

```rust
// From: crates/node/src/delta_store.rs
pub struct DeltaStore {
    dag: Arc<RwLock<CoreDagStore<Vec<Action>>>>,  // RwLock allows concurrent reads
    applier: Arc<ContextStorageApplier>,
    head_root_hashes: Arc<RwLock<HashMap<[u8; 32], [u8; 32]>>>,  // For merge detection
}
```

- **Multiple readers**: Queries can read DAG state concurrently
- **Single writer**: Delta application is serialized
- **Short critical sections**: Lock held only during DAG operations

---

## Availability During Different Sync States

### State Matrix

| Node State | JSON-RPC Availability | Notes |
|------------|----------------------|-------|
| **No sync in progress** | Full | Normal operation |
| **Periodic delta sync** | Full | Lock contention ~ms during apply |
| **Snapshot sync (uninitialized)** | `Uninitialized` error | Context unavailable until sync completes |
| **Snapshot sync (other contexts)** | Full | Only syncing context affected |
| **Delta catchup** | Full | Brief lock contention during apply |

### Uninitialized Context Protection

```rust
// From: crates/context/src/handlers/execute.rs
if !is_state_op && *context.meta.root_hash == [0; 32] {
    return ActorResponse::reply(Err(ExecuteError::Uninitialized));
}
```

Queries return `Uninitialized` error until first sync completes, preventing reads of empty state.

---

## Lock Contention Timeline

### Best Case (No Contention)

```
JSON-RPC Request ──[try_lock SUCCESS]──[WASM ~10ms]──[release]──
```

### With Contention (Same Context)

```
Request 1 ──────[LOCK]────────[WASM ~10ms]────────[release]─────
Request 2 ─────────[wait ~10ms]───────────────────[LOCK]──[WASM]──
Sync Apply ────────────────────────────────────────[wait]─[LOCK]──
```

### Different Contexts (Parallel)

```
Context A: ──────[LOCK]────[WASM ~10ms]────[release]──────
Context B: ──────[LOCK]────[WASM ~10ms]────[release]──────  (parallel!)
Context C: ──────[LOCK]────[WASM ~10ms]────[release]──────  (parallel!)
```

---

## Delta Buffering During Sync

### Problem

During snapshot sync, incoming deltas can't be applied (context uninitialized).

### Solution

```rust
// From: crates/node/src/lib.rs
pub(crate) struct SyncSession {
    pub state: SyncSessionState,
    pub delta_buffer: DeltaBuffer,            // Buffers incoming deltas
    pub last_drop_warning: Option<Instant>,   // Rate-limited warning tracking
}

// During snapshot sync (from handlers/state_delta.rs)
if node_state.should_buffer_delta(&context_id) {
    node_state.buffer_delta(&context_id, buffered);
    return Ok(());  // Non-blocking return
}
```

Deltas are buffered during snapshot sync and replayed after completion.

---

## Temporal Storage Layer (WASM Transactions)

### Problem

When WASM executes, it may:
- Read and write multiple keys
- Fail midway through execution
- Need to see its own uncommitted writes

Without proper handling, partial writes could corrupt state.

### Solution: Shadow Transactions

The `Temporal` layer provides transactional semantics by accumulating changes in memory before committing:

```rust
// From: crates/store/src/layer/temporal.rs
pub struct Temporal<'base, 'entry, L> {
    inner: &'base mut L,        // Underlying RocksDB store
    shadow: Transaction<'entry>, // In-memory pending changes
}
```

### How It Works

```
┌─────────────────────────────────────────────────────────────────┐
│                     WASM Execution                              │
│                                                                 │
│  storage.set("key1", value1)  ──┐                              │
│  storage.set("key2", value2)  ──┼──► Shadow Transaction        │
│  storage.get("key1")  ◄─────────┘    (in-memory BTreeMap)      │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘
                    On success: │ storage.commit()
┌─────────────────────────────────────────────────────────────────┐
│                      RocksDB                                    │
│  All changes applied atomically via inner.apply(&shadow)        │
└─────────────────────────────────────────────────────────────────┘
```

### Read Shadowing

When reading, the temporal layer checks the shadow first:

```rust
// From: crates/store/src/layer/temporal.rs
fn get<K: AsKeyParts>(&self, key: &K) -> EyreResult<Option<Slice<'_>>> {
    match self.shadow.get(key) {
        Some(Operation::Delete) => Ok(None),     // Key deleted in this tx
        Some(Operation::Put { value }) => Ok(Some(value.into())), // Uncommitted write
        None => self.inner.get(key),             // Fall back to RocksDB
    }
}
```

This ensures WASM sees its own uncommitted writes immediately.

### Execution Flow

```rust
// From: crates/context/src/handlers/execute.rs
async fn execute_and_persist_state(...) {
    // 1. Create temporal storage (wraps RocksDB with shadow transaction)
    let storage = ContextStorage::from(datastore.clone(), context.id);
    let private_storage = ContextPrivateStorage::from(datastore, context.id);
    
    // 2. Execute WASM (all writes go to shadow transaction)
    let (outcome, storage, private_storage) = execute(
        guard, module, executor, method, input,
        storage, private_storage, ...
    ).await?;
    
    // 3. On error: return early, shadow transaction is dropped
    if outcome.returns.is_err() {
        return Ok((outcome, None));  // No commit - changes discarded
    }
    
    // 4. On success with state change: commit both storage layers
    if let Some(root_hash) = outcome.root_hash {
        let store = storage.commit()?;           // Atomic commit to RocksDB
        let _private = private_storage.commit()?;
    }
}
```

### Transaction Guarantees

| Property | Guarantee |
|----------|-----------|
| **Atomicity** | All writes commit together or none do |
| **Isolation** | Uncommitted changes invisible to other contexts |
| **Read-your-writes** | WASM sees its own pending changes |
| **Rollback on error** | Shadow dropped if WASM fails |

### Two Storage Types

```rust
// From: crates/context/src/handlers/execute/storage.rs
// Synchronized storage (replicated via deltas)
#[self_referencing]
pub struct ContextStorage {
    context_id: ContextId,
    store: Store,
    #[borrows(mut store)]
    inner: Temporal<'this, 'static, Store>,  // → ContextState column
    keys: RefCell<Vec<Arc<key::ContextState>>>,
}

// Private storage (node-local only)
#[self_referencing]
pub struct ContextPrivateStorage {
    context_id: ContextId,
    store: Store,
    #[borrows(mut store)]
    inner: Temporal<'this, 'static, Store>,  // → ContextPrivateState column
    keys: RefCell<Vec<Arc<key::ContextPrivateState>>>,
}
```

| Storage Type | Synced | Use Case |
|--------------|--------|----------|
| `ContextStorage` | Yes | Application state (CRDTs) |
| `ContextPrivateStorage` | No | Node-local caches, preferences |

### Why This Matters for Concurrency

1. **No partial writes visible**: Other contexts/queries never see incomplete state
2. **Safe rollback**: Failed WASM execution doesn't corrupt state
3. **Isolated mutations**: Each execution has its own shadow transaction
4. **Deterministic commits**: Same actions produce same state (CRDT property)

---

## Network Event Bridge

### Channel-Based Decoupling

```rust
// From: crates/node/src/network_event_processor.rs
pub struct NetworkEventBridge {
    receiver: NetworkEventReceiver,  // mpsc channel
    node_manager: Addr<NodeManager>,
    shutdown: Arc<Notify>,           // Graceful shutdown signal
}

loop {
    tokio::select! {
        event = self.receiver.recv() => {
            match event {
                Some(event) => self.node_manager.do_send(event),
                None => break,  // Channel closed
            }
        }
        _ = self.shutdown.notified() => break,
    }
}
```

- Channel capacity: 1000 events (configurable)
- Decouples network I/O from processing
- Provides backpressure when overwhelmed

---

## Performance Implications

### Read-Heavy Workloads

- **Impact**: Minimal
- **Why**: WASM queries are fast (~10ms), lock held briefly
- **Recommendation**: No special handling needed

### Write-Heavy + Sync

- **Impact**: Moderate contention
- **Why**: Delta applies compete for lock with user writes
- **Recommendation**: Batch writes when possible

### Multiple Contexts

- **Impact**: None (fully parallel)
- **Why**: Each context has independent lock
- **Recommendation**: Design applications with multiple contexts for parallelism

### Snapshot Sync (Bootstrap)

- **Impact**: Context unavailable for ~seconds
- **Why**: Full state transfer required
- **Recommendation**: Show loading state in UI during bootstrap

---

## Best Practices

### For Application Developers

1. **Handle `Uninitialized` errors gracefully** - Show loading state during initial sync
2. **Use multiple contexts** for independent data - Enables parallel access
3. **Prefer read-only queries** when possible - Lower contention
4. **Batch writes** - Fewer lock acquisitions

### For Node Operators

1. **Monitor sync duration** - Long syncs indicate network/peer issues
2. **Watch for lock contention** - High contention suggests write-heavy load
3. **Configure appropriate timeouts** - Balance responsiveness vs. reliability

---

## Configuration

### Sync Configuration

```rust
// From: crates/node/src/sync/config.rs
pub struct SyncConfig {
    pub timeout: Duration,            // Sync timeout (default: 30s)
    pub interval: Duration,           // Min between syncs (default: 5s)
    pub frequency: Duration,          // How often to check (default: 10s)
    pub max_concurrent: usize,        // Max parallel syncs (default: 30)
    pub snapshot_chunk_size: usize,   // Chunk size for full resync (default: 64KB)
    pub delta_sync_threshold: usize,  // Max delta gap before full resync
}
```

### Buffer Configuration

```rust
// Delta buffer for snapshot sync (from crates/node/primitives/src/delta_buffer.rs)
pub const DEFAULT_BUFFER_CAPACITY: usize = 10_000;  // 10,000 deltas per context

// Rate limit for overflow warnings (from crates/node/src/constants.rs)
pub const DELTA_BUFFER_DROP_WARNING_RATE_LIMIT_S: u64 = 5;
```

---

## See Also

- [Architecture]architecture.md - Component structure
- [Sync Protocol]sync-protocol.md - How sync works
- [Performance]performance.md - Benchmarks and optimization
- [Troubleshooting]troubleshooting.md - Common issues

---

## Navigation

- **Previous**: [Sync Configuration]sync-configuration.md
- **Next**: [Troubleshooting]troubleshooting.md
- **Up**: [Documentation Index]DOCUMENTATION_INDEX.md