calimero-node 0.10.0

Core Calimero infrastructure and tools
docs.rs failed to build calimero-node-0.10.0
Please check the build logs for more information.
See Builds for ideas on how to fix a failed build, or Metadata for how to configure docs.rs builds.
If you believe this is docs.rs' fault, open an issue.

calimero-node

P2P node runtime with DAG-based CRDT synchronization, event handling, and blob sharing.

Overview

The node layer orchestrates:

  • Real-time sync: Gossipsub broadcasts for instant delta propagation
  • Periodic sync: P2P streams for catch-up and recovery
  • **Event handlersMenuAutomatic execution of application callbacks
  • Blob sharing: Content-addressed file distribution

Architecture

┌─────────────────────────────────────────────────┐
│ Application Layer (WASM)                        │
│  - User code                                     │
│  - CRDT operations                               │
│  - Event emission                                │
└────────────────┬────────────────────────────────┘
                 │
┌────────────────┴────────────────────────────────┐
│ Node Layer (This Crate)                         │
│  ┌──────────────┐  ┌──────────────┐            │
│  │ NodeManager  │  │ SyncManager  │            │
│  │  - Orchestrate│  │  - P2P sync  │            │
│  └──────┬───────┘  └──────┬───────┘            │
│         │                  │                     │
│  ┌──────┴───────┐  ┌──────┴───────┐            │
│  │ DeltaStore   │  │ Handlers     │            │
│  │  - DAG logic │  │  - Broadcasts│            │
│  │  - Buffering │  │  - Events    │            │
│  └──────────────┘  └──────────────┘            │
└────────────────┬────────────────────────────────┘
                 │
┌────────────────┴────────────────────────────────┐
│ Network Layer (libp2p)                          │
│  - Gossipsub (broadcast)                         │
│  - Streams (P2P)                                 │
│  - Discovery                                     │
└──────────────────────────────────────────────────┘

How Synchronization Works

Dual-Path Delta Propagation

flowchart TB
    subgraph "Path 1: Gossipsub Broadcast (Primary)"
        G1[Transaction executed<br/>on Node A]
        G2[Create CausalDelta<br/>parents: dag_heads]
        G3[Broadcast via Gossipsub]
        G4[All peers receive<br/>within ~100ms]
        G5{Parents ready?}
        G6A[Apply immediately]
        G6B[Buffer as pending]
        G7[Execute event handlers]
        
        G1 --> G2 --> G3 --> G4 --> G5
        G5 -->|Yes| G6A --> G7
        G5 -->|No| G6B
    end
    
    subgraph "Path 2: Periodic Sync (Fallback)"
        P1[SyncManager timer<br/>every 10s]
        P2[Select random peer]
        P3[Open P2P stream]
        P4[Exchange DAG heads]
        P5{Heads differ?}
        P6[Request missing deltas]
        P7[Apply missing deltas]
        P8[Catch up complete]
        
        P1 --> P2 --> P3 --> P4 --> P5
        P5 -->|Yes| P6 --> P7 --> P8
        P5 -->|No| P8
    end
    
    G6B -.->|Eventually| P6
    
    style G1 fill:#4DABF7,stroke:#333,stroke-width:3px,color:#000
    style G6A fill:#51CF66,stroke:#333,stroke-width:3px,color:#000
    style G6B fill:#FFB84D,stroke:#333,stroke-width:3px,color:#000
    style P1 fill:#FF6B6B,stroke:#333,stroke-width:3px,color:#000
    style P8 fill:#51CF66,stroke:#333,stroke-width:3px,color:#000

Why both paths?

  • Gossipsub: Fast (~100ms), reliable in good network conditions
  • Periodic sync: Ensures eventual consistency even with packet loss, partitions, or downtime

Synchronization Sequence

