1use std::net::{SocketAddr, UdpSocket};
2use std::sync::mpsc::{Sender, Receiver, channel};
3use udp2p_protocol::protocol::{AckMessage, Message, MessageKey, Header};
4use udp2p_node::peer_id::PeerId;
5use udp2p_node::peer_key::Key;
6use udp2p_node::peer_info::PeerInfo;
7use udp2p_discovery::kad::Kademlia;
8use udp2p_discovery::routing::RoutingTable;
9use udp2p_transport::transport::Transport;
10use udp2p_transport::handler::MessageHandler;
11use std::collections::{HashMap, HashSet};
12use std::thread;
13use std::env::args;
14use udp2p_gossip::gossip::{GossipConfig, GossipService};
15use udp2p_gossip::protocol::GossipMessage;
16use rand::{thread_rng, Rng};
17use std::time::{Duration, Instant};
18use udp2p_utils::utils::ByteRep;
19
20
21fn main() {
22 let port: usize = thread_rng().gen_range(9292..19292);
25 let addr: SocketAddr = format!("127.0.0.1:{}", port)
26 .parse()
27 .expect("Unable to parse address");
28 let sock: UdpSocket = UdpSocket::bind(addr).expect("Unable to bind to address");
29
30 let (to_transport_tx, to_transport_rx): (
32 Sender<(SocketAddr, Message)>,
33 Receiver<(SocketAddr, Message)>,
34 ) = channel();
35 let (to_gossip_tx, to_gossip_rx) = channel();
36 let (to_kad_tx, to_kad_rx) = channel();
37 let (incoming_ack_tx, incoming_ack_rx): (Sender<AckMessage>, Receiver<AckMessage>) = channel();
38 let (to_app_tx, _to_app_rx) = channel::<GossipMessage>();
39
40 let key: Key = Key::rand();
42 let id: PeerId = PeerId::from_key(&key);
43 let info: PeerInfo = PeerInfo::new(id, key, addr.clone());
44
45 let routing_table = RoutingTable::new(info.clone());
47 let ping_pong = Instant::now();
48 let interval = Duration::from_secs(20);
49 let kad = Kademlia::new(routing_table, to_transport_tx.clone(), to_kad_rx, HashSet::new(), interval, ping_pong.clone());
50 let mut transport = Transport::new(addr.clone(), incoming_ack_rx, to_transport_rx);
51 let mut message_handler = MessageHandler::new(
52 to_transport_tx.clone(),
53 incoming_ack_tx.clone(),
54 HashMap::new(),
55 to_kad_tx.clone(),
56 to_gossip_tx.clone(),
57 );
58 let protocol_id = String::from("vrrb-0.1.0-test-net");
59 let gossip_config = GossipConfig::new(
60 protocol_id,
61 8,
62 3,
63 8,
64 3,
65 12,
66 3,
67 0.4,
68 Duration::from_millis(250),
69 80,
70 );
71 let heartbeat = Instant::now();
72 let ping_pong = Instant::now();
73 let mut gossip = GossipService::new(
74 addr.clone(),
75 to_gossip_rx,
76 to_transport_tx.clone(),
77 to_app_tx.clone(),
78 kad,
79 gossip_config,
80 heartbeat,
81 ping_pong,
82 );
83
84 println!("My Address: {:?}", addr);
86 println!("My ID: {:?}", info.id);
87 let thread_sock = sock.try_clone().expect("Unable to clone socket");
89 thread::spawn(move || {
90 let inner_sock = thread_sock.try_clone().expect("Unable to clone socket");
91 thread::spawn(move || loop {
92 transport.incoming_ack();
93 transport.outgoing_msg(&inner_sock);
94 transport.check_time_elapsed(&inner_sock);
95 });
96
97 loop {
98 let local = addr.clone();
99 let mut buf = [0u8; 65536];
100 message_handler.recv_msg(&thread_sock, &mut buf, local);
101 }
102 });
103
104 if let Some(to_dial) = args().nth(1) {
105 let bootstrap: SocketAddr = to_dial.parse().expect("Unable to parse address");
106 gossip.kad.bootstrap(&bootstrap);
107 if let Some(bytes) = info.as_bytes() {
108 gossip.kad.add_peer(bytes)
109 }
110 } else {
111 if let Some(bytes) = info.as_bytes() {
112 gossip.kad.add_peer(bytes)
113 }
114 }
115
116 let thread_to_gossip = to_gossip_tx.clone();
117 thread::spawn(move || {
118 loop {
119 let mut line = String::new();
120 let input = std::io::stdin().read_line(&mut line);
121 if let Ok(_) = input {
122 let msg_id = MessageKey::rand();
123 let msg = GossipMessage {
124 id: msg_id.inner(),
125 data: line.trim().as_bytes().to_vec(),
126 sender: addr.clone()
127 };
128
129 let message = Message {
130 head: Header::Gossip,
131 msg: msg.as_bytes().unwrap()
132 };
133
134 if let Err(_) = thread_to_gossip.clone().send((addr.clone(), message)) {
135 println!("Error sending message to gossip")
136 }
137 }
138 }
139 });
140
141 gossip.start()
142}