timely_communication/
networking.rs1use std::io;
4use std::io::{Read, Result};
5use std::net::{TcpListener, TcpStream};
6use std::sync::Arc;
7use std::thread;
8use std::thread::sleep;
9use std::time::Duration;
10
11use abomonation::{encode, decode};
12
13const HANDSHAKE_MAGIC: u64 = 0xc2f1fb770118add9;
17
18#[derive(Abomonation, Debug, PartialEq, Eq, Hash, Clone, Copy)]
21pub struct MessageHeader {
22 pub channel: usize,
24 pub source: usize,
26 pub target: usize,
28 pub length: usize,
30 pub seqno: usize,
32}
33
34impl MessageHeader {
35 #[inline]
37 pub fn try_read(bytes: &mut [u8]) -> Option<MessageHeader> {
38 unsafe { decode::<MessageHeader>(bytes) }
39 .and_then(|(header, remaining)| {
40 if remaining.len() >= header.length {
41 Some(header.clone())
42 }
43 else {
44 None
45 }
46 })
47 }
48
49 #[inline]
51 pub fn write_to<W: ::std::io::Write>(&self, writer: &mut W) -> ::std::io::Result<()> {
52 unsafe { encode(self, writer) }
53 }
54
55 #[inline]
57 pub fn required_bytes(&self) -> usize {
58 ::std::mem::size_of::<MessageHeader>() + self.length
59 }
60}
61
62pub fn create_sockets(addresses: Vec<String>, my_index: usize, noisy: bool) -> Result<Vec<Option<TcpStream>>> {
67
68 let hosts1 = Arc::new(addresses);
69 let hosts2 = hosts1.clone();
70
71 let start_task = thread::spawn(move || start_connections(hosts1, my_index, noisy));
72 let await_task = thread::spawn(move || await_connections(hosts2, my_index, noisy));
73
74 let mut results = start_task.join().unwrap()?;
75 results.push(None);
76 let to_extend = await_task.join().unwrap()?;
77 results.extend(to_extend.into_iter());
78
79 if noisy { println!("worker {}:\tinitialization complete", my_index) }
80
81 Ok(results)
82}
83
84
85pub fn start_connections(addresses: Arc<Vec<String>>, my_index: usize, noisy: bool) -> Result<Vec<Option<TcpStream>>> {
87 let results = addresses.iter().take(my_index).enumerate().map(|(index, address)| {
88 loop {
89 match TcpStream::connect(address) {
90 Ok(mut stream) => {
91 stream.set_nodelay(true).expect("set_nodelay call failed");
92 unsafe { encode(&HANDSHAKE_MAGIC, &mut stream) }.expect("failed to encode/send handshake magic");
93 unsafe { encode(&(my_index as u64), &mut stream) }.expect("failed to encode/send worker index");
94 if noisy { println!("worker {}:\tconnection to worker {}", my_index, index); }
95 break Some(stream);
96 },
97 Err(error) => {
98 println!("worker {}:\terror connecting to worker {}: {}; retrying", my_index, index, error);
99 sleep(Duration::from_secs(1));
100 },
101 }
102 }
103 }).collect();
104
105 Ok(results)
106}
107
108pub fn await_connections(addresses: Arc<Vec<String>>, my_index: usize, noisy: bool) -> Result<Vec<Option<TcpStream>>> {
110 let mut results: Vec<_> = (0..(addresses.len() - my_index - 1)).map(|_| None).collect();
111 let listener = TcpListener::bind(&addresses[my_index][..])?;
112
113 for _ in (my_index + 1) .. addresses.len() {
114 let mut stream = listener.accept()?.0;
115 stream.set_nodelay(true).expect("set_nodelay call failed");
116 let mut buffer = [0u8;16];
117 stream.read_exact(&mut buffer)?;
118 let (magic, mut buffer) = unsafe { decode::<u64>(&mut buffer) }.expect("failed to decode magic");
119 if magic != &HANDSHAKE_MAGIC {
120 return Err(io::Error::new(io::ErrorKind::InvalidData,
121 "received incorrect timely handshake"));
122 }
123 let identifier = unsafe { decode::<u64>(&mut buffer) }.expect("failed to decode worker index").0.clone() as usize;
124 results[identifier - my_index - 1] = Some(stream);
125 if noisy { println!("worker {}:\tconnection from worker {}", my_index, identifier); }
126 }
127
128 Ok(results)
129}