1#![allow(dead_code)] use helper::*;
4use std::{
5 collections::{
6 HashMap,
7 HashSet,
8 },
9 io, fmt, iter, cmp,
10 time::Instant,
11 io::ErrorKind,
12};
13use byteorder::{ReadBytesExt, WriteBytesExt};
14use mod_ord::ModOrd;
15
16
17#[derive(Debug)]
23pub struct Endpoint<U: UdpLike> {
24 config: EndpointConfig,
26 socket: U,
27 buf: Vec<u8>,
28 buf_free_start: usize,
29 max_yielded: ModOrd, time_last_acked: Instant,
31
32 next_id: ModOrd,
34 wait_until: ModOrd,
35 outbox: HashMap<ModOrd, (Instant, *mut [u8])>,
37 outbox2: HashMap<ModOrd, (Instant, Vec<u8>)>,
38 peer_acked: ModOrd,
39 out_buf_written: usize,
40
41 buf_min_space: usize,
43 n: ModOrd,
44 largest_set_id_yielded: ModOrd,
45 seen_before: HashSet<ModOrd>, inbox: HashMap<ModOrd, Message>,
47 inbox2: HashMap<ModOrd, OwnedMessage>,
48 inbox2_to_remove: Option<ModOrd>,
49}
50
51impl<U> Endpoint<U> where U: UdpLike {
52pub fn maintain(&mut self) -> io::Result<()> {
57 let a = self.peer_acked;
58
59 self.outbox.retain(|&id, _| id > a);
61 self.outbox2.retain(|&id, _| id > a);
62
63 let now = Instant::now();
64 for (id, (ref mut instant, ref_bytes)) in self.outbox.iter_mut() {
65 if (self.config.resend_predicate)(id.abs_difference(self.n), instant.elapsed()) {
66 self.socket.send(unsafe{&**ref_bytes})?;
68 *instant = now;
69 }
70 }
71 for (id, (ref mut instant, ref vec)) in self.outbox2.iter_mut() {
72 if (self.config.resend_predicate)(id.abs_difference(self.n), instant.elapsed()) {
73 self.socket.send(&vec[..])?;
75 *instant = now;
76 }
77 }
78 self.maybe_ack()?;
79 Ok(())
80 }
81
82
83 pub fn new_with_config(socket: U, config: EndpointConfig) -> Endpoint<U> {
86 let time_last_acked = Instant::now();
87 let buf_min_space = config.max_msg_size + Header::BYTES;
88 let buflen = config.max_msg_size + config.buffer_grow_space + Header::BYTES;
89 Endpoint {
90 config, socket, buf_min_space, time_last_acked,
91 buf_free_start: 0,
93 out_buf_written: 0,
94 buf: iter::repeat(0)
95 .take(buflen)
96 .collect(),
97 next_id: ModOrd::ZERO,
99 wait_until: ModOrd::ZERO,
100 n: ModOrd::ZERO,
101 largest_set_id_yielded: ModOrd::ZERO,
102 max_yielded: ModOrd::BEFORE_ZERO,
103 peer_acked: ModOrd::BEFORE_ZERO,
104 inbox: HashMap::new(),
106 inbox2: HashMap::new(),
107 seen_before: HashSet::new(),
108 inbox2_to_remove: None,
109 outbox: HashMap::new(),
110 outbox2: HashMap::new(),
111 }
112 }
113
114
115 pub fn new(socket: U) -> Endpoint<U> {
118 Self::new_with_config(socket, EndpointConfig::default())
119 }
120
121
122 pub fn recv(&mut self) -> io::Result<Option<&mut [u8]>> {
133
134 if let Some(id) = self.ready_from_inbox() {
136 let msg = self.inbox.remove(&id).unwrap();
137 if self.inbox.is_empty() && self.outbox.is_empty() {
138 self.vacate_buffer();
139 }
140 self.pre_yield(msg.h.set_id, msg.h.id, msg.h.del);
141 self.maybe_ack()?;
142 return Ok(Some(unsafe{&mut *msg.payload}));
143 }
144
145 if let Some(id) = self.inbox2_to_remove {
147 self.inbox2.remove(&id);
148 self.inbox2_to_remove = None;
149 }
150
151 if let Some(id) = self.ready_from_inbox2() {
153 let (set_id, id, del) = {
154 let msg = self.inbox2.get(&id).unwrap();
155 (msg.h.set_id, msg.h.id, msg.h.del)
156 };
157 self.pre_yield(set_id, id, del);
158 self.inbox2_to_remove = Some(id); self.maybe_ack()?;
160 return Ok(Some(&mut self.inbox2.get_mut(&id).unwrap().payload));
161 }
162
163 loop {
165 if self.buf_cant_take_another() {
166 self.vacate_buffer();
167 }
168
169 match self.socket.recv(&mut self.buf[self.buf_free_start..]) {
170 Ok(0) => {
171 let _ = self.maybe_ack();
172 return Ok(None)
173 },
174 Err(e) => {
175 let _ = self.maybe_ack();
176 return if e.kind() == ErrorKind::WouldBlock {
177 return Ok(None)
178 } else {
179 Err(ErrorKind::WouldBlock.into())
180 };
181 },
182 Ok(ModOrd::BYTES) => {
183 let ack = ModOrd::read_from(& self.buf[self.buf_free_start..(self.buf_free_start+ModOrd::BYTES)]).unwrap();
184 self.digest_incoming_ack(ack);
185 },
186 Ok(bytes) if bytes >= Header::BYTES => {
187 let h_starts_at = self.buf_free_start + bytes - Header::BYTES;
188 let h = Header::read_from(& self.buf[h_starts_at..])?;
189 self.digest_incoming_ack(h.ack);
190 if self.invalid_header(&h) || self.known_duplicate(&h) {
191 continue;
192 }
193 let msg = Message {
194 h,
195 payload: (&mut self.buf[self.buf_free_start..h_starts_at]) as *mut [u8],
196 };
197
198 if msg.h.id.special() {
200 self.maybe_ack()?;
206 return Ok(Some(unsafe{&mut *msg.payload}))
207 } else if msg.h.set_id < self.largest_set_id_yielded {
208 continue;
214 } else if msg.h.wait_until > self.n {
215 if !self.inbox.contains_key(&msg.h.id) {
221 self.inbox.insert(msg.h.id, msg);
222 }
223 self.buf_free_start = h_starts_at;
225 } else if self.seen_before.contains(&msg.h.id) {
226 } else {
232 self.pre_yield(msg.h.set_id, msg.h.id, msg.h.del);
238 self.maybe_ack()?;
239 return Ok(Some(unsafe{&mut *msg.payload}))
240 }
241 },
242 Ok(_) => (), }
244 }
245 }
246
247
248 pub fn as_set<F,R>(&mut self, work: F) -> R
251 where
252 F: Sized + FnOnce(SetSender<U>) -> R,
253 R: Sized,
254 {
255 work(self.new_set())
256 }
257
258
259 pub fn new_set(&mut self) -> SetSender<U> {
264 if self.out_buf_written > 0 {
265 match self.config.new_set_unsent_action {
266 NewSetUnsent::Panic => panic!(
267 "Endpoint created new set \
268 with non-empty write buffer! \
269 (Configuration requested a panic)."
270 ),
271 NewSetUnsent::Clear => self.out_buf_written = 0,
272 NewSetUnsent::IntoSet => (), }
274 }
275 self.inner_new_set()
276 }
277
278
279
280#[inline]
283 fn invalid_header(&mut self, h: &Header) -> bool {
284 if self.largest_set_id_yielded.abs_difference(h.set_id)
285 > self.config.window_size {
286 true
288 } else if h.id < h.set_id {
289 true
291 } else if h.wait_until > h.id {
292 true
294 } else {
295 false
296 }
297 }
298
299 #[inline(always)]
300 fn inner_new_set(&mut self) -> SetSender<U> {
301 let set_id = self.next_id;
302 SetSender::new(self, set_id)
303 }
304
305 fn pre_yield(&mut self, set_id: ModOrd, id: ModOrd, del: bool) {
306 if set_id > self.largest_set_id_yielded {
307 self.largest_set_id_yielded = set_id;
308 self.seen_before.clear();
309 }
310 self.seen_before.insert(id);
311 if self.n < set_id {
312 self.n = set_id;
313 }
314 if del {
315 self.n = self.n.new_plus(1);
316 }
317 if self.max_yielded < id {
318 self.max_yielded = id;
319 }
320 }
321
322 fn maybe_ack(&mut self) -> io::Result<()> {
323 let now = Instant::now();
324 if self.time_last_acked.elapsed() > self.config.min_heartbeat_period {
325 let b = self.buf_free_start;
326 self.largest_set_id_yielded.write_to(&mut self.buf[b..])?;
327 self.socket.send(&self.buf[b..(b+ModOrd::BYTES)])?;
328 self.time_last_acked = now;
329 }
330 Ok(())
331 }
332
333 fn ready_from_inbox(&self) -> Option<ModOrd> {
334 for (&id, msg) in self.inbox.iter() {
335 if msg.h.wait_until <= self.n {
336 return Some(id);
337 }
338 }
339 None
340 }
341
342 fn ready_from_inbox2(&self) -> Option<ModOrd> {
343 for (&id, msg) in self.inbox2.iter() {
344 if msg.h.wait_until <= self.n {
345 return Some(id);
346 }
347 }
348 None
349 }
350
351 fn vacate_buffer(&mut self) {
357 for (id, msg) in self.inbox.drain() {
358 let payload = unsafe{&*msg.payload}.to_vec();
359 let h = msg.h;
360 let owned_msg = OwnedMessage {
361 h, payload,
362 };
363 self.inbox2.insert(id, owned_msg);
364 }
365 assert!(self.inbox.is_empty());
366 for (id, (instant, bytes)) in self.outbox.drain() {
367 let vec = unsafe{&*bytes}.to_vec();
368 self.outbox2.insert(id, (instant, vec));
369 }
370 assert!(self.inbox.is_empty());
371
372 self.buf_free_start = 0;
374 }
375
376 fn known_duplicate(&mut self, header: &Header) -> bool {
377 let id = header.id;
378 self.seen_before.contains(&id)
379 || self.inbox.contains_key(&id)
380 || self.inbox2.contains_key(&id)
381 }
382
383 fn buf_cant_take_another(&self) -> bool {
384 self.buf.len() - self.buf_free_start < self.buf_min_space
385 }
386
387 fn digest_incoming_ack(&mut self, ack: ModOrd) {
388 if self.peer_acked < ack {
389 self.peer_acked = ack;
390 }
391 }
392}
393
394
395
396impl<U> Sender for Endpoint<U> where U: UdpLike {
397 fn send_written(&mut self, guarantee: Guarantee) -> io::Result<usize> {
398 self.inner_new_set().send_written(guarantee)
399 }
400
401 fn clear_written(&mut self) {
402 self.out_buf_written = 0;
403 }
404}
405
406impl<U> io::Write for Endpoint<U> where U: UdpLike {
407 fn write(&mut self, bytes: &[u8]) -> io::Result<usize> {
408 let b = (&mut self.buf[(self.buf_free_start + self.out_buf_written)..]).write(bytes)?;
409 self.out_buf_written += b;
410 Ok(b)
411 }
412
413 fn flush(&mut self) -> io::Result<()> {
414 Ok(())
415 }
416}
417
418#[derive(Debug, Clone)]
421struct Header {
422 id: ModOrd,
423 set_id: ModOrd,
424 ack: ModOrd,
425 wait_until: ModOrd,
426 del: bool,
427}
428impl Header {
429 const BYTES: usize = 4*4 + 1;
430
431 fn write_to<W: io::Write>(&self, mut w: W) -> io::Result<()> {
432 self.ack.write_to(&mut w)?;
433 self.id.write_to(&mut w)?;
434 self.set_id.write_to(&mut w)?;
435 self.wait_until.write_to(&mut w)?;
436 w.write_u8(if self.del {0x01} else {0x00})?;
437 Ok(())
438 }
439
440 fn read_from<R: io::Read>(mut r: R) -> io::Result<Self> {
441 Ok(Header {
442 ack: ModOrd::read_from(&mut r)?,
443 id: ModOrd::read_from(&mut r)?,
444 set_id: ModOrd::read_from(&mut r)?,
445 wait_until: ModOrd::read_from(&mut r)?,
446 del: r.read_u8()? == 0x01,
447 })
448 }
449}
450impl cmp::PartialEq for Header {
451 fn eq(&self, other: &Self) -> bool {
452 self.id == other.id
453 }
454}
455
456
457#[derive(Debug)]
466pub struct SetSender<'a, U: UdpLike + 'a>{
467 endpoint: &'a mut Endpoint<U>,
468 set_id: ModOrd,
469 count: u32,
470 ord_count: u32,
471}
472
473impl<'a, U> SetSender<'a, U> where U: UdpLike + 'a {
474 fn new(endpoint: &mut Endpoint<U>, set_id: ModOrd) -> SetSender<U> {
475 SetSender {
476 endpoint,
477 set_id,
478 count: 0,
479 ord_count: 0,
480 }
481 }
482}
483
484impl<'a, U> Sender for SetSender<'a, U> where U: UdpLike + 'a {
485 fn send_written(&mut self, guarantee: Guarantee) -> io::Result<usize> {
486 if self.endpoint.buf_cant_take_another() {
487 self.endpoint.vacate_buffer();
488 }
489 let id = if guarantee == Guarantee::None {
490 ModOrd::SPECIAL
491 } else {
492 self.set_id.new_plus(self.count)
493 };
494 let header = Header {
495 ack: self.endpoint.max_yielded,
496 set_id: self.set_id,
497 id,
498 wait_until: self.endpoint.wait_until,
499 del: guarantee == Guarantee::Delivery,
500 };
501
502 let payload_end = self.endpoint.buf_free_start+self.endpoint.out_buf_written;
503 header.write_to(&mut self.endpoint.buf[payload_end..])?;
504 let bytes_sent = self.endpoint.out_buf_written + Header::BYTES;
505 self.endpoint.out_buf_written = 0;
506 let new_end = self.endpoint.buf_free_start + bytes_sent;
507 let msg_slice = &mut self.endpoint.buf[self.endpoint.buf_free_start..new_end];
508 self.endpoint.socket.send(msg_slice)?;
509
510 if guarantee == Guarantee::Delivery {
511 self.endpoint.outbox.insert(id, (Instant::now(), msg_slice as *mut [u8]));
513 self.endpoint.buf_free_start = new_end;
514 }
515
516 if guarantee != Guarantee::None {
517 self.count += 1;
518 if guarantee != Guarantee::Delivery {
519 self.ord_count += 1;
520 }
521 }
522 Ok(bytes_sent)
523 }
524
525 fn clear_written(&mut self) {
526 self.endpoint.clear_written()
527 }
528}
529
530impl<'a, U> Drop for SetSender<'a, U> where U: UdpLike {
531 fn drop(&mut self) {
532 if self.count == 0 {
533 return;
535 }
536 self.endpoint.next_id = self.endpoint.next_id.new_plus(self.count);
538 if self.ord_count < self.count {
539 self.endpoint.wait_until = self.endpoint.next_id.new_minus(self.ord_count);
542 }
543 }
544}
545
546impl<'a, U> io::Write for SetSender<'a, U> where U: UdpLike {
547 fn write(&mut self, bytes: &[u8]) -> io::Result<usize> {
548 self.endpoint.write(bytes)
549 }
550
551 fn flush(&mut self) -> io::Result<()> {
552 Ok(())
553 }
554}
555
556#[derive(Clone)]
559struct Message {
560 h: Header,
561 payload: *mut [u8],
562}
563impl fmt::Debug for Message {
564 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
565 write!(f, "Inbox1Msg {:?} payload ~= {:?}",
566 self.h,
567 String::from_utf8_lossy(unsafe{&*self.payload}),
568 )
569 }
570}
571impl cmp::PartialEq for Message {
572 fn eq(&self, other: &Self) -> bool {
573 self.h == other.h
574 }
575}
576
577
578#[derive(Debug, Clone)]
579struct OwnedMessage {
580 h: Header,
581 payload: Vec<u8>,
582}
583impl cmp::PartialEq for OwnedMessage {
584 fn eq(&self, other: &Self) -> bool {
585 self.h == other.h
586 }
587}