sequenceDiagram
    participant App as WASM App
    participant NodeA as Node A (Author)
    participant Gossip as Gossipsub Network
    participant NodeB as Node B (Peer)
    participant DAG as DAG Store
    participant Handler as Event Handler
    
    rect rgb(220, 237, 255)
        Note over App,NodeA: Transaction Execution
        App->>NodeA: execute("add_item", args)
        NodeA->>NodeA: WASM execution
        NodeA->>NodeA: Collect CRDT actions
        NodeA->>NodeA: Create CausalDelta {<br/>  id: D5,<br/>  parents: [D4],<br/>  payload: actions,<br/>  events: [ItemAdded]<br/>}
    end
    
    rect rgb(255, 237, 220)
        Note over NodeA,Gossip: Broadcast (Path 1)
        NodeA->>Gossip: Broadcast StateDelta
        Gossip->>NodeB: Propagate to peers (~100ms)
    end
    
    rect rgb(220, 255, 237)
        Note over NodeB,DAG: Delta Application
        NodeB->>DAG: add_delta(D5)
        DAG->>DAG: Check parents: [D4]
        DAG->>DAG: D4 in applied ✅
        DAG->>DAG: Apply to storage
        DAG->>DAG: Update heads: [D5]
        DAG-->>NodeB: Applied successfully
    end
    
    rect rgb(237, 220, 255)
        Note over NodeB,Handler: Event Handling
        NodeB->>NodeB: Check: not author ✅
        NodeB->>Handler: execute("on_item_added", data)
        Handler->>Handler: counter.increment()
        Handler-->>NodeB: Handler complete
        NodeB->>NodeB: Emit to WebSocket clients
    end
    
    Note over App,Handler: Total latency: ~100-200ms

Synchronization Configuration

// Production defaults (crates/node/src/sync/config.rs)
pub const DEFAULT_SYNC_FREQUENCY_SECS: u64 = 10;  // Check every 10s
pub const DEFAULT_SYNC_INTERVAL_SECS: u64 = 5;    // Min 5s between syncs
pub const DEFAULT_SYNC_TIMEOUT_SECS: u64 = 30;    // 30s timeout

// Optimized for 20-50 node networks with aggressive convergence

Tuning guide:

  • frequency: How often to check all contexts for sync
    • Lower = faster recovery from packet loss
    • Higher = less network overhead
  • interval: Minimum time between syncs for same context
    • Lower = more aggressive convergence
    • Higher = prevents sync spam
  • timeout: Max time for a sync operation
    • Should be > RTT + state transfer time

Event Handler Execution

Handler Execution Flow

flowchart TD
    Start([Delta received<br/>with events]) --> Applied{Delta<br/>applied?}
    
    Applied -->|No<br/>Pending| Buffer[Buffer delta + events]
    Applied -->|Yes| Deserialize[Deserialize events<br/>once, optimized]
    
    Buffer --> Wait[⏳ Wait for parents]
    Wait -.->|Parents arrive| Deserialize
    
    Deserialize --> AuthorCheck{Is author<br/>node?}
    
    AuthorCheck -->|Yes| Skip[❌ Skip handler execution<br/>Prevents infinite loops]
    AuthorCheck -->|No| HasHandler{Event has<br/>handler?}
    
    Skip --> EmitWS[Emit to WebSocket]
    
    HasHandler -->|No| EmitWS
    HasHandler -->|Yes| ExecuteHandler[✅ Execute handler in WASM]
    
    ExecuteHandler --> HandlerResult{Handler<br/>result?}
    
    HandlerResult -->|Success| CheckNewEvents{Emitted new<br/>events?}
    HandlerResult -->|Error| LogError[⚠️ Log error<br/>Event lost]
    
    CheckNewEvents -->|Yes| NewDelta[Create new CausalDelta<br/>Broadcast recursively]
    CheckNewEvents -->|No| EmitWS
    
    LogError --> EmitWS
    NewDelta --> EmitWS
    EmitWS --> Done([Complete])
    
    style Start fill:#4DABF7,stroke:#333,stroke-width:3px,color:#000
    style Applied fill:#FFB84D,stroke:#333,stroke-width:3px,color:#000
    style AuthorCheck fill:#FFB84D,stroke:#333,stroke-width:3px,color:#000
    style Skip fill:#FF6B6B,stroke:#333,stroke-width:3px,color:#000
    style ExecuteHandler fill:#51CF66,stroke:#333,stroke-width:3px,color:#000
    style Buffer fill:#FFB84D,stroke:#333,stroke-width:3px,color:#000
    style Done fill:#51CF66,stroke:#333,stroke-width:3px,color:#000

Critical Rules:

  1. Author nodes DO NOT execute their own handlers

    • Prevents infinite loops (handler → event → handler → ...)
    • Ensures distributed execution model
    • Only receiving nodes execute handlers
  2. Events lost if delta never applied

    • Pending deltas buffer events
    • If parents never arrive, handlers never execute
    • TODO: Implement parent request protocol

Handler Requirements

Event handlers may execute in parallel in future optimizations. Handlers MUST be:

  1. Commutative: Order doesn't matter

    • Counter::increment()
    • create() → update() chains
  2. Independent: No shared mutable state

    • ✅ Each handler uses unique CRDT key
    • ❌ Multiple handlers modifying same entity
  3. Idempotent: Safe to retry

    • ✅ CRDT operations
    • ❌ External API calls
  4. Pure: No side effects

    • ✅ Modify CRDT state only
    • ❌ HTTP requests, file I/O

