sse_client/
network.rs

1use std::io::prelude::*;
2use std::io::Error;
3use std::net::{Shutdown, TcpStream};
4use url::Url;
5use std::thread;
6use std::sync::Arc;
7use std::sync::Mutex;
8use std::io::BufReader;
9use std::time::Duration;
10#[cfg(feature = "native-tls")]
11use crate::tls::MaybeTlsStream;
12
13#[cfg(feature = "native-tls")]
14type InnerStream = MaybeTlsStream;
15#[cfg(not(feature = "native-tls"))]
16type InnerStream = TcpStream;
17
18type Callback = Arc<Mutex<Option<Box<dyn Fn(String) + Send>>>>;
19type CallbackNoArgs = Arc<Mutex<Option<Box<dyn Fn() + Send>>>>;
20type StreamWrapper = Arc<Mutex<Option<TcpStream>>>;
21type StateWrapper = Arc<Mutex<State>>;
22type LastIdWrapper = Arc<Mutex<Option<String>>>;
23
24const INITIAL_RECONNECTION_TIME_IN_MS: u64 = 500;
25
26/// Client state
27#[derive(Debug, PartialEq, Clone)]
28pub enum State {
29    /// State when trying to connect or reconnect to stream.
30    Connecting,
31    /// Stream is open.
32    Open,
33    /// State when connection is closed and client won't try to reconnect.
34    Closed
35}
36
37#[derive(Debug, PartialEq)]
38enum StreamAction {
39    Reconnect(String),
40    Close(String),
41    Move(Url),
42    MovePermanently(Url)
43}
44
45pub struct EventStream {
46    url: Arc<Url>,
47    stream: StreamWrapper,
48    state: StateWrapper,
49    on_open_listener: CallbackNoArgs,
50    on_message_listener: Callback,
51    on_error_listener: Callback,
52    last_event_id: LastIdWrapper
53}
54
55impl EventStream {
56    pub fn new(url: Url) -> Result<EventStream, Error> {
57        let event_stream = EventStream {
58            url: Arc::new(url),
59            stream: Arc::new(Mutex::new(None)),
60            state: Arc::new(Mutex::new(State::Connecting)),
61            on_open_listener: Arc::new(Mutex::new(None)),
62            on_message_listener: Arc::new(Mutex::new(None)),
63            on_error_listener: Arc::new(Mutex::new(None)),
64            last_event_id: Arc::new(Mutex::new(None))
65        };
66
67        event_stream.listen();
68
69        Ok(event_stream)
70    }
71
72    fn listen(&self) {
73        listen_stream(
74            Arc::clone(&self.url),
75            Arc::clone(&self.url),
76            Arc::clone(&self.stream),
77            Arc::clone(&self.state),
78            Arc::clone(&self.on_open_listener),
79            Arc::clone(&self.on_message_listener),
80            Arc::clone(&self.on_error_listener),
81            Arc::new(Mutex::new(0)),
82            Arc::clone(&self.last_event_id)
83        );
84    }
85
86    pub fn close(&self) {
87        let mut state = self.state.lock().unwrap();
88        *state = State::Closed;
89        if let Some(ref st) = *self.stream.lock().unwrap() {
90            st.shutdown(Shutdown::Both).unwrap();
91        }
92    }
93
94    pub fn on_open<F>(&mut self, listener: F) where F: Fn() + Send + 'static {
95        let mut on_open_listener = self.on_open_listener.lock().unwrap();
96        *on_open_listener = Some(Box::new(listener));
97    }
98
99    pub fn on_message<F>(&mut self, listener: F) where F: Fn(String) + Send + 'static {
100        let mut on_message_listener = self.on_message_listener.lock().unwrap();
101        *on_message_listener = Some(Box::new(listener));
102    }
103
104    pub fn on_error<F>(&mut self, listener: F) where F: Fn(String) + Send + 'static {
105        let mut on_error_listener = self.on_error_listener.lock().unwrap();
106        *on_error_listener = Some(Box::new(listener));
107    }
108
109    pub fn state(&self) -> State {
110        let state = &self.state.lock().unwrap();
111        (*state).clone()
112    }
113
114    pub fn set_last_id(&self, id: String) {
115        let mut last_id = self.last_event_id.lock().unwrap();
116        *last_id = Some(id);
117    }
118}
119
120fn listen_stream(
121    url: Arc<Url>,
122    connection_url: Arc<Url>,
123    stream: StreamWrapper,
124    state: StateWrapper,
125    on_open: CallbackNoArgs,
126    on_message: Callback,
127    on_error: Callback,
128    failed_attempts: Arc<Mutex<u32>>,
129    last_event_id: LastIdWrapper
130) {
131    thread::spawn(move || {
132        let action = match connect_event_stream(&connection_url, &stream, &last_event_id) {
133            Ok(stream) => read_stream(stream, &state, &on_open, &on_message, &failed_attempts),
134            Err(error) => Err(StreamAction::Reconnect(error.to_string()))
135        };
136
137        if let Err(stream_action) = action  {
138            match stream_action {
139                StreamAction::Reconnect(ref error) => {
140                    let mut state_lock = state.lock().unwrap();
141                    *state_lock = State::Connecting;
142                    handle_error(error.to_string(),  &on_error);
143                    reconnect_stream(url, stream, Arc::clone(&state), on_open, on_message, on_error, failed_attempts, last_event_id);
144                },
145                StreamAction::Close(ref error) => {
146                    let mut state_lock = state.lock().unwrap();
147                    *state_lock = State::Closed;
148                    handle_error(error.to_string(), &on_error);
149                },
150                StreamAction::Move(redirect_url) => {
151                    let mut state_lock = state.lock().unwrap();
152                    *state_lock = State::Connecting;
153
154                    listen_stream(url, Arc::new(redirect_url), stream, Arc::clone(&state), on_open, on_message, on_error, failed_attempts, last_event_id);
155                },
156                StreamAction::MovePermanently(redirect_url) => {
157                    let mut state_lock = state.lock().unwrap();
158                    *state_lock = State::Connecting;
159
160                    listen_stream(Arc::new(redirect_url.clone()), Arc::new(redirect_url), stream, Arc::clone(&state), on_open, on_message, on_error, failed_attempts, last_event_id);
161                }
162            };
163        }
164    });
165}
166
167fn connect_event_stream(url: &Url, stream: &StreamWrapper, last_event_id: &LastIdWrapper) -> Result<InnerStream, Error> {
168    let connection_stream = event_stream_handshake(url, last_event_id)?;
169    let mut stream = stream.lock().unwrap();
170
171    #[cfg(feature = "native-tls")]
172    { *stream = Some(connection_stream.clone_plain_handle().unwrap()); }
173
174    #[cfg(not(feature = "native-tls"))]
175    { *stream = Some(connection_stream.try_clone().unwrap()); }
176
177    Ok(connection_stream)
178}
179
180fn event_stream_handshake(url: &Url, last_event_id: &LastIdWrapper) -> Result<InnerStream, Error> {
181    let host = get_host(&url);
182    let host = host.as_str();
183
184    #[cfg(feature = "native-tls")]
185    let mut stream = {
186        let plain = TcpStream::connect(host)?;
187        if url.scheme() == "https" {
188            MaybeTlsStream::try_wrap_tls(plain, url.host_str().unwrap_or("localhost"))?
189        } else {
190            MaybeTlsStream::wrap_plain(plain)
191        }
192    };
193
194    #[cfg(not(feature = "native-tls"))]
195    let mut stream = TcpStream::connect(host)?;
196
197    stream.set_read_timeout(Some(Duration::from_millis(60000)))?;
198
199    let extra_headers = match *(last_event_id.lock().unwrap()) {
200        Some(ref last_id) => format!("Last-Event-ID: {}\r\n", last_id),
201        None => String::from("")
202    };
203
204    let request = format!(
205        "GET {} HTTP/1.1\r\nAccept: text/event-stream\r\nHost: {}\r\n{}\r\n",
206        get_path_with_query_params(url),
207        host,
208        extra_headers
209    );
210
211    stream.write(request.as_bytes())?;
212    stream.flush()?;
213
214    Ok(stream)
215}
216
217fn get_host(url: &Url) -> String {
218    let mut host = match url.host_str() {
219        Some(h) => String::from(h),
220        None => String::from("localhost")
221    };
222
223    if let Some(port) = url.port_or_known_default() {
224        host = format!("{}:{}", host, port);
225    }
226
227    host
228}
229
230fn get_path_with_query_params(url: &Url) -> String {
231    match url.query() {
232        Some(query) => format!("{}?{}", url.path(), query),
233        None => url.path().to_owned()
234    }
235}
236
237fn read_stream(
238    connection_stream: InnerStream,
239    state: &StateWrapper,
240    on_open: &CallbackNoArgs,
241    on_message: &Callback,
242    failed_attempts: &Arc<Mutex<u32>>
243) -> Result<(), StreamAction> {
244    let mut reader = BufReader::new(connection_stream);
245
246    let mut request_header = String::new();
247    reader.read_line(&mut request_header).unwrap();
248
249    let status_code = validate_status_code(request_header)?;
250
251    for line in reader.lines() {
252        let mut state = state.lock().unwrap();
253
254        let line = line.map_err(|error| {
255            StreamAction::Reconnect(error.to_string())
256        })?;
257
258        match *state {
259            State::Connecting => handle_headers(line.clone(), &mut state, &on_open, status_code, failed_attempts)?,
260            _ => handle_messages(line.clone(), &on_message)
261        }
262    }
263
264    match *(state.lock().unwrap()) {
265        State::Closed => Ok(()),
266        _ => Err(StreamAction::Reconnect(String::from("connection closed by server")))
267    }
268}
269
270fn handle_headers(
271    line: String,
272    state: &mut State,
273    on_open: &CallbackNoArgs,
274    status_code: i32,
275    failed_attempts: &Arc<Mutex<u32>>
276) -> Result<(), StreamAction> {
277    if line == "" {
278        handle_open_connection(state, on_open, failed_attempts)
279    } else if line.starts_with("Content-Type") {
280        validate_content_type(line)
281    } else if line.starts_with("Location:") {
282        handle_new_location(line, status_code)
283    } else {
284        Ok(())
285    }
286}
287
288fn handle_open_connection(state: &mut State, on_open: &CallbackNoArgs, failed_attempts: &Arc<Mutex<u32>>) -> Result<(), StreamAction> {
289    *state = State::Open;
290    let on_open = on_open.lock().unwrap();
291    if let Some(ref f) = *on_open {
292        f();
293    }
294    let mut failed_attempts = failed_attempts.lock().unwrap();
295    *failed_attempts = 0;
296    Ok(())
297}
298
299fn validate_content_type(line: String) -> Result<(), StreamAction> {
300    if line.contains("text/event-stream") {
301        Ok(())
302    } else {
303        Err(StreamAction::Close(String::from("Wrong Content-Type")))
304    }
305}
306
307fn validate_status_code(line: String) -> Result<i32, StreamAction> {
308    let status = &line[9..].trim_end();
309    let status_code: i32 = status[..3].parse().unwrap();
310
311    match status_code {
312        200 | 301 | 302 | 303 | 307 => Ok(status_code),
313        204 => Err(StreamAction::Close(status.to_string())),
314        201 ..= 203 | 205 ..= 299 => Err(StreamAction::Reconnect(status.to_string())),
315        _ => Err(StreamAction::Close(status.to_string()))
316    }
317}
318
319fn handle_new_location(line: String, status_code: i32) -> Result<(), StreamAction> {
320    let location = &line[10..];
321
322    match status_code {
323        301 => Err(StreamAction::MovePermanently(Url::parse(location).unwrap())),
324        302 | 303 | 307 => Err(StreamAction::Move(Url::parse(location).unwrap())),
325        _ => Ok(())
326    }
327}
328
329fn handle_messages(line: String, on_message: &Callback) {
330    let on_message = on_message.lock().unwrap();
331    if let Some(ref f) = *on_message {
332        f(line);
333    }
334}
335
336fn handle_error(message: String, on_error: &Callback) {
337    let on_error = on_error.lock().unwrap();
338    if let Some(ref f) = *on_error {
339        f(message);
340    }
341}
342
343fn reconnect_stream(
344    url: Arc<Url>,
345    stream: StreamWrapper,
346    state: StateWrapper,
347    on_open: CallbackNoArgs,
348    on_message: Callback,
349    on_error: Callback,
350    failed_attempts: Arc<Mutex<u32>>,
351    last_event_id: LastIdWrapper
352) {
353    let mut attempts = failed_attempts.lock().unwrap();
354    let base: u64 = 2;
355    let reconnection_time = INITIAL_RECONNECTION_TIME_IN_MS + (15 * (base.pow(*attempts) - 1));
356    *attempts += 1;
357
358    thread::sleep(Duration::from_millis(reconnection_time));
359    listen_stream(url.clone(), url, stream, Arc::clone(&state), on_open, on_message, on_error, Arc::clone(&failed_attempts), last_event_id);
360}
361
362
363#[cfg(test)]
364mod tests {
365    use super::*;
366    use std::sync::mpsc;
367    use std::time::Duration;
368    use http_test_server::{ TestServer, Resource };
369    use http_test_server::http::Status;
370
371    fn setup() -> (TestServer, Resource, Url) {
372        let server = TestServer::new().unwrap();
373        let resource = server.create_resource("/sub");
374        resource.header("Content-Type", "text/event-stream").stream();
375        let address = format!("http://localhost:{}/sub", server.port());
376        let url = Url::parse(address.as_str()).unwrap();
377        (server, resource, url)
378    }
379
380
381    #[test]
382    fn should_create_stream_object() {
383        let (_server, _stream_endpoint, address) = setup();
384        let event_stream = EventStream::new(address).unwrap();
385        event_stream.close();
386    }
387
388    #[test]
389    fn should_trigger_on_open_listener() {
390        let (tx, rx) = mpsc::channel();
391
392        let (_server, stream_endpoint, address) = setup();
393        stream_endpoint.send("Date: Thu, 24 May 2018 12:26:38 GMT\n");
394
395        let mut event_stream = EventStream::new(address).unwrap();
396
397        event_stream.on_open(move || {
398            tx.send("open").unwrap();
399        });
400
401        let _ = rx.recv().unwrap();
402
403        event_stream.close();
404    }
405
406    #[test]
407    fn should_have_status_connecting_while_opening_connection() {
408        let (_server, _stream_endpoint, address) = setup();
409        let event_stream = EventStream::new(address).unwrap();
410
411        let state = event_stream.state();
412        assert_eq!(state, State::Connecting);
413
414        event_stream.close();
415    }
416
417    #[test]
418    fn should_have_status_open_after_connection_stabilished() {
419        let (_server, _stream_endpoint, address) = setup();
420        let event_stream = EventStream::new(address).unwrap();
421
422        thread::sleep(Duration::from_millis(100));
423        let state = event_stream.state();
424        assert_eq!(state, State::Open);
425
426        event_stream.close();
427    }
428
429    #[test]
430    fn should_have_status_closed_after_closing_connection() {
431        let (_server, _stream_endpoint, address) = setup();
432        let event_stream = EventStream::new(address).unwrap();
433
434        event_stream.close();
435
436        let state = event_stream.state();
437        assert_eq!(state, State::Closed);
438    }
439
440    #[test]
441    fn should_trigger_listeners_when_message_received() {
442        let (tx, rx) = mpsc::channel();
443        let (_server, stream_endpoint, address) = setup();
444        let mut event_stream = EventStream::new(address).unwrap();
445
446        event_stream.on_message(move |message| {
447            tx.send(message).unwrap();
448        });
449
450        while event_stream.state() != State::Open {
451            thread::sleep(Duration::from_millis(200));
452        }
453
454        stream_endpoint.send("data: some message\n\n");
455
456        let message = rx.recv().unwrap();
457
458        assert_eq!(message, "data: some message");
459
460        event_stream.close();
461    }
462
463    #[test]
464    fn should_trigger_on_error_when_connection_closed_by_server() {
465        let (tx, rx) = mpsc::channel();
466        let (_server, stream_endpoint, address) = setup();
467        let mut event_stream = EventStream::new(address).unwrap();
468
469        event_stream.on_error(move |message| {
470            tx.send(message).unwrap();
471        });
472
473        while event_stream.state() != State::Open {
474            thread::sleep(Duration::from_millis(100));
475        };
476
477        stream_endpoint.close_open_connections();
478
479        let message = rx.recv().unwrap();
480
481        assert!(message.contains("connection closed by server"));
482    }
483
484    #[test]
485    fn should_trigger_error_when_status_code_is_not_success() {
486        let (tx, rx) = mpsc::channel();
487        let (_server, stream_endpoint, address) = setup();
488        stream_endpoint.status(Status::InternalServerError);
489        let mut event_stream = EventStream::new(address).unwrap();
490
491        event_stream.on_error(move |message| {
492            tx.send(message).unwrap();
493        });
494
495        let message = rx.recv().unwrap();
496
497        assert_eq!(message, "500 Internal Server Error");
498
499        event_stream.close();
500    }
501
502    #[test]
503    fn should_reconnect_when_connection_closed_by_server() {
504        let (tx, rx) = mpsc::channel();
505        let (_server, stream_endpoint, address) = setup();
506        let mut event_stream = EventStream::new(address).unwrap();
507
508        event_stream.on_message(move |message| {
509            tx.send(message).unwrap();
510        });
511
512        stream_endpoint.close_open_connections();
513
514        while event_stream.state() != State::Open {
515            thread::sleep(Duration::from_millis(100));
516        }
517
518        stream_endpoint.send("data: some message\n\n");
519
520        assert_eq!(rx.recv().unwrap(), "data: some message");
521
522        event_stream.close();
523    }
524
525    #[test]
526    fn should_trigger_error_when_first_connection_fails() {
527        let url = Url::parse("http://localhost:7777/sub").unwrap();
528        let mut event_stream = EventStream::new(url).unwrap();
529
530        let (tx, rx) = mpsc::channel();
531
532        event_stream.on_error(move |message| {
533            tx.send(message).unwrap();
534        });
535
536        let message = rx.recv().unwrap();
537        assert!(message.contains("Connection refused"));
538
539        event_stream.close();
540    }
541
542    #[test]
543    fn should_reset_connection_on_status_205() {
544        let (server, stream_endpoint, address) = setup();
545        stream_endpoint.status(Status::ResetContent);
546
547        let event_stream = EventStream::new(address).unwrap();
548
549        server.requests().recv().unwrap();
550        server.requests().recv().unwrap();
551
552        assert!(stream_endpoint.request_count() >= 2);
553
554        event_stream.close();
555    }
556
557    #[test]
558    fn should_close_connection_when_content_type_is_not_event_stream() {
559        let (error_tx, error_rx) = mpsc::channel();
560        let (_server, stream_endpoint, address) = setup();
561        stream_endpoint.header("Content-Type", "application/json");
562        let mut event_stream = EventStream::new(address).unwrap();
563
564        event_stream.on_error(move |message| {
565            error_tx.send(message).unwrap();
566        });
567
568
569        let message = error_rx.recv().unwrap();
570        assert_eq!(message, "Wrong Content-Type");
571
572        let state = event_stream.state();
573        assert_eq!(state, State::Closed);
574    }
575
576    #[test]
577    fn should_reconnect_when_status_in_range_2xx_but_not_200() {
578        let (server, stream_endpoint, address) = setup();
579        stream_endpoint.status(Status::Accepted);
580        let event_stream = EventStream::new(address).unwrap();
581
582        server.requests().recv().unwrap();
583        server.requests().recv().unwrap();
584
585        assert!(stream_endpoint.request_count() >= 2);
586
587        event_stream.close();
588    }
589
590    #[test]
591    fn should_close_connection_when_returned_any_status_not_handled_in_previous_scenarios() {
592        let (error_tx, error_rx) = mpsc::channel();
593        let (_server, stream_endpoint, address) = setup();
594        stream_endpoint.status(Status::InternalServerError);
595        let mut event_stream = EventStream::new(address).unwrap();
596
597        event_stream.on_error(move |message| {
598            error_tx.send(message).unwrap();
599        });
600
601        let message = error_rx.recv().unwrap();
602        assert_eq!(message, "500 Internal Server Error");
603
604        let state = event_stream.state();
605        assert_eq!(state, State::Closed);
606    }
607
608    #[test]
609    fn should_connect_to_provided_host_when_status_302() {
610        let (_server, stream_endpoint, address) = setup();
611        let (_server2, stream_endpoint2, address2) = setup();
612        stream_endpoint
613            .status(Status::Found)
614            .header("Location", address2.as_str());
615
616        let event_stream = EventStream::new(address).unwrap();
617
618        while stream_endpoint2.open_connections_count() != 1 {
619            thread::sleep(Duration::from_millis(200));
620        }
621
622        event_stream.close();
623    }
624
625    #[test]
626    fn should_reconnect_to_original_host_when_connection_to_redirected_host_is_lost() {
627        let (tx, rx) = mpsc::channel();
628        let (_server, stream_endpoint, address) = setup();
629        let (_server2, stream_endpoint2, address2) = setup();
630        stream_endpoint
631            .status(Status::Found)
632            .header("Location", address2.as_str());
633
634        let mut event_stream = EventStream::new(address).unwrap();
635
636        event_stream.on_message(move |message| {
637            tx.send(message).unwrap();
638        });
639
640        stream_endpoint.status(Status::OK);
641        stream_endpoint2.close_open_connections();
642
643        while event_stream.state() != State::Open {
644            thread::sleep(Duration::from_millis(100));
645        }
646
647        stream_endpoint.send("data: from server 1\n\n");
648        let message = rx.recv().unwrap();
649        assert_eq!(message, "data: from server 1");
650
651        event_stream.close();
652    }
653
654    #[test]
655    fn should_reconnect_to_new_host_when_connection_lost_after_moved_permanently() {
656        let (_server, stream_endpoint, address) = setup();
657        let (_server2, stream_endpoint2, address2) = setup();
658        stream_endpoint
659            .status(Status::MovedPermanently)
660            .header("Location", address2.as_str());
661
662        let mut event_stream = EventStream::new(address).unwrap();
663
664        let (tx, rx) = mpsc::channel();
665
666        event_stream.on_message(move |message| {
667            tx.send(message).unwrap();
668        });
669
670        stream_endpoint2
671            .delay(Duration::from_millis(200))
672            .close_open_connections();
673
674        while event_stream.state() != State::Connecting {
675            thread::sleep(Duration::from_millis(100));
676        }
677        while event_stream.state() != State::Open {
678            thread::sleep(Duration::from_millis(100));
679        }
680
681        stream_endpoint2.send("data: from server 2\n");
682        assert_eq!(rx.recv().unwrap(), "data: from server 2");
683
684        event_stream.close();
685    }
686
687    #[test]
688    fn should_connect_to_new_host_when_status_303() {
689        let (tx, rx) = mpsc::channel();
690        let (_server, stream_endpoint, address) = setup();
691        let (_server2, stream_endpoint2, address2) = setup();
692
693        stream_endpoint
694            .status(Status::SeeOther)
695            .header("Location", address2.as_str());
696
697        let mut event_stream = EventStream::new(address).unwrap();
698
699        event_stream.on_message(move |message| {
700            tx.send(message).unwrap();
701        });
702
703        while stream_endpoint2.open_connections_count() == 0 {
704            thread::sleep(Duration::from_millis(100));
705        }
706
707        stream_endpoint2.send("data: from server 2\n\n");
708        let message = rx.recv().unwrap();
709        assert_eq!(message, "data: from server 2");
710
711        event_stream.close();
712    }
713
714    #[test]
715    fn should_connect_to_new_host_when_status_307() {
716        let (_server, stream_endpoint, address) = setup();
717        let (_server2, stream_endpoint2, address2) = setup();
718
719        stream_endpoint
720            .delay(Duration::from_millis(200))
721            .status(Status::TemporaryRedirect)
722            .header("Location", address2.as_str());
723
724        let event_stream = EventStream::new(address).unwrap();
725
726        while stream_endpoint2.open_connections_count() == 0 {
727            thread::sleep(Duration::from_millis(100));
728        }
729
730        event_stream.close();
731    }
732
733    #[test]
734    fn should_stop_reconnection_when_status_204() {
735        let (server, stream_endpoint, address) = setup();
736        let event_stream = EventStream::new(address).unwrap();
737
738        // first connection
739        stream_endpoint.status(Status::OK);
740        server.requests().recv().unwrap();
741
742        // change status to 204 and kill connections
743        stream_endpoint.status(Status::NoContent);
744        stream_endpoint.close_open_connections();
745
746        server.requests().recv().unwrap();
747        assert_eq!(event_stream.state(), State::Closed);
748
749        event_stream.close();
750    }
751
752
753    #[test]
754    fn should_try_to_reconnect_with_an_exponential_backoff() {
755        let (_server, stream_endpoint, address) = setup();
756        stream_endpoint.status(Status::Accepted);
757        let event_stream = EventStream::new(address).unwrap();
758
759        for _ in 0 .. 20 {
760            thread::sleep(Duration::from_millis(100))
761        }
762
763        let retries_in_first_two_seconds = stream_endpoint.request_count();
764
765        for _ in 0 .. 20 {
766            thread::sleep(Duration::from_millis(100))
767        }
768
769        let retries_in_the_next_two_seconds = stream_endpoint.request_count() - retries_in_first_two_seconds;
770
771        assert!(retries_in_first_two_seconds > retries_in_the_next_two_seconds);
772
773        event_stream.close();
774    }
775
776    #[test]
777    fn should_reset_exponential_backoff_after_success_connection() {
778        let (_server, stream_endpoint, address) = setup();
779        stream_endpoint.status(Status::Accepted);
780        let event_stream = EventStream::new(address).unwrap();
781
782        for _ in 0 .. 20 {
783            thread::sleep(Duration::from_millis(100))
784        }
785
786        let retries_in_first_two_seconds = stream_endpoint.request_count();
787
788        stream_endpoint.status(Status::OK);
789
790        while event_stream.state() != State::Open {
791            thread::sleep(Duration::from_millis(100));
792        }
793
794        stream_endpoint.status(Status::Accepted);
795        stream_endpoint.close_open_connections();
796
797        for _ in 0 .. 20 {
798            thread::sleep(Duration::from_millis(100))
799        }
800
801        let retries_in_the_next_two_seconds = stream_endpoint.request_count() - retries_in_first_two_seconds;
802
803        assert_eq!(retries_in_first_two_seconds, retries_in_the_next_two_seconds);
804
805        event_stream.close();
806    }
807
808    #[test]
809    fn should_use_port_80_as_default_when_no_port_provided() {
810        let url = Url::parse("http://localhost").unwrap();
811        let host = get_host(&url);
812
813        assert_eq!(host, String::from("localhost:80"));
814    }
815
816    #[test]
817    fn should_use_port_443_as_default_when_https_and_no_port_provided() {
818        let url = Url::parse("https://localhost").unwrap();
819        let host = get_host(&url);
820
821        assert_eq!(host, String::from("localhost:443"));
822    }
823
824    #[test]
825    fn should_keep_query_parameters_in_connection_url() {
826        let resource_uri = "/sub?q=1&query=3&token=abcd";
827        let server = TestServer::new().unwrap();
828        let stream_endpoint = server.create_resource(resource_uri);
829        let address = format!("http://localhost:{}{}", server.port(), resource_uri);
830        let stream_url = Url::parse(address.as_str()).unwrap();
831
832        stream_endpoint
833            .status(Status::OK)
834            .header("Content-Type", "text/event-stream")
835            .stream();
836
837        let event_stream = EventStream::new(stream_url).unwrap();
838
839        thread::sleep(Duration::from_millis(100));
840
841        let state = event_stream.state();
842        assert_eq!(state, State::Open);
843
844
845        event_stream.close();
846    }
847}
848