calimero_node_primitives/sync/protocol_trait.rs
1//! Common trait for sync protocol implementations.
2//!
3//! This module defines the [`SyncProtocolExecutor`] trait that all sync protocols
4//! implement. This enables:
5//!
6//! - Protocol implementation details contained within each protocol module
7//! - Common interface for `SyncManager` to invoke any protocol
8//! - Same code path for production and simulation (only `Store` backend differs)
9//!
10//! # Architecture
11//!
12//! ```text
13//! ┌─────────────────────────────────────────────────────────────────┐
14//! │ SyncProtocolExecutor trait │
15//! │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
16//! │ │ HashComparison │ │ Snapshot │ │ LevelWise │ │
17//! │ │ Protocol │ │ Protocol │ │ Protocol │ │
18//! │ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘ │
19//! │ │ │ │ │
20//! │ └────────────────────┼────────────────────┘ │
21//! │ │ │
22//! │ ┌───────────┴───────────┐ │
23//! │ │ SyncTransport │ │
24//! │ │ (Stream or SimStream) │ │
25//! │ └───────────────────────┘ │
26//! └─────────────────────────────────────────────────────────────────┘
27//! ```
28//!
29//! # Responder Dispatch Model
30//!
31//! The `SyncManager` dispatches incoming sync requests using this flow:
32//!
33//! 1. Manager receives stream and calls `recv()` to get the first message
34//! 2. Manager matches on `InitPayload` to determine which protocol to use
35//! 3. Manager extracts protocol-specific data from the first message
36//! 4. Manager calls `run_responder()` passing the extracted data via `ResponderInit`
37//!
38//! This design is necessary because the manager must peek at the first message
39//! for routing, but once consumed it cannot be "un-read". The `ResponderInit`
40//! associated type allows each protocol to declare what data it needs from
41//! the first request.
42//!
43//! # Example
44//!
45//! ```ignore
46//! use calimero_node_primitives::sync::{SyncProtocolExecutor, HashComparisonProtocol};
47//!
48//! // Production initiator
49//! let mut transport = StreamTransport::new(&mut stream);
50//! let stats = HashComparisonProtocol::run_initiator(
51//! &mut transport,
52//! &store,
53//! context_id,
54//! identity,
55//! HashComparisonConfig { remote_root_hash },
56//! ).await?;
57//!
58//! // Production responder (manager extracts first request data)
59//! let first_request = HashComparisonFirstRequest { node_id, max_depth };
60//! HashComparisonProtocol::run_responder(
61//! &mut transport,
62//! &store,
63//! context_id,
64//! identity,
65//! first_request,
66//! ).await?;
67//! ```
68
69use async_trait::async_trait;
70use calimero_primitives::context::ContextId;
71use calimero_primitives::identity::PublicKey;
72use calimero_store::Store;
73use eyre::Result;
74
75use super::SyncTransport;
76
77/// Trait for sync protocol implementations.
78///
79/// Each sync protocol (HashComparison, Snapshot, LevelWise, etc.) implements
80/// this trait. The protocol logic is generic over:
81///
82/// - `T: SyncTransport` - the transport layer (production streams or simulation channels)
83/// - `Store` - the storage backend (RocksDB or InMemoryDB)
84///
85/// This enables the same protocol code to run in both production and simulation.
86///
87/// Note: Uses `?Send` because `RuntimeEnv` (used for storage access) contains `Rc`
88/// which is not `Send`. Callers must not spawn these futures across threads.
89#[async_trait(?Send)]
90pub trait SyncProtocolExecutor {
91 /// Protocol-specific configuration for the initiator.
92 ///
93 /// For example, HashComparison needs the remote root hash.
94 type Config: Send;
95
96 /// Data extracted from the first request for responder dispatch.
97 ///
98 /// The manager parses the first `InitPayload` and constructs this type
99 /// to pass to `run_responder`. This is necessary because the manager
100 /// consumes the first message for routing, so the protocol cannot
101 /// `recv()` it again.
102 ///
103 /// For example:
104 /// - HashComparison needs `{ node_id, max_depth }` from `TreeNodeRequest`
105 /// - LevelWise needs `{ level, parent_ids }` from `LevelWiseRequest`
106 type ResponderInit: Send;
107
108 /// Protocol-specific statistics/results.
109 type Stats: Send + Default;
110
111 /// Run the initiator (pulling) side of the protocol.
112 ///
113 /// The initiator requests data from the responder and applies it locally.
114 ///
115 /// # Arguments
116 ///
117 /// * `transport` - The transport for sending/receiving messages
118 /// * `store` - The local storage (works with both RocksDB and InMemoryDB)
119 /// * `context_id` - The context being synced
120 /// * `identity` - Our identity for this context
121 /// * `config` - Protocol-specific configuration
122 ///
123 /// # Returns
124 ///
125 /// Protocol-specific statistics on success.
126 async fn run_initiator<T: SyncTransport>(
127 transport: &mut T,
128 store: &Store,
129 context_id: ContextId,
130 identity: PublicKey,
131 config: Self::Config,
132 ) -> Result<Self::Stats>;
133
134 /// Run the responder side of the protocol.
135 ///
136 /// The responder answers requests from the initiator. The first request's
137 /// data is passed via `first_request` because the manager has already
138 /// consumed the first message for routing.
139 ///
140 /// # Arguments
141 ///
142 /// * `transport` - The transport for sending/receiving messages
143 /// * `store` - The local storage
144 /// * `context_id` - The context being synced
145 /// * `identity` - Our identity for this context
146 /// * `first_request` - Data from the first `InitPayload`, extracted by the manager
147 async fn run_responder<T: SyncTransport>(
148 transport: &mut T,
149 store: &Store,
150 context_id: ContextId,
151 identity: PublicKey,
152 first_request: Self::ResponderInit,
153 ) -> Result<()>;
154}