scaproust/core/
socket.rs

1// Copyright (c) 2015-2017 Contributors as noted in the AUTHORS file.
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0>
4// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
5// This file may not be copied, modified, or distributed except according to those terms.
6
7use std::collections::HashMap;
8use std::sync::mpsc::Sender;
9use std::io;
10use std::time::Duration;
11
12use super::{BuildIdHasher, SocketId, EndpointId, Message, EndpointTmpl, EndpointSpec, EndpointDesc, Scheduled };
13use super::endpoint::{Pipe, Acceptor};
14use super::config::{Config, ConfigOption};
15use super::context::{Context, Schedulable, Event};
16use io_error::*;
17
18pub enum Request {
19    Connect(String),
20    Bind(String),
21    Send(Message, bool),
22    Recv(bool),
23    SetOption(ConfigOption),
24    Close
25}
26
27pub enum Reply {
28    Err(io::Error),
29    Connect(EndpointId),
30    Bind(EndpointId),
31    Send,
32    Recv(Message),
33    SetOption
34}
35
36pub struct Socket {
37    id: SocketId,
38    reply_sender: Sender<Reply>,
39    protocol: Box<Protocol>,
40    pipes: HashMap<EndpointId, Pipe, BuildIdHasher>,
41    acceptors: HashMap<EndpointId, Acceptor, BuildIdHasher>,
42    config: Config
43}
44
45/*****************************************************************************/
46/*                                                                           */
47/* Protocol                                                                  */
48/*                                                                           */
49/*****************************************************************************/
50
51pub trait Protocol {
52    fn id(&self) -> u16;
53    fn peer_id(&self) -> u16;
54
55    fn add_pipe(&mut self, ctx: &mut Context, eid: EndpointId, pipe: Pipe);
56    fn remove_pipe(&mut self, ctx: &mut Context, eid: EndpointId) -> Option<Pipe>;
57
58    fn send(&mut self, ctx: &mut Context, msg: Message, timeout: Option<Scheduled>);
59    fn on_send_ack(&mut self, ctx: &mut Context, eid: EndpointId);
60    fn on_send_timeout(&mut self, ctx: &mut Context);
61    fn on_send_ready(&mut self, ctx: &mut Context, eid: EndpointId);
62    fn on_send_not_ready(&mut self, ctx: &mut Context, eid: EndpointId);
63    
64    fn recv(&mut self, ctx: &mut Context, timeout: Option<Scheduled>);
65    fn on_recv_ack(&mut self, ctx: &mut Context, eid: EndpointId, msg: Message);
66    fn on_recv_timeout(&mut self, ctx: &mut Context);
67    fn on_recv_ready(&mut self, ctx: &mut Context, eid: EndpointId);
68    fn on_recv_not_ready(&mut self, ctx: &mut Context, eid: EndpointId);
69
70    fn is_send_ready(&self) -> bool;
71    fn is_recv_ready(&self) -> bool;
72
73    fn set_option(&mut self, _: ConfigOption) -> io::Result<()> {
74        Err(invalid_input_io_error("option not supported"))
75    }
76    fn on_timer_tick(&mut self, _: &mut Context, _: Schedulable) {
77    }
78    fn on_device_plugged(&mut self, _: &mut Context) {}
79    fn close(&mut self, ctx: &mut Context);
80}
81
82pub type ProtocolCtor = Box<Fn(Sender<Reply>) -> Box<Protocol> + Send>;
83
84/*****************************************************************************/
85/*                                                                           */
86/* Socket                                                                    */
87/*                                                                           */
88/*****************************************************************************/
89
90impl Socket {
91    pub fn new(id: SocketId, reply_tx: Sender<Reply>, proto: Box<Protocol>) -> Socket {
92        Socket {
93            id: id,
94            reply_sender: reply_tx,
95            protocol: proto,
96            pipes: HashMap::default(),
97            acceptors: HashMap::default(),
98            config: Config::default()
99        }
100    }
101
102    fn get_protocol_ids(&self) -> (u16, u16) {
103        let proto_id = self.protocol.id();
104        let peer_proto_id = self.protocol.peer_id();
105
106        (proto_id, peer_proto_id)
107    }
108
109    fn send_reply(&self, reply: Reply) {
110        let _ = self.reply_sender.send(reply);
111    }
112
113    pub fn poll(&self, ctx: &mut Context) {
114        ctx.raise(Event::CanRecv(self.protocol.is_recv_ready()));
115        ctx.raise(Event::CanSend(self.protocol.is_send_ready()));
116    }
117
118/*****************************************************************************/
119/*                                                                           */
120/* endpoint creation                                                         */
121/*                                                                           */
122/*****************************************************************************/
123
124    fn create_endpoint_desc(&self) -> EndpointDesc {
125        EndpointDesc {
126            send_priority: self.config.send_priority,
127            recv_priority: self.config.recv_priority,
128            tcp_no_delay: self.config.tcp_no_delay,
129            recv_max_size: self.config.recv_max_size
130        }
131    }
132
133    fn create_endpoint_spec(&self, url: String) -> EndpointSpec {
134        EndpointSpec {
135            url: url,
136            desc: self.create_endpoint_desc()
137        }
138    }
139
140    fn create_endpoint_tmpl(&self, url: String) -> EndpointTmpl {
141        EndpointTmpl {
142            pids: self.get_protocol_ids(),
143            spec: self.create_endpoint_spec(url)
144        }
145    }
146
147/*****************************************************************************/
148/*                                                                           */
149/* connect                                                                   */
150/*                                                                           */
151/*****************************************************************************/
152
153    pub fn connect(&mut self, ctx: &mut Context, url: String) {
154        let tmpl = self.create_endpoint_tmpl(url);
155
156        match ctx.connect(self.id, &tmpl) {
157            Ok(id) => self.on_connect_success(ctx, id, tmpl.spec),
158            Err(e) => self.on_connect_error(e)
159        };
160    }
161
162    fn on_connect_success(&mut self, ctx: &mut Context, eid: EndpointId, spec: EndpointSpec) {
163        let pipe = self.connect_pipe(eid, spec);
164
165        self.insert_pipe(ctx, eid, pipe);
166        self.send_reply(Reply::Connect(eid));
167    }
168
169    fn on_connect_error(&mut self, err: io::Error) {
170        self.send_reply(Reply::Err(err));
171    }
172
173    fn schedule_reconnect(&mut self, ctx: &mut Context, eid: EndpointId, spec: EndpointSpec) {
174        let task = Schedulable::Reconnect(eid, spec);
175        let delay = self.config.retry_ivl;
176        let _ = ctx.schedule(task, delay); 
177        // TODO maybe we should keep track of the scheduled reconnection
178        // In case the facade wants to close the ep somewhere between the error and the timeout
179    }
180
181    pub fn reconnect(&mut self, ctx: &mut Context, eid: EndpointId, spec: EndpointSpec) {
182        let pids = self.get_protocol_ids();
183        let tmpl = EndpointTmpl {
184            pids: pids,
185            spec: spec
186        };
187
188        match ctx.reconnect(self.id, eid, &tmpl) {
189            Ok(_)  => self.on_reconnect_success(ctx, eid, tmpl.spec),
190            Err(_) => self.on_reconnect_error(ctx, eid, tmpl.spec)
191        }
192    }
193
194    fn on_reconnect_success(&mut self, ctx: &mut Context, eid: EndpointId, spec: EndpointSpec) {
195        self.insert_pipe(ctx, eid, Pipe::from_spec(eid, spec));
196    }
197
198    fn on_reconnect_error(&mut self, ctx: &mut Context, eid: EndpointId, spec: EndpointSpec) {
199        self.schedule_reconnect(ctx, eid, spec);
200    }
201
202/*****************************************************************************/
203/*                                                                           */
204/* bind                                                                      */
205/*                                                                           */
206/*****************************************************************************/
207
208    pub fn bind(&mut self, ctx: &mut Context, url: String) {
209        let tmpl = self.create_endpoint_tmpl(url);
210
211        match ctx.bind(self.id, &tmpl) {
212            Ok(id) => self.on_bind_success(ctx, id, tmpl.spec),
213            Err(e) => self.on_bind_error(e)
214        };
215    }
216
217    fn on_bind_success(&mut self, ctx: &mut Context, eid: EndpointId, spec: EndpointSpec) {
218        let acceptor = self.connect_acceptor(eid, spec);
219
220        acceptor.open(ctx);
221
222        self.acceptors.insert(eid, acceptor);
223        self.send_reply(Reply::Bind(eid));
224    }
225
226    fn on_bind_error(&mut self, err: io::Error) {
227        self.send_reply(Reply::Err(err));
228    }
229
230    fn schedule_rebind(&mut self, ctx: &mut Context, eid: EndpointId, spec: EndpointSpec) {
231        let task = Schedulable::Rebind(eid, spec);
232        let delay = self.config.retry_ivl;
233        let _ = ctx.schedule(task, delay); 
234        // TODO maybe we should keep track of the scheduled reconnection
235        // In case the facade wants to close the ep somewhere between the error and the timeout
236    }
237
238    pub fn rebind(&mut self, ctx: &mut Context, eid: EndpointId, spec: EndpointSpec) {
239        let pids = self.get_protocol_ids();
240        let tmpl = EndpointTmpl {
241            pids: pids,
242            spec: spec
243        };
244
245        match ctx.rebind(self.id, eid, &tmpl) {
246            Ok(_)  => self.on_rebind_success(ctx, eid, tmpl.spec),
247            Err(_) => self.on_rebind_error(ctx, eid, tmpl.spec)
248        };
249    }
250
251    fn on_rebind_success(&mut self, ctx: &mut Context, eid: EndpointId, spec: EndpointSpec) {
252        let acceptor = Acceptor::from_spec(eid, spec);
253
254        self.insert_acceptor(ctx, eid, acceptor)
255    }
256
257    fn on_rebind_error(&mut self, ctx: &mut Context, eid: EndpointId, spec: EndpointSpec) {
258        self.schedule_rebind(ctx, eid, spec);
259    }
260
261/*****************************************************************************/
262/*                                                                           */
263/* pipe                                                                      */
264/*                                                                           */
265/*****************************************************************************/
266
267    pub fn on_pipe_opened(&mut self, ctx: &mut Context, eid: EndpointId) {
268        if let Some(pipe) = self.pipes.remove(&eid) {
269            self.protocol.add_pipe(ctx, eid, pipe);
270        }
271    }
272
273    pub fn on_pipe_accepted(&mut self, ctx: &mut Context, aid: EndpointId, eid: EndpointId) {
274        let pipe = self.accept_pipe(aid, eid);
275
276        self.insert_pipe(ctx, eid, pipe);
277    }
278
279    pub fn close_pipe(&mut self, ctx: &mut Context, eid: EndpointId) {
280        let _ = self.remove_pipe(ctx, eid);
281    }
282
283    pub fn on_pipe_error(&mut self, ctx: &mut Context, eid: EndpointId, _: io::Error) {
284        if let Some(spec) = self.remove_pipe(ctx, eid) {
285            self.schedule_reconnect(ctx, eid, spec);
286        }
287    }
288
289    fn insert_pipe(&mut self, ctx: &mut Context, eid: EndpointId, pipe: Pipe) {
290        pipe.open(ctx);
291
292        self.pipes.insert(eid, pipe);
293    }
294
295    fn remove_pipe(&mut self, ctx: &mut Context, eid: EndpointId) -> Option<EndpointSpec> {
296        if let Some(pipe) = self.pipes.remove(&eid) {
297            return pipe.close(ctx)
298        }
299        if let Some(pipe) = self.protocol.remove_pipe(ctx, eid) {
300            return pipe.close(ctx)
301        }
302        None
303    }
304
305    fn connect_pipe(&self, eid: EndpointId, spec: EndpointSpec) -> Pipe {
306        Pipe::from_spec(eid, spec)
307    }
308
309    fn accept_pipe(&self, aid: EndpointId, eid: EndpointId) -> Pipe {
310        let (send_prio, recv_prio) = if let Some(acceptor) = self.acceptors.get(&aid) {
311            (acceptor.get_send_priority(), acceptor.get_recv_priority())
312        } else {
313            (self.config.send_priority, self.config.recv_priority)
314        };
315        let desc = EndpointDesc {
316            send_priority: send_prio,
317            recv_priority: recv_prio,
318            tcp_no_delay: self.config.tcp_no_delay,
319            recv_max_size: self.config.recv_max_size
320        };
321
322        Pipe::new_accepted(eid, desc)
323    }
324
325/*****************************************************************************/
326/*                                                                           */
327/* acceptor                                                                  */
328/*                                                                           */
329/*****************************************************************************/
330
331    pub fn on_acceptor_error(&mut self, ctx: &mut Context, eid: EndpointId, _: io::Error) {
332        if let Some(spec) = self.remove_acceptor(ctx, eid) {
333            self.schedule_rebind(ctx, eid, spec);
334        }
335    }
336
337    pub fn close_acceptor(&mut self, ctx: &mut Context, eid: EndpointId) {
338        let _ = self.remove_acceptor(ctx, eid);
339    }
340
341    fn insert_acceptor(&mut self, ctx: &mut Context, eid: EndpointId, acceptor: Acceptor) {
342        acceptor.open(ctx);
343
344        self.acceptors.insert(eid, acceptor);
345    }
346
347    fn remove_acceptor(&mut self, ctx: &mut Context, eid: EndpointId) -> Option<EndpointSpec> {
348        self.acceptors.remove(&eid).map_or(None, |acceptor| acceptor.close(ctx))
349    }
350
351    fn connect_acceptor(&self, eid: EndpointId, spec: EndpointSpec) -> Acceptor {
352        Acceptor::from_spec(eid, spec)
353    }
354
355/*****************************************************************************/
356/*                                                                           */
357/* send                                                                      */
358/*                                                                           */
359/*****************************************************************************/
360
361    pub fn send(&mut self, ctx: &mut Context, msg: Message) {
362        #[cfg(debug_assertions)] debug!("[{:?}] send", ctx);
363        if let Some(delay) = self.get_send_timeout() {
364            let task = Schedulable::SendTimeout;
365
366            match ctx.schedule(task, delay) {
367                Ok(timeout) => self.protocol.send(ctx, msg, Some(timeout)),
368                Err(e) => self.send_reply(Reply::Err(e))
369            }
370        } else {
371            self.protocol.send(ctx, msg, None);
372        }
373    }
374
375    pub fn try_send(&mut self, ctx: &mut Context, msg: Message) {
376        #[cfg(debug_assertions)] debug!("[{:?}] try_send", ctx);
377        if self.protocol.is_send_ready() {
378            self.protocol.send(ctx, msg, None);
379        } else {
380            let err = would_block_io_error("socket is not send ready");
381
382            self.send_reply(Reply::Err(err));
383        }
384    }
385
386    pub fn on_send_ack(&mut self, ctx: &mut Context, eid: EndpointId) {
387        #[cfg(debug_assertions)] debug!("[{:?}] send ack from ep {:?}", ctx, eid);
388        self.protocol.on_send_ack(ctx, eid);
389    }
390
391    pub fn on_send_timeout(&mut self, ctx: &mut Context) {
392        #[cfg(debug_assertions)] debug!("[{:?}] send timeout", ctx);
393        self.protocol.on_send_timeout(ctx);
394    }
395
396    fn get_send_timeout(&self) -> Option<Duration> {
397        self.config.send_timeout
398    }
399
400    pub fn on_send_ready(&mut self, ctx: &mut Context, eid: EndpointId, ready: bool) {
401        #[cfg(debug_assertions)] debug!("[{:?}] ep {:?} send ready: {} ", ctx, eid, ready);
402        if ready {
403            self.protocol.on_send_ready(ctx, eid)
404        } else {
405            self.protocol.on_send_not_ready(ctx, eid)
406        }
407    }
408
409/*****************************************************************************/
410/*                                                                           */
411/* recv                                                                      */
412/*                                                                           */
413/*****************************************************************************/
414
415    pub fn recv(&mut self, ctx: &mut Context) {
416        #[cfg(debug_assertions)] debug!("[{:?}] recv", ctx);
417        if let Some(delay) = self.get_recv_timeout() {
418            let task = Schedulable::RecvTimeout;
419
420            match ctx.schedule(task, delay) {
421                Ok(timeout) => self.protocol.recv(ctx, Some(timeout)),
422                Err(e) => self.send_reply(Reply::Err(e))
423            }
424        } else {
425            self.protocol.recv(ctx, None);
426        }
427    }
428
429    pub fn try_recv(&mut self, ctx: &mut Context) {
430        #[cfg(debug_assertions)] debug!("[{:?}] try_recv", ctx);
431        if self.protocol.is_recv_ready() {
432            self.protocol.recv(ctx, None);
433        } else {
434            let err = would_block_io_error("socket is not recv ready");
435            
436            self.send_reply(Reply::Err(err));
437        }
438    }
439
440    pub fn on_recv_ack(&mut self, ctx: &mut Context, eid: EndpointId, msg: Message) {
441        #[cfg(debug_assertions)] debug!("[{:?}] recv ack from ep {:?}", ctx, eid);
442        self.protocol.on_recv_ack(ctx, eid, msg);
443    }
444
445    pub fn on_recv_timeout(&mut self, ctx: &mut Context) {
446        #[cfg(debug_assertions)] debug!("[{:?}] recv timeout", ctx);
447        self.protocol.on_recv_timeout(ctx);
448    }
449
450    fn get_recv_timeout(&self) -> Option<Duration> {
451        self.config.recv_timeout
452    }
453
454    pub fn on_recv_ready(&mut self, ctx: &mut Context, eid: EndpointId, ready: bool) {
455        #[cfg(debug_assertions)] debug!("[{:?}] ep {:?} recv ready: {}", ctx, eid, ready);
456        if ready {
457            self.protocol.on_recv_ready(ctx, eid)
458        } else {
459            self.protocol.on_recv_not_ready(ctx, eid)
460        }
461    }
462
463/*****************************************************************************/
464/*                                                                           */
465/* options                                                                   */
466/*                                                                           */
467/*****************************************************************************/
468
469    pub fn set_option(&mut self, _: &mut Context, opt: ConfigOption) {
470        let res = if opt.is_generic() {
471            self.config.set(opt)
472        } else {
473            self.protocol.set_option(opt)
474        };
475        let reply = match res {
476            Ok(()) => Reply::SetOption,
477            Err(e) => Reply::Err(e)
478        };
479
480        self.send_reply(reply);
481    }
482
483    pub fn on_timer_tick(&mut self, ctx: &mut Context, task: Schedulable) {
484        self.protocol.on_timer_tick(ctx, task)
485    }
486
487    pub fn on_device_plugged(&mut self, ctx: &mut Context) {
488        self.protocol.on_device_plugged(ctx)
489    }
490
491    pub fn close(&mut self, ctx: &mut Context) {
492        for (_, pipe) in self.pipes.drain() {
493            pipe.close(ctx);
494        }
495        for (_, acceptor) in self.acceptors.drain() {
496            acceptor.close(ctx);
497        }
498
499        self.protocol.close(ctx);
500
501        ctx.raise(Event::Closed);
502    }
503}
504
505/*****************************************************************************/
506/*                                                                           */
507/* tests                                                                     */
508/*                                                                           */
509/*****************************************************************************/
510
511#[cfg(test)]
512mod tests {
513    use std::fmt;
514    use std::rc::Rc;
515    use std::sync::mpsc;
516    use std::io;
517    use std::time::Duration;
518
519    use super::*;
520    use core::network;
521    use core::context::*;
522    use core::{SocketId, EndpointId, Message, EndpointTmpl, Scheduled};
523    use core::endpoint::Pipe;
524
525    struct TestProto;
526
527    impl Protocol for TestProto {
528        fn id(&self) -> u16 {0}
529        fn peer_id(&self) -> u16 {0}
530        fn add_pipe(&mut self, _: &mut Context, _: EndpointId, _: Pipe) {}
531        fn remove_pipe(&mut self, _: &mut Context, _: EndpointId) -> Option<Pipe> {None}
532        fn send(&mut self, _: &mut Context, _: Message, _: Option<Scheduled>) {}
533        fn on_send_ack(&mut self, _: &mut Context, _: EndpointId) {}
534        fn on_send_timeout(&mut self, _: &mut Context) {}
535        fn on_send_ready(&mut self, _: &mut Context, _: EndpointId) {}
536        fn on_send_not_ready(&mut self, _: &mut Context, _: EndpointId) {}
537        fn recv(&mut self, _: &mut Context, _: Option<Scheduled>) {}
538        fn on_recv_ack(&mut self, _: &mut Context, _: EndpointId, _: Message) {}
539        fn on_recv_timeout(&mut self, _: &mut Context) {}
540        fn on_recv_ready(&mut self, _: &mut Context, _: EndpointId) {}
541        fn on_recv_not_ready(&mut self, _: &mut Context, _: EndpointId) {}
542        fn is_send_ready(&self) -> bool { false }
543        fn is_recv_ready(&self) -> bool { false }
544        fn close(&mut self, _: &mut Context) {}
545    }
546
547    struct FailingNetwork;
548
549    impl network::Network for FailingNetwork {
550        fn connect(&mut self, _: SocketId, _: &EndpointTmpl) -> io::Result<EndpointId> {
551            Err(other_io_error("FailingNetwork can only fail"))
552        }
553        fn reconnect(&mut self, _: SocketId, _: EndpointId, _: &EndpointTmpl) -> io::Result<()> {
554            Err(other_io_error("FailingNetwork can only fail"))
555        }
556        fn bind(&mut self, _: SocketId, _: &EndpointTmpl) -> io::Result<EndpointId> {
557            Err(other_io_error("FailingNetwork can only fail"))
558        }
559        fn rebind(&mut self, _: SocketId, _: EndpointId, _: &EndpointTmpl) -> io::Result<()> {
560            Err(other_io_error("FailingNetwork can only fail"))
561        }
562        fn open(&mut self, _: EndpointId, _: bool) {
563        }
564        fn close(&mut self, _: EndpointId, _: bool) {
565        }
566        fn send(&mut self, _: EndpointId, _: Rc<Message>) {
567        }
568        fn recv(&mut self, _: EndpointId) {
569        }
570    }
571
572    impl Scheduler for FailingNetwork {
573        fn schedule(&mut self, _: Schedulable, _: Duration) -> io::Result<Scheduled> {
574            Err(other_io_error("FailingNetwork can only fail"))
575        }
576        fn cancel(&mut self, _: Scheduled){
577        }
578    }
579
580    impl fmt::Debug for FailingNetwork {
581        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
582            write!(f, "FailingNetwork")
583        }
584    }
585
586    impl Context for FailingNetwork {
587        fn raise(&mut self, _: Event) {
588        }
589    }
590
591    #[test]
592    fn when_connect_fails() {
593        let id = SocketId::from(1);
594        let (tx, rx) = mpsc::channel();
595        let proto = Box::new(TestProto) as Box<Protocol>;
596        let mut network = FailingNetwork;
597        let mut socket = Socket::new(id, tx, proto);
598
599        socket.connect(&mut network, String::from("test://fake"));
600
601        let reply = rx.recv().expect("Socket should have sent a reply to the connect request");
602
603        match reply {
604            Reply::Err(_) => {},
605            _ => {
606                assert!(false, "Socket should have replied an error to the connect request");
607            },
608        }
609    }
610
611    struct WorkingNetwork(EndpointId);
612
613    impl network::Network for WorkingNetwork {
614        fn connect(&mut self, _: SocketId, _: &EndpointTmpl) -> io::Result<EndpointId> {
615            Ok(self.0)
616        }
617        fn reconnect(&mut self, _: SocketId, _: EndpointId, _: &EndpointTmpl) -> io::Result<()> {
618            Ok(())
619        }
620        fn bind(&mut self, _: SocketId, _: &EndpointTmpl) -> io::Result<EndpointId> {
621            Ok(self.0)
622        }
623        fn rebind(&mut self, _: SocketId, _: EndpointId, _: &EndpointTmpl) -> io::Result<()> {
624            Ok(())
625        }
626        fn open(&mut self, _: EndpointId, _: bool) {}
627        fn close(&mut self, _: EndpointId, _: bool) {}
628        fn send(&mut self, _: EndpointId, _: Rc<Message>) {}
629        fn recv(&mut self, _: EndpointId) {}
630    }
631
632    impl Scheduler for WorkingNetwork {
633        fn schedule(&mut self, _: Schedulable, _: Duration) -> io::Result<Scheduled> {
634            Ok(Scheduled::from(0))
635        }
636        fn cancel(&mut self, _: Scheduled){
637        }
638    }
639
640    impl fmt::Debug for WorkingNetwork {
641        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
642            write!(f, "WorkingNetwork")
643        }
644    }
645
646    impl Context for WorkingNetwork {
647        fn raise(&mut self, _: Event) {
648
649        }
650    }
651
652    #[test]
653    fn when_connect_succeeds() {
654        let id = SocketId::from(1);
655        let (tx, rx) = mpsc::channel();
656        let proto = Box::new(TestProto) as Box<Protocol>;
657        let mut network = WorkingNetwork(EndpointId::from(1));
658        let mut socket = Socket::new(id, tx, proto);
659
660        socket.connect(&mut network, String::from("test://fake"));
661
662        let reply = rx.recv().expect("Socket should have sent a reply to the connect request");
663
664        match reply {
665            Reply::Connect(eid) => {
666                assert_eq!(EndpointId::from(1), eid);
667            },
668            _ => {
669                assert!(false, "Socket should have replied an ack to the connect request");
670            },
671        }
672    }
673}