guts_node/
p2p.rs

1//! P2P networking integration for guts-node.
2//!
3//! This module provides the P2P layer integration using commonware-p2p's
4//! authenticated network for production BFT consensus.
5//!
6//! # Channels
7//!
8//! The consensus engine requires several P2P channels:
9//! - **Pending**: For pending consensus votes (channel 0)
10//! - **Recovered**: For recovered messages after reconnection (channel 1)
11//! - **Resolver**: For fetching missing certificates (channel 2)
12//! - **Broadcast**: For block broadcast messages (channel 3)
13//! - **Marshal**: For block sync messages (channel 4)
14
15use commonware_codec::DecodeExt;
16use commonware_consensus::marshal;
17use commonware_cryptography::{
18    ed25519::{PrivateKey, PublicKey},
19    PrivateKeyExt, Signer,
20};
21use commonware_p2p::{
22    authenticated::discovery as authenticated, utils::requester, Manager, Receiver, Sender,
23};
24use commonware_runtime::{Clock, Metrics, Network as RNetwork, Spawner, Storage};
25use commonware_utils::{set::Ordered, union_unique};
26use futures::channel::mpsc;
27use governor::clock::{Clock as GClock, ReasonablyRealtime};
28use governor::Quota;
29use guts_consensus::simplex::{SimplexBlock, NAMESPACE};
30use rand::{CryptoRng, Rng};
31use std::{
32    net::{IpAddr, Ipv4Addr, SocketAddr},
33    num::NonZeroU32,
34    time::Duration,
35};
36use tracing::info;
37
38/// Legacy P2P manager stub for backward compatibility.
39///
40/// This will be replaced by the authenticated network for real consensus.
41#[derive(Clone)]
42pub struct P2PManager {
43    /// Our public key.
44    pub public_key: PublicKey,
45}
46
47impl P2PManager {
48    /// Create a new P2P manager stub.
49    pub fn new(private_key: &PrivateKey) -> Self {
50        Self {
51            public_key: private_key.public_key(),
52        }
53    }
54
55    /// Stub: Notify peers about a repository update.
56    ///
57    /// In the real consensus system, replication happens via the consensus layer.
58    pub fn notify_update(
59        &self,
60        _repo_key: &str,
61        _new_objects: Vec<guts_storage::ObjectId>,
62        _refs: Vec<(String, guts_storage::ObjectId)>,
63    ) {
64        // Replication will be handled by consensus in the real implementation
65        tracing::debug!("P2P notify_update called (stub)");
66    }
67
68    /// Stub: Register a repository for replication.
69    ///
70    /// In the real consensus system, repositories are replicated via consensus.
71    pub fn register_repo(&self, _key: String, _repo: std::sync::Arc<guts_storage::Repository>) {
72        // Registration will be handled differently in the real implementation
73        tracing::debug!("P2P register_repo called (stub)");
74    }
75}
76
77/// Channel IDs for consensus messaging.
78pub const PENDING_CHANNEL: u64 = 0;
79pub const RECOVERED_CHANNEL: u64 = 1;
80pub const RESOLVER_CHANNEL: u64 = 2;
81pub const BROADCAST_CHANNEL: u64 = 3;
82pub const MARSHAL_CHANNEL: u64 = 4;
83
84/// Maximum P2P message size (1MB).
85pub const MAX_MESSAGE_SIZE: usize = 1024 * 1024;
86
87/// Configuration for the authenticated P2P network.
88#[derive(Clone)]
89pub struct AuthenticatedP2pConfig {
90    /// Our private key for signing.
91    pub private_key: PrivateKey,
92    /// P2P listen address.
93    pub listen_addr: SocketAddr,
94    /// Our external address (for NAT traversal).
95    pub external_addr: SocketAddr,
96    /// Bootstrap nodes (public key, address).
97    pub bootstrappers: Vec<(PublicKey, SocketAddr)>,
98    /// Mailbox size for message channels.
99    pub mailbox_size: usize,
100    /// Message backlog size.
101    pub message_backlog: usize,
102    /// Whether running in local mode (relaxed timing).
103    pub local: bool,
104}
105
106impl AuthenticatedP2pConfig {
107    /// Creates a new configuration with sensible defaults.
108    pub fn new(private_key: PrivateKey, listen_port: u16) -> Self {
109        let listen_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), listen_port);
110        let external_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), listen_port);
111
112        Self {
113            private_key,
114            listen_addr,
115            external_addr,
116            bootstrappers: Vec::new(),
117            mailbox_size: 1024,
118            message_backlog: 1024,
119            local: true, // Default to local mode for development
120        }
121    }
122
123    /// Set the external address.
124    pub fn with_external_addr(mut self, addr: SocketAddr) -> Self {
125        self.external_addr = addr;
126        self
127    }
128
129    /// Add a bootstrapper.
130    pub fn with_bootstrapper(mut self, key: PublicKey, addr: SocketAddr) -> Self {
131        self.bootstrappers.push((key, addr));
132        self
133    }
134
135    /// Set local mode.
136    pub fn with_local(mut self, local: bool) -> Self {
137        self.local = local;
138        self
139    }
140}
141
142/// Consensus P2P channels for the simplex engine.
143pub struct ConsensusChannels<S: Sender, R: Receiver, Res> {
144    /// Pending channel (sender, receiver).
145    pub pending: (S, R),
146    /// Recovered channel (sender, receiver).
147    pub recovered: (S, R),
148    /// Resolver channel (sender, receiver).
149    pub resolver: (S, R),
150    /// Broadcast channel (sender, receiver).
151    pub broadcast: (S, R),
152    /// Marshal resolver (receiver, resolver).
153    pub marshal: (
154        mpsc::Receiver<marshal::ingress::handler::Message<SimplexBlock>>,
155        Res,
156    ),
157}
158
159/// Authenticated P2P network for BFT consensus.
160pub struct AuthenticatedNetwork<E>
161where
162    E: Clock
163        + GClock
164        + ReasonablyRealtime
165        + Rng
166        + CryptoRng
167        + Spawner
168        + Storage
169        + Metrics
170        + RNetwork
171        + Clone,
172{
173    /// The network context.
174    #[allow(dead_code)]
175    context: E,
176    /// Our public key.
177    pub public_key: PublicKey,
178    /// The network oracle for peer management.
179    pub oracle: authenticated::Oracle<PublicKey>,
180}
181
182impl<E> AuthenticatedNetwork<E>
183where
184    E: Clock
185        + GClock
186        + ReasonablyRealtime
187        + Rng
188        + CryptoRng
189        + Spawner
190        + Storage
191        + Metrics
192        + RNetwork
193        + Clone,
194{
195    /// Creates and starts a new authenticated P2P network.
196    ///
197    /// Returns the network handle and the consensus channels.
198    pub async fn new(
199        context: E,
200        config: AuthenticatedP2pConfig,
201        participants: Vec<PublicKey>,
202    ) -> (
203        Self,
204        ConsensusChannels<
205            authenticated::Sender<PublicKey>,
206            authenticated::Receiver<PublicKey>,
207            impl commonware_resolver::Resolver<Key = marshal::ingress::handler::Request<SimplexBlock>>,
208        >,
209        commonware_runtime::Handle<()>,
210    ) {
211        let public_key = config.private_key.public_key();
212
213        // Create P2P namespace
214        let p2p_namespace = union_unique(NAMESPACE, b"_P2P");
215
216        // Configure the network
217        let mut p2p_cfg = if config.local {
218            authenticated::Config::local(
219                config.private_key.clone(),
220                &p2p_namespace,
221                config.listen_addr,
222                config.external_addr,
223                config.bootstrappers.clone(),
224                MAX_MESSAGE_SIZE,
225            )
226        } else {
227            authenticated::Config::recommended(
228                config.private_key.clone(),
229                &p2p_namespace,
230                config.listen_addr,
231                config.external_addr,
232                config.bootstrappers.clone(),
233                MAX_MESSAGE_SIZE,
234            )
235        };
236        p2p_cfg.mailbox_size = config.mailbox_size;
237
238        // Create the network
239        let (mut network, mut oracle) = authenticated::Network::new(context.clone(), p2p_cfg);
240
241        // Provide authorized peers
242        let participants_ordered: Ordered<PublicKey> = participants.into_iter().collect();
243        oracle.update(0, participants_ordered).await;
244
245        // Register consensus channels
246        let pending_quota = Quota::per_second(NonZeroU32::new(128).unwrap());
247        let pending = network.register(PENDING_CHANNEL, pending_quota, config.message_backlog);
248
249        let recovered_quota = Quota::per_second(NonZeroU32::new(128).unwrap());
250        let recovered =
251            network.register(RECOVERED_CHANNEL, recovered_quota, config.message_backlog);
252
253        let resolver_quota = Quota::per_second(NonZeroU32::new(128).unwrap());
254        let resolver = network.register(RESOLVER_CHANNEL, resolver_quota, config.message_backlog);
255
256        let broadcast_quota = Quota::per_second(NonZeroU32::new(8).unwrap());
257        let broadcast =
258            network.register(BROADCAST_CHANNEL, broadcast_quota, config.message_backlog);
259
260        let marshal_quota = Quota::per_second(NonZeroU32::new(8).unwrap());
261        let marshal_channel =
262            network.register(MARSHAL_CHANNEL, marshal_quota, config.message_backlog);
263
264        // Start the network
265        let network_handle = network.start();
266
267        // Create marshal resolver
268        let marshal_resolver_cfg = marshal::resolver::p2p::Config {
269            public_key: public_key.clone(),
270            manager: oracle.clone(),
271            mailbox_size: config.mailbox_size,
272            requester_config: requester::Config {
273                me: Some(public_key.clone()),
274                rate_limit: Quota::per_second(NonZeroU32::new(5).unwrap()),
275                initial: Duration::from_secs(1),
276                timeout: Duration::from_secs(2),
277            },
278            fetch_retry_timeout: Duration::from_millis(100),
279            priority_requests: false,
280            priority_responses: false,
281        };
282        let marshal_resolver =
283            marshal::resolver::p2p::init(&context, marshal_resolver_cfg, marshal_channel);
284
285        info!(
286            ?public_key,
287            listen_addr = %config.listen_addr,
288            external_addr = %config.external_addr,
289            bootstrappers = config.bootstrappers.len(),
290            "P2P network started"
291        );
292
293        let channels = ConsensusChannels {
294            pending,
295            recovered,
296            resolver,
297            broadcast,
298            marshal: marshal_resolver,
299        };
300
301        (
302            Self {
303                context,
304                public_key,
305                oracle,
306            },
307            channels,
308            network_handle,
309        )
310    }
311
312    /// Update the set of authorized participants.
313    pub async fn update_participants(&mut self, epoch: u64, participants: Vec<PublicKey>) {
314        let participants_ordered: Ordered<PublicKey> = participants.into_iter().collect();
315        self.oracle.update(epoch, participants_ordered).await;
316    }
317}
318
319// Type alias for the tokio runtime context
320/// Type alias for the commonware tokio runtime.
321pub type TokioContext = commonware_runtime::tokio::Context;
322
323/// Parse a public key from hex string.
324pub fn parse_public_key(hex_str: &str) -> Result<PublicKey, String> {
325    let hex_str = hex_str.strip_prefix("0x").unwrap_or(hex_str);
326    let bytes = hex::decode(hex_str).map_err(|e| format!("Invalid hex: {}", e))?;
327    PublicKey::decode(bytes.as_ref()).map_err(|e| format!("Invalid public key: {:?}", e))
328}
329
330/// Parse a private key from hex string (using seed derivation).
331pub fn parse_private_key(hex_str: &str) -> Result<PrivateKey, String> {
332    let hex_str = hex_str.strip_prefix("0x").unwrap_or(hex_str);
333    let bytes = hex::decode(hex_str).map_err(|e| format!("Invalid hex: {}", e))?;
334
335    if bytes.len() >= 8 {
336        // Use the first 8 bytes as a seed
337        let seed = u64::from_le_bytes(bytes[..8].try_into().unwrap());
338        Ok(PrivateKey::from_seed(seed))
339    } else {
340        Err("Private key hex must be at least 8 bytes".to_string())
341    }
342}
343
344/// Parse bootstrapper string in format "pubkey@host:port".
345pub fn parse_bootstrapper(s: &str) -> Result<(PublicKey, SocketAddr), String> {
346    let parts: Vec<&str> = s.split('@').collect();
347    if parts.len() != 2 {
348        return Err(format!(
349            "Invalid bootstrapper format '{}', expected 'pubkey@host:port'",
350            s
351        ));
352    }
353
354    let public_key = parse_public_key(parts[0])?;
355    let addr: SocketAddr = parts[1]
356        .parse()
357        .map_err(|e| format!("Invalid address '{}': {}", parts[1], e))?;
358
359    Ok((public_key, addr))
360}
361
362#[cfg(test)]
363mod tests {
364    use super::*;
365
366    #[test]
367    fn test_parse_public_key() {
368        // Generate a key and convert to hex
369        let private_key = PrivateKey::from_seed(42);
370        let public_key = private_key.public_key();
371        let hex_str = hex::encode(public_key.as_ref());
372
373        let parsed = parse_public_key(&hex_str).unwrap();
374        assert_eq!(parsed, public_key);
375
376        // With 0x prefix
377        let hex_with_prefix = format!("0x{}", hex_str);
378        let parsed = parse_public_key(&hex_with_prefix).unwrap();
379        assert_eq!(parsed, public_key);
380    }
381
382    #[test]
383    fn test_parse_private_key() {
384        let hex_str = "0123456789abcdef0123456789abcdef";
385        let key = parse_private_key(hex_str).unwrap();
386        assert!(!key.public_key().as_ref().is_empty());
387    }
388
389    #[test]
390    fn test_parse_bootstrapper() {
391        let private_key = PrivateKey::from_seed(42);
392        let public_key = private_key.public_key();
393        let hex_str = hex::encode(public_key.as_ref());
394
395        let bootstrapper_str = format!("{}@127.0.0.1:9000", hex_str);
396        let (parsed_key, addr) = parse_bootstrapper(&bootstrapper_str).unwrap();
397
398        assert_eq!(parsed_key, public_key);
399        assert_eq!(addr, "127.0.0.1:9000".parse().unwrap());
400    }
401}