timely_communication/allocator/zero_copy/
initialize.rs1use std::sync::Arc;
4use crate::allocator::process::ProcessBuilder;
6use crate::networking::create_sockets;
7use super::tcp::{send_loop, recv_loop};
8use super::allocator::{TcpBuilder, new_vector};
9use super::stream::Stream;
10
11pub struct CommsGuard {
16 send_guards: Vec<::std::thread::JoinHandle<()>>,
17 recv_guards: Vec<::std::thread::JoinHandle<()>>,
18}
19
20impl Drop for CommsGuard {
21 fn drop(&mut self) {
22 for handle in self.send_guards.drain(..) {
23 handle.join().expect("Send thread panic");
24 }
25 for handle in self.recv_guards.drain(..) {
27 handle.join().expect("Recv thread panic");
28 }
29 }
31}
32
33use crate::logging::{CommunicationSetup, CommunicationEvent};
34use logging_core::Logger;
35
36pub fn initialize_networking(
38 addresses: Vec<String>,
39 my_index: usize,
40 threads: usize,
41 noisy: bool,
42 log_sender: Box<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEvent, CommunicationSetup>>+Send+Sync>)
43-> ::std::io::Result<(Vec<TcpBuilder<ProcessBuilder>>, CommsGuard)>
44{
45 let sockets = create_sockets(addresses, my_index, noisy)?;
46 initialize_networking_from_sockets(sockets, my_index, threads, log_sender)
47}
48
49pub fn initialize_networking_from_sockets<S: Stream + 'static>(
57 mut sockets: Vec<Option<S>>,
58 my_index: usize,
59 threads: usize,
60 log_sender: Box<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEvent, CommunicationSetup>>+Send+Sync>)
61-> ::std::io::Result<(Vec<TcpBuilder<ProcessBuilder>>, CommsGuard)>
62{
63 for socket in sockets.iter_mut() {
65 if let Some(socket) = socket {
66 socket.set_nonblocking(false).expect("failed to set socket to blocking");
67 }
68 }
69
70 let log_sender = Arc::new(log_sender);
71 let processes = sockets.len();
72
73 let process_allocators = crate::allocator::process::Process::new_vector(threads);
74 let (builders, promises, futures) = new_vector(process_allocators, my_index, processes);
75
76 let mut promises_iter = promises.into_iter();
77 let mut futures_iter = futures.into_iter();
78
79 let mut send_guards = Vec::with_capacity(sockets.len());
80 let mut recv_guards = Vec::with_capacity(sockets.len());
81
82 for (index, stream) in sockets.into_iter().enumerate().filter_map(|(i, s)| s.map(|s| (i, s))) {
84 let remote_recv = promises_iter.next().unwrap();
85
86 {
87 let log_sender = log_sender.clone();
88 let stream = stream.try_clone()?;
89 let join_guard =
90 ::std::thread::Builder::new()
91 .name(format!("timely:send-{}", index))
92 .spawn(move || {
93
94 let logger = log_sender(CommunicationSetup {
95 process: my_index,
96 sender: true,
97 remote: Some(index),
98 });
99
100 send_loop(stream, remote_recv, my_index, index, logger);
101 })?;
102
103 send_guards.push(join_guard);
104 }
105
106 let remote_send = futures_iter.next().unwrap();
107
108 {
109 let log_sender = log_sender.clone();
111 let stream = stream.try_clone()?;
112 let join_guard =
113 ::std::thread::Builder::new()
114 .name(format!("timely:recv-{}", index))
115 .spawn(move || {
116 let logger = log_sender(CommunicationSetup {
117 process: my_index,
118 sender: false,
119 remote: Some(index),
120 });
121 recv_loop(stream, remote_send, threads * my_index, my_index, index, logger);
122 })?;
123
124 recv_guards.push(join_guard);
125 }
126 }
127
128 Ok((builders, CommsGuard { send_guards, recv_guards }))
129}