timely_communication 0.29.0

Communication layer for timely dataflow
Documentation
//! Networking code for sending and receiving fixed size `Vec<u8>` between machines.

use std::io;
use std::io::Result;
use std::net::{TcpListener, TcpStream};
use std::sync::Arc;
use std::thread;
use std::thread::sleep;
use std::time::Duration;

use byteorder::{ReadBytesExt, WriteBytesExt};
use columnar::Columnar;
use serde::{Deserialize, Serialize};

/// The byte order for writing message headers and stream initialization.
type ByteOrder = byteorder::BigEndian;

/// Framing data for each `Vec<u8>` transmission, indicating a typed channel, the source and
/// destination workers, and the length in bytes.
// *Warning*: Adding, removing and altering fields requires to adjust the implementation below!
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize, Columnar)]
pub struct MessageHeader {
    /// index of channel.
    pub channel:    usize,
    /// index of worker sending message.
    pub source:     usize,
    /// lower bound of index of worker receiving message.
    pub target_lower:     usize,
    /// upper bound of index of worker receiving message.
    ///
    /// This is often `self.target_lower + 1` for point to point messages,
    /// but can be larger for broadcast messages.
    pub target_upper:     usize,
    /// number of bytes in message.
    pub length:     usize,
    /// sequence number.
    pub seqno:      usize,
}

impl MessageHeader {

    /// The number of `usize` fields in [MessageHeader].
    const FIELDS: usize = 6;

    /// Returns a header when there is enough supporting data
    #[inline]
    pub fn try_read(bytes: &[u8]) -> Option<MessageHeader> {
        let mut cursor = io::Cursor::new(bytes);
        let mut buffer = [0; Self::FIELDS];
        cursor.read_u64_into::<ByteOrder>(&mut buffer).ok()?;
        let header = MessageHeader {
            // Order must match writing order.
            channel: buffer[0] as usize,
            source: buffer[1] as usize,
            target_lower: buffer[2] as usize,
            target_upper: buffer[3] as usize,
            length: buffer[4] as usize,
            seqno: buffer[5] as usize,
        };

        if bytes.len() >= header.required_bytes() {
            Some(header)
        } else {
            None
        }
    }

    /// Writes the header as binary data.
    #[inline]
    pub fn write_to<W: ::std::io::Write>(&self, writer: &mut W) -> Result<()> {
        let mut buffer = [0u8; std::mem::size_of::<u64>() * Self::FIELDS];
        let mut cursor = io::Cursor::new(&mut buffer[..]);
        // Order must match reading order.
        cursor.write_u64::<ByteOrder>(self.channel as u64)?;
        cursor.write_u64::<ByteOrder>(self.source as u64)?;
        cursor.write_u64::<ByteOrder>(self.target_lower as u64)?;
        cursor.write_u64::<ByteOrder>(self.target_upper as u64)?;
        cursor.write_u64::<ByteOrder>(self.length as u64)?;
        cursor.write_u64::<ByteOrder>(self.seqno as u64)?;

        writer.write_all(&buffer[..])
    }

    /// The number of bytes required for the header and data.
    #[inline]
    pub fn required_bytes(&self) -> usize {
        self.header_bytes() + self.length
    }

    /// The number of bytes required for the header.
    #[inline(always)]
    pub fn header_bytes(&self) -> usize {
        std::mem::size_of::<u64>() * Self::FIELDS
    }
}

/// Creates socket connections from a list of host addresses.
///
/// The item at index `i` in the resulting vec, is a `Some(TcpSocket)` to process `i`, except
/// for item `my_index` which is `None` (no socket to self).
pub fn create_sockets(addresses: Vec<String>, my_index: usize, noisy: bool) -> Result<Vec<Option<TcpStream>>> {

    let hosts1 = Arc::new(addresses);
    let hosts2 = Arc::clone(&hosts1);

    let start_task = thread::spawn(move || start_connections(hosts1, my_index, noisy));
    let await_task = thread::spawn(move || await_connections(hosts2, my_index, noisy));

    let mut results = start_task.join().unwrap()?;
    results.push(None);
    let to_extend = await_task.join().unwrap()?;
    results.extend(to_extend);

    if noisy { println!("worker {}:\tinitialization complete", my_index) }

    Ok(results)
}


/// Result contains connections `[0, my_index - 1]`.
pub fn start_connections(addresses: Arc<Vec<String>>, my_index: usize, noisy: bool) -> Result<Vec<Option<TcpStream>>> {
    let results = addresses.iter().take(my_index).enumerate().map(|(index, address)| {
        loop {
            match TcpStream::connect(address) {
                Ok(mut stream) => {
                    stream.set_nodelay(true).expect("set_nodelay call failed");
                    stream.write_u64::<ByteOrder>(my_index as u64).expect("failed to encode/send worker index");
                    if noisy { println!("worker {}:\tconnection to worker {}", my_index, index); }
                    break Some(stream);
                },
                Err(error) => {
                    println!("worker {}:\terror connecting to worker {}: {}; retrying", my_index, index, error);
                    sleep(Duration::from_secs(1));
                },
            }
        }
    }).collect();

    Ok(results)
}

/// Result contains connections `[my_index + 1, addresses.len() - 1]`.
pub fn await_connections(addresses: Arc<Vec<String>>, my_index: usize, noisy: bool) -> Result<Vec<Option<TcpStream>>> {
    let mut results: Vec<_> = (0..(addresses.len() - my_index - 1)).map(|_| None).collect();

    // We may have multiple addresses to bind to, and will listen on each of them until all received.
    let listeners = addresses[my_index].split_whitespace().map(TcpListener::bind).collect::<Result<Vec<_>>>()?;
    for listener in listeners.iter() { listener.set_nonblocking(true).expect("Couldn't set nonblocking"); }

    // Until we have all intended connections, poll each listener, sleeping briefly if none have accepted a new stream.
    while results.iter().any(Option::is_none) {
        let mut received = false;
        for listener in listeners.iter() {
            match listener.accept() {
                Ok((mut stream, _)) => {
                    stream.set_nodelay(true).expect("set_nodelay call failed");
                    let identifier = stream.read_u64::<ByteOrder>().expect("failed to decode worker index") as usize;
                    results[identifier - my_index - 1] = Some(stream);
                    if noisy { println!("worker {}:\tconnection from worker {}", my_index, identifier); }
                    received = true;
                }
                Err(e) => { if e.kind() != io::ErrorKind::WouldBlock { return Err(e); } }
            }
        }
        if !received {
            println!("awaiting connections (at {:?}/{:?})", results.iter().filter(|x| x.is_some()).count(), results.len());
            sleep(Duration::from_secs(1));
        }
    }

    Ok(results)
}