1use libp2p::{gossipsub, mdns, noise, swarm::NetworkBehaviour, tcp, yamux, Swarm};
2use openrank_common::{
3 topics::{Domain, Topic},
4 txs::Address,
5};
6use std::{
7 error::Error,
8 hash::{DefaultHasher, Hash, Hasher},
9 time::Duration,
10};
11use tokio::{
12 io::{self, AsyncBufReadExt},
13 select,
14};
15use tracing_subscriber::EnvFilter;
16
17#[derive(NetworkBehaviour)]
19struct MyBehaviour {
20 gossipsub: gossipsub::Behaviour,
21 mdns: mdns::tokio::Behaviour,
22}
23
24async fn build_node() -> Result<Swarm<MyBehaviour>, Box<dyn Error>> {
25 let swarm = libp2p::SwarmBuilder::with_new_identity()
26 .with_tokio()
27 .with_tcp(
28 tcp::Config::default(),
29 noise::Config::new,
30 yamux::Config::default,
31 )?
32 .with_quic()
33 .with_behaviour(|key| {
34 let message_id_fn = |message: &gossipsub::Message| {
36 let mut s = DefaultHasher::new();
37 message.data.hash(&mut s);
38 gossipsub::MessageId::from(s.finish().to_string())
39 };
40
41 let gossipsub_config = gossipsub::ConfigBuilder::default()
43 .heartbeat_interval(Duration::from_secs(10)) .validation_mode(gossipsub::ValidationMode::Strict) .message_id_fn(message_id_fn) .build()
47 .map_err(|msg| io::Error::new(io::ErrorKind::Other, msg))?; let gossipsub = gossipsub::Behaviour::new(
51 gossipsub::MessageAuthenticity::Signed(key.clone()),
52 gossipsub_config,
53 )?;
54
55 let mdns =
56 mdns::tokio::Behaviour::new(mdns::Config::default(), key.public().to_peer_id())?;
57 Ok(MyBehaviour { gossipsub, mdns })
58 })?
59 .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60)))
60 .build();
61
62 Ok(swarm)
63}
64
65pub async fn run() -> Result<(), Box<dyn Error>> {
66 let _ = tracing_subscriber::fmt().with_env_filter(EnvFilter::from_default_env()).try_init();
67
68 let mut swarm = build_node().await?;
69
70 let domains = vec![Domain::new(
71 Address::default(),
72 "1".to_string(),
73 Address::default(),
74 "1".to_string(),
75 0,
76 )];
77 let sub_topics_assignment: Vec<Topic> = domains
78 .clone()
79 .into_iter()
80 .map(|x| x.to_hash())
81 .map(|domain_hash| Topic::DomainAssignent(domain_hash.clone()))
82 .collect();
83 let sub_topics_scores: Vec<Topic> = domains
84 .clone()
85 .into_iter()
86 .map(|x| x.to_hash())
87 .map(|domain_hash| Topic::DomainScores(domain_hash.clone()))
88 .collect();
89 let sub_topics_commitment: Vec<Topic> = domains
90 .clone()
91 .into_iter()
92 .map(|x| x.to_hash())
93 .map(|domain_hash| Topic::DomainCommitment(domain_hash.clone()))
94 .collect();
95
96 let mut stdin = io::BufReader::new(io::stdin()).lines();
98
99 swarm.listen_on("/ip4/0.0.0.0/udp/0/quic-v1".parse()?)?;
101 swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
102
103 println!("Enter messages via STDIN and they will be sent to connected peers using Gossipsub");
104
105 loop {
107 select! {
108 Ok(Some(line)) = stdin.next_line() => {
109 match line.as_str() {
110 "assignment" => {
111 for topic in &sub_topics_assignment {
112 let topic_wrapper = gossipsub::IdentTopic::new(topic.to_hash().to_hex());
113 if let Err(e) = swarm
114 .behaviour_mut().gossipsub
115 .publish(topic_wrapper, line.as_bytes()) {
116 println!("Publish error: {e:?}");
117 }
118 }
119 },
120 "scores" => {
121 for topic in &sub_topics_scores {
122 let topic_wrapper = gossipsub::IdentTopic::new(topic.to_hash().to_hex());
123 if let Err(e) = swarm
124 .behaviour_mut().gossipsub
125 .publish(topic_wrapper, line.as_bytes()) {
126 println!("Publish error: {e:?}");
127 }
128 }
129 },
130 "commitment" => {
131 for topic in &sub_topics_commitment {
132 let topic_wrapper = gossipsub::IdentTopic::new(topic.to_hash().to_hex());
133 if let Err(e) = swarm
134 .behaviour_mut().gossipsub
135 .publish(topic_wrapper, line.as_bytes()) {
136 println!("Publish error: {e:?}");
137 }
138 }
139 },
140 &_ => {}
141 }
142 }
143 }
144 }
145}