pub/
pub.rs

1use std::net::{SocketAddr, UdpSocket};
2use std::sync::mpsc::{Sender, Receiver, channel};
3use udp2p_protocol::protocol::{AckMessage, Message, MessageKey, Header};
4use udp2p_node::peer_id::PeerId;
5use udp2p_node::peer_key::Key;
6use udp2p_node::peer_info::PeerInfo;
7use udp2p_discovery::kad::Kademlia;
8use udp2p_discovery::routing::RoutingTable;
9use udp2p_transport::transport::Transport;
10use udp2p_transport::handler::MessageHandler;
11use std::collections::{HashMap, HashSet};
12use std::thread;
13use std::env::args;
14use udp2p_gossip::gossip::{GossipConfig, GossipService};
15use udp2p_gossip::protocol::GossipMessage;
16use rand::{thread_rng, Rng};
17use std::time::{Duration, Instant};
18use udp2p_utils::utils::ByteRep;
19
20
21fn main() {
22    // Bind a UDP Socket to a Socket Address with a random port between
23    // 9292 and 19292 on the localhost address.
24    let port: usize = thread_rng().gen_range(9292..19292);
25    let addr: SocketAddr = format!("127.0.0.1:{}", port)
26        .parse()
27        .expect("Unable to parse address");
28    let sock: UdpSocket = UdpSocket::bind(addr).expect("Unable to bind to address");
29
30    // Initiate channels for communication between different threads
31    let (to_transport_tx, to_transport_rx): (
32        Sender<(SocketAddr, Message)>,
33        Receiver<(SocketAddr, Message)>,
34    ) = channel();
35    let (to_gossip_tx, to_gossip_rx) = channel();
36    let (to_kad_tx, to_kad_rx) = channel();
37    let (incoming_ack_tx, incoming_ack_rx): (Sender<AckMessage>, Receiver<AckMessage>) = channel();
38    let (to_app_tx, _to_app_rx) = channel::<GossipMessage>();
39
40    // Initialize local peer information
41    let key: Key = Key::rand();
42    let id: PeerId = PeerId::from_key(&key);
43    let info: PeerInfo = PeerInfo::new(id, key, addr.clone());
44
45    // initialize a kademlia, transport and message handler instance
46    let routing_table = RoutingTable::new(info.clone());
47    let ping_pong = Instant::now();
48    let interval = Duration::from_secs(20);
49    let kad = Kademlia::new(routing_table, to_transport_tx.clone(), to_kad_rx, HashSet::new(), interval, ping_pong.clone());
50    let mut transport = Transport::new(addr.clone(), incoming_ack_rx, to_transport_rx);
51    let mut message_handler = MessageHandler::new(
52        to_transport_tx.clone(),
53        incoming_ack_tx.clone(),
54        HashMap::new(),
55        to_kad_tx.clone(),
56        to_gossip_tx.clone(),
57    );
58    let protocol_id = String::from("vrrb-0.1.0-test-net");
59    let gossip_config = GossipConfig::new(
60        protocol_id,
61        8,
62        3,
63        8,
64        3,
65        12,
66        3,
67        0.4,
68        Duration::from_millis(250),
69        80,
70    );
71    let heartbeat = Instant::now();
72    let ping_pong = Instant::now();
73    let mut gossip = GossipService::new(
74        addr.clone(),
75        to_gossip_rx,
76        to_transport_tx.clone(),
77        to_app_tx.clone(),
78        kad,
79        gossip_config,
80        heartbeat,
81        ping_pong,
82    );
83
84    // Inform the local node of their address (since the port is randomized)
85    println!("My Address: {:?}", addr);
86    println!("My ID: {:?}", info.id);
87    // Clone the socket for the transport and message handling thread(s)
88    let thread_sock = sock.try_clone().expect("Unable to clone socket");
89    thread::spawn(move || {
90        let inner_sock = thread_sock.try_clone().expect("Unable to clone socket");
91        thread::spawn(move || loop {
92            transport.incoming_ack();
93            transport.outgoing_msg(&inner_sock);
94            transport.check_time_elapsed(&inner_sock);
95        });
96
97        loop {
98            let local = addr.clone();
99            let mut buf = [0u8; 65536];
100            message_handler.recv_msg(&thread_sock, &mut buf, local);
101        }
102    });
103
104    if let Some(to_dial) = args().nth(1) {
105        let bootstrap: SocketAddr = to_dial.parse().expect("Unable to parse address");
106        gossip.kad.bootstrap(&bootstrap);
107        if let Some(bytes) = info.as_bytes() {
108            gossip.kad.add_peer(bytes)
109        }
110    } else {
111        if let Some(bytes) = info.as_bytes() {
112            gossip.kad.add_peer(bytes)
113        }
114    }
115
116    let thread_to_gossip = to_gossip_tx.clone();
117    thread::spawn(move || {
118        loop {
119            let mut line = String::new();
120            let input = std::io::stdin().read_line(&mut line);
121            if let Ok(_) = input {
122                let msg_id = MessageKey::rand();
123                let msg = GossipMessage {
124                    id: msg_id.inner(),
125                    data: line.trim().as_bytes().to_vec(),
126                    sender: addr.clone()
127                };
128
129                let message = Message {
130                    head: Header::Gossip,
131                    msg: msg.as_bytes().unwrap()
132                };
133
134                if let Err(_) = thread_to_gossip.clone().send((addr.clone(), message)) {
135                    println!("Error sending message to gossip")
136                }
137            }        
138        }
139    });
140
141    gossip.start()
142}