1use std::collections::HashMap;
8use std::sync::mpsc::Sender;
9use std::io;
10use std::time::Duration;
11
12use super::{BuildIdHasher, SocketId, EndpointId, Message, EndpointTmpl, EndpointSpec, EndpointDesc, Scheduled };
13use super::endpoint::{Pipe, Acceptor};
14use super::config::{Config, ConfigOption};
15use super::context::{Context, Schedulable, Event};
16use io_error::*;
17
18pub enum Request {
19 Connect(String),
20 Bind(String),
21 Send(Message, bool),
22 Recv(bool),
23 SetOption(ConfigOption),
24 Close
25}
26
27pub enum Reply {
28 Err(io::Error),
29 Connect(EndpointId),
30 Bind(EndpointId),
31 Send,
32 Recv(Message),
33 SetOption
34}
35
36pub struct Socket {
37 id: SocketId,
38 reply_sender: Sender<Reply>,
39 protocol: Box<Protocol>,
40 pipes: HashMap<EndpointId, Pipe, BuildIdHasher>,
41 acceptors: HashMap<EndpointId, Acceptor, BuildIdHasher>,
42 config: Config
43}
44
45pub trait Protocol {
52 fn id(&self) -> u16;
53 fn peer_id(&self) -> u16;
54
55 fn add_pipe(&mut self, ctx: &mut Context, eid: EndpointId, pipe: Pipe);
56 fn remove_pipe(&mut self, ctx: &mut Context, eid: EndpointId) -> Option<Pipe>;
57
58 fn send(&mut self, ctx: &mut Context, msg: Message, timeout: Option<Scheduled>);
59 fn on_send_ack(&mut self, ctx: &mut Context, eid: EndpointId);
60 fn on_send_timeout(&mut self, ctx: &mut Context);
61 fn on_send_ready(&mut self, ctx: &mut Context, eid: EndpointId);
62 fn on_send_not_ready(&mut self, ctx: &mut Context, eid: EndpointId);
63
64 fn recv(&mut self, ctx: &mut Context, timeout: Option<Scheduled>);
65 fn on_recv_ack(&mut self, ctx: &mut Context, eid: EndpointId, msg: Message);
66 fn on_recv_timeout(&mut self, ctx: &mut Context);
67 fn on_recv_ready(&mut self, ctx: &mut Context, eid: EndpointId);
68 fn on_recv_not_ready(&mut self, ctx: &mut Context, eid: EndpointId);
69
70 fn is_send_ready(&self) -> bool;
71 fn is_recv_ready(&self) -> bool;
72
73 fn set_option(&mut self, _: ConfigOption) -> io::Result<()> {
74 Err(invalid_input_io_error("option not supported"))
75 }
76 fn on_timer_tick(&mut self, _: &mut Context, _: Schedulable) {
77 }
78 fn on_device_plugged(&mut self, _: &mut Context) {}
79 fn close(&mut self, ctx: &mut Context);
80}
81
82pub type ProtocolCtor = Box<Fn(Sender<Reply>) -> Box<Protocol> + Send>;
83
84impl Socket {
91 pub fn new(id: SocketId, reply_tx: Sender<Reply>, proto: Box<Protocol>) -> Socket {
92 Socket {
93 id: id,
94 reply_sender: reply_tx,
95 protocol: proto,
96 pipes: HashMap::default(),
97 acceptors: HashMap::default(),
98 config: Config::default()
99 }
100 }
101
102 fn get_protocol_ids(&self) -> (u16, u16) {
103 let proto_id = self.protocol.id();
104 let peer_proto_id = self.protocol.peer_id();
105
106 (proto_id, peer_proto_id)
107 }
108
109 fn send_reply(&self, reply: Reply) {
110 let _ = self.reply_sender.send(reply);
111 }
112
113 pub fn poll(&self, ctx: &mut Context) {
114 ctx.raise(Event::CanRecv(self.protocol.is_recv_ready()));
115 ctx.raise(Event::CanSend(self.protocol.is_send_ready()));
116 }
117
118fn create_endpoint_desc(&self) -> EndpointDesc {
125 EndpointDesc {
126 send_priority: self.config.send_priority,
127 recv_priority: self.config.recv_priority,
128 tcp_no_delay: self.config.tcp_no_delay,
129 recv_max_size: self.config.recv_max_size
130 }
131 }
132
133 fn create_endpoint_spec(&self, url: String) -> EndpointSpec {
134 EndpointSpec {
135 url: url,
136 desc: self.create_endpoint_desc()
137 }
138 }
139
140 fn create_endpoint_tmpl(&self, url: String) -> EndpointTmpl {
141 EndpointTmpl {
142 pids: self.get_protocol_ids(),
143 spec: self.create_endpoint_spec(url)
144 }
145 }
146
147pub fn connect(&mut self, ctx: &mut Context, url: String) {
154 let tmpl = self.create_endpoint_tmpl(url);
155
156 match ctx.connect(self.id, &tmpl) {
157 Ok(id) => self.on_connect_success(ctx, id, tmpl.spec),
158 Err(e) => self.on_connect_error(e)
159 };
160 }
161
162 fn on_connect_success(&mut self, ctx: &mut Context, eid: EndpointId, spec: EndpointSpec) {
163 let pipe = self.connect_pipe(eid, spec);
164
165 self.insert_pipe(ctx, eid, pipe);
166 self.send_reply(Reply::Connect(eid));
167 }
168
169 fn on_connect_error(&mut self, err: io::Error) {
170 self.send_reply(Reply::Err(err));
171 }
172
173 fn schedule_reconnect(&mut self, ctx: &mut Context, eid: EndpointId, spec: EndpointSpec) {
174 let task = Schedulable::Reconnect(eid, spec);
175 let delay = self.config.retry_ivl;
176 let _ = ctx.schedule(task, delay);
177 }
180
181 pub fn reconnect(&mut self, ctx: &mut Context, eid: EndpointId, spec: EndpointSpec) {
182 let pids = self.get_protocol_ids();
183 let tmpl = EndpointTmpl {
184 pids: pids,
185 spec: spec
186 };
187
188 match ctx.reconnect(self.id, eid, &tmpl) {
189 Ok(_) => self.on_reconnect_success(ctx, eid, tmpl.spec),
190 Err(_) => self.on_reconnect_error(ctx, eid, tmpl.spec)
191 }
192 }
193
194 fn on_reconnect_success(&mut self, ctx: &mut Context, eid: EndpointId, spec: EndpointSpec) {
195 self.insert_pipe(ctx, eid, Pipe::from_spec(eid, spec));
196 }
197
198 fn on_reconnect_error(&mut self, ctx: &mut Context, eid: EndpointId, spec: EndpointSpec) {
199 self.schedule_reconnect(ctx, eid, spec);
200 }
201
202pub fn bind(&mut self, ctx: &mut Context, url: String) {
209 let tmpl = self.create_endpoint_tmpl(url);
210
211 match ctx.bind(self.id, &tmpl) {
212 Ok(id) => self.on_bind_success(ctx, id, tmpl.spec),
213 Err(e) => self.on_bind_error(e)
214 };
215 }
216
217 fn on_bind_success(&mut self, ctx: &mut Context, eid: EndpointId, spec: EndpointSpec) {
218 let acceptor = self.connect_acceptor(eid, spec);
219
220 acceptor.open(ctx);
221
222 self.acceptors.insert(eid, acceptor);
223 self.send_reply(Reply::Bind(eid));
224 }
225
226 fn on_bind_error(&mut self, err: io::Error) {
227 self.send_reply(Reply::Err(err));
228 }
229
230 fn schedule_rebind(&mut self, ctx: &mut Context, eid: EndpointId, spec: EndpointSpec) {
231 let task = Schedulable::Rebind(eid, spec);
232 let delay = self.config.retry_ivl;
233 let _ = ctx.schedule(task, delay);
234 }
237
238 pub fn rebind(&mut self, ctx: &mut Context, eid: EndpointId, spec: EndpointSpec) {
239 let pids = self.get_protocol_ids();
240 let tmpl = EndpointTmpl {
241 pids: pids,
242 spec: spec
243 };
244
245 match ctx.rebind(self.id, eid, &tmpl) {
246 Ok(_) => self.on_rebind_success(ctx, eid, tmpl.spec),
247 Err(_) => self.on_rebind_error(ctx, eid, tmpl.spec)
248 };
249 }
250
251 fn on_rebind_success(&mut self, ctx: &mut Context, eid: EndpointId, spec: EndpointSpec) {
252 let acceptor = Acceptor::from_spec(eid, spec);
253
254 self.insert_acceptor(ctx, eid, acceptor)
255 }
256
257 fn on_rebind_error(&mut self, ctx: &mut Context, eid: EndpointId, spec: EndpointSpec) {
258 self.schedule_rebind(ctx, eid, spec);
259 }
260
261pub fn on_pipe_opened(&mut self, ctx: &mut Context, eid: EndpointId) {
268 if let Some(pipe) = self.pipes.remove(&eid) {
269 self.protocol.add_pipe(ctx, eid, pipe);
270 }
271 }
272
273 pub fn on_pipe_accepted(&mut self, ctx: &mut Context, aid: EndpointId, eid: EndpointId) {
274 let pipe = self.accept_pipe(aid, eid);
275
276 self.insert_pipe(ctx, eid, pipe);
277 }
278
279 pub fn close_pipe(&mut self, ctx: &mut Context, eid: EndpointId) {
280 let _ = self.remove_pipe(ctx, eid);
281 }
282
283 pub fn on_pipe_error(&mut self, ctx: &mut Context, eid: EndpointId, _: io::Error) {
284 if let Some(spec) = self.remove_pipe(ctx, eid) {
285 self.schedule_reconnect(ctx, eid, spec);
286 }
287 }
288
289 fn insert_pipe(&mut self, ctx: &mut Context, eid: EndpointId, pipe: Pipe) {
290 pipe.open(ctx);
291
292 self.pipes.insert(eid, pipe);
293 }
294
295 fn remove_pipe(&mut self, ctx: &mut Context, eid: EndpointId) -> Option<EndpointSpec> {
296 if let Some(pipe) = self.pipes.remove(&eid) {
297 return pipe.close(ctx)
298 }
299 if let Some(pipe) = self.protocol.remove_pipe(ctx, eid) {
300 return pipe.close(ctx)
301 }
302 None
303 }
304
305 fn connect_pipe(&self, eid: EndpointId, spec: EndpointSpec) -> Pipe {
306 Pipe::from_spec(eid, spec)
307 }
308
309 fn accept_pipe(&self, aid: EndpointId, eid: EndpointId) -> Pipe {
310 let (send_prio, recv_prio) = if let Some(acceptor) = self.acceptors.get(&aid) {
311 (acceptor.get_send_priority(), acceptor.get_recv_priority())
312 } else {
313 (self.config.send_priority, self.config.recv_priority)
314 };
315 let desc = EndpointDesc {
316 send_priority: send_prio,
317 recv_priority: recv_prio,
318 tcp_no_delay: self.config.tcp_no_delay,
319 recv_max_size: self.config.recv_max_size
320 };
321
322 Pipe::new_accepted(eid, desc)
323 }
324
325pub fn on_acceptor_error(&mut self, ctx: &mut Context, eid: EndpointId, _: io::Error) {
332 if let Some(spec) = self.remove_acceptor(ctx, eid) {
333 self.schedule_rebind(ctx, eid, spec);
334 }
335 }
336
337 pub fn close_acceptor(&mut self, ctx: &mut Context, eid: EndpointId) {
338 let _ = self.remove_acceptor(ctx, eid);
339 }
340
341 fn insert_acceptor(&mut self, ctx: &mut Context, eid: EndpointId, acceptor: Acceptor) {
342 acceptor.open(ctx);
343
344 self.acceptors.insert(eid, acceptor);
345 }
346
347 fn remove_acceptor(&mut self, ctx: &mut Context, eid: EndpointId) -> Option<EndpointSpec> {
348 self.acceptors.remove(&eid).map_or(None, |acceptor| acceptor.close(ctx))
349 }
350
351 fn connect_acceptor(&self, eid: EndpointId, spec: EndpointSpec) -> Acceptor {
352 Acceptor::from_spec(eid, spec)
353 }
354
355pub fn send(&mut self, ctx: &mut Context, msg: Message) {
362 #[cfg(debug_assertions)] debug!("[{:?}] send", ctx);
363 if let Some(delay) = self.get_send_timeout() {
364 let task = Schedulable::SendTimeout;
365
366 match ctx.schedule(task, delay) {
367 Ok(timeout) => self.protocol.send(ctx, msg, Some(timeout)),
368 Err(e) => self.send_reply(Reply::Err(e))
369 }
370 } else {
371 self.protocol.send(ctx, msg, None);
372 }
373 }
374
375 pub fn try_send(&mut self, ctx: &mut Context, msg: Message) {
376 #[cfg(debug_assertions)] debug!("[{:?}] try_send", ctx);
377 if self.protocol.is_send_ready() {
378 self.protocol.send(ctx, msg, None);
379 } else {
380 let err = would_block_io_error("socket is not send ready");
381
382 self.send_reply(Reply::Err(err));
383 }
384 }
385
386 pub fn on_send_ack(&mut self, ctx: &mut Context, eid: EndpointId) {
387 #[cfg(debug_assertions)] debug!("[{:?}] send ack from ep {:?}", ctx, eid);
388 self.protocol.on_send_ack(ctx, eid);
389 }
390
391 pub fn on_send_timeout(&mut self, ctx: &mut Context) {
392 #[cfg(debug_assertions)] debug!("[{:?}] send timeout", ctx);
393 self.protocol.on_send_timeout(ctx);
394 }
395
396 fn get_send_timeout(&self) -> Option<Duration> {
397 self.config.send_timeout
398 }
399
400 pub fn on_send_ready(&mut self, ctx: &mut Context, eid: EndpointId, ready: bool) {
401 #[cfg(debug_assertions)] debug!("[{:?}] ep {:?} send ready: {} ", ctx, eid, ready);
402 if ready {
403 self.protocol.on_send_ready(ctx, eid)
404 } else {
405 self.protocol.on_send_not_ready(ctx, eid)
406 }
407 }
408
409pub fn recv(&mut self, ctx: &mut Context) {
416 #[cfg(debug_assertions)] debug!("[{:?}] recv", ctx);
417 if let Some(delay) = self.get_recv_timeout() {
418 let task = Schedulable::RecvTimeout;
419
420 match ctx.schedule(task, delay) {
421 Ok(timeout) => self.protocol.recv(ctx, Some(timeout)),
422 Err(e) => self.send_reply(Reply::Err(e))
423 }
424 } else {
425 self.protocol.recv(ctx, None);
426 }
427 }
428
429 pub fn try_recv(&mut self, ctx: &mut Context) {
430 #[cfg(debug_assertions)] debug!("[{:?}] try_recv", ctx);
431 if self.protocol.is_recv_ready() {
432 self.protocol.recv(ctx, None);
433 } else {
434 let err = would_block_io_error("socket is not recv ready");
435
436 self.send_reply(Reply::Err(err));
437 }
438 }
439
440 pub fn on_recv_ack(&mut self, ctx: &mut Context, eid: EndpointId, msg: Message) {
441 #[cfg(debug_assertions)] debug!("[{:?}] recv ack from ep {:?}", ctx, eid);
442 self.protocol.on_recv_ack(ctx, eid, msg);
443 }
444
445 pub fn on_recv_timeout(&mut self, ctx: &mut Context) {
446 #[cfg(debug_assertions)] debug!("[{:?}] recv timeout", ctx);
447 self.protocol.on_recv_timeout(ctx);
448 }
449
450 fn get_recv_timeout(&self) -> Option<Duration> {
451 self.config.recv_timeout
452 }
453
454 pub fn on_recv_ready(&mut self, ctx: &mut Context, eid: EndpointId, ready: bool) {
455 #[cfg(debug_assertions)] debug!("[{:?}] ep {:?} recv ready: {}", ctx, eid, ready);
456 if ready {
457 self.protocol.on_recv_ready(ctx, eid)
458 } else {
459 self.protocol.on_recv_not_ready(ctx, eid)
460 }
461 }
462
463pub fn set_option(&mut self, _: &mut Context, opt: ConfigOption) {
470 let res = if opt.is_generic() {
471 self.config.set(opt)
472 } else {
473 self.protocol.set_option(opt)
474 };
475 let reply = match res {
476 Ok(()) => Reply::SetOption,
477 Err(e) => Reply::Err(e)
478 };
479
480 self.send_reply(reply);
481 }
482
483 pub fn on_timer_tick(&mut self, ctx: &mut Context, task: Schedulable) {
484 self.protocol.on_timer_tick(ctx, task)
485 }
486
487 pub fn on_device_plugged(&mut self, ctx: &mut Context) {
488 self.protocol.on_device_plugged(ctx)
489 }
490
491 pub fn close(&mut self, ctx: &mut Context) {
492 for (_, pipe) in self.pipes.drain() {
493 pipe.close(ctx);
494 }
495 for (_, acceptor) in self.acceptors.drain() {
496 acceptor.close(ctx);
497 }
498
499 self.protocol.close(ctx);
500
501 ctx.raise(Event::Closed);
502 }
503}
504
505#[cfg(test)]
512mod tests {
513 use std::fmt;
514 use std::rc::Rc;
515 use std::sync::mpsc;
516 use std::io;
517 use std::time::Duration;
518
519 use super::*;
520 use core::network;
521 use core::context::*;
522 use core::{SocketId, EndpointId, Message, EndpointTmpl, Scheduled};
523 use core::endpoint::Pipe;
524
525 struct TestProto;
526
527 impl Protocol for TestProto {
528 fn id(&self) -> u16 {0}
529 fn peer_id(&self) -> u16 {0}
530 fn add_pipe(&mut self, _: &mut Context, _: EndpointId, _: Pipe) {}
531 fn remove_pipe(&mut self, _: &mut Context, _: EndpointId) -> Option<Pipe> {None}
532 fn send(&mut self, _: &mut Context, _: Message, _: Option<Scheduled>) {}
533 fn on_send_ack(&mut self, _: &mut Context, _: EndpointId) {}
534 fn on_send_timeout(&mut self, _: &mut Context) {}
535 fn on_send_ready(&mut self, _: &mut Context, _: EndpointId) {}
536 fn on_send_not_ready(&mut self, _: &mut Context, _: EndpointId) {}
537 fn recv(&mut self, _: &mut Context, _: Option<Scheduled>) {}
538 fn on_recv_ack(&mut self, _: &mut Context, _: EndpointId, _: Message) {}
539 fn on_recv_timeout(&mut self, _: &mut Context) {}
540 fn on_recv_ready(&mut self, _: &mut Context, _: EndpointId) {}
541 fn on_recv_not_ready(&mut self, _: &mut Context, _: EndpointId) {}
542 fn is_send_ready(&self) -> bool { false }
543 fn is_recv_ready(&self) -> bool { false }
544 fn close(&mut self, _: &mut Context) {}
545 }
546
547 struct FailingNetwork;
548
549 impl network::Network for FailingNetwork {
550 fn connect(&mut self, _: SocketId, _: &EndpointTmpl) -> io::Result<EndpointId> {
551 Err(other_io_error("FailingNetwork can only fail"))
552 }
553 fn reconnect(&mut self, _: SocketId, _: EndpointId, _: &EndpointTmpl) -> io::Result<()> {
554 Err(other_io_error("FailingNetwork can only fail"))
555 }
556 fn bind(&mut self, _: SocketId, _: &EndpointTmpl) -> io::Result<EndpointId> {
557 Err(other_io_error("FailingNetwork can only fail"))
558 }
559 fn rebind(&mut self, _: SocketId, _: EndpointId, _: &EndpointTmpl) -> io::Result<()> {
560 Err(other_io_error("FailingNetwork can only fail"))
561 }
562 fn open(&mut self, _: EndpointId, _: bool) {
563 }
564 fn close(&mut self, _: EndpointId, _: bool) {
565 }
566 fn send(&mut self, _: EndpointId, _: Rc<Message>) {
567 }
568 fn recv(&mut self, _: EndpointId) {
569 }
570 }
571
572 impl Scheduler for FailingNetwork {
573 fn schedule(&mut self, _: Schedulable, _: Duration) -> io::Result<Scheduled> {
574 Err(other_io_error("FailingNetwork can only fail"))
575 }
576 fn cancel(&mut self, _: Scheduled){
577 }
578 }
579
580 impl fmt::Debug for FailingNetwork {
581 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
582 write!(f, "FailingNetwork")
583 }
584 }
585
586 impl Context for FailingNetwork {
587 fn raise(&mut self, _: Event) {
588 }
589 }
590
591 #[test]
592 fn when_connect_fails() {
593 let id = SocketId::from(1);
594 let (tx, rx) = mpsc::channel();
595 let proto = Box::new(TestProto) as Box<Protocol>;
596 let mut network = FailingNetwork;
597 let mut socket = Socket::new(id, tx, proto);
598
599 socket.connect(&mut network, String::from("test://fake"));
600
601 let reply = rx.recv().expect("Socket should have sent a reply to the connect request");
602
603 match reply {
604 Reply::Err(_) => {},
605 _ => {
606 assert!(false, "Socket should have replied an error to the connect request");
607 },
608 }
609 }
610
611 struct WorkingNetwork(EndpointId);
612
613 impl network::Network for WorkingNetwork {
614 fn connect(&mut self, _: SocketId, _: &EndpointTmpl) -> io::Result<EndpointId> {
615 Ok(self.0)
616 }
617 fn reconnect(&mut self, _: SocketId, _: EndpointId, _: &EndpointTmpl) -> io::Result<()> {
618 Ok(())
619 }
620 fn bind(&mut self, _: SocketId, _: &EndpointTmpl) -> io::Result<EndpointId> {
621 Ok(self.0)
622 }
623 fn rebind(&mut self, _: SocketId, _: EndpointId, _: &EndpointTmpl) -> io::Result<()> {
624 Ok(())
625 }
626 fn open(&mut self, _: EndpointId, _: bool) {}
627 fn close(&mut self, _: EndpointId, _: bool) {}
628 fn send(&mut self, _: EndpointId, _: Rc<Message>) {}
629 fn recv(&mut self, _: EndpointId) {}
630 }
631
632 impl Scheduler for WorkingNetwork {
633 fn schedule(&mut self, _: Schedulable, _: Duration) -> io::Result<Scheduled> {
634 Ok(Scheduled::from(0))
635 }
636 fn cancel(&mut self, _: Scheduled){
637 }
638 }
639
640 impl fmt::Debug for WorkingNetwork {
641 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
642 write!(f, "WorkingNetwork")
643 }
644 }
645
646 impl Context for WorkingNetwork {
647 fn raise(&mut self, _: Event) {
648
649 }
650 }
651
652 #[test]
653 fn when_connect_succeeds() {
654 let id = SocketId::from(1);
655 let (tx, rx) = mpsc::channel();
656 let proto = Box::new(TestProto) as Box<Protocol>;
657 let mut network = WorkingNetwork(EndpointId::from(1));
658 let mut socket = Socket::new(id, tx, proto);
659
660 socket.connect(&mut network, String::from("test://fake"));
661
662 let reply = rx.recv().expect("Socket should have sent a reply to the connect request");
663
664 match reply {
665 Reply::Connect(eid) => {
666 assert_eq!(EndpointId::from(1), eid);
667 },
668 _ => {
669 assert!(false, "Socket should have replied an ack to the connect request");
670 },
671 }
672 }
673}