See HANDLER_AUDIT.md for detailed analysis.

DAG Structure

Linear History (Simple Case)

Root → D1 → D2 → D3
heads = [D3]

Concurrent Updates (Fork)

      D0
     / \
    D1A D1B  ← Fork! Two nodes updated simultaneously
     \ /
      D2     ← Merge delta (parents: [D1A, D1B])
heads = [D2]

Out-of-Order Delivery

Receive: D3 (parents: [D2])
  → D2 not found → buffer as pending
  → heads = [D1]

Receive: D2 (parents: [D1])
  → D1 exists → apply D2
  → Check pending → D3 now ready → apply D3 (cascade!)
  → heads = [D3]

API

// Create DAG
let mut dag = DagStore::new([0; 32]);  // root = genesis

// Add delta
let delta = CausalDelta {
    id: compute_id(...),
    parents: dag.get_heads(),  // Build on current tips
    payload: my_data,
    timestamp: now(),
};

let applied = dag.add_delta(delta, &applier).await?;
if !applied {
    // Delta pending - request missing parents
    let missing = dag.get_missing_parents();
            for parent_id in missing {
        request_from_network(parent_id).await?;
    }
}

// Query state
let heads = dag.get_heads();          // Current DAG tips
let delta = dag.get_delta(&id);       // Get specific delta
let stats = dag.pending_stats();      // Pending count, oldest age

// Cleanup
let evicted = dag.cleanup_stale(Duration::from_secs(300));  // 5 min timeout

Performance Characteristics

Memory (per context):

Applied deltas: ~1000 × 5KB = 5MB
Pending deltas: ~0-100 × 5KB = 0-500KB
Heads: ~1-10 × 32 bytes = 32-320 bytes
Total: ~5-6MB typical, ~10MB worst case

Latency:

Add delta (parents ready): <1ms
Add delta (cascade of 10): ~10ms
Cleanup stale: <1ms for 100 pending

Integration with calimero-node

The node layer wraps DagStore with WASM execution:

// crates/node/src/delta_store.rs
pub struct DeltaStore {
    dag: Arc<RwLock<DagStore<Vec<Action>>>>,
    applier: Arc<ContextStorageApplier>,
}

impl DeltaStore {
    pub async fn add_delta(&self, delta: CausalDelta<Vec<Action>>) -> Result<bool> {
        let mut dag = self.dag.write().await;
        let result = dag.add_delta(delta, &*self.applier).await?;
        
        // Update context DAG heads after apply
        let heads = dag.get_heads();
        drop(dag);  // Release lock before external call
        
        self.applier.context_client
            .update_dag_heads(&self.applier.context_id, heads)?;
        
        Ok(result)
    }
}

Design Principles

  • Pure DAG logic: No network, storage, or WASM dependencies
  • Generic over payload: Works with any T: Serialize + Clone
  • Dependency injection: Applier pattern enables testing
  • Memory-only: DAG state not persisted (handled by wrapper)

Testing

cargo test -p calimero-dag

# Key tests validate:
# - Linear sequences apply in order
# - Out-of-order deltas buffer and cascade
# - Concurrent updates create forks
# - Stale deltas get cleaned up

Common Issues

Deltas stuck in pending

Symptom: pending_stats().count keeps growing

Causes:

  1. Missing parent deltas never requested from network
  2. Network partition preventing delta delivery
  3. Parent delta lost (packet drop)

Solutions:

  • Implement parent request protocol (TODO)
  • Trigger state sync fallback
  • Check network connectivity

Memory growing unbounded

Symptom: Node memory usage increasing over time

Causes:

  1. No timeout for pending deltas
  2. Too many deltas kept in applied set

Solutions:

  • cleanup_stale() runs every 60s (evicts >5min old)
  • Implement DAG pruning (TODO)
  • Set reasonable limits (100 pending max)

Testing

# Run all node tests
cargo test -p calimero-node

# Run specific integration test
cargo test -p calimero-node --test dag_storage_integration

# With logs
RUST_LOG=debug cargo test -p calimero-node -- --nocapture

Integration Tests

The node crate includes integration tests in tests/ that validate end-to-end scenarios.

Test: DAG + Storage Integration

