Skip to main content

calimero_node_primitives/sync/
transport.rs

1//! Transport abstraction for sync protocols.
2//!
3//! This module provides the [`SyncTransport`] trait that abstracts the underlying
4//! network transport, enabling:
5//!
6//! - Production code to use libp2p streams
7//! - Simulation tests to use in-memory channels
8//! - Same protocol code to run in both environments
9//!
10//! # Design Rationale
11//!
12//! The sync protocol code needs to send and receive [`StreamMessage`] payloads.
13//! By abstracting this behind a trait, we can:
14//!
15//! 1. Test the actual production protocol logic in simulation
16//! 2. Verify invariants (I4, I5, I6) with real message flow
17//! 3. Inject network faults (latency, loss, reorder) in tests
18//!
19//! # Example
20//!
21//! ```ignore
22//! async fn hash_comparison_sync<T: SyncTransport>(
23//!     transport: &mut T,
24//!     context_id: ContextId,
25//!     // ...
26//! ) -> Result<Stats> {
27//!     transport.send(&request_msg).await?;
28//!     let response = transport.recv().await?;
29//!     // ...
30//! }
31//! ```
32
33use std::time::Duration;
34
35use async_trait::async_trait;
36use calimero_crypto::{Nonce, SharedKey};
37use eyre::Result;
38
39use super::wire::StreamMessage;
40
41/// Transport abstraction for sync protocol message exchange.
42///
43/// Implementations handle serialization, optional encryption, and the
44/// underlying transport mechanism (network streams or in-memory channels).
45///
46/// # Encryption
47///
48/// Transport implementations may support optional encryption. Use
49/// [`set_encryption`](SyncTransport::set_encryption) to configure the
50/// shared key and nonce for encrypted communication.
51#[async_trait]
52pub trait SyncTransport: Send {
53    /// Send a message to the peer.
54    ///
55    /// The implementation handles serialization and optional encryption.
56    ///
57    /// # Errors
58    ///
59    /// Returns error if serialization, encryption, or send fails.
60    async fn send(&mut self, message: &StreamMessage<'_>) -> Result<()>;
61
62    /// Receive a message from the peer.
63    ///
64    /// The implementation handles deserialization and optional decryption.
65    ///
66    /// # Errors
67    ///
68    /// Returns error if receive, decryption, or deserialization fails.
69    /// Returns `Ok(None)` if the stream is closed.
70    async fn recv(&mut self) -> Result<Option<StreamMessage<'static>>>;
71
72    /// Receive a message with a timeout.
73    ///
74    /// # Errors
75    ///
76    /// Returns error if timeout expires or receive fails.
77    async fn recv_timeout(&mut self, timeout: Duration) -> Result<Option<StreamMessage<'static>>>;
78
79    /// Set encryption parameters for subsequent send/recv operations.
80    ///
81    /// Pass `None` to disable encryption.
82    fn set_encryption(&mut self, encryption: Option<(SharedKey, Nonce)>);
83
84    /// Get the current encryption parameters.
85    fn encryption(&self) -> Option<(SharedKey, Nonce)>;
86
87    /// Close the transport.
88    ///
89    /// After closing, further send/recv calls will fail.
90    async fn close(&mut self) -> Result<()>;
91}
92
93// =============================================================================
94// Encryption Helper
95// =============================================================================
96
97/// Common encryption state that implementations can embed.
98#[derive(Debug, Clone, Default)]
99pub struct EncryptionState {
100    /// Current encryption key and nonce.
101    pub key_nonce: Option<(SharedKey, Nonce)>,
102}
103
104impl EncryptionState {
105    /// Create new encryption state (no encryption).
106    #[must_use]
107    pub fn new() -> Self {
108        Self { key_nonce: None }
109    }
110
111    /// Set encryption parameters.
112    pub fn set(&mut self, encryption: Option<(SharedKey, Nonce)>) {
113        self.key_nonce = encryption;
114    }
115
116    /// Get current encryption parameters.
117    #[must_use]
118    pub fn get(&self) -> Option<(SharedKey, Nonce)> {
119        self.key_nonce.clone()
120    }
121
122    /// Encrypt data if encryption is configured.
123    ///
124    /// # Errors
125    ///
126    /// Returns error if encryption fails.
127    pub fn encrypt(&self, data: Vec<u8>) -> Result<Vec<u8>> {
128        match &self.key_nonce {
129            Some((key, nonce)) => key
130                .encrypt(data, *nonce)
131                .ok_or_else(|| eyre::eyre!("encryption failed")),
132            None => Ok(data),
133        }
134    }
135
136    /// Decrypt data if encryption is configured.
137    ///
138    /// # Errors
139    ///
140    /// Returns error if decryption fails.
141    pub fn decrypt(&self, data: Vec<u8>) -> Result<Vec<u8>> {
142        match &self.key_nonce {
143            Some((key, nonce)) => key
144                .decrypt(data, *nonce)
145                .ok_or_else(|| eyre::eyre!("decryption failed")),
146            None => Ok(data),
147        }
148    }
149}
150
151// =============================================================================
152// Tests
153// =============================================================================
154
155#[cfg(test)]
156mod tests {
157    use super::*;
158
159    #[test]
160    fn test_encryption_state_default() {
161        let state = EncryptionState::new();
162        assert!(state.get().is_none());
163    }
164
165    #[test]
166    fn test_encryption_state_passthrough() {
167        let state = EncryptionState::new();
168        let data = b"hello world".to_vec();
169        let encrypted = state.encrypt(data.clone()).unwrap();
170        assert_eq!(encrypted, data); // No encryption = passthrough
171        let decrypted = state.decrypt(encrypted).unwrap();
172        assert_eq!(decrypted, data);
173    }
174}