calimero_node_primitives/sync/transport.rs
1//! Transport abstraction for sync protocols.
2//!
3//! This module provides the [`SyncTransport`] trait that abstracts the underlying
4//! network transport, enabling:
5//!
6//! - Production code to use libp2p streams
7//! - Simulation tests to use in-memory channels
8//! - Same protocol code to run in both environments
9//!
10//! # Design Rationale
11//!
12//! The sync protocol code needs to send and receive [`StreamMessage`] payloads.
13//! By abstracting this behind a trait, we can:
14//!
15//! 1. Test the actual production protocol logic in simulation
16//! 2. Verify invariants (I4, I5, I6) with real message flow
17//! 3. Inject network faults (latency, loss, reorder) in tests
18//!
19//! # Example
20//!
21//! ```ignore
22//! async fn hash_comparison_sync<T: SyncTransport>(
23//! transport: &mut T,
24//! context_id: ContextId,
25//! // ...
26//! ) -> Result<Stats> {
27//! transport.send(&request_msg).await?;
28//! let response = transport.recv().await?;
29//! // ...
30//! }
31//! ```
32
33use std::time::Duration;
34
35use async_trait::async_trait;
36use calimero_crypto::{Nonce, SharedKey};
37use eyre::Result;
38
39use super::wire::StreamMessage;
40
41/// Transport abstraction for sync protocol message exchange.
42///
43/// Implementations handle serialization, optional encryption, and the
44/// underlying transport mechanism (network streams or in-memory channels).
45///
46/// # Encryption
47///
48/// Transport implementations may support optional encryption. Use
49/// [`set_encryption`](SyncTransport::set_encryption) to configure the
50/// shared key and nonce for encrypted communication.
51#[async_trait]
52pub trait SyncTransport: Send {
53 /// Send a message to the peer.
54 ///
55 /// The implementation handles serialization and optional encryption.
56 ///
57 /// # Errors
58 ///
59 /// Returns error if serialization, encryption, or send fails.
60 async fn send(&mut self, message: &StreamMessage<'_>) -> Result<()>;
61
62 /// Receive a message from the peer.
63 ///
64 /// The implementation handles deserialization and optional decryption.
65 ///
66 /// # Errors
67 ///
68 /// Returns error if receive, decryption, or deserialization fails.
69 /// Returns `Ok(None)` if the stream is closed.
70 async fn recv(&mut self) -> Result<Option<StreamMessage<'static>>>;
71
72 /// Receive a message with a timeout.
73 ///
74 /// # Errors
75 ///
76 /// Returns error if timeout expires or receive fails.
77 async fn recv_timeout(&mut self, timeout: Duration) -> Result<Option<StreamMessage<'static>>>;
78
79 /// Set encryption parameters for subsequent send/recv operations.
80 ///
81 /// Pass `None` to disable encryption.
82 fn set_encryption(&mut self, encryption: Option<(SharedKey, Nonce)>);
83
84 /// Get the current encryption parameters.
85 fn encryption(&self) -> Option<(SharedKey, Nonce)>;
86
87 /// Close the transport.
88 ///
89 /// After closing, further send/recv calls will fail.
90 async fn close(&mut self) -> Result<()>;
91}
92
93// =============================================================================
94// Encryption Helper
95// =============================================================================
96
97/// Common encryption state that implementations can embed.
98#[derive(Debug, Clone, Default)]
99pub struct EncryptionState {
100 /// Current encryption key and nonce.
101 pub key_nonce: Option<(SharedKey, Nonce)>,
102}
103
104impl EncryptionState {
105 /// Create new encryption state (no encryption).
106 #[must_use]
107 pub fn new() -> Self {
108 Self { key_nonce: None }
109 }
110
111 /// Set encryption parameters.
112 pub fn set(&mut self, encryption: Option<(SharedKey, Nonce)>) {
113 self.key_nonce = encryption;
114 }
115
116 /// Get current encryption parameters.
117 #[must_use]
118 pub fn get(&self) -> Option<(SharedKey, Nonce)> {
119 self.key_nonce.clone()
120 }
121
122 /// Encrypt data if encryption is configured.
123 ///
124 /// # Errors
125 ///
126 /// Returns error if encryption fails.
127 pub fn encrypt(&self, data: Vec<u8>) -> Result<Vec<u8>> {
128 match &self.key_nonce {
129 Some((key, nonce)) => key
130 .encrypt(data, *nonce)
131 .ok_or_else(|| eyre::eyre!("encryption failed")),
132 None => Ok(data),
133 }
134 }
135
136 /// Decrypt data if encryption is configured.
137 ///
138 /// # Errors
139 ///
140 /// Returns error if decryption fails.
141 pub fn decrypt(&self, data: Vec<u8>) -> Result<Vec<u8>> {
142 match &self.key_nonce {
143 Some((key, nonce)) => key
144 .decrypt(data, *nonce)
145 .ok_or_else(|| eyre::eyre!("decryption failed")),
146 None => Ok(data),
147 }
148 }
149}
150
151// =============================================================================
152// Tests
153// =============================================================================
154
155#[cfg(test)]
156mod tests {
157 use super::*;
158
159 #[test]
160 fn test_encryption_state_default() {
161 let state = EncryptionState::new();
162 assert!(state.get().is_none());
163 }
164
165 #[test]
166 fn test_encryption_state_passthrough() {
167 let state = EncryptionState::new();
168 let data = b"hello world".to_vec();
169 let encrypted = state.encrypt(data.clone()).unwrap();
170 assert_eq!(encrypted, data); // No encryption = passthrough
171 let decrypted = state.decrypt(encrypted).unwrap();
172 assert_eq!(decrypted, data);
173 }
174}