sequenceDiagram
    participant Test
    participant Node as NodeManager
    participant DAG as DeltaStore
    participant WASM as WASM Runtime
    participant Storage as Storage Layer
    
    Note over Test,Storage: Scenario: Multi-transaction with DAG ordering
    
    Test->>Node: execute("set", {k1: v1})
    Node->>WASM: Run WASM
    WASM->>Storage: map.insert(k1, v1)
    Storage->>Storage: Generate Action::Update
    Storage-->>WASM: Success
    WASM-->>Node: Outcome {delta_1}
    Node->>DAG: add_delta(delta_1)
    DAG-->>Test: Applied ✅
    
    Test->>Node: execute("set", {k2: v2})
    Node->>WASM: Run WASM
    WASM->>Storage: map.insert(k2, v2)
    Storage-->>WASM: Success
    WASM-->>Node: Outcome {delta_2, parents: [delta_1]}
    Node->>DAG: add_delta(delta_2)
    DAG->>DAG: Check: delta_1 applied ✅
    DAG-->>Test: Applied ✅
    
    Test->>Node: get("k1")
    Node->>WASM: Run WASM
    WASM->>Storage: map.get(k1)
    Storage-->>WASM: Some(v1)
    WASM-->>Test: v1 ✅
    
    Note over Test: Validates: DAG ordering + Storage consistency

What it validates:

  • WASM execution generates valid deltas
  • DAG correctly tracks parent relationships
  • Storage maintains consistency across transactions
  • Heads update properly

Test file: tests/dag_storage_integration.rs

Test: Sync Protocol (Mock Network)

graph TB
    subgraph "Test Setup"
        N1[Create Node A<br/>with context]
        N2[Create Node B<br/>with context]
        
        N1 --> S[Shared test store]
        N2 --> S
    end
    
    subgraph "Node A: Create Data"
        E1[Execute: set k1=v1]
        D1[Generate Delta D1]
        B1[Broadcast Delta D1<br/>mock network]
        
        E1 --> D1 --> B1
    end
    
    subgraph "Node B: Receive & Apply"
        R1[Receive Delta D1]
        C1{Parents ready?}
        A1[Apply to storage]
        V1[Verify: get k1 = v1]
        
        B1 --> R1 --> C1 --> A1 --> V1
    end
    
    subgraph "Sync Protocol"
        T1[Timer: periodic sync]
        Q1[Query Node A for heads]
        C2{Heads differ?}
        S1[Request missing deltas]
        S2[Apply deltas]
        S3[Converge to same state]
        
        T1 --> Q1 --> C2 --> S1 --> S2 --> S3
    end
    
    style N1 fill:#4DABF7,stroke:#333,stroke-width:3px,color:#000
    style N2 fill:#4DABF7,stroke:#333,stroke-width:3px,color:#000
    style V1 fill:#51CF66,stroke:#333,stroke-width:3px,color:#000
    style S3 fill:#51CF66,stroke:#333,stroke-width:3px,color:#000

What it validates:

  • Sync protocol correctly exchanges state
  • Nodes converge after sync
  • Missing deltas requested and applied
  • Periodic sync timer works

Test file: tests/sync_protocols.rs

Test Categories

Test File What It Tests Network
dag_storage_integration.rs DAG + WASM + Storage No (unit)
sync_protocols.rs State sync, delta exchange Mocked
network_simulation.rs Multi-node scenarios Simulated
dag_persistence.rs DAG state save/load No (unit)

End-to-End Testing

End-to-end tests validate real nodes with actual network communication. These tests are automatically run via GitHub Actions workflows:

GitHub Actions Workflows:

  1. merobox-workflows.yml - Runs merobox workflows for all applications in apps/ directory:

    • Validates application functionality
    • Tests multi-node scenarios
    • Verifies CRDT consistency
    • Tests event handler execution
  2. e2e-blockchain.yml - Tests blockchain integration proposals:

    • NEAR protocol proposals
    • Validates NEAR integration scenarios
  3. test-sdk-js.yml - Ensures JavaScript SDK compatibility:

    • Builds SDK JS examples
    • Runs merobox workflows from SDK JS examples
    • Validates that core changes don't break JS SDK compatibility

E2E test coverage:

  • ✅ Context creation and joining
  • ✅ Application installation
  • ✅ Multi-node state sync
  • ✅ Event handler execution
  • ✅ G-Counter CRDT consistency
  • ✅ Network resilience
  • ✅ NEAR blockchain integration
  • ✅ JavaScript SDK compatibility

Documentation

For complete documentation, see the Documentation Index.

Key Guides:

See Also

License

See COPYRIGHT and LICENSE.md in the repository root.