1use commonware_codec::DecodeExt;
16use commonware_consensus::marshal;
17use commonware_cryptography::{
18 ed25519::{PrivateKey, PublicKey},
19 PrivateKeyExt, Signer,
20};
21use commonware_p2p::{
22 authenticated::discovery as authenticated, utils::requester, Manager, Receiver, Sender,
23};
24use commonware_runtime::{Clock, Metrics, Network as RNetwork, Spawner, Storage};
25use commonware_utils::{set::Ordered, union_unique};
26use futures::channel::mpsc;
27use governor::clock::{Clock as GClock, ReasonablyRealtime};
28use governor::Quota;
29use guts_consensus::simplex::{SimplexBlock, NAMESPACE};
30use rand::{CryptoRng, Rng};
31use std::{
32 net::{IpAddr, Ipv4Addr, SocketAddr},
33 num::NonZeroU32,
34 time::Duration,
35};
36use tracing::info;
37
38#[derive(Clone)]
42pub struct P2PManager {
43 pub public_key: PublicKey,
45}
46
47impl P2PManager {
48 pub fn new(private_key: &PrivateKey) -> Self {
50 Self {
51 public_key: private_key.public_key(),
52 }
53 }
54
55 pub fn notify_update(
59 &self,
60 _repo_key: &str,
61 _new_objects: Vec<guts_storage::ObjectId>,
62 _refs: Vec<(String, guts_storage::ObjectId)>,
63 ) {
64 tracing::debug!("P2P notify_update called (stub)");
66 }
67
68 pub fn register_repo(&self, _key: String, _repo: std::sync::Arc<guts_storage::Repository>) {
72 tracing::debug!("P2P register_repo called (stub)");
74 }
75}
76
77pub const PENDING_CHANNEL: u64 = 0;
79pub const RECOVERED_CHANNEL: u64 = 1;
80pub const RESOLVER_CHANNEL: u64 = 2;
81pub const BROADCAST_CHANNEL: u64 = 3;
82pub const MARSHAL_CHANNEL: u64 = 4;
83
84pub const MAX_MESSAGE_SIZE: usize = 1024 * 1024;
86
87#[derive(Clone)]
89pub struct AuthenticatedP2pConfig {
90 pub private_key: PrivateKey,
92 pub listen_addr: SocketAddr,
94 pub external_addr: SocketAddr,
96 pub bootstrappers: Vec<(PublicKey, SocketAddr)>,
98 pub mailbox_size: usize,
100 pub message_backlog: usize,
102 pub local: bool,
104}
105
106impl AuthenticatedP2pConfig {
107 pub fn new(private_key: PrivateKey, listen_port: u16) -> Self {
109 let listen_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), listen_port);
110 let external_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), listen_port);
111
112 Self {
113 private_key,
114 listen_addr,
115 external_addr,
116 bootstrappers: Vec::new(),
117 mailbox_size: 1024,
118 message_backlog: 1024,
119 local: true, }
121 }
122
123 pub fn with_external_addr(mut self, addr: SocketAddr) -> Self {
125 self.external_addr = addr;
126 self
127 }
128
129 pub fn with_bootstrapper(mut self, key: PublicKey, addr: SocketAddr) -> Self {
131 self.bootstrappers.push((key, addr));
132 self
133 }
134
135 pub fn with_local(mut self, local: bool) -> Self {
137 self.local = local;
138 self
139 }
140}
141
142pub struct ConsensusChannels<S: Sender, R: Receiver, Res> {
144 pub pending: (S, R),
146 pub recovered: (S, R),
148 pub resolver: (S, R),
150 pub broadcast: (S, R),
152 pub marshal: (
154 mpsc::Receiver<marshal::ingress::handler::Message<SimplexBlock>>,
155 Res,
156 ),
157}
158
159pub struct AuthenticatedNetwork<E>
161where
162 E: Clock
163 + GClock
164 + ReasonablyRealtime
165 + Rng
166 + CryptoRng
167 + Spawner
168 + Storage
169 + Metrics
170 + RNetwork
171 + Clone,
172{
173 #[allow(dead_code)]
175 context: E,
176 pub public_key: PublicKey,
178 pub oracle: authenticated::Oracle<PublicKey>,
180}
181
182impl<E> AuthenticatedNetwork<E>
183where
184 E: Clock
185 + GClock
186 + ReasonablyRealtime
187 + Rng
188 + CryptoRng
189 + Spawner
190 + Storage
191 + Metrics
192 + RNetwork
193 + Clone,
194{
195 pub async fn new(
199 context: E,
200 config: AuthenticatedP2pConfig,
201 participants: Vec<PublicKey>,
202 ) -> (
203 Self,
204 ConsensusChannels<
205 authenticated::Sender<PublicKey>,
206 authenticated::Receiver<PublicKey>,
207 impl commonware_resolver::Resolver<Key = marshal::ingress::handler::Request<SimplexBlock>>,
208 >,
209 commonware_runtime::Handle<()>,
210 ) {
211 let public_key = config.private_key.public_key();
212
213 let p2p_namespace = union_unique(NAMESPACE, b"_P2P");
215
216 let mut p2p_cfg = if config.local {
218 authenticated::Config::local(
219 config.private_key.clone(),
220 &p2p_namespace,
221 config.listen_addr,
222 config.external_addr,
223 config.bootstrappers.clone(),
224 MAX_MESSAGE_SIZE,
225 )
226 } else {
227 authenticated::Config::recommended(
228 config.private_key.clone(),
229 &p2p_namespace,
230 config.listen_addr,
231 config.external_addr,
232 config.bootstrappers.clone(),
233 MAX_MESSAGE_SIZE,
234 )
235 };
236 p2p_cfg.mailbox_size = config.mailbox_size;
237
238 let (mut network, mut oracle) = authenticated::Network::new(context.clone(), p2p_cfg);
240
241 let participants_ordered: Ordered<PublicKey> = participants.into_iter().collect();
243 oracle.update(0, participants_ordered).await;
244
245 let pending_quota = Quota::per_second(NonZeroU32::new(128).unwrap());
247 let pending = network.register(PENDING_CHANNEL, pending_quota, config.message_backlog);
248
249 let recovered_quota = Quota::per_second(NonZeroU32::new(128).unwrap());
250 let recovered =
251 network.register(RECOVERED_CHANNEL, recovered_quota, config.message_backlog);
252
253 let resolver_quota = Quota::per_second(NonZeroU32::new(128).unwrap());
254 let resolver = network.register(RESOLVER_CHANNEL, resolver_quota, config.message_backlog);
255
256 let broadcast_quota = Quota::per_second(NonZeroU32::new(8).unwrap());
257 let broadcast =
258 network.register(BROADCAST_CHANNEL, broadcast_quota, config.message_backlog);
259
260 let marshal_quota = Quota::per_second(NonZeroU32::new(8).unwrap());
261 let marshal_channel =
262 network.register(MARSHAL_CHANNEL, marshal_quota, config.message_backlog);
263
264 let network_handle = network.start();
266
267 let marshal_resolver_cfg = marshal::resolver::p2p::Config {
269 public_key: public_key.clone(),
270 manager: oracle.clone(),
271 mailbox_size: config.mailbox_size,
272 requester_config: requester::Config {
273 me: Some(public_key.clone()),
274 rate_limit: Quota::per_second(NonZeroU32::new(5).unwrap()),
275 initial: Duration::from_secs(1),
276 timeout: Duration::from_secs(2),
277 },
278 fetch_retry_timeout: Duration::from_millis(100),
279 priority_requests: false,
280 priority_responses: false,
281 };
282 let marshal_resolver =
283 marshal::resolver::p2p::init(&context, marshal_resolver_cfg, marshal_channel);
284
285 info!(
286 ?public_key,
287 listen_addr = %config.listen_addr,
288 external_addr = %config.external_addr,
289 bootstrappers = config.bootstrappers.len(),
290 "P2P network started"
291 );
292
293 let channels = ConsensusChannels {
294 pending,
295 recovered,
296 resolver,
297 broadcast,
298 marshal: marshal_resolver,
299 };
300
301 (
302 Self {
303 context,
304 public_key,
305 oracle,
306 },
307 channels,
308 network_handle,
309 )
310 }
311
312 pub async fn update_participants(&mut self, epoch: u64, participants: Vec<PublicKey>) {
314 let participants_ordered: Ordered<PublicKey> = participants.into_iter().collect();
315 self.oracle.update(epoch, participants_ordered).await;
316 }
317}
318
319pub type TokioContext = commonware_runtime::tokio::Context;
322
323pub fn parse_public_key(hex_str: &str) -> Result<PublicKey, String> {
325 let hex_str = hex_str.strip_prefix("0x").unwrap_or(hex_str);
326 let bytes = hex::decode(hex_str).map_err(|e| format!("Invalid hex: {}", e))?;
327 PublicKey::decode(bytes.as_ref()).map_err(|e| format!("Invalid public key: {:?}", e))
328}
329
330pub fn parse_private_key(hex_str: &str) -> Result<PrivateKey, String> {
332 let hex_str = hex_str.strip_prefix("0x").unwrap_or(hex_str);
333 let bytes = hex::decode(hex_str).map_err(|e| format!("Invalid hex: {}", e))?;
334
335 if bytes.len() >= 8 {
336 let seed = u64::from_le_bytes(bytes[..8].try_into().unwrap());
338 Ok(PrivateKey::from_seed(seed))
339 } else {
340 Err("Private key hex must be at least 8 bytes".to_string())
341 }
342}
343
344pub fn parse_bootstrapper(s: &str) -> Result<(PublicKey, SocketAddr), String> {
346 let parts: Vec<&str> = s.split('@').collect();
347 if parts.len() != 2 {
348 return Err(format!(
349 "Invalid bootstrapper format '{}', expected 'pubkey@host:port'",
350 s
351 ));
352 }
353
354 let public_key = parse_public_key(parts[0])?;
355 let addr: SocketAddr = parts[1]
356 .parse()
357 .map_err(|e| format!("Invalid address '{}': {}", parts[1], e))?;
358
359 Ok((public_key, addr))
360}
361
362#[cfg(test)]
363mod tests {
364 use super::*;
365
366 #[test]
367 fn test_parse_public_key() {
368 let private_key = PrivateKey::from_seed(42);
370 let public_key = private_key.public_key();
371 let hex_str = hex::encode(public_key.as_ref());
372
373 let parsed = parse_public_key(&hex_str).unwrap();
374 assert_eq!(parsed, public_key);
375
376 let hex_with_prefix = format!("0x{}", hex_str);
378 let parsed = parse_public_key(&hex_with_prefix).unwrap();
379 assert_eq!(parsed, public_key);
380 }
381
382 #[test]
383 fn test_parse_private_key() {
384 let hex_str = "0123456789abcdef0123456789abcdef";
385 let key = parse_private_key(hex_str).unwrap();
386 assert!(!key.public_key().as_ref().is_empty());
387 }
388
389 #[test]
390 fn test_parse_bootstrapper() {
391 let private_key = PrivateKey::from_seed(42);
392 let public_key = private_key.public_key();
393 let hex_str = hex::encode(public_key.as_ref());
394
395 let bootstrapper_str = format!("{}@127.0.0.1:9000", hex_str);
396 let (parsed_key, addr) = parse_bootstrapper(&bootstrapper_str).unwrap();
397
398 assert_eq!(parsed_key, public_key);
399 assert_eq!(addr, "127.0.0.1:9000".parse().unwrap());
400 }
401}