Skip to main content

calimero_node_primitives/sync/
protocol_trait.rs

1//! Common trait for sync protocol implementations.
2//!
3//! This module defines the [`SyncProtocolExecutor`] trait that all sync protocols
4//! implement. This enables:
5//!
6//! - Protocol implementation details contained within each protocol module
7//! - Common interface for `SyncManager` to invoke any protocol
8//! - Same code path for production and simulation (only `Store` backend differs)
9//!
10//! # Architecture
11//!
12//! ```text
13//! ┌─────────────────────────────────────────────────────────────────┐
14//! │                     SyncProtocolExecutor trait                   │
15//! │  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐ │
16//! │  │ HashComparison  │  │    Snapshot     │  │   LevelWise     │ │
17//! │  │    Protocol     │  │    Protocol     │  │    Protocol     │ │
18//! │  └────────┬────────┘  └────────┬────────┘  └────────┬────────┘ │
19//! │           │                    │                    │          │
20//! │           └────────────────────┼────────────────────┘          │
21//! │                                │                               │
22//! │                    ┌───────────┴───────────┐                   │
23//! │                    │   SyncTransport       │                   │
24//! │                    │ (Stream or SimStream) │                   │
25//! │                    └───────────────────────┘                   │
26//! └─────────────────────────────────────────────────────────────────┘
27//! ```
28//!
29//! # Responder Dispatch Model
30//!
31//! The `SyncManager` dispatches incoming sync requests using this flow:
32//!
33//! 1. Manager receives stream and calls `recv()` to get the first message
34//! 2. Manager matches on `InitPayload` to determine which protocol to use
35//! 3. Manager extracts protocol-specific data from the first message
36//! 4. Manager calls `run_responder()` passing the extracted data via `ResponderInit`
37//!
38//! This design is necessary because the manager must peek at the first message
39//! for routing, but once consumed it cannot be "un-read". The `ResponderInit`
40//! associated type allows each protocol to declare what data it needs from
41//! the first request.
42//!
43//! # Example
44//!
45//! ```ignore
46//! use calimero_node_primitives::sync::{SyncProtocolExecutor, HashComparisonProtocol};
47//!
48//! // Production initiator
49//! let mut transport = StreamTransport::new(&mut stream);
50//! let stats = HashComparisonProtocol::run_initiator(
51//!     &mut transport,
52//!     &store,
53//!     context_id,
54//!     identity,
55//!     HashComparisonConfig { remote_root_hash },
56//! ).await?;
57//!
58//! // Production responder (manager extracts first request data)
59//! let first_request = HashComparisonFirstRequest { node_id, max_depth };
60//! HashComparisonProtocol::run_responder(
61//!     &mut transport,
62//!     &store,
63//!     context_id,
64//!     identity,
65//!     first_request,
66//! ).await?;
67//! ```
68
69use async_trait::async_trait;
70use calimero_primitives::context::ContextId;
71use calimero_primitives::identity::PublicKey;
72use calimero_store::Store;
73use eyre::Result;
74
75use super::SyncTransport;
76
77/// Trait for sync protocol implementations.
78///
79/// Each sync protocol (HashComparison, Snapshot, LevelWise, etc.) implements
80/// this trait. The protocol logic is generic over:
81///
82/// - `T: SyncTransport` - the transport layer (production streams or simulation channels)
83/// - `Store` - the storage backend (RocksDB or InMemoryDB)
84///
85/// This enables the same protocol code to run in both production and simulation.
86///
87/// Note: Uses `?Send` because `RuntimeEnv` (used for storage access) contains `Rc`
88/// which is not `Send`. Callers must not spawn these futures across threads.
89#[async_trait(?Send)]
90pub trait SyncProtocolExecutor {
91    /// Protocol-specific configuration for the initiator.
92    ///
93    /// For example, HashComparison needs the remote root hash.
94    type Config: Send;
95
96    /// Data extracted from the first request for responder dispatch.
97    ///
98    /// The manager parses the first `InitPayload` and constructs this type
99    /// to pass to `run_responder`. This is necessary because the manager
100    /// consumes the first message for routing, so the protocol cannot
101    /// `recv()` it again.
102    ///
103    /// For example:
104    /// - HashComparison needs `{ node_id, max_depth }` from `TreeNodeRequest`
105    /// - LevelWise needs `{ level, parent_ids }` from `LevelWiseRequest`
106    type ResponderInit: Send;
107
108    /// Protocol-specific statistics/results.
109    type Stats: Send + Default;
110
111    /// Run the initiator (pulling) side of the protocol.
112    ///
113    /// The initiator requests data from the responder and applies it locally.
114    ///
115    /// # Arguments
116    ///
117    /// * `transport` - The transport for sending/receiving messages
118    /// * `store` - The local storage (works with both RocksDB and InMemoryDB)
119    /// * `context_id` - The context being synced
120    /// * `identity` - Our identity for this context
121    /// * `config` - Protocol-specific configuration
122    ///
123    /// # Returns
124    ///
125    /// Protocol-specific statistics on success.
126    async fn run_initiator<T: SyncTransport>(
127        transport: &mut T,
128        store: &Store,
129        context_id: ContextId,
130        identity: PublicKey,
131        config: Self::Config,
132    ) -> Result<Self::Stats>;
133
134    /// Run the responder side of the protocol.
135    ///
136    /// The responder answers requests from the initiator. The first request's
137    /// data is passed via `first_request` because the manager has already
138    /// consumed the first message for routing.
139    ///
140    /// # Arguments
141    ///
142    /// * `transport` - The transport for sending/receiving messages
143    /// * `store` - The local storage
144    /// * `context_id` - The context being synced
145    /// * `identity` - Our identity for this context
146    /// * `first_request` - Data from the first `InitPayload`, extracted by the manager
147    async fn run_responder<T: SyncTransport>(
148        transport: &mut T,
149        store: &Store,
150        context_id: ContextId,
151        identity: PublicKey,
152        first_request: Self::ResponderInit,
153    ) -> Result<()>;
154}