timely_communication/allocator/zero_copy/
tcp.rs1use std::io::{self, Write};
4use crossbeam_channel::{Sender, Receiver};
5
6use crate::networking::MessageHeader;
7
8use super::bytes_slab::BytesSlab;
9use super::bytes_exchange::MergeQueue;
10use super::stream::Stream;
11
12use logging_core::Logger;
13
14use crate::logging::{CommunicationEvent, CommunicationSetup, MessageEvent, StateEvent};
15
16fn tcp_panic(context: &'static str, cause: io::Error) -> ! {
17 panic!("timely communication error: {}: {}", context, cause)
22}
23
24pub fn recv_loop<S>(
33 mut reader: S,
34 targets: Vec<Receiver<MergeQueue>>,
35 worker_offset: usize,
36 process: usize,
37 remote: usize,
38 mut logger: Option<Logger<CommunicationEvent, CommunicationSetup>>)
39where
40 S: Stream,
41{
42 logger.as_mut().map(|l| l.log(StateEvent { send: false, process, remote, start: true }));
44
45 let mut targets: Vec<MergeQueue> = targets.into_iter().map(|x| x.recv().expect("Failed to receive MergeQueue")).collect();
46
47 let mut buffer = BytesSlab::new(20);
48
49 let mut stageds = Vec::with_capacity(targets.len());
51 for _ in 0 .. targets.len() {
52 stageds.push(Vec::new());
53 }
54
55 let mut active = true;
63 while active {
64
65 buffer.ensure_capacity(1);
66
67 assert!(!buffer.empty().is_empty());
68
69 let read = match reader.read(&mut buffer.empty()) {
71 Err(x) => tcp_panic("reading data", x),
72 Ok(n) if n == 0 => {
73 tcp_panic(
74 "reading data",
75 std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "socket closed"),
76 );
77 }
78 Ok(n) => n,
79 };
80
81 buffer.make_valid(read);
82
83 while let Some(header) = MessageHeader::try_read(buffer.valid()) {
85
86 let peeled_bytes = header.required_bytes();
88 let bytes = buffer.extract(peeled_bytes);
89
90 logger.as_mut().map(|logger| {
92 logger.log(MessageEvent { is_send: false, header, });
93 });
94
95 if header.length > 0 {
96 stageds[header.target - worker_offset].push(bytes);
97 }
98 else {
99 active = false;
101 if !buffer.valid().is_empty() {
102 panic!("Clean shutdown followed by data.");
103 }
104 buffer.ensure_capacity(1);
105 if reader.read(&mut buffer.empty()).unwrap_or_else(|e| tcp_panic("reading EOF", e)) > 0 {
106 panic!("Clean shutdown followed by data.");
107 }
108 }
109 }
110
111 for (index, staged) in stageds.iter_mut().enumerate() {
113 use crate::allocator::zero_copy::bytes_exchange::BytesPush;
115 targets[index].extend(staged.drain(..));
116 }
117 }
118
119 logger.as_mut().map(|l| l.log(StateEvent { send: false, process, remote, start: false, }));
121}
122
123pub fn send_loop<S: Stream>(
132 writer: S,
134 sources: Vec<Sender<MergeQueue>>,
135 process: usize,
136 remote: usize,
137 mut logger: Option<Logger<CommunicationEvent, CommunicationSetup>>)
138{
139
140 logger.as_mut().map(|l| l.log(StateEvent { send: true, process, remote, start: true, }));
142
143 let mut sources: Vec<MergeQueue> = sources.into_iter().map(|x| {
144 let buzzer = crate::buzzer::Buzzer::new();
145 let queue = MergeQueue::new(buzzer);
146 x.send(queue.clone()).expect("failed to send MergeQueue");
147 queue
148 }).collect();
149
150 let mut writer = ::std::io::BufWriter::with_capacity(1 << 16, writer);
151 let mut stash = Vec::new();
152
153 while !sources.is_empty() {
154
155 for source in sources.iter_mut() {
157 use crate::allocator::zero_copy::bytes_exchange::BytesPull;
158 source.drain_into(&mut stash);
159 }
160
161 if stash.is_empty() {
162 writer.flush().unwrap_or_else(|e| tcp_panic("flushing writer", e));
169 sources.retain(|source| !source.is_complete());
170 if !sources.is_empty() {
171 std::thread::park();
172 }
173 }
174 else {
175 for mut bytes in stash.drain(..) {
177
178 logger.as_mut().map(|logger| {
180 let mut offset = 0;
181 while let Some(header) = MessageHeader::try_read(&mut bytes[offset..]) {
182 logger.log(MessageEvent { is_send: true, header, });
183 offset += header.required_bytes();
184 }
185 });
186
187 writer.write_all(&bytes[..]).unwrap_or_else(|e| tcp_panic("writing data", e));
188 }
189 }
190 }
191
192 let header = MessageHeader {
196 channel: 0,
197 source: 0,
198 target: 0,
199 length: 0,
200 seqno: 0,
201 };
202 header.write_to(&mut writer).unwrap_or_else(|e| tcp_panic("writing data", e));
203 writer.flush().unwrap_or_else(|e| tcp_panic("flushing writer", e));
204 writer.get_mut().shutdown(::std::net::Shutdown::Write).unwrap_or_else(|e| tcp_panic("shutting down writer", e));
205 logger.as_mut().map(|logger| logger.log(MessageEvent { is_send: true, header }));
206
207 logger.as_mut().map(|l| l.log(StateEvent { send: true, process, remote, start: false, }));
209}