rudp/
endpoint.rs

1#![allow(dead_code)] //////////// REMOVE REMOVE DEBUG DEBUG TODO TODO
2
3use helper::*;
4use std::{
5	collections::{
6		HashMap,
7		HashSet,
8	},
9	io, fmt, iter, cmp,
10	time::Instant,
11	io::ErrorKind,
12};
13use byteorder::{ReadBytesExt, WriteBytesExt};
14use mod_ord::ModOrd;
15
16
17/// Stateful wrapper around a Udp-like object (facilitating sending of datagrams).
18/// Allows communication with another Endpoint object.
19/// The Endpoint does not have its own thread of control. Communication
20/// is maintained by regular `maintain` calls that perform heartbeats, resend lost
21/// packets etc.
22#[derive(Debug)]
23pub struct Endpoint<U: UdpLike> {
24	//both in and out
25	config: EndpointConfig,
26	socket: U,
27	buf: Vec<u8>,
28	buf_free_start: usize,
29	max_yielded: ModOrd, // for acking
30	time_last_acked: Instant,
31
32	//outgoing
33	next_id: ModOrd,
34	wait_until: ModOrd,
35	 // only stores delivery messages
36	outbox: HashMap<ModOrd, (Instant, *mut [u8])>,
37	outbox2: HashMap<ModOrd, (Instant, Vec<u8>)>,
38	peer_acked: ModOrd,
39	out_buf_written: usize,
40
41	//incoming
42	buf_min_space: usize,
43	n: ModOrd,
44	largest_set_id_yielded: ModOrd,
45	seen_before: HashSet<ModOrd>, // contains messages only in THIS set
46	inbox: HashMap<ModOrd, Message>,
47	inbox2: HashMap<ModOrd, OwnedMessage>, 
48	inbox2_to_remove: Option<ModOrd>,
49}
50
51impl<U> Endpoint<U> where U: UdpLike {
52///// PUBLIC
53
54	/// Discard acknowledged outputs, resend lost outputs and send a heartbeat if
55	/// necessary.
56	pub fn maintain(&mut self) -> io::Result<()> {
57		let a = self.peer_acked;
58
59		// clear out acknowledged messages in outboxes
60		self.outbox.retain(|&id, _| id > a);
61		self.outbox2.retain(|&id, _| id > a);
62
63		let now = Instant::now();
64		for (id, (ref mut instant, ref_bytes)) in self.outbox.iter_mut() {
65			if (self.config.resend_predicate)(id.abs_difference(self.n), instant.elapsed()) {
66				//resend
67				self.socket.send(unsafe{&**ref_bytes})?;
68				*instant = now;
69			}
70		}
71		for (id, (ref mut instant, ref vec)) in self.outbox2.iter_mut() {
72			if (self.config.resend_predicate)(id.abs_difference(self.n), instant.elapsed()) {
73				//resend
74				self.socket.send(&vec[..])?;
75				*instant = now;
76			}
77		}
78		self.maybe_ack()?;
79		Ok(())
80	}
81
82	
83	/// Create a new Endpoint around the given Udp-like object, with the given
84	/// configuration.
85	pub fn new_with_config(socket: U, config: EndpointConfig) -> Endpoint<U> {
86		let time_last_acked = Instant::now();
87		let buf_min_space = config.max_msg_size + Header::BYTES;
88		let buflen = config.max_msg_size + config.buffer_grow_space + Header::BYTES;
89		Endpoint {
90			config, socket, buf_min_space, time_last_acked,
91			////////////////////////////////////////////////////////////////////
92			buf_free_start: 0,
93			out_buf_written: 0,
94			buf: iter::repeat(0)
95				.take(buflen)
96				.collect(),
97			////////////////////////////////////////////////////////////////////
98			next_id: ModOrd::ZERO,
99			wait_until: ModOrd::ZERO,
100			n: ModOrd::ZERO,
101			largest_set_id_yielded: ModOrd::ZERO,
102			max_yielded: ModOrd::BEFORE_ZERO,
103			peer_acked: ModOrd::BEFORE_ZERO,
104			////////////////////////////////////////////////////////////////////
105			inbox: HashMap::new(),
106			inbox2: HashMap::new(),
107			seen_before: HashSet::new(),
108			inbox2_to_remove: None,
109			outbox: HashMap::new(),
110			outbox2: HashMap::new(),
111		}
112	}
113
114	
115	/// Create a new Endpoint around the given Udp-like object, with the default
116	/// configuration.
117	pub fn new(socket: U) -> Endpoint<U> {
118		Self::new_with_config(socket, EndpointConfig::default())
119	}
120
121	
122	/// Attempt to yield a message from the peer Endpoint that is ready for receipt.
123	/// May block only if the wrapped Udp-like object may block.
124	/// recv() calls may not call the inner receive, depending on the contents
125	/// of the inbox.
126	///
127	/// Fatal errors return `Err(_)`
128	/// Reads that fail because they would block return `Ok(None)`
129	/// Successful reads return `Ok(Some(x))`, where x is an in-place slice into the internal
130	/// buffer; thus, you need to drop the slice before interacting with the 
131	/// Endpoint again. 
132	pub fn recv(&mut self) -> io::Result<Option<&mut [u8]>> {
133
134		// first try in-line inbox
135		if let Some(id) = self.ready_from_inbox() {
136			let msg = self.inbox.remove(&id).unwrap();
137			if self.inbox.is_empty() && self.outbox.is_empty() {
138				self.vacate_buffer();
139			}
140			self.pre_yield(msg.h.set_id, msg.h.id, msg.h.del);
141			self.maybe_ack()?;
142			return Ok(Some(unsafe{&mut *msg.payload}));
143		}
144
145		// remove from inbox2 as possible
146		if let Some(id) = self.inbox2_to_remove {
147			self.inbox2.remove(&id);
148			self.inbox2_to_remove = None;
149		} 
150
151		// next try inbox2 (growing owned storage)
152		if let Some(id) = self.ready_from_inbox2() {
153			let (set_id, id, del) = {
154				let msg = self.inbox2.get(&id).unwrap();
155				(msg.h.set_id, msg.h.id, msg.h.del)
156			};
157			self.pre_yield(set_id, id, del);
158			self.inbox2_to_remove = Some(id); // remove message on next call
159			self.maybe_ack()?;
160			return Ok(Some(&mut self.inbox2.get_mut(&id).unwrap().payload));
161		}
162
163		// nothing ready from the inbox. receive messages until we can yield
164		loop {
165			if self.buf_cant_take_another() {
166				self.vacate_buffer();
167			}
168
169			match self.socket.recv(&mut self.buf[self.buf_free_start..]) {
170				Ok(0) => {
171					let _ = self.maybe_ack();
172					return Ok(None)
173				},
174				Err(e) => {
175					let _ = self.maybe_ack();
176					return if e.kind() == ErrorKind::WouldBlock {
177						return Ok(None)
178					} else {
179						Err(ErrorKind::WouldBlock.into())
180					};
181				},
182				Ok(ModOrd::BYTES) => {
183					let ack = ModOrd::read_from(& self.buf[self.buf_free_start..(self.buf_free_start+ModOrd::BYTES)]).unwrap();
184					self.digest_incoming_ack(ack);
185				},
186				Ok(bytes) if bytes >= Header::BYTES => {
187					let h_starts_at = self.buf_free_start + bytes - Header::BYTES;
188					let h = Header::read_from(& self.buf[h_starts_at..])?;
189					self.digest_incoming_ack(h.ack);
190					if self.invalid_header(&h) || self.known_duplicate(&h) {
191						continue;
192					}
193					let msg = Message {
194						h,
195						payload: (&mut self.buf[self.buf_free_start..h_starts_at]) as *mut [u8],
196					};
197
198					// BIG IF ELSE BRANCH.
199					if msg.h.id.special() {
200						/* read a 'None' guarantee message.
201						shift - YES
202						store - NO
203						yield = YES
204						*/
205						self.maybe_ack()?;
206						return Ok(Some(unsafe{&mut *msg.payload}))
207					} else if msg.h.set_id < self.largest_set_id_yielded {
208						/* previous-set message. Its OLD data. Must discard to be safe.
209						shift - NO
210						store - NO
211						yield = NO
212						*/
213						continue;
214					} else if msg.h.wait_until > self.n {
215						/* future message.
216						shift - YES
217						store - YES
218						yield = NO
219						*/
220						if !self.inbox.contains_key(&msg.h.id) {
221							self.inbox.insert(msg.h.id, msg);
222						}
223						// shift the buffer right. don't want to obliterate the data
224						self.buf_free_start = h_starts_at; 
225					} else if self.seen_before.contains(&msg.h.id) {
226						/* current-set message already yielded
227						shift - NO
228						store - NO
229						yield = NO
230						*/
231					} else {
232						/* ORDER or DELIVERY message, but we can yield it right away
233						shift - NO
234						store - NO
235						yield = YES
236						*/
237						self.pre_yield(msg.h.set_id, msg.h.id, msg.h.del);
238						self.maybe_ack()?;
239						return Ok(Some(unsafe{&mut *msg.payload}))
240					}					
241				},
242				Ok(_) => (), // invalid size datagram. improper header or bogus.
243			}
244		}	
245	}
246
247	
248	/// Convenience function that passes a new `SetSender` into the given closure.
249	/// See `new_set` for more information.
250	pub fn as_set<F,R>(&mut self, work: F) -> R
251	where
252		F: Sized + FnOnce(SetSender<U>) -> R,
253		R: Sized,
254	{
255		work(self.new_set())
256	}
257
258	
259	/// The `Endpoint` itself implements `Sender`, allowing it to send messages.
260	/// `new_set` returns a `SetSender` object, which implements the same trait.
261	/// All messages sent by this setsender object have the added semantics of
262	/// relaxed ordering _between_ them. 
263	pub fn new_set(&mut self) -> SetSender<U> {
264		if self.out_buf_written > 0 {
265			match self.config.new_set_unsent_action {
266				NewSetUnsent::Panic => panic!(
267					"Endpoint created new set \
268					with non-empty write buffer! \
269					(Configuration requested a panic)."
270				),
271				NewSetUnsent::Clear => self.out_buf_written = 0,
272				NewSetUnsent::IntoSet => (), // keep the bytes
273			}
274		}
275		self.inner_new_set()
276	}
277
278
279
280////////////// PRIVATE
281
282	#[inline]
283	fn invalid_header(&mut self, h: &Header) -> bool {
284		if self.largest_set_id_yielded.abs_difference(h.set_id)
285		> self.config.window_size {
286			//outside of window
287			true
288		} else if h.id < h.set_id {
289			// set id cannot be AFTER the id
290			true
291		} else if h.wait_until > h.id {
292			// cannot wait for a message after self
293			true
294		} else {
295			false
296		}
297	}
298
299	#[inline(always)]
300	fn inner_new_set(&mut self) -> SetSender<U> {
301		let set_id = self.next_id;
302		SetSender::new(self, set_id)
303	}
304
305	fn pre_yield(&mut self, set_id: ModOrd, id: ModOrd, del: bool) {
306		if set_id > self.largest_set_id_yielded {
307			self.largest_set_id_yielded = set_id;
308			self.seen_before.clear();
309		}
310		self.seen_before.insert(id);
311		if self.n < set_id {
312			self.n = set_id;
313		}
314		if del {
315			self.n = self.n.new_plus(1);
316		}
317		if self.max_yielded < id {
318			self.max_yielded = id;
319		}
320	}
321
322	fn maybe_ack(&mut self) -> io::Result<()> {
323		let now = Instant::now();
324		if self.time_last_acked.elapsed() > self.config.min_heartbeat_period {
325			let b = self.buf_free_start;
326			self.largest_set_id_yielded.write_to(&mut self.buf[b..])?;
327			self.socket.send(&self.buf[b..(b+ModOrd::BYTES)])?;
328			self.time_last_acked = now;
329		}
330		Ok(())
331	}
332
333	fn ready_from_inbox(&self) -> Option<ModOrd> {
334		for (&id, msg) in self.inbox.iter() {
335			if msg.h.wait_until <= self.n {
336				return Some(id);
337			}
338		}
339		None
340	}
341
342	fn ready_from_inbox2(&self) -> Option<ModOrd> {
343		for (&id, msg) in self.inbox2.iter() {
344			if msg.h.wait_until <= self.n {
345				return Some(id);
346			}
347		}
348		None
349	}
350
351	/*
352	Empty the big buffer. Need to make sure that any inbox/outbox data
353	that is still inside is relocated to the secondary storage.
354	This requires copying over.
355	*/
356	fn vacate_buffer(&mut self) {
357		for (id, msg) in self.inbox.drain() {
358			let payload = unsafe{&*msg.payload}.to_vec();
359			let h = msg.h;
360			let owned_msg = OwnedMessage {
361				h, payload,
362			};
363			self.inbox2.insert(id, owned_msg);
364		}
365		assert!(self.inbox.is_empty());
366		for (id, (instant, bytes)) in self.outbox.drain() {
367			let vec = unsafe{&*bytes}.to_vec();
368			self.outbox2.insert(id, (instant, vec));
369		}
370		assert!(self.inbox.is_empty());
371
372		// reset buffer position to start.
373		self.buf_free_start = 0;
374	}
375
376	fn known_duplicate(&mut self, header: &Header) -> bool {
377		let id = header.id;
378	  	self.seen_before.contains(&id)
379		|| self.inbox.contains_key(&id)
380		|| self.inbox2.contains_key(&id) 
381	}
382
383	fn buf_cant_take_another(&self) -> bool {
384		self.buf.len() - self.buf_free_start < self.buf_min_space
385	}
386
387	fn digest_incoming_ack(&mut self, ack: ModOrd) {
388		if self.peer_acked < ack {
389			self.peer_acked = ack;
390		}
391	}
392}
393
394
395
396impl<U> Sender for Endpoint<U> where U: UdpLike {
397	fn send_written(&mut self, guarantee: Guarantee) -> io::Result<usize> {
398		self.inner_new_set().send_written(guarantee)
399	}
400
401	fn clear_written(&mut self) {
402		self.out_buf_written = 0;
403	}
404}
405
406impl<U> io::Write for Endpoint<U> where U: UdpLike {
407    fn write(&mut self, bytes: &[u8]) -> io::Result<usize> {
408    	let b = (&mut self.buf[(self.buf_free_start + self.out_buf_written)..]).write(bytes)?;
409    	self.out_buf_written += b;
410    	Ok(b)
411    }
412
413    fn flush(&mut self) -> io::Result<()> {
414    	Ok(())
415    }
416}
417
418/////////////////////////////////
419
420#[derive(Debug, Clone)]
421struct Header {
422	id: ModOrd,
423	set_id: ModOrd,
424	ack: ModOrd,
425	wait_until: ModOrd,
426	del: bool,
427}
428impl Header {
429	const BYTES: usize = 4*4 + 1;
430
431	fn write_to<W: io::Write>(&self, mut w: W) -> io::Result<()> {
432		self.ack.write_to(&mut w)?;
433		self.id.write_to(&mut w)?;
434		self.set_id.write_to(&mut w)?;
435		self.wait_until.write_to(&mut w)?;
436		w.write_u8(if self.del {0x01} else {0x00})?;
437		Ok(())
438	}
439
440	fn read_from<R: io::Read>(mut r: R) -> io::Result<Self> {
441		Ok(Header {
442			ack: ModOrd::read_from(&mut r)?,
443			id: ModOrd::read_from(&mut r)?,
444			set_id: ModOrd::read_from(&mut r)?,
445			wait_until: ModOrd::read_from(&mut r)?,
446			del: r.read_u8()? == 0x01,
447		})
448	}
449}
450impl cmp::PartialEq for Header {
451    fn eq(&self, other: &Self) -> bool {
452        self.id == other.id
453    }
454}
455
456
457////////////////////////////////////////////////////////////////////////////////
458
459
460/// An Endpoint can send payloads of data. However, all messages sent by a single
461/// `SetSender` object of the endpoint are semantically grouped together into an 
462/// unordered set. A new set cannot be defined until the current one is dropped.
463/// 
464/// Note that the concept of _sending_
465#[derive(Debug)]
466pub struct SetSender<'a, U: UdpLike + 'a>{
467	endpoint: &'a mut Endpoint<U>,
468	set_id: ModOrd,
469	count: u32,
470	ord_count: u32,
471}
472
473impl<'a, U> SetSender<'a, U> where U: UdpLike + 'a {
474	fn new(endpoint: &mut Endpoint<U>, set_id: ModOrd) -> SetSender<U> {
475		SetSender {
476			endpoint,
477			set_id,
478			count: 0,
479			ord_count: 0,
480		}
481	}
482}
483
484impl<'a, U> Sender for SetSender<'a, U> where U: UdpLike + 'a {
485	fn send_written(&mut self, guarantee: Guarantee) -> io::Result<usize> {
486		if self.endpoint.buf_cant_take_another() {
487			self.endpoint.vacate_buffer();
488		}
489		let id = if guarantee == Guarantee::None {
490			ModOrd::SPECIAL
491		} else {
492			self.set_id.new_plus(self.count)
493		};
494		let header = Header {
495			ack: self.endpoint.max_yielded,
496			set_id: self.set_id,
497			id,
498			wait_until: self.endpoint.wait_until,
499			del: guarantee == Guarantee::Delivery,
500		};
501
502		let payload_end = self.endpoint.buf_free_start+self.endpoint.out_buf_written;
503		header.write_to(&mut self.endpoint.buf[payload_end..])?;
504		let bytes_sent = self.endpoint.out_buf_written + Header::BYTES;
505		self.endpoint.out_buf_written = 0;
506		let new_end = self.endpoint.buf_free_start + bytes_sent;
507		let msg_slice = &mut self.endpoint.buf[self.endpoint.buf_free_start..new_end];
508		self.endpoint.socket.send(msg_slice)?;
509
510		if guarantee == Guarantee::Delivery {
511			// save into outbox and bump the buffer up
512			self.endpoint.outbox.insert(id, (Instant::now(), msg_slice as *mut [u8]));
513			self.endpoint.buf_free_start = new_end;
514		}
515
516		if guarantee != Guarantee::None {
517			self.count += 1;
518			if guarantee != Guarantee::Delivery {
519				self.ord_count += 1;
520			}
521		}
522		Ok(bytes_sent)
523	}
524
525	fn clear_written(&mut self) {
526		self.endpoint.clear_written()
527	}
528}
529
530impl<'a, U> Drop for SetSender<'a, U> where U: UdpLike {
531    fn drop(&mut self) {
532		if self.count == 0 {
533			// set was empty. nothing to do here
534			return;
535		}
536		// increment the next_id by the number of IDs that the set contained
537		self.endpoint.next_id = self.endpoint.next_id.new_plus(self.count);
538		if self.ord_count < self.count {
539			// there was at least ONE delivery message. future sets must wait fo
540			// all of them (instead of waiting for whatever the previous set was waiting for)
541			self.endpoint.wait_until = self.endpoint.next_id.new_minus(self.ord_count);
542		}
543    }
544}
545
546impl<'a, U> io::Write for SetSender<'a, U> where U: UdpLike {
547    fn write(&mut self, bytes: &[u8]) -> io::Result<usize> {
548    	self.endpoint.write(bytes)
549    }
550
551    fn flush(&mut self) -> io::Result<()> {
552    	Ok(())
553    }
554}
555
556////////////////////////////////////////////////////////////////////////////////
557
558#[derive(Clone)]
559struct Message {
560	h: Header,
561	payload: *mut [u8],
562}
563impl fmt::Debug for Message {
564	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
565		write!(f, "Inbox1Msg {:?} payload ~= {:?}",
566			self.h,
567			String::from_utf8_lossy(unsafe{&*self.payload}),
568		)
569	}
570}
571impl cmp::PartialEq for Message {
572    fn eq(&self, other: &Self) -> bool {
573        self.h == other.h
574    }
575}
576
577
578#[derive(Debug, Clone)]
579struct OwnedMessage {
580	h: Header,
581	payload: Vec<u8>,
582}
583impl cmp::PartialEq for OwnedMessage {
584    fn eq(&self, other: &Self) -> bool {
585        self.h == other.h
586    }
587}