1use base64::prelude::*;
2
3use futures::{ready, Stream};
4use http::{HeaderMap, HeaderName, HeaderValue, Request, Uri};
5use log::{debug, info, trace, warn};
6use pin_project::pin_project;
7use std::{
8 boxed,
9 fmt::{self, Debug, Formatter},
10 future::Future,
11 io::ErrorKind,
12 pin::Pin,
13 str::FromStr,
14 sync::Arc,
15 task::{Context, Poll},
16 time::{Duration, Instant},
17};
18
19use tokio::time::Sleep;
20
21use crate::{
22 config::ReconnectOptions,
23 response::{ErrorBody, Response},
24};
25use crate::{
26 error::{Error, Result},
27 event_parser::ConnectionDetails,
28};
29use launchdarkly_sdk_transport::{ByteStream, HttpTransport, ResponseFuture};
30
31use crate::event_parser::EventParser;
32use crate::event_parser::SSE;
33
34use crate::retry::{BackoffRetry, RetryStrategy};
35use std::error::Error as StdError;
36
37pub type BoxStream<T> = Pin<boxed::Box<dyn Stream<Item = T> + Send + Sync>>;
39
40pub trait Client: Send + Sync + private::Sealed {
43 fn stream(&self) -> BoxStream<Result<SSE>>;
44}
45
46pub const DEFAULT_REDIRECT_LIMIT: u32 = 16;
54
55pub struct ClientBuilder {
57 url: Uri,
58 headers: HeaderMap,
59 reconnect_opts: ReconnectOptions,
60 last_event_id: Option<String>,
61 method: String,
62 body: Option<String>,
63 max_redirects: Option<u32>,
64}
65
66impl ClientBuilder {
67 pub fn for_url(url: &str) -> Result<ClientBuilder> {
69 let url = url
70 .parse()
71 .map_err(|e| Error::InvalidParameter(Box::new(e)))?;
72
73 let mut header_map = HeaderMap::new();
74 header_map.insert("Accept", HeaderValue::from_static("text/event-stream"));
75 header_map.insert("Cache-Control", HeaderValue::from_static("no-cache"));
76
77 Ok(ClientBuilder {
78 url,
79 headers: header_map,
80 reconnect_opts: ReconnectOptions::default(),
81 last_event_id: None,
82 method: String::from("GET"),
83 max_redirects: None,
84 body: None,
85 })
86 }
87
88 pub fn method(mut self, method: String) -> ClientBuilder {
90 self.method = method;
91 self
92 }
93
94 pub fn body(mut self, body: String) -> ClientBuilder {
96 self.body = Some(body);
97 self
98 }
99
100 pub fn last_event_id(mut self, last_event_id: String) -> ClientBuilder {
103 self.last_event_id = Some(last_event_id);
104 self
105 }
106
107 pub fn header(mut self, name: &str, value: &str) -> Result<ClientBuilder> {
109 let name = HeaderName::from_str(name).map_err(|e| Error::InvalidParameter(Box::new(e)))?;
110
111 let value =
112 HeaderValue::from_str(value).map_err(|e| Error::InvalidParameter(Box::new(e)))?;
113
114 self.headers.insert(name, value);
115 Ok(self)
116 }
117
118 pub fn basic_auth(self, username: &str, password: &str) -> Result<ClientBuilder> {
120 let auth = format!("{username}:{password}");
121 let encoded = BASE64_STANDARD.encode(auth);
122 let value = format!("Basic {encoded}");
123
124 self.header("Authorization", &value)
125 }
126
127 pub fn reconnect(mut self, opts: ReconnectOptions) -> ClientBuilder {
132 self.reconnect_opts = opts;
133 self
134 }
135
136 pub fn redirect_limit(mut self, limit: u32) -> ClientBuilder {
140 self.max_redirects = Some(limit);
141 self
142 }
143
144 pub fn build_with_transport<T>(self, transport: T) -> impl Client
162 where
163 T: HttpTransport,
164 {
165 ClientImpl {
166 transport: Arc::new(transport),
167 request_props: RequestProps {
168 url: self.url,
169 headers: self.headers,
170 method: self.method,
171 body: self.body,
172 reconnect_opts: self.reconnect_opts,
173 max_redirects: self.max_redirects.unwrap_or(DEFAULT_REDIRECT_LIMIT),
174 },
175 last_event_id: self.last_event_id,
176 }
177 }
178}
179
180#[derive(Clone)]
181struct RequestProps {
182 url: Uri,
183 headers: HeaderMap,
184 method: String,
185 body: Option<String>,
186 reconnect_opts: ReconnectOptions,
187 max_redirects: u32,
188}
189
190struct ClientImpl<T: HttpTransport> {
193 transport: Arc<T>,
194 request_props: RequestProps,
195 last_event_id: Option<String>,
196}
197
198impl<T: HttpTransport> Client for ClientImpl<T> {
199 fn stream(&self) -> BoxStream<Result<SSE>> {
217 Box::pin(ReconnectingRequest::new(
218 Arc::clone(&self.transport),
219 self.request_props.clone(),
220 self.last_event_id.clone(),
221 ))
222 }
223}
224
225#[allow(clippy::large_enum_variant)] #[pin_project(project = StateProj)]
227enum State {
228 New,
229 Connecting {
230 retry: bool,
231 #[pin]
232 resp: ResponseFuture,
233 },
234 Connected(#[pin] ByteStream),
235 WaitingToReconnect(#[pin] Sleep),
236 FollowingRedirect(Option<HeaderValue>),
237 StreamClosed,
238}
239
240impl State {
241 fn name(&self) -> &'static str {
242 match self {
243 State::New => "new",
244 State::Connecting { retry: false, .. } => "connecting(no-retry)",
245 State::Connecting { retry: true, .. } => "connecting(retry)",
246 State::Connected(_) => "connected",
247 State::WaitingToReconnect(_) => "waiting-to-reconnect",
248 State::FollowingRedirect(_) => "following-redirect",
249 State::StreamClosed => "closed",
250 }
251 }
252}
253
254impl Debug for State {
255 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
256 write!(f, "{}", self.name())
257 }
258}
259
260#[must_use = "streams do nothing unless polled"]
261#[pin_project]
262pub struct ReconnectingRequest<T: HttpTransport> {
263 transport: Arc<T>,
264 props: RequestProps,
265 #[pin]
266 state: State,
267 retry_strategy: Box<dyn RetryStrategy + Send + Sync>,
268 current_url: Uri,
269 redirect_count: u32,
270 event_parser: EventParser,
271 last_event_id: Option<String>,
272 #[pin]
273 initial_connection: bool,
274}
275
276impl<T: HttpTransport> ReconnectingRequest<T> {
277 fn new(
278 transport: Arc<T>,
279 props: RequestProps,
280 last_event_id: Option<String>,
281 ) -> ReconnectingRequest<T> {
282 let reconnect_delay = props.reconnect_opts.delay;
283 let delay_max = props.reconnect_opts.delay_max;
284 let backoff_factor = props.reconnect_opts.backoff_factor;
285
286 let url = props.url.clone();
287 ReconnectingRequest {
288 props,
289 transport,
290 state: State::New,
291 retry_strategy: Box::new(BackoffRetry::new(
292 reconnect_delay,
293 delay_max,
294 backoff_factor,
295 true,
296 )),
297 redirect_count: 0,
298 current_url: url,
299 event_parser: EventParser::new(),
300 last_event_id,
301 initial_connection: true,
302 }
303 }
304
305 fn send_request(&self) -> Result<ResponseFuture> {
306 let mut request_builder = Request::builder()
307 .method(self.props.method.as_str())
308 .uri(&self.current_url);
309
310 for (name, value) in &self.props.headers {
311 request_builder = request_builder.header(name, value);
312 }
313
314 if let Some(id) = self.last_event_id.as_ref() {
315 if !id.is_empty() {
316 let id_as_header =
317 HeaderValue::from_str(id).map_err(|e| Error::InvalidParameter(Box::new(e)))?;
318
319 request_builder = request_builder.header("last-event-id", id_as_header);
320 }
321 }
322
323 let request = request_builder
326 .body(self.props.body.clone().map(|b| b.into()))
327 .map_err(|e| Error::InvalidParameter(Box::new(e)))?;
328
329 Ok(self.transport.request(request))
330 }
331
332 fn reset_redirects(self: Pin<&mut Self>) {
333 let url = self.props.url.clone();
334 let this = self.project();
335 *this.current_url = url;
336 *this.redirect_count = 0;
337 }
338
339 fn increment_redirect_counter(self: Pin<&mut Self>) -> bool {
340 if self.redirect_count == self.props.max_redirects {
341 return false;
342 }
343 *self.project().redirect_count += 1;
344 true
345 }
346}
347
348impl<T: HttpTransport> Stream for ReconnectingRequest<T> {
349 type Item = Result<SSE>;
350
351 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
352 trace!("ReconnectingRequest::poll({:?})", &self.state);
353
354 loop {
355 let this = self.as_mut().project();
356 if let Some(event) = this.event_parser.get_event() {
357 return match event {
358 SSE::Connected(_) => Poll::Ready(Some(Ok(event))),
359 SSE::Event(ref evt) => {
360 this.last_event_id.clone_from(&evt.id);
361
362 if let Some(retry) = evt.retry {
363 this.retry_strategy
364 .change_base_delay(Duration::from_millis(retry));
365 }
366 Poll::Ready(Some(Ok(event)))
367 }
368 SSE::Comment(_) => Poll::Ready(Some(Ok(event))),
369 };
370 }
371
372 trace!("ReconnectingRequest::poll loop({:?})", &this.state);
373
374 let state = this.state.project();
375 match state {
376 StateProj::StreamClosed => return Poll::Ready(None),
377 StateProj::New => {
380 *self.as_mut().project().event_parser = EventParser::new();
381 match self.send_request() {
382 Ok(resp) => {
383 let retry = if self.initial_connection {
384 self.props.reconnect_opts.retry_initial
385 } else {
386 self.props.reconnect_opts.reconnect
387 };
388 self.as_mut()
389 .project()
390 .state
391 .set(State::Connecting { resp, retry })
392 }
393 Err(e) => {
394 self.as_mut().project().state.set(State::StreamClosed);
397 return Poll::Ready(Some(Err(e)));
398 }
399 }
400 }
401 StateProj::Connecting { retry, resp } => match ready!(resp.poll(cx)) {
402 Ok(resp) => {
403 debug!(
404 "HTTP response status: {}, headers: {:?}",
405 resp.status(),
406 resp.headers()
407 );
408
409 if resp.status().is_success() {
410 self.as_mut().project().retry_strategy.reset(Instant::now());
411 self.as_mut().reset_redirects();
412
413 let status = resp.status();
414 let headers = resp.headers().clone();
415
416 self.as_mut()
417 .project()
418 .state
419 .set(State::Connected(resp.into_body()));
420 self.as_mut().project().initial_connection.set(false);
421
422 return Poll::Ready(Some(Ok(SSE::Connected(ConnectionDetails::new(
423 Response::new(status, headers),
424 )))));
425 }
426
427 if resp.status() == 301 || resp.status() == 307 {
428 debug!("got redirected ({})", resp.status());
429
430 if self.as_mut().increment_redirect_counter() {
431 debug!("following redirect {}", self.redirect_count);
432
433 self.as_mut().project().state.set(State::FollowingRedirect(
434 resp.headers().get("location").cloned(),
435 ));
436 continue;
437 } else {
438 debug!("redirect limit reached ({})", self.props.max_redirects);
439
440 self.as_mut().project().state.set(State::StreamClosed);
441 return Poll::Ready(Some(Err(Error::MaxRedirectLimitReached(
442 self.props.max_redirects,
443 ))));
444 }
445 }
446
447 let status = resp.status();
448 let headers = resp.headers().clone();
449 let body = resp.into_body();
450
451 let error = Error::UnexpectedResponse(
452 Response::new(status, headers),
453 ErrorBody::new(body),
454 );
455
456 if !*retry {
457 self.as_mut().project().state.set(State::StreamClosed);
458 return Poll::Ready(Some(Err(error)));
459 }
460
461 self.as_mut().reset_redirects();
462
463 let duration = self
464 .as_mut()
465 .project()
466 .retry_strategy
467 .next_delay(Instant::now());
468
469 self.as_mut()
470 .project()
471 .state
472 .set(State::WaitingToReconnect(delay(duration, "retrying")));
473
474 return Poll::Ready(Some(Err(error)));
475 }
476 Err(e) => {
477 warn!("request returned an error: {e}");
479 if !*retry {
480 self.as_mut().project().state.set(State::StreamClosed);
481 return Poll::Ready(Some(Err(Error::Transport(e))));
482 }
483
484 let duration = self
485 .as_mut()
486 .project()
487 .retry_strategy
488 .next_delay(Instant::now());
489
490 self.as_mut()
491 .project()
492 .state
493 .set(State::WaitingToReconnect(delay(duration, "retrying")));
494 }
495 },
496 StateProj::FollowingRedirect(maybe_header) => match uri_from_header(maybe_header) {
497 Ok(uri) => {
498 *self.as_mut().project().current_url = uri;
499 self.as_mut().project().state.set(State::New);
500 }
501 Err(e) => {
502 self.as_mut().project().state.set(State::StreamClosed);
503 return Poll::Ready(Some(Err(e)));
504 }
505 },
506 StateProj::Connected(mut body) => match ready!(body.as_mut().poll_next(cx)) {
507 Some(Ok(result)) => {
508 if let Err(e) = this.event_parser.process_bytes(result) {
509 if self.props.reconnect_opts.reconnect {
514 let duration = self
515 .as_mut()
516 .project()
517 .retry_strategy
518 .next_delay(Instant::now());
519 self.as_mut().project().state.set(State::WaitingToReconnect(
520 delay(duration, "reconnecting"),
521 ));
522 } else {
523 self.as_mut().project().state.set(State::StreamClosed);
524 }
525 return Poll::Ready(Some(Err(e)));
526 }
527 continue;
528 }
529 Some(Err(e)) => {
530 if self.props.reconnect_opts.reconnect {
531 let duration = self
532 .as_mut()
533 .project()
534 .retry_strategy
535 .next_delay(Instant::now());
536 self.as_mut()
537 .project()
538 .state
539 .set(State::WaitingToReconnect(delay(duration, "reconnecting")));
540 }
541
542 if let Some(cause) = e.source() {
544 if let Some(downcast) = cause.downcast_ref::<std::io::Error>() {
545 if let std::io::ErrorKind::TimedOut = downcast.kind() {
546 return Poll::Ready(Some(Err(Error::TimedOut)));
547 }
548 }
549 }
550
551 return Poll::Ready(Some(Err(Error::Transport(e))));
552 }
553 None => {
554 if self.props.reconnect_opts.reconnect {
555 let duration = self
556 .as_mut()
557 .project()
558 .retry_strategy
559 .next_delay(Instant::now());
560 self.as_mut()
561 .project()
562 .state
563 .set(State::WaitingToReconnect(delay(duration, "retrying")));
564 } else {
565 self.as_mut().project().state.set(State::StreamClosed);
566 }
567
568 if self.event_parser.was_processing() {
569 return Poll::Ready(Some(Err(Error::UnexpectedEof)));
570 }
571 return Poll::Ready(Some(Err(Error::Eof)));
572 }
573 },
574 StateProj::WaitingToReconnect(delay) => {
575 ready!(delay.poll(cx));
576 info!("Reconnecting");
577 self.as_mut().project().state.set(State::New);
578 }
579 };
580 }
581 }
582}
583
584fn uri_from_header(maybe_header: &Option<HeaderValue>) -> Result<Uri> {
585 let header = maybe_header.as_ref().ok_or_else(|| {
586 Error::MalformedLocationHeader(Box::new(std::io::Error::new(
587 ErrorKind::NotFound,
588 "missing Location header",
589 )))
590 })?;
591
592 let header_string = header
593 .to_str()
594 .map_err(|e| Error::MalformedLocationHeader(Box::new(e)))?;
595
596 header_string
597 .parse::<Uri>()
598 .map_err(|e| Error::MalformedLocationHeader(Box::new(e)))
599}
600
601fn delay(dur: Duration, description: &str) -> Sleep {
602 info!("Waiting {dur:?} before {description}");
603 tokio::time::sleep(dur)
604}
605
606mod private {
607 use crate::client::ClientImpl;
608 use launchdarkly_sdk_transport::HttpTransport;
609
610 pub trait Sealed {}
611 impl<T: HttpTransport> Sealed for ClientImpl<T> {}
612}
613
614#[cfg(test)]
615mod tests {
616 use crate::ClientBuilder;
617 use http::HeaderValue;
618 use test_case::test_case;
619
620 #[test_case("user", "pass", "dXNlcjpwYXNz")]
621 #[test_case("user1", "password123", "dXNlcjE6cGFzc3dvcmQxMjM=")]
622 #[test_case("user2", "", "dXNlcjI6")]
623 #[test_case("user@name", "pass#word!", "dXNlckBuYW1lOnBhc3Mjd29yZCE=")]
624 #[test_case("user3", "my pass", "dXNlcjM6bXkgcGFzcw==")]
625 #[test_case(
626 "weird@-/:stuff",
627 "goes@-/:here",
628 "d2VpcmRALS86c3R1ZmY6Z29lc0AtLzpoZXJl"
629 )]
630 fn basic_auth_generates_correct_headers(username: &str, password: &str, expected: &str) {
631 let builder = ClientBuilder::for_url("http://example.com")
632 .expect("failed to build client")
633 .basic_auth(username, password)
634 .expect("failed to add authentication");
635
636 let actual = builder.headers.get("Authorization");
637 let expected = HeaderValue::from_str(format!("Basic {expected}").as_str())
638 .expect("unable to create expected header");
639
640 assert_eq!(Some(&expected), actual);
641 }
642
643 use std::{pin::pin, sync::Arc, time::Duration};
644
645 use bytes::Bytes;
646 use futures::{stream, TryStreamExt};
647 use http::HeaderMap;
648 use tokio::time::timeout;
649
650 use crate::{
651 client::{RequestProps, State},
652 ReconnectOptionsBuilder, ReconnectingRequest,
653 };
654 use launchdarkly_sdk_transport::{ByteStream, HttpTransport, ResponseFuture, TransportError};
655
656 #[derive(Clone)]
658 struct MockTransport {
659 fail_request: bool,
660 }
661
662 impl MockTransport {
663 fn new(_url: String, fail_request: bool) -> Self {
664 Self { fail_request }
665 }
666 }
667
668 impl HttpTransport for MockTransport {
669 fn request(&self, _request: http::Request<Option<Bytes>>) -> ResponseFuture {
670 if self.fail_request {
671 Box::pin(async {
673 Err(TransportError::new(std::io::Error::new(
674 std::io::ErrorKind::ConnectionRefused,
675 "connection refused",
676 )))
677 })
678 } else {
679 Box::pin(async {
681 let byte_stream: ByteStream =
682 Box::pin(stream::iter(vec![Ok(Bytes::from("not found"))]));
683 let response = http::Response::builder()
684 .status(404)
685 .body(byte_stream)
686 .unwrap();
687 Ok(response)
688 })
689 }
690 }
691 }
692
693 const INVALID_URI: &str = "http://mycrazyunexsistenturl.invaliddomainext";
694
695 #[test_case(INVALID_URI, false, |state| matches!(state, State::StreamClosed))]
696 #[test_case(INVALID_URI, true, |state| matches!(state, State::WaitingToReconnect(_)))]
697 #[tokio::test]
698 async fn initial_connection(uri: &str, retry_initial: bool, expected: fn(&State) -> bool) {
699 let reconnect_opts = ReconnectOptionsBuilder::new(false)
700 .backoff_factor(1)
701 .delay(Duration::from_secs(1))
702 .retry_initial(retry_initial)
703 .build();
704
705 let transport = Arc::new(MockTransport::new(uri.to_string(), true));
706 let req_props = RequestProps {
707 url: uri.parse().unwrap(),
708 headers: HeaderMap::new(),
709 method: "GET".to_string(),
710 body: None,
711 reconnect_opts,
712 max_redirects: 10,
713 };
714
715 let mut reconnecting_request = ReconnectingRequest::new(transport.clone(), req_props, None);
716
717 let resp = transport.request(http::Request::builder().uri(uri).body(None).unwrap());
719
720 reconnecting_request.state = State::Connecting {
721 retry: reconnecting_request.props.reconnect_opts.retry_initial,
722 resp,
723 };
724
725 let mut reconnecting_request = pin!(reconnecting_request);
726
727 timeout(Duration::from_millis(500), reconnecting_request.try_next())
728 .await
729 .ok();
730
731 assert!(expected(&reconnecting_request.state));
732 }
733
734 #[test_case(false, |state| matches!(state, State::StreamClosed))]
735 #[test_case(true, |state| matches!(state, State::WaitingToReconnect(_)))]
736 #[tokio::test]
737 async fn initial_connection_mocked_server(retry_initial: bool, expected: fn(&State) -> bool) {
738 let mut mock_server = mockito::Server::new_async().await;
739 let _mock = mock_server
740 .mock("GET", "/")
741 .with_status(404)
742 .create_async()
743 .await;
744
745 initial_connection(&mock_server.url(), retry_initial, expected).await;
746 }
747
748 #[cfg(feature = "hyper")]
753 #[tokio::test(flavor = "multi_thread")]
754 async fn parser_error_schedules_reconnect_immediately() {
755 use crate::{Client, ClientBuilder, Error, ReconnectOptionsBuilder, SSE};
756 use futures::StreamExt;
757 use launchdarkly_sdk_transport::HyperTransport;
758
759 let mut server = mockito::Server::new_async().await;
760 let _mock = server
761 .mock("GET", "/")
762 .with_status(200)
763 .with_body(b"\xff\xfe:bad\n\n".as_ref())
764 .create_async()
765 .await;
766
767 let transport = HyperTransport::new().expect("failed to build transport");
768 let client = ClientBuilder::for_url(&server.url())
769 .unwrap()
770 .reconnect(
771 ReconnectOptionsBuilder::new(true)
772 .delay(Duration::from_millis(10))
773 .delay_max(Duration::from_millis(10))
774 .retry_initial(true)
775 .build(),
776 )
777 .build_with_transport(transport);
778
779 let mut stream = client.stream();
780
781 let mut items = Vec::new();
783 tokio::time::timeout(Duration::from_secs(2), async {
784 while items.len() < 3 {
785 match stream.next().await {
786 Some(item) => items.push(item),
787 None => break,
788 }
789 }
790 })
791 .await
792 .expect("timed out waiting for parse error and reconnect");
793
794 assert!(
795 matches!(items.first(), Some(Ok(SSE::Connected(_)))),
796 "expected initial Connected, got {:?}",
797 items.first()
798 );
799 assert!(
800 matches!(items.get(1), Some(Err(Error::InvalidLine(_)))),
801 "expected InvalidLine error after first connection, got {:?}",
802 items.get(1)
803 );
804 assert!(
805 matches!(items.get(2), Some(Ok(SSE::Connected(_)))),
806 "expected reconnect (Connected) immediately after parse error, got {:?}",
807 items.get(2)
808 );
809 }
810
811 #[cfg(feature = "hyper")]
815 #[tokio::test(flavor = "multi_thread")]
816 async fn parser_error_closes_stream_when_reconnect_disabled() {
817 use crate::{Client, ClientBuilder, Error, ReconnectOptionsBuilder, SSE};
818 use futures::StreamExt;
819 use launchdarkly_sdk_transport::HyperTransport;
820
821 let mut server = mockito::Server::new_async().await;
822 let _mock = server
823 .mock("GET", "/")
824 .with_status(200)
825 .with_body(b"\xff\xfe:bad\n\n".as_ref())
826 .create_async()
827 .await;
828
829 let transport = HyperTransport::new().expect("failed to build transport");
830 let client = ClientBuilder::for_url(&server.url())
831 .unwrap()
832 .reconnect(
833 ReconnectOptionsBuilder::new(false)
834 .retry_initial(true)
835 .build(),
836 )
837 .build_with_transport(transport);
838
839 let mut stream = client.stream();
840
841 let mut items = Vec::new();
842 tokio::time::timeout(Duration::from_secs(2), async {
843 while items.len() < 3 {
844 match stream.next().await {
845 Some(item) => items.push(item),
846 None => {
847 items.push(Ok(SSE::Comment("__stream_ended__".into())));
848 break;
849 }
850 }
851 }
852 })
853 .await
854 .expect("timed out waiting for stream to close");
855
856 assert!(
857 matches!(items.first(), Some(Ok(SSE::Connected(_)))),
858 "expected initial Connected, got {:?}",
859 items.first()
860 );
861 assert!(
862 matches!(items.get(1), Some(Err(Error::InvalidLine(_)))),
863 "expected InvalidLine error, got {:?}",
864 items.get(1)
865 );
866 assert!(
867 matches!(
868 items.get(2),
869 Some(Ok(SSE::Comment(s))) if s == "__stream_ended__"
870 ),
871 "expected stream to end (None) after parse error with reconnect disabled, got {:?}",
872 items.get(2)
873 );
874 }
875
876 #[cfg(feature = "hyper")]
879 #[tokio::test(flavor = "multi_thread")]
880 async fn eof_closes_stream_when_reconnect_disabled() {
881 use crate::{Client, ClientBuilder, Error, ReconnectOptionsBuilder, SSE};
882 use futures::StreamExt;
883 use launchdarkly_sdk_transport::HyperTransport;
884
885 let mut server = mockito::Server::new_async().await;
886 let _mock = server
887 .mock("GET", "/")
888 .with_status(200)
889 .with_body("event: hello\ndata: world\n\n")
890 .create_async()
891 .await;
892
893 let transport = HyperTransport::new().expect("failed to build transport");
894 let client = ClientBuilder::for_url(&server.url())
895 .unwrap()
896 .reconnect(
897 ReconnectOptionsBuilder::new(false)
898 .retry_initial(true)
899 .build(),
900 )
901 .build_with_transport(transport);
902
903 let mut stream = client.stream();
904
905 let mut items: Vec<Option<crate::Result<SSE>>> = Vec::new();
906 tokio::time::timeout(Duration::from_secs(2), async {
907 for _ in 0..4 {
908 let item = stream.next().await;
909 let is_terminal = item.is_none();
910 items.push(item);
911 if is_terminal {
912 break;
913 }
914 }
915 })
916 .await
917 .expect("timed out waiting for stream to close");
918
919 assert!(
920 matches!(items.first(), Some(Some(Ok(SSE::Connected(_))))),
921 "expected initial Connected, got {:?}",
922 items.first()
923 );
924 assert!(
925 matches!(items.get(1), Some(Some(Ok(SSE::Event(e)))) if e.event_type == "hello"),
926 "expected hello event, got {:?}",
927 items.get(1)
928 );
929 assert!(
930 matches!(items.get(2), Some(Some(Err(Error::Eof)))),
931 "expected Eof error after body ends, got {:?}",
932 items.get(2)
933 );
934 assert!(
935 matches!(items.get(3), Some(None)),
936 "expected stream to end (None) after EOF with reconnect disabled, got {:?}",
937 items.get(3)
938 );
939 }
940}