timely_communication/allocator/zero_copy/
initialize.rs

1//! Network initialization.
2
3use std::sync::Arc;
4// use crate::allocator::Process;
5use crate::allocator::process::ProcessBuilder;
6use crate::networking::create_sockets;
7use super::tcp::{send_loop, recv_loop};
8use super::allocator::{TcpBuilder, new_vector};
9use super::stream::Stream;
10
11/// Join handles for send and receive threads.
12///
13/// On drop, the guard joins with each of the threads to ensure that they complete
14/// cleanly and send all necessary data.
15pub struct CommsGuard {
16    send_guards: Vec<::std::thread::JoinHandle<()>>,
17    recv_guards: Vec<::std::thread::JoinHandle<()>>,
18}
19
20impl Drop for CommsGuard {
21    fn drop(&mut self) {
22        for handle in self.send_guards.drain(..) {
23            handle.join().expect("Send thread panic");
24        }
25        // println!("SEND THREADS JOINED");
26        for handle in self.recv_guards.drain(..) {
27            handle.join().expect("Recv thread panic");
28        }
29        // println!("RECV THREADS JOINED");
30    }
31}
32
33use crate::logging::{CommunicationSetup, CommunicationEvent};
34use logging_core::Logger;
35
36/// Initializes network connections
37pub fn initialize_networking(
38    addresses: Vec<String>,
39    my_index: usize,
40    threads: usize,
41    noisy: bool,
42    log_sender: Box<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEvent, CommunicationSetup>>+Send+Sync>)
43-> ::std::io::Result<(Vec<TcpBuilder<ProcessBuilder>>, CommsGuard)>
44{
45    let sockets = create_sockets(addresses, my_index, noisy)?;
46    initialize_networking_from_sockets(sockets, my_index, threads, log_sender)
47}
48
49/// Initialize send and recv threads from sockets.
50///
51/// This method is available for users who have already connected sockets and simply wish to construct
52/// a vector of process-local allocators connected to instantiated send and recv threads.
53///
54/// It is important that the `sockets` argument contain sockets for each remote process, in order, and
55/// with position `my_index` set to `None`.
56pub fn initialize_networking_from_sockets<S: Stream + 'static>(
57    mut sockets: Vec<Option<S>>,
58    my_index: usize,
59    threads: usize,
60    log_sender: Box<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEvent, CommunicationSetup>>+Send+Sync>)
61-> ::std::io::Result<(Vec<TcpBuilder<ProcessBuilder>>, CommsGuard)>
62{
63    // Sockets are expected to be blocking,
64    for socket in sockets.iter_mut() {
65        if let Some(socket) = socket {
66            socket.set_nonblocking(false).expect("failed to set socket to blocking");
67        }
68    }
69
70    let log_sender = Arc::new(log_sender);
71    let processes = sockets.len();
72
73    let process_allocators = crate::allocator::process::Process::new_vector(threads);
74    let (builders, promises, futures) = new_vector(process_allocators, my_index, processes);
75
76    let mut promises_iter = promises.into_iter();
77    let mut futures_iter = futures.into_iter();
78
79    let mut send_guards = Vec::with_capacity(sockets.len());
80    let mut recv_guards = Vec::with_capacity(sockets.len());
81
82    // for each process, if a stream exists (i.e. not local) ...
83    for (index, stream) in sockets.into_iter().enumerate().filter_map(|(i, s)| s.map(|s| (i, s))) {
84        let remote_recv = promises_iter.next().unwrap();
85
86        {
87            let log_sender = log_sender.clone();
88            let stream = stream.try_clone()?;
89            let join_guard =
90            ::std::thread::Builder::new()
91                .name(format!("timely:send-{}", index))
92                .spawn(move || {
93
94                    let logger = log_sender(CommunicationSetup {
95                        process: my_index,
96                        sender: true,
97                        remote: Some(index),
98                    });
99
100                    send_loop(stream, remote_recv, my_index, index, logger);
101                })?;
102
103            send_guards.push(join_guard);
104        }
105
106        let remote_send = futures_iter.next().unwrap();
107
108        {
109            // let remote_sends = remote_sends.clone();
110            let log_sender = log_sender.clone();
111            let stream = stream.try_clone()?;
112            let join_guard =
113            ::std::thread::Builder::new()
114                .name(format!("timely:recv-{}", index))
115                .spawn(move || {
116                    let logger = log_sender(CommunicationSetup {
117                        process: my_index,
118                        sender: false,
119                        remote: Some(index),
120                    });
121                    recv_loop(stream, remote_send, threads * my_index, my_index, index, logger);
122                })?;
123
124            recv_guards.push(join_guard);
125        }
126    }
127
128    Ok((builders, CommsGuard { send_guards, recv_guards }))
129}