timely_communication/
networking.rs

1//! Networking code for sending and receiving fixed size `Vec<u8>` between machines.
2
3use std::io;
4use std::io::{Read, Result};
5use std::net::{TcpListener, TcpStream};
6use std::sync::Arc;
7use std::thread;
8use std::thread::sleep;
9use std::time::Duration;
10
11use abomonation::{encode, decode};
12
13// This constant is sent along immediately after establishing a TCP stream, so
14// that it is easy to sniff out Timely traffic when it is multiplexed with
15// other traffic on the same port.
16const HANDSHAKE_MAGIC: u64 = 0xc2f1fb770118add9;
17
18/// Framing data for each `Vec<u8>` transmission, indicating a typed channel, the source and
19/// destination workers, and the length in bytes.
20#[derive(Abomonation, Debug, PartialEq, Eq, Hash, Clone, Copy)]
21pub struct MessageHeader {
22    /// index of channel.
23    pub channel:    usize,
24    /// index of worker sending message.
25    pub source:     usize,
26    /// index of worker receiving message.
27    pub target:     usize,
28    /// number of bytes in message.
29    pub length:     usize,
30    /// sequence number.
31    pub seqno:      usize,
32}
33
34impl MessageHeader {
35    /// Returns a header when there is enough supporting data
36    #[inline]
37    pub fn try_read(bytes: &mut [u8]) -> Option<MessageHeader> {
38        unsafe { decode::<MessageHeader>(bytes) }
39            .and_then(|(header, remaining)| {
40                if remaining.len() >= header.length {
41                    Some(header.clone())
42                }
43                else {
44                    None
45                }
46            })
47    }
48
49    /// Writes the header as binary data.
50    #[inline]
51    pub fn write_to<W: ::std::io::Write>(&self, writer: &mut W) -> ::std::io::Result<()> {
52        unsafe { encode(self, writer) }
53    }
54
55    /// The number of bytes required for the header and data.
56    #[inline]
57    pub fn required_bytes(&self) -> usize {
58        ::std::mem::size_of::<MessageHeader>() + self.length
59    }
60}
61
62/// Creates socket connections from a list of host addresses.
63///
64/// The item at index i in the resulting vec, is a Some(TcpSocket) to process i, except
65/// for item `my_index` which is None (no socket to self).
66pub fn create_sockets(addresses: Vec<String>, my_index: usize, noisy: bool) -> Result<Vec<Option<TcpStream>>> {
67
68    let hosts1 = Arc::new(addresses);
69    let hosts2 = hosts1.clone();
70
71    let start_task = thread::spawn(move || start_connections(hosts1, my_index, noisy));
72    let await_task = thread::spawn(move || await_connections(hosts2, my_index, noisy));
73
74    let mut results = start_task.join().unwrap()?;
75    results.push(None);
76    let to_extend = await_task.join().unwrap()?;
77    results.extend(to_extend.into_iter());
78
79    if noisy { println!("worker {}:\tinitialization complete", my_index) }
80
81    Ok(results)
82}
83
84
85/// Result contains connections [0, my_index - 1].
86pub fn start_connections(addresses: Arc<Vec<String>>, my_index: usize, noisy: bool) -> Result<Vec<Option<TcpStream>>> {
87    let results = addresses.iter().take(my_index).enumerate().map(|(index, address)| {
88        loop {
89            match TcpStream::connect(address) {
90                Ok(mut stream) => {
91                    stream.set_nodelay(true).expect("set_nodelay call failed");
92                    unsafe { encode(&HANDSHAKE_MAGIC, &mut stream) }.expect("failed to encode/send handshake magic");
93                    unsafe { encode(&(my_index as u64), &mut stream) }.expect("failed to encode/send worker index");
94                    if noisy { println!("worker {}:\tconnection to worker {}", my_index, index); }
95                    break Some(stream);
96                },
97                Err(error) => {
98                    println!("worker {}:\terror connecting to worker {}: {}; retrying", my_index, index, error);
99                    sleep(Duration::from_secs(1));
100                },
101            }
102        }
103    }).collect();
104
105    Ok(results)
106}
107
108/// Result contains connections [my_index + 1, addresses.len() - 1].
109pub fn await_connections(addresses: Arc<Vec<String>>, my_index: usize, noisy: bool) -> Result<Vec<Option<TcpStream>>> {
110    let mut results: Vec<_> = (0..(addresses.len() - my_index - 1)).map(|_| None).collect();
111    let listener = TcpListener::bind(&addresses[my_index][..])?;
112
113    for _ in (my_index + 1) .. addresses.len() {
114        let mut stream = listener.accept()?.0;
115        stream.set_nodelay(true).expect("set_nodelay call failed");
116        let mut buffer = [0u8;16];
117        stream.read_exact(&mut buffer)?;
118        let (magic, mut buffer) = unsafe { decode::<u64>(&mut buffer) }.expect("failed to decode magic");
119        if magic != &HANDSHAKE_MAGIC {
120            return Err(io::Error::new(io::ErrorKind::InvalidData,
121                "received incorrect timely handshake"));
122        }
123        let identifier = unsafe { decode::<u64>(&mut buffer) }.expect("failed to decode worker index").0.clone() as usize;
124        results[identifier - my_index - 1] = Some(stream);
125        if noisy { println!("worker {}:\tconnection from worker {}", my_index, identifier); }
126    }
127
128    Ok(results)
129}