timely_communication/allocator/zero_copy/
tcp.rs

1//!
2
3use std::io::{self, Write};
4use crossbeam_channel::{Sender, Receiver};
5
6use crate::networking::MessageHeader;
7
8use super::bytes_slab::BytesSlab;
9use super::bytes_exchange::MergeQueue;
10use super::stream::Stream;
11
12use logging_core::Logger;
13
14use crate::logging::{CommunicationEvent, CommunicationSetup, MessageEvent, StateEvent};
15
16fn tcp_panic(context: &'static str, cause: io::Error) -> ! {
17    // NOTE: some downstream crates sniff out "timely communication error:" from
18    // the panic message. Avoid removing or rewording this message if possible.
19    // It'd be nice to instead use `panic_any` here with a structured error
20    // type, but the panic message for `panic_any` is no good (Box<dyn Any>).
21    panic!("timely communication error: {}: {}", context, cause)
22}
23
24/// Repeatedly reads from a TcpStream and carves out messages.
25///
26/// The intended communication pattern is a sequence of (header, message)^* for valid
27/// messages, followed by a header for a zero length message indicating the end of stream.
28///
29/// If the stream ends without being shut down, or if reading from the stream fails, the
30/// receive thread panics with a message that starts with "timely communication error:"
31/// in an attempt to take down the computation and cause the failures to cascade.
32pub fn recv_loop<S>(
33    mut reader: S,
34    targets: Vec<Receiver<MergeQueue>>,
35    worker_offset: usize,
36    process: usize,
37    remote: usize,
38    mut logger: Option<Logger<CommunicationEvent, CommunicationSetup>>)
39where
40    S: Stream,
41{
42    // Log the receive thread's start.
43    logger.as_mut().map(|l| l.log(StateEvent { send: false, process, remote, start: true }));
44
45    let mut targets: Vec<MergeQueue> = targets.into_iter().map(|x| x.recv().expect("Failed to receive MergeQueue")).collect();
46
47    let mut buffer = BytesSlab::new(20);
48
49    // Where we stash Bytes before handing them off.
50    let mut stageds = Vec::with_capacity(targets.len());
51    for _ in 0 .. targets.len() {
52        stageds.push(Vec::new());
53    }
54
55    // Each loop iteration adds to `self.Bytes` and consumes all complete messages.
56    // At the start of each iteration, `self.buffer[..self.length]` represents valid
57    // data, and the remaining capacity is available for reading from the reader.
58    //
59    // Once the buffer fills, we need to copy uncomplete messages to a new shared
60    // allocation and place the existing Bytes into `self.in_progress`, so that it
61    // can be recovered once all readers have read what they need to.
62    let mut active = true;
63    while active {
64
65        buffer.ensure_capacity(1);
66
67        assert!(!buffer.empty().is_empty());
68
69        // Attempt to read some more bytes into self.buffer.
70        let read = match reader.read(&mut buffer.empty()) {
71            Err(x) => tcp_panic("reading data", x),
72            Ok(n) if n == 0 => {
73                tcp_panic(
74                    "reading data",
75                    std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "socket closed"),
76                );
77            }
78            Ok(n) => n,
79        };
80
81        buffer.make_valid(read);
82
83        // Consume complete messages from the front of self.buffer.
84        while let Some(header) = MessageHeader::try_read(buffer.valid()) {
85
86            // TODO: Consolidate message sequences sent to the same worker?
87            let peeled_bytes = header.required_bytes();
88            let bytes = buffer.extract(peeled_bytes);
89
90            // Record message receipt.
91            logger.as_mut().map(|logger| {
92                logger.log(MessageEvent { is_send: false, header, });
93            });
94
95            if header.length > 0 {
96                stageds[header.target - worker_offset].push(bytes);
97            }
98            else {
99                // Shutting down; confirm absence of subsequent data.
100                active = false;
101                if !buffer.valid().is_empty() {
102                    panic!("Clean shutdown followed by data.");
103                }
104                buffer.ensure_capacity(1);
105                if reader.read(&mut buffer.empty()).unwrap_or_else(|e| tcp_panic("reading EOF", e)) > 0 {
106                    panic!("Clean shutdown followed by data.");
107                }
108            }
109        }
110
111        // Pass bytes along to targets.
112        for (index, staged) in stageds.iter_mut().enumerate() {
113            // FIXME: try to merge `staged` before handing it to BytesPush::extend
114            use crate::allocator::zero_copy::bytes_exchange::BytesPush;
115            targets[index].extend(staged.drain(..));
116        }
117    }
118
119    // Log the receive thread's end.
120    logger.as_mut().map(|l| l.log(StateEvent { send: false, process, remote, start: false, }));
121}
122
123/// Repeatedly sends messages into a TcpStream.
124///
125/// The intended communication pattern is a sequence of (header, message)^* for valid
126/// messages, followed by a header for a zero length message indicating the end of stream.
127///
128/// If writing to the stream fails, the send thread panics with a message that starts with
129/// "timely communication error:" in an attempt to take down the computation and cause the
130/// failures to cascade.
131pub fn send_loop<S: Stream>(
132    // TODO: Maybe we don't need BufWriter with consolidation in writes.
133    writer: S,
134    sources: Vec<Sender<MergeQueue>>,
135    process: usize,
136    remote: usize,
137    mut logger: Option<Logger<CommunicationEvent, CommunicationSetup>>)
138{
139
140    // Log the send thread's start.
141    logger.as_mut().map(|l| l.log(StateEvent { send: true, process, remote, start: true, }));
142
143    let mut sources: Vec<MergeQueue> = sources.into_iter().map(|x| {
144        let buzzer = crate::buzzer::Buzzer::new();
145        let queue = MergeQueue::new(buzzer);
146        x.send(queue.clone()).expect("failed to send MergeQueue");
147        queue
148    }).collect();
149
150    let mut writer = ::std::io::BufWriter::with_capacity(1 << 16, writer);
151    let mut stash = Vec::new();
152
153    while !sources.is_empty() {
154
155        // TODO: Round-robin better, to release resources fairly when overloaded.
156        for source in sources.iter_mut() {
157            use crate::allocator::zero_copy::bytes_exchange::BytesPull;
158            source.drain_into(&mut stash);
159        }
160
161        if stash.is_empty() {
162            // No evidence of records to read, but sources not yet empty (at start of loop).
163            // We are going to flush our writer (to move buffered data), double check on the
164            // sources for emptiness and wait on a signal only if we are sure that there will
165            // still be a signal incoming.
166            //
167            // We could get awoken by more data, a channel closing, or spuriously perhaps.
168            writer.flush().unwrap_or_else(|e| tcp_panic("flushing writer", e));
169            sources.retain(|source| !source.is_complete());
170            if !sources.is_empty() {
171                std::thread::park();
172            }
173        }
174        else {
175            // TODO: Could do scatter/gather write here.
176            for mut bytes in stash.drain(..) {
177
178                // Record message sends.
179                logger.as_mut().map(|logger| {
180                    let mut offset = 0;
181                    while let Some(header) = MessageHeader::try_read(&mut bytes[offset..]) {
182                        logger.log(MessageEvent { is_send: true, header, });
183                        offset += header.required_bytes();
184                    }
185                });
186
187                writer.write_all(&bytes[..]).unwrap_or_else(|e| tcp_panic("writing data", e));
188            }
189        }
190    }
191
192    // Write final zero-length header.
193    // Would be better with meaningful metadata, but as this stream merges many
194    // workers it isn't clear that there is anything specific to write here.
195    let header = MessageHeader {
196        channel:    0,
197        source:     0,
198        target:     0,
199        length:     0,
200        seqno:      0,
201    };
202    header.write_to(&mut writer).unwrap_or_else(|e| tcp_panic("writing data", e));
203    writer.flush().unwrap_or_else(|e| tcp_panic("flushing writer", e));
204    writer.get_mut().shutdown(::std::net::Shutdown::Write).unwrap_or_else(|e| tcp_panic("shutting down writer", e));
205    logger.as_mut().map(|logger| logger.log(MessageEvent { is_send: true, header }));
206
207    // Log the send thread's end.
208    logger.as_mut().map(|l| l.log(StateEvent { send: true, process, remote, start: false, }));
209}