1use std::io::prelude::*;
2use std::io::Error;
3use std::net::{Shutdown, TcpStream};
4use url::Url;
5use std::thread;
6use std::sync::Arc;
7use std::sync::Mutex;
8use std::io::BufReader;
9use std::time::Duration;
10#[cfg(feature = "native-tls")]
11use crate::tls::MaybeTlsStream;
12
13#[cfg(feature = "native-tls")]
14type InnerStream = MaybeTlsStream;
15#[cfg(not(feature = "native-tls"))]
16type InnerStream = TcpStream;
17
18type Callback = Arc<Mutex<Option<Box<dyn Fn(String) + Send>>>>;
19type CallbackNoArgs = Arc<Mutex<Option<Box<dyn Fn() + Send>>>>;
20type StreamWrapper = Arc<Mutex<Option<TcpStream>>>;
21type StateWrapper = Arc<Mutex<State>>;
22type LastIdWrapper = Arc<Mutex<Option<String>>>;
23
24const INITIAL_RECONNECTION_TIME_IN_MS: u64 = 500;
25
26#[derive(Debug, PartialEq, Clone)]
28pub enum State {
29 Connecting,
31 Open,
33 Closed
35}
36
37#[derive(Debug, PartialEq)]
38enum StreamAction {
39 Reconnect(String),
40 Close(String),
41 Move(Url),
42 MovePermanently(Url)
43}
44
45pub struct EventStream {
46 url: Arc<Url>,
47 stream: StreamWrapper,
48 state: StateWrapper,
49 on_open_listener: CallbackNoArgs,
50 on_message_listener: Callback,
51 on_error_listener: Callback,
52 last_event_id: LastIdWrapper
53}
54
55impl EventStream {
56 pub fn new(url: Url) -> Result<EventStream, Error> {
57 let event_stream = EventStream {
58 url: Arc::new(url),
59 stream: Arc::new(Mutex::new(None)),
60 state: Arc::new(Mutex::new(State::Connecting)),
61 on_open_listener: Arc::new(Mutex::new(None)),
62 on_message_listener: Arc::new(Mutex::new(None)),
63 on_error_listener: Arc::new(Mutex::new(None)),
64 last_event_id: Arc::new(Mutex::new(None))
65 };
66
67 event_stream.listen();
68
69 Ok(event_stream)
70 }
71
72 fn listen(&self) {
73 listen_stream(
74 Arc::clone(&self.url),
75 Arc::clone(&self.url),
76 Arc::clone(&self.stream),
77 Arc::clone(&self.state),
78 Arc::clone(&self.on_open_listener),
79 Arc::clone(&self.on_message_listener),
80 Arc::clone(&self.on_error_listener),
81 Arc::new(Mutex::new(0)),
82 Arc::clone(&self.last_event_id)
83 );
84 }
85
86 pub fn close(&self) {
87 let mut state = self.state.lock().unwrap();
88 *state = State::Closed;
89 if let Some(ref st) = *self.stream.lock().unwrap() {
90 st.shutdown(Shutdown::Both).unwrap();
91 }
92 }
93
94 pub fn on_open<F>(&mut self, listener: F) where F: Fn() + Send + 'static {
95 let mut on_open_listener = self.on_open_listener.lock().unwrap();
96 *on_open_listener = Some(Box::new(listener));
97 }
98
99 pub fn on_message<F>(&mut self, listener: F) where F: Fn(String) + Send + 'static {
100 let mut on_message_listener = self.on_message_listener.lock().unwrap();
101 *on_message_listener = Some(Box::new(listener));
102 }
103
104 pub fn on_error<F>(&mut self, listener: F) where F: Fn(String) + Send + 'static {
105 let mut on_error_listener = self.on_error_listener.lock().unwrap();
106 *on_error_listener = Some(Box::new(listener));
107 }
108
109 pub fn state(&self) -> State {
110 let state = &self.state.lock().unwrap();
111 (*state).clone()
112 }
113
114 pub fn set_last_id(&self, id: String) {
115 let mut last_id = self.last_event_id.lock().unwrap();
116 *last_id = Some(id);
117 }
118}
119
120fn listen_stream(
121 url: Arc<Url>,
122 connection_url: Arc<Url>,
123 stream: StreamWrapper,
124 state: StateWrapper,
125 on_open: CallbackNoArgs,
126 on_message: Callback,
127 on_error: Callback,
128 failed_attempts: Arc<Mutex<u32>>,
129 last_event_id: LastIdWrapper
130) {
131 thread::spawn(move || {
132 let action = match connect_event_stream(&connection_url, &stream, &last_event_id) {
133 Ok(stream) => read_stream(stream, &state, &on_open, &on_message, &failed_attempts),
134 Err(error) => Err(StreamAction::Reconnect(error.to_string()))
135 };
136
137 if let Err(stream_action) = action {
138 match stream_action {
139 StreamAction::Reconnect(ref error) => {
140 let mut state_lock = state.lock().unwrap();
141 *state_lock = State::Connecting;
142 handle_error(error.to_string(), &on_error);
143 reconnect_stream(url, stream, Arc::clone(&state), on_open, on_message, on_error, failed_attempts, last_event_id);
144 },
145 StreamAction::Close(ref error) => {
146 let mut state_lock = state.lock().unwrap();
147 *state_lock = State::Closed;
148 handle_error(error.to_string(), &on_error);
149 },
150 StreamAction::Move(redirect_url) => {
151 let mut state_lock = state.lock().unwrap();
152 *state_lock = State::Connecting;
153
154 listen_stream(url, Arc::new(redirect_url), stream, Arc::clone(&state), on_open, on_message, on_error, failed_attempts, last_event_id);
155 },
156 StreamAction::MovePermanently(redirect_url) => {
157 let mut state_lock = state.lock().unwrap();
158 *state_lock = State::Connecting;
159
160 listen_stream(Arc::new(redirect_url.clone()), Arc::new(redirect_url), stream, Arc::clone(&state), on_open, on_message, on_error, failed_attempts, last_event_id);
161 }
162 };
163 }
164 });
165}
166
167fn connect_event_stream(url: &Url, stream: &StreamWrapper, last_event_id: &LastIdWrapper) -> Result<InnerStream, Error> {
168 let connection_stream = event_stream_handshake(url, last_event_id)?;
169 let mut stream = stream.lock().unwrap();
170
171 #[cfg(feature = "native-tls")]
172 { *stream = Some(connection_stream.clone_plain_handle().unwrap()); }
173
174 #[cfg(not(feature = "native-tls"))]
175 { *stream = Some(connection_stream.try_clone().unwrap()); }
176
177 Ok(connection_stream)
178}
179
180fn event_stream_handshake(url: &Url, last_event_id: &LastIdWrapper) -> Result<InnerStream, Error> {
181 let host = get_host(&url);
182 let host = host.as_str();
183
184 #[cfg(feature = "native-tls")]
185 let mut stream = {
186 let plain = TcpStream::connect(host)?;
187 if url.scheme() == "https" {
188 MaybeTlsStream::try_wrap_tls(plain, url.host_str().unwrap_or("localhost"))?
189 } else {
190 MaybeTlsStream::wrap_plain(plain)
191 }
192 };
193
194 #[cfg(not(feature = "native-tls"))]
195 let mut stream = TcpStream::connect(host)?;
196
197 stream.set_read_timeout(Some(Duration::from_millis(60000)))?;
198
199 let extra_headers = match *(last_event_id.lock().unwrap()) {
200 Some(ref last_id) => format!("Last-Event-ID: {}\r\n", last_id),
201 None => String::from("")
202 };
203
204 let request = format!(
205 "GET {} HTTP/1.1\r\nAccept: text/event-stream\r\nHost: {}\r\n{}\r\n",
206 get_path_with_query_params(url),
207 host,
208 extra_headers
209 );
210
211 stream.write(request.as_bytes())?;
212 stream.flush()?;
213
214 Ok(stream)
215}
216
217fn get_host(url: &Url) -> String {
218 let mut host = match url.host_str() {
219 Some(h) => String::from(h),
220 None => String::from("localhost")
221 };
222
223 if let Some(port) = url.port_or_known_default() {
224 host = format!("{}:{}", host, port);
225 }
226
227 host
228}
229
230fn get_path_with_query_params(url: &Url) -> String {
231 match url.query() {
232 Some(query) => format!("{}?{}", url.path(), query),
233 None => url.path().to_owned()
234 }
235}
236
237fn read_stream(
238 connection_stream: InnerStream,
239 state: &StateWrapper,
240 on_open: &CallbackNoArgs,
241 on_message: &Callback,
242 failed_attempts: &Arc<Mutex<u32>>
243) -> Result<(), StreamAction> {
244 let mut reader = BufReader::new(connection_stream);
245
246 let mut request_header = String::new();
247 reader.read_line(&mut request_header).unwrap();
248
249 let status_code = validate_status_code(request_header)?;
250
251 for line in reader.lines() {
252 let mut state = state.lock().unwrap();
253
254 let line = line.map_err(|error| {
255 StreamAction::Reconnect(error.to_string())
256 })?;
257
258 match *state {
259 State::Connecting => handle_headers(line.clone(), &mut state, &on_open, status_code, failed_attempts)?,
260 _ => handle_messages(line.clone(), &on_message)
261 }
262 }
263
264 match *(state.lock().unwrap()) {
265 State::Closed => Ok(()),
266 _ => Err(StreamAction::Reconnect(String::from("connection closed by server")))
267 }
268}
269
270fn handle_headers(
271 line: String,
272 state: &mut State,
273 on_open: &CallbackNoArgs,
274 status_code: i32,
275 failed_attempts: &Arc<Mutex<u32>>
276) -> Result<(), StreamAction> {
277 if line == "" {
278 handle_open_connection(state, on_open, failed_attempts)
279 } else if line.starts_with("Content-Type") {
280 validate_content_type(line)
281 } else if line.starts_with("Location:") {
282 handle_new_location(line, status_code)
283 } else {
284 Ok(())
285 }
286}
287
288fn handle_open_connection(state: &mut State, on_open: &CallbackNoArgs, failed_attempts: &Arc<Mutex<u32>>) -> Result<(), StreamAction> {
289 *state = State::Open;
290 let on_open = on_open.lock().unwrap();
291 if let Some(ref f) = *on_open {
292 f();
293 }
294 let mut failed_attempts = failed_attempts.lock().unwrap();
295 *failed_attempts = 0;
296 Ok(())
297}
298
299fn validate_content_type(line: String) -> Result<(), StreamAction> {
300 if line.contains("text/event-stream") {
301 Ok(())
302 } else {
303 Err(StreamAction::Close(String::from("Wrong Content-Type")))
304 }
305}
306
307fn validate_status_code(line: String) -> Result<i32, StreamAction> {
308 let status = &line[9..].trim_end();
309 let status_code: i32 = status[..3].parse().unwrap();
310
311 match status_code {
312 200 | 301 | 302 | 303 | 307 => Ok(status_code),
313 204 => Err(StreamAction::Close(status.to_string())),
314 201 ..= 203 | 205 ..= 299 => Err(StreamAction::Reconnect(status.to_string())),
315 _ => Err(StreamAction::Close(status.to_string()))
316 }
317}
318
319fn handle_new_location(line: String, status_code: i32) -> Result<(), StreamAction> {
320 let location = &line[10..];
321
322 match status_code {
323 301 => Err(StreamAction::MovePermanently(Url::parse(location).unwrap())),
324 302 | 303 | 307 => Err(StreamAction::Move(Url::parse(location).unwrap())),
325 _ => Ok(())
326 }
327}
328
329fn handle_messages(line: String, on_message: &Callback) {
330 let on_message = on_message.lock().unwrap();
331 if let Some(ref f) = *on_message {
332 f(line);
333 }
334}
335
336fn handle_error(message: String, on_error: &Callback) {
337 let on_error = on_error.lock().unwrap();
338 if let Some(ref f) = *on_error {
339 f(message);
340 }
341}
342
343fn reconnect_stream(
344 url: Arc<Url>,
345 stream: StreamWrapper,
346 state: StateWrapper,
347 on_open: CallbackNoArgs,
348 on_message: Callback,
349 on_error: Callback,
350 failed_attempts: Arc<Mutex<u32>>,
351 last_event_id: LastIdWrapper
352) {
353 let mut attempts = failed_attempts.lock().unwrap();
354 let base: u64 = 2;
355 let reconnection_time = INITIAL_RECONNECTION_TIME_IN_MS + (15 * (base.pow(*attempts) - 1));
356 *attempts += 1;
357
358 thread::sleep(Duration::from_millis(reconnection_time));
359 listen_stream(url.clone(), url, stream, Arc::clone(&state), on_open, on_message, on_error, Arc::clone(&failed_attempts), last_event_id);
360}
361
362
363#[cfg(test)]
364mod tests {
365 use super::*;
366 use std::sync::mpsc;
367 use std::time::Duration;
368 use http_test_server::{ TestServer, Resource };
369 use http_test_server::http::Status;
370
371 fn setup() -> (TestServer, Resource, Url) {
372 let server = TestServer::new().unwrap();
373 let resource = server.create_resource("/sub");
374 resource.header("Content-Type", "text/event-stream").stream();
375 let address = format!("http://localhost:{}/sub", server.port());
376 let url = Url::parse(address.as_str()).unwrap();
377 (server, resource, url)
378 }
379
380
381 #[test]
382 fn should_create_stream_object() {
383 let (_server, _stream_endpoint, address) = setup();
384 let event_stream = EventStream::new(address).unwrap();
385 event_stream.close();
386 }
387
388 #[test]
389 fn should_trigger_on_open_listener() {
390 let (tx, rx) = mpsc::channel();
391
392 let (_server, stream_endpoint, address) = setup();
393 stream_endpoint.send("Date: Thu, 24 May 2018 12:26:38 GMT\n");
394
395 let mut event_stream = EventStream::new(address).unwrap();
396
397 event_stream.on_open(move || {
398 tx.send("open").unwrap();
399 });
400
401 let _ = rx.recv().unwrap();
402
403 event_stream.close();
404 }
405
406 #[test]
407 fn should_have_status_connecting_while_opening_connection() {
408 let (_server, _stream_endpoint, address) = setup();
409 let event_stream = EventStream::new(address).unwrap();
410
411 let state = event_stream.state();
412 assert_eq!(state, State::Connecting);
413
414 event_stream.close();
415 }
416
417 #[test]
418 fn should_have_status_open_after_connection_stabilished() {
419 let (_server, _stream_endpoint, address) = setup();
420 let event_stream = EventStream::new(address).unwrap();
421
422 thread::sleep(Duration::from_millis(100));
423 let state = event_stream.state();
424 assert_eq!(state, State::Open);
425
426 event_stream.close();
427 }
428
429 #[test]
430 fn should_have_status_closed_after_closing_connection() {
431 let (_server, _stream_endpoint, address) = setup();
432 let event_stream = EventStream::new(address).unwrap();
433
434 event_stream.close();
435
436 let state = event_stream.state();
437 assert_eq!(state, State::Closed);
438 }
439
440 #[test]
441 fn should_trigger_listeners_when_message_received() {
442 let (tx, rx) = mpsc::channel();
443 let (_server, stream_endpoint, address) = setup();
444 let mut event_stream = EventStream::new(address).unwrap();
445
446 event_stream.on_message(move |message| {
447 tx.send(message).unwrap();
448 });
449
450 while event_stream.state() != State::Open {
451 thread::sleep(Duration::from_millis(200));
452 }
453
454 stream_endpoint.send("data: some message\n\n");
455
456 let message = rx.recv().unwrap();
457
458 assert_eq!(message, "data: some message");
459
460 event_stream.close();
461 }
462
463 #[test]
464 fn should_trigger_on_error_when_connection_closed_by_server() {
465 let (tx, rx) = mpsc::channel();
466 let (_server, stream_endpoint, address) = setup();
467 let mut event_stream = EventStream::new(address).unwrap();
468
469 event_stream.on_error(move |message| {
470 tx.send(message).unwrap();
471 });
472
473 while event_stream.state() != State::Open {
474 thread::sleep(Duration::from_millis(100));
475 };
476
477 stream_endpoint.close_open_connections();
478
479 let message = rx.recv().unwrap();
480
481 assert!(message.contains("connection closed by server"));
482 }
483
484 #[test]
485 fn should_trigger_error_when_status_code_is_not_success() {
486 let (tx, rx) = mpsc::channel();
487 let (_server, stream_endpoint, address) = setup();
488 stream_endpoint.status(Status::InternalServerError);
489 let mut event_stream = EventStream::new(address).unwrap();
490
491 event_stream.on_error(move |message| {
492 tx.send(message).unwrap();
493 });
494
495 let message = rx.recv().unwrap();
496
497 assert_eq!(message, "500 Internal Server Error");
498
499 event_stream.close();
500 }
501
502 #[test]
503 fn should_reconnect_when_connection_closed_by_server() {
504 let (tx, rx) = mpsc::channel();
505 let (_server, stream_endpoint, address) = setup();
506 let mut event_stream = EventStream::new(address).unwrap();
507
508 event_stream.on_message(move |message| {
509 tx.send(message).unwrap();
510 });
511
512 stream_endpoint.close_open_connections();
513
514 while event_stream.state() != State::Open {
515 thread::sleep(Duration::from_millis(100));
516 }
517
518 stream_endpoint.send("data: some message\n\n");
519
520 assert_eq!(rx.recv().unwrap(), "data: some message");
521
522 event_stream.close();
523 }
524
525 #[test]
526 fn should_trigger_error_when_first_connection_fails() {
527 let url = Url::parse("http://localhost:7777/sub").unwrap();
528 let mut event_stream = EventStream::new(url).unwrap();
529
530 let (tx, rx) = mpsc::channel();
531
532 event_stream.on_error(move |message| {
533 tx.send(message).unwrap();
534 });
535
536 let message = rx.recv().unwrap();
537 assert!(message.contains("Connection refused"));
538
539 event_stream.close();
540 }
541
542 #[test]
543 fn should_reset_connection_on_status_205() {
544 let (server, stream_endpoint, address) = setup();
545 stream_endpoint.status(Status::ResetContent);
546
547 let event_stream = EventStream::new(address).unwrap();
548
549 server.requests().recv().unwrap();
550 server.requests().recv().unwrap();
551
552 assert!(stream_endpoint.request_count() >= 2);
553
554 event_stream.close();
555 }
556
557 #[test]
558 fn should_close_connection_when_content_type_is_not_event_stream() {
559 let (error_tx, error_rx) = mpsc::channel();
560 let (_server, stream_endpoint, address) = setup();
561 stream_endpoint.header("Content-Type", "application/json");
562 let mut event_stream = EventStream::new(address).unwrap();
563
564 event_stream.on_error(move |message| {
565 error_tx.send(message).unwrap();
566 });
567
568
569 let message = error_rx.recv().unwrap();
570 assert_eq!(message, "Wrong Content-Type");
571
572 let state = event_stream.state();
573 assert_eq!(state, State::Closed);
574 }
575
576 #[test]
577 fn should_reconnect_when_status_in_range_2xx_but_not_200() {
578 let (server, stream_endpoint, address) = setup();
579 stream_endpoint.status(Status::Accepted);
580 let event_stream = EventStream::new(address).unwrap();
581
582 server.requests().recv().unwrap();
583 server.requests().recv().unwrap();
584
585 assert!(stream_endpoint.request_count() >= 2);
586
587 event_stream.close();
588 }
589
590 #[test]
591 fn should_close_connection_when_returned_any_status_not_handled_in_previous_scenarios() {
592 let (error_tx, error_rx) = mpsc::channel();
593 let (_server, stream_endpoint, address) = setup();
594 stream_endpoint.status(Status::InternalServerError);
595 let mut event_stream = EventStream::new(address).unwrap();
596
597 event_stream.on_error(move |message| {
598 error_tx.send(message).unwrap();
599 });
600
601 let message = error_rx.recv().unwrap();
602 assert_eq!(message, "500 Internal Server Error");
603
604 let state = event_stream.state();
605 assert_eq!(state, State::Closed);
606 }
607
608 #[test]
609 fn should_connect_to_provided_host_when_status_302() {
610 let (_server, stream_endpoint, address) = setup();
611 let (_server2, stream_endpoint2, address2) = setup();
612 stream_endpoint
613 .status(Status::Found)
614 .header("Location", address2.as_str());
615
616 let event_stream = EventStream::new(address).unwrap();
617
618 while stream_endpoint2.open_connections_count() != 1 {
619 thread::sleep(Duration::from_millis(200));
620 }
621
622 event_stream.close();
623 }
624
625 #[test]
626 fn should_reconnect_to_original_host_when_connection_to_redirected_host_is_lost() {
627 let (tx, rx) = mpsc::channel();
628 let (_server, stream_endpoint, address) = setup();
629 let (_server2, stream_endpoint2, address2) = setup();
630 stream_endpoint
631 .status(Status::Found)
632 .header("Location", address2.as_str());
633
634 let mut event_stream = EventStream::new(address).unwrap();
635
636 event_stream.on_message(move |message| {
637 tx.send(message).unwrap();
638 });
639
640 stream_endpoint.status(Status::OK);
641 stream_endpoint2.close_open_connections();
642
643 while event_stream.state() != State::Open {
644 thread::sleep(Duration::from_millis(100));
645 }
646
647 stream_endpoint.send("data: from server 1\n\n");
648 let message = rx.recv().unwrap();
649 assert_eq!(message, "data: from server 1");
650
651 event_stream.close();
652 }
653
654 #[test]
655 fn should_reconnect_to_new_host_when_connection_lost_after_moved_permanently() {
656 let (_server, stream_endpoint, address) = setup();
657 let (_server2, stream_endpoint2, address2) = setup();
658 stream_endpoint
659 .status(Status::MovedPermanently)
660 .header("Location", address2.as_str());
661
662 let mut event_stream = EventStream::new(address).unwrap();
663
664 let (tx, rx) = mpsc::channel();
665
666 event_stream.on_message(move |message| {
667 tx.send(message).unwrap();
668 });
669
670 stream_endpoint2
671 .delay(Duration::from_millis(200))
672 .close_open_connections();
673
674 while event_stream.state() != State::Connecting {
675 thread::sleep(Duration::from_millis(100));
676 }
677 while event_stream.state() != State::Open {
678 thread::sleep(Duration::from_millis(100));
679 }
680
681 stream_endpoint2.send("data: from server 2\n");
682 assert_eq!(rx.recv().unwrap(), "data: from server 2");
683
684 event_stream.close();
685 }
686
687 #[test]
688 fn should_connect_to_new_host_when_status_303() {
689 let (tx, rx) = mpsc::channel();
690 let (_server, stream_endpoint, address) = setup();
691 let (_server2, stream_endpoint2, address2) = setup();
692
693 stream_endpoint
694 .status(Status::SeeOther)
695 .header("Location", address2.as_str());
696
697 let mut event_stream = EventStream::new(address).unwrap();
698
699 event_stream.on_message(move |message| {
700 tx.send(message).unwrap();
701 });
702
703 while stream_endpoint2.open_connections_count() == 0 {
704 thread::sleep(Duration::from_millis(100));
705 }
706
707 stream_endpoint2.send("data: from server 2\n\n");
708 let message = rx.recv().unwrap();
709 assert_eq!(message, "data: from server 2");
710
711 event_stream.close();
712 }
713
714 #[test]
715 fn should_connect_to_new_host_when_status_307() {
716 let (_server, stream_endpoint, address) = setup();
717 let (_server2, stream_endpoint2, address2) = setup();
718
719 stream_endpoint
720 .delay(Duration::from_millis(200))
721 .status(Status::TemporaryRedirect)
722 .header("Location", address2.as_str());
723
724 let event_stream = EventStream::new(address).unwrap();
725
726 while stream_endpoint2.open_connections_count() == 0 {
727 thread::sleep(Duration::from_millis(100));
728 }
729
730 event_stream.close();
731 }
732
733 #[test]
734 fn should_stop_reconnection_when_status_204() {
735 let (server, stream_endpoint, address) = setup();
736 let event_stream = EventStream::new(address).unwrap();
737
738 stream_endpoint.status(Status::OK);
740 server.requests().recv().unwrap();
741
742 stream_endpoint.status(Status::NoContent);
744 stream_endpoint.close_open_connections();
745
746 server.requests().recv().unwrap();
747 assert_eq!(event_stream.state(), State::Closed);
748
749 event_stream.close();
750 }
751
752
753 #[test]
754 fn should_try_to_reconnect_with_an_exponential_backoff() {
755 let (_server, stream_endpoint, address) = setup();
756 stream_endpoint.status(Status::Accepted);
757 let event_stream = EventStream::new(address).unwrap();
758
759 for _ in 0 .. 20 {
760 thread::sleep(Duration::from_millis(100))
761 }
762
763 let retries_in_first_two_seconds = stream_endpoint.request_count();
764
765 for _ in 0 .. 20 {
766 thread::sleep(Duration::from_millis(100))
767 }
768
769 let retries_in_the_next_two_seconds = stream_endpoint.request_count() - retries_in_first_two_seconds;
770
771 assert!(retries_in_first_two_seconds > retries_in_the_next_two_seconds);
772
773 event_stream.close();
774 }
775
776 #[test]
777 fn should_reset_exponential_backoff_after_success_connection() {
778 let (_server, stream_endpoint, address) = setup();
779 stream_endpoint.status(Status::Accepted);
780 let event_stream = EventStream::new(address).unwrap();
781
782 for _ in 0 .. 20 {
783 thread::sleep(Duration::from_millis(100))
784 }
785
786 let retries_in_first_two_seconds = stream_endpoint.request_count();
787
788 stream_endpoint.status(Status::OK);
789
790 while event_stream.state() != State::Open {
791 thread::sleep(Duration::from_millis(100));
792 }
793
794 stream_endpoint.status(Status::Accepted);
795 stream_endpoint.close_open_connections();
796
797 for _ in 0 .. 20 {
798 thread::sleep(Duration::from_millis(100))
799 }
800
801 let retries_in_the_next_two_seconds = stream_endpoint.request_count() - retries_in_first_two_seconds;
802
803 assert_eq!(retries_in_first_two_seconds, retries_in_the_next_two_seconds);
804
805 event_stream.close();
806 }
807
808 #[test]
809 fn should_use_port_80_as_default_when_no_port_provided() {
810 let url = Url::parse("http://localhost").unwrap();
811 let host = get_host(&url);
812
813 assert_eq!(host, String::from("localhost:80"));
814 }
815
816 #[test]
817 fn should_use_port_443_as_default_when_https_and_no_port_provided() {
818 let url = Url::parse("https://localhost").unwrap();
819 let host = get_host(&url);
820
821 assert_eq!(host, String::from("localhost:443"));
822 }
823
824 #[test]
825 fn should_keep_query_parameters_in_connection_url() {
826 let resource_uri = "/sub?q=1&query=3&token=abcd";
827 let server = TestServer::new().unwrap();
828 let stream_endpoint = server.create_resource(resource_uri);
829 let address = format!("http://localhost:{}{}", server.port(), resource_uri);
830 let stream_url = Url::parse(address.as_str()).unwrap();
831
832 stream_endpoint
833 .status(Status::OK)
834 .header("Content-Type", "text/event-stream")
835 .stream();
836
837 let event_stream = EventStream::new(stream_url).unwrap();
838
839 thread::sleep(Duration::from_millis(100));
840
841 let state = event_stream.state();
842 assert_eq!(state, State::Open);
843
844
845 event_stream.close();
846 }
847}
848