Skip to main content

eventsource_client/
client.rs

1use base64::prelude::*;
2
3use futures::{ready, Stream};
4use http::{HeaderMap, HeaderName, HeaderValue, Request, Uri};
5use log::{debug, info, trace, warn};
6use pin_project::pin_project;
7use std::{
8    boxed,
9    fmt::{self, Debug, Formatter},
10    future::Future,
11    io::ErrorKind,
12    pin::Pin,
13    str::FromStr,
14    sync::Arc,
15    task::{Context, Poll},
16    time::{Duration, Instant},
17};
18
19use tokio::time::Sleep;
20
21use crate::{
22    config::ReconnectOptions,
23    response::{ErrorBody, Response},
24};
25use crate::{
26    error::{Error, Result},
27    event_parser::ConnectionDetails,
28};
29use launchdarkly_sdk_transport::{ByteStream, HttpTransport, ResponseFuture};
30
31use crate::event_parser::EventParser;
32use crate::event_parser::SSE;
33
34use crate::retry::{BackoffRetry, RetryStrategy};
35use std::error::Error as StdError;
36
37/// Represents a [`Pin`]'d [`Send`] + [`Sync`] stream, returned by [`Client`]'s stream method.
38pub type BoxStream<T> = Pin<boxed::Box<dyn Stream<Item = T> + Send + Sync>>;
39
40/// Client is the Server-Sent-Events interface.
41/// This trait is sealed and cannot be implemented for types outside this crate.
42pub trait Client: Send + Sync + private::Sealed {
43    fn stream(&self) -> BoxStream<Result<SSE>>;
44}
45
46/*
47 * TODO remove debug output
48 * TODO specify list of stati to not retry (e.g. 204)
49 */
50
51/// Maximum amount of redirects that the client will follow before
52/// giving up, if not overridden via [ClientBuilder::redirect_limit].
53pub const DEFAULT_REDIRECT_LIMIT: u32 = 16;
54
55/// ClientBuilder provides a series of builder methods to easily construct a [`Client`].
56pub struct ClientBuilder {
57    url: Uri,
58    headers: HeaderMap,
59    reconnect_opts: ReconnectOptions,
60    last_event_id: Option<String>,
61    method: String,
62    body: Option<String>,
63    max_redirects: Option<u32>,
64}
65
66impl ClientBuilder {
67    /// Create a builder for a given URL.
68    pub fn for_url(url: &str) -> Result<ClientBuilder> {
69        let url = url
70            .parse()
71            .map_err(|e| Error::InvalidParameter(Box::new(e)))?;
72
73        let mut header_map = HeaderMap::new();
74        header_map.insert("Accept", HeaderValue::from_static("text/event-stream"));
75        header_map.insert("Cache-Control", HeaderValue::from_static("no-cache"));
76
77        Ok(ClientBuilder {
78            url,
79            headers: header_map,
80            reconnect_opts: ReconnectOptions::default(),
81            last_event_id: None,
82            method: String::from("GET"),
83            max_redirects: None,
84            body: None,
85        })
86    }
87
88    /// Set the request method used for the initial connection to the SSE endpoint.
89    pub fn method(mut self, method: String) -> ClientBuilder {
90        self.method = method;
91        self
92    }
93
94    /// Set the request body used for the initial connection to the SSE endpoint.
95    pub fn body(mut self, body: String) -> ClientBuilder {
96        self.body = Some(body);
97        self
98    }
99
100    /// Set the last event id for a stream when it is created. If it is set, it will be sent to the
101    /// server in case it can replay missed events.
102    pub fn last_event_id(mut self, last_event_id: String) -> ClientBuilder {
103        self.last_event_id = Some(last_event_id);
104        self
105    }
106
107    /// Set a HTTP header on the SSE request.
108    pub fn header(mut self, name: &str, value: &str) -> Result<ClientBuilder> {
109        let name = HeaderName::from_str(name).map_err(|e| Error::InvalidParameter(Box::new(e)))?;
110
111        let value =
112            HeaderValue::from_str(value).map_err(|e| Error::InvalidParameter(Box::new(e)))?;
113
114        self.headers.insert(name, value);
115        Ok(self)
116    }
117
118    /// Set the Authorization header with the calculated basic authentication value.
119    pub fn basic_auth(self, username: &str, password: &str) -> Result<ClientBuilder> {
120        let auth = format!("{username}:{password}");
121        let encoded = BASE64_STANDARD.encode(auth);
122        let value = format!("Basic {encoded}");
123
124        self.header("Authorization", &value)
125    }
126
127    /// Configure the client's reconnect behaviour according to the supplied
128    /// [`ReconnectOptions`].
129    ///
130    /// [`ReconnectOptions`]: struct.ReconnectOptions.html
131    pub fn reconnect(mut self, opts: ReconnectOptions) -> ClientBuilder {
132        self.reconnect_opts = opts;
133        self
134    }
135
136    /// Customize the client's following behavior when served a redirect.
137    /// To disable following redirects, pass `0`.
138    /// By default, the limit is [`DEFAULT_REDIRECT_LIMIT`].
139    pub fn redirect_limit(mut self, limit: u32) -> ClientBuilder {
140        self.max_redirects = Some(limit);
141        self
142    }
143
144    /// Build a client with a custom HTTP transport implementation.
145    ///
146    /// # Arguments
147    ///
148    /// * `transport` - An implementation of the [`HttpTransport`] trait that will handle
149    ///   HTTP requests. See the `examples/` directory for reference implementations.
150    ///
151    /// # Example
152    ///
153    /// ```ignore
154    /// use eventsource_client::ClientBuilder;
155    ///
156    /// let transport = MyTransport::new();
157    /// let client = ClientBuilder::for_url("https://live-test-scores.herokuapp.com/scores")
158    ///     .expect("failed to create client builder")
159    ///     .build_with_transport(transport);
160    /// ```
161    pub fn build_with_transport<T>(self, transport: T) -> impl Client
162    where
163        T: HttpTransport,
164    {
165        ClientImpl {
166            transport: Arc::new(transport),
167            request_props: RequestProps {
168                url: self.url,
169                headers: self.headers,
170                method: self.method,
171                body: self.body,
172                reconnect_opts: self.reconnect_opts,
173                max_redirects: self.max_redirects.unwrap_or(DEFAULT_REDIRECT_LIMIT),
174            },
175            last_event_id: self.last_event_id,
176        }
177    }
178}
179
180#[derive(Clone)]
181struct RequestProps {
182    url: Uri,
183    headers: HeaderMap,
184    method: String,
185    body: Option<String>,
186    reconnect_opts: ReconnectOptions,
187    max_redirects: u32,
188}
189
190/// A client implementation that connects to a server using the Server-Sent Events protocol
191/// and consumes the event stream indefinitely.
192struct ClientImpl<T: HttpTransport> {
193    transport: Arc<T>,
194    request_props: RequestProps,
195    last_event_id: Option<String>,
196}
197
198impl<T: HttpTransport> Client for ClientImpl<T> {
199    /// Connect to the server and begin consuming the stream. Produces a
200    /// [`Stream`] of [`Event`](crate::Event)s wrapped in [`Result`].
201    ///
202    /// Errors yielded by the stream are not terminal: keep polling.
203    /// When [`ReconnectOptions::reconnect`] is enabled (the default),
204    /// the stream schedules a reconnect on retryable errors and the
205    /// next poll resumes from a fresh connection.
206    ///
207    /// The stream is exhausted only when [`Stream::poll_next`] returns
208    /// [`Poll::Ready(None)`]. That happens when the underlying state
209    /// machine reaches `StreamClosed` (e.g. a redirect-limit overrun,
210    /// a malformed `Location` header, or an error during initial
211    /// connection while [`ReconnectOptions::retry_initial`] is
212    /// disabled), or after any error when reconnect is disabled.
213    ///
214    /// [`Poll::Ready(None)`]: std::task::Poll::Ready
215    /// [`Stream::poll_next`]: futures::Stream::poll_next
216    fn stream(&self) -> BoxStream<Result<SSE>> {
217        Box::pin(ReconnectingRequest::new(
218            Arc::clone(&self.transport),
219            self.request_props.clone(),
220            self.last_event_id.clone(),
221        ))
222    }
223}
224
225#[allow(clippy::large_enum_variant)] // false positive
226#[pin_project(project = StateProj)]
227enum State {
228    New,
229    Connecting {
230        retry: bool,
231        #[pin]
232        resp: ResponseFuture,
233    },
234    Connected(#[pin] ByteStream),
235    WaitingToReconnect(#[pin] Sleep),
236    FollowingRedirect(Option<HeaderValue>),
237    StreamClosed,
238}
239
240impl State {
241    fn name(&self) -> &'static str {
242        match self {
243            State::New => "new",
244            State::Connecting { retry: false, .. } => "connecting(no-retry)",
245            State::Connecting { retry: true, .. } => "connecting(retry)",
246            State::Connected(_) => "connected",
247            State::WaitingToReconnect(_) => "waiting-to-reconnect",
248            State::FollowingRedirect(_) => "following-redirect",
249            State::StreamClosed => "closed",
250        }
251    }
252}
253
254impl Debug for State {
255    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
256        write!(f, "{}", self.name())
257    }
258}
259
260#[must_use = "streams do nothing unless polled"]
261#[pin_project]
262pub struct ReconnectingRequest<T: HttpTransport> {
263    transport: Arc<T>,
264    props: RequestProps,
265    #[pin]
266    state: State,
267    retry_strategy: Box<dyn RetryStrategy + Send + Sync>,
268    current_url: Uri,
269    redirect_count: u32,
270    event_parser: EventParser,
271    last_event_id: Option<String>,
272    #[pin]
273    initial_connection: bool,
274}
275
276impl<T: HttpTransport> ReconnectingRequest<T> {
277    fn new(
278        transport: Arc<T>,
279        props: RequestProps,
280        last_event_id: Option<String>,
281    ) -> ReconnectingRequest<T> {
282        let reconnect_delay = props.reconnect_opts.delay;
283        let delay_max = props.reconnect_opts.delay_max;
284        let backoff_factor = props.reconnect_opts.backoff_factor;
285
286        let url = props.url.clone();
287        ReconnectingRequest {
288            props,
289            transport,
290            state: State::New,
291            retry_strategy: Box::new(BackoffRetry::new(
292                reconnect_delay,
293                delay_max,
294                backoff_factor,
295                true,
296            )),
297            redirect_count: 0,
298            current_url: url,
299            event_parser: EventParser::new(),
300            last_event_id,
301            initial_connection: true,
302        }
303    }
304
305    fn send_request(&self) -> Result<ResponseFuture> {
306        let mut request_builder = Request::builder()
307            .method(self.props.method.as_str())
308            .uri(&self.current_url);
309
310        for (name, value) in &self.props.headers {
311            request_builder = request_builder.header(name, value);
312        }
313
314        if let Some(id) = self.last_event_id.as_ref() {
315            if !id.is_empty() {
316                let id_as_header =
317                    HeaderValue::from_str(id).map_err(|e| Error::InvalidParameter(Box::new(e)))?;
318
319                request_builder = request_builder.header("last-event-id", id_as_header);
320            }
321        }
322
323        // Include the request body if set. Most SSE requests use GET and will have None,
324        // but some implementations (e.g., using REPORT method) may include a body.
325        let request = request_builder
326            .body(self.props.body.clone().map(|b| b.into()))
327            .map_err(|e| Error::InvalidParameter(Box::new(e)))?;
328
329        Ok(self.transport.request(request))
330    }
331
332    fn reset_redirects(self: Pin<&mut Self>) {
333        let url = self.props.url.clone();
334        let this = self.project();
335        *this.current_url = url;
336        *this.redirect_count = 0;
337    }
338
339    fn increment_redirect_counter(self: Pin<&mut Self>) -> bool {
340        if self.redirect_count == self.props.max_redirects {
341            return false;
342        }
343        *self.project().redirect_count += 1;
344        true
345    }
346}
347
348impl<T: HttpTransport> Stream for ReconnectingRequest<T> {
349    type Item = Result<SSE>;
350
351    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
352        trace!("ReconnectingRequest::poll({:?})", &self.state);
353
354        loop {
355            let this = self.as_mut().project();
356            if let Some(event) = this.event_parser.get_event() {
357                return match event {
358                    SSE::Connected(_) => Poll::Ready(Some(Ok(event))),
359                    SSE::Event(ref evt) => {
360                        this.last_event_id.clone_from(&evt.id);
361
362                        if let Some(retry) = evt.retry {
363                            this.retry_strategy
364                                .change_base_delay(Duration::from_millis(retry));
365                        }
366                        Poll::Ready(Some(Ok(event)))
367                    }
368                    SSE::Comment(_) => Poll::Ready(Some(Ok(event))),
369                };
370            }
371
372            trace!("ReconnectingRequest::poll loop({:?})", &this.state);
373
374            let state = this.state.project();
375            match state {
376                StateProj::StreamClosed => return Poll::Ready(None),
377                // New immediately transitions to Connecting, and exists only
378                // to ensure that we only connect when polled.
379                StateProj::New => {
380                    *self.as_mut().project().event_parser = EventParser::new();
381                    match self.send_request() {
382                        Ok(resp) => {
383                            let retry = if self.initial_connection {
384                                self.props.reconnect_opts.retry_initial
385                            } else {
386                                self.props.reconnect_opts.reconnect
387                            };
388                            self.as_mut()
389                                .project()
390                                .state
391                                .set(State::Connecting { resp, retry })
392                        }
393                        Err(e) => {
394                            // This error seems to be unrecoverable. So we should just shut down the
395                            // stream.
396                            self.as_mut().project().state.set(State::StreamClosed);
397                            return Poll::Ready(Some(Err(e)));
398                        }
399                    }
400                }
401                StateProj::Connecting { retry, resp } => match ready!(resp.poll(cx)) {
402                    Ok(resp) => {
403                        debug!(
404                            "HTTP response status: {}, headers: {:?}",
405                            resp.status(),
406                            resp.headers()
407                        );
408
409                        if resp.status().is_success() {
410                            self.as_mut().project().retry_strategy.reset(Instant::now());
411                            self.as_mut().reset_redirects();
412
413                            let status = resp.status();
414                            let headers = resp.headers().clone();
415
416                            self.as_mut()
417                                .project()
418                                .state
419                                .set(State::Connected(resp.into_body()));
420                            self.as_mut().project().initial_connection.set(false);
421
422                            return Poll::Ready(Some(Ok(SSE::Connected(ConnectionDetails::new(
423                                Response::new(status, headers),
424                            )))));
425                        }
426
427                        if resp.status() == 301 || resp.status() == 307 {
428                            debug!("got redirected ({})", resp.status());
429
430                            if self.as_mut().increment_redirect_counter() {
431                                debug!("following redirect {}", self.redirect_count);
432
433                                self.as_mut().project().state.set(State::FollowingRedirect(
434                                    resp.headers().get("location").cloned(),
435                                ));
436                                continue;
437                            } else {
438                                debug!("redirect limit reached ({})", self.props.max_redirects);
439
440                                self.as_mut().project().state.set(State::StreamClosed);
441                                return Poll::Ready(Some(Err(Error::MaxRedirectLimitReached(
442                                    self.props.max_redirects,
443                                ))));
444                            }
445                        }
446
447                        let status = resp.status();
448                        let headers = resp.headers().clone();
449                        let body = resp.into_body();
450
451                        let error = Error::UnexpectedResponse(
452                            Response::new(status, headers),
453                            ErrorBody::new(body),
454                        );
455
456                        if !*retry {
457                            self.as_mut().project().state.set(State::StreamClosed);
458                            return Poll::Ready(Some(Err(error)));
459                        }
460
461                        self.as_mut().reset_redirects();
462
463                        let duration = self
464                            .as_mut()
465                            .project()
466                            .retry_strategy
467                            .next_delay(Instant::now());
468
469                        self.as_mut()
470                            .project()
471                            .state
472                            .set(State::WaitingToReconnect(delay(duration, "retrying")));
473
474                        return Poll::Ready(Some(Err(error)));
475                    }
476                    Err(e) => {
477                        // This happens when the server is unreachable, e.g. connection refused.
478                        warn!("request returned an error: {e}");
479                        if !*retry {
480                            self.as_mut().project().state.set(State::StreamClosed);
481                            return Poll::Ready(Some(Err(Error::Transport(e))));
482                        }
483
484                        let duration = self
485                            .as_mut()
486                            .project()
487                            .retry_strategy
488                            .next_delay(Instant::now());
489
490                        self.as_mut()
491                            .project()
492                            .state
493                            .set(State::WaitingToReconnect(delay(duration, "retrying")));
494                    }
495                },
496                StateProj::FollowingRedirect(maybe_header) => match uri_from_header(maybe_header) {
497                    Ok(uri) => {
498                        *self.as_mut().project().current_url = uri;
499                        self.as_mut().project().state.set(State::New);
500                    }
501                    Err(e) => {
502                        self.as_mut().project().state.set(State::StreamClosed);
503                        return Poll::Ready(Some(Err(e)));
504                    }
505                },
506                StateProj::Connected(mut body) => match ready!(body.as_mut().poll_next(cx)) {
507                    Some(Ok(result)) => {
508                        if let Err(e) = this.event_parser.process_bytes(result) {
509                            // The current response body is unusable. Either
510                            // schedule a reconnect or close the stream so a
511                            // caller that disabled reconnect doesn't keep
512                            // reading from a poisoned parser.
513                            if self.props.reconnect_opts.reconnect {
514                                let duration = self
515                                    .as_mut()
516                                    .project()
517                                    .retry_strategy
518                                    .next_delay(Instant::now());
519                                self.as_mut().project().state.set(State::WaitingToReconnect(
520                                    delay(duration, "reconnecting"),
521                                ));
522                            } else {
523                                self.as_mut().project().state.set(State::StreamClosed);
524                            }
525                            return Poll::Ready(Some(Err(e)));
526                        }
527                        continue;
528                    }
529                    Some(Err(e)) => {
530                        if self.props.reconnect_opts.reconnect {
531                            let duration = self
532                                .as_mut()
533                                .project()
534                                .retry_strategy
535                                .next_delay(Instant::now());
536                            self.as_mut()
537                                .project()
538                                .state
539                                .set(State::WaitingToReconnect(delay(duration, "reconnecting")));
540                        }
541
542                        // Check if the underlying error is a timeout
543                        if let Some(cause) = e.source() {
544                            if let Some(downcast) = cause.downcast_ref::<std::io::Error>() {
545                                if let std::io::ErrorKind::TimedOut = downcast.kind() {
546                                    return Poll::Ready(Some(Err(Error::TimedOut)));
547                                }
548                            }
549                        }
550
551                        return Poll::Ready(Some(Err(Error::Transport(e))));
552                    }
553                    None => {
554                        if self.props.reconnect_opts.reconnect {
555                            let duration = self
556                                .as_mut()
557                                .project()
558                                .retry_strategy
559                                .next_delay(Instant::now());
560                            self.as_mut()
561                                .project()
562                                .state
563                                .set(State::WaitingToReconnect(delay(duration, "retrying")));
564                        } else {
565                            self.as_mut().project().state.set(State::StreamClosed);
566                        }
567
568                        if self.event_parser.was_processing() {
569                            return Poll::Ready(Some(Err(Error::UnexpectedEof)));
570                        }
571                        return Poll::Ready(Some(Err(Error::Eof)));
572                    }
573                },
574                StateProj::WaitingToReconnect(delay) => {
575                    ready!(delay.poll(cx));
576                    info!("Reconnecting");
577                    self.as_mut().project().state.set(State::New);
578                }
579            };
580        }
581    }
582}
583
584fn uri_from_header(maybe_header: &Option<HeaderValue>) -> Result<Uri> {
585    let header = maybe_header.as_ref().ok_or_else(|| {
586        Error::MalformedLocationHeader(Box::new(std::io::Error::new(
587            ErrorKind::NotFound,
588            "missing Location header",
589        )))
590    })?;
591
592    let header_string = header
593        .to_str()
594        .map_err(|e| Error::MalformedLocationHeader(Box::new(e)))?;
595
596    header_string
597        .parse::<Uri>()
598        .map_err(|e| Error::MalformedLocationHeader(Box::new(e)))
599}
600
601fn delay(dur: Duration, description: &str) -> Sleep {
602    info!("Waiting {dur:?} before {description}");
603    tokio::time::sleep(dur)
604}
605
606mod private {
607    use crate::client::ClientImpl;
608    use launchdarkly_sdk_transport::HttpTransport;
609
610    pub trait Sealed {}
611    impl<T: HttpTransport> Sealed for ClientImpl<T> {}
612}
613
614#[cfg(test)]
615mod tests {
616    use crate::ClientBuilder;
617    use http::HeaderValue;
618    use test_case::test_case;
619
620    #[test_case("user", "pass", "dXNlcjpwYXNz")]
621    #[test_case("user1", "password123", "dXNlcjE6cGFzc3dvcmQxMjM=")]
622    #[test_case("user2", "", "dXNlcjI6")]
623    #[test_case("user@name", "pass#word!", "dXNlckBuYW1lOnBhc3Mjd29yZCE=")]
624    #[test_case("user3", "my pass", "dXNlcjM6bXkgcGFzcw==")]
625    #[test_case(
626        "weird@-/:stuff",
627        "goes@-/:here",
628        "d2VpcmRALS86c3R1ZmY6Z29lc0AtLzpoZXJl"
629    )]
630    fn basic_auth_generates_correct_headers(username: &str, password: &str, expected: &str) {
631        let builder = ClientBuilder::for_url("http://example.com")
632            .expect("failed to build client")
633            .basic_auth(username, password)
634            .expect("failed to add authentication");
635
636        let actual = builder.headers.get("Authorization");
637        let expected = HeaderValue::from_str(format!("Basic {expected}").as_str())
638            .expect("unable to create expected header");
639
640        assert_eq!(Some(&expected), actual);
641    }
642
643    use std::{pin::pin, sync::Arc, time::Duration};
644
645    use bytes::Bytes;
646    use futures::{stream, TryStreamExt};
647    use http::HeaderMap;
648    use tokio::time::timeout;
649
650    use crate::{
651        client::{RequestProps, State},
652        ReconnectOptionsBuilder, ReconnectingRequest,
653    };
654    use launchdarkly_sdk_transport::{ByteStream, HttpTransport, ResponseFuture, TransportError};
655
656    // Mock transport for testing
657    #[derive(Clone)]
658    struct MockTransport {
659        fail_request: bool,
660    }
661
662    impl MockTransport {
663        fn new(_url: String, fail_request: bool) -> Self {
664            Self { fail_request }
665        }
666    }
667
668    impl HttpTransport for MockTransport {
669        fn request(&self, _request: http::Request<Option<Bytes>>) -> ResponseFuture {
670            if self.fail_request {
671                // Simulate a connection error
672                Box::pin(async {
673                    Err(TransportError::new(std::io::Error::new(
674                        std::io::ErrorKind::ConnectionRefused,
675                        "connection refused",
676                    )))
677                })
678            } else {
679                // Return a 404 response
680                Box::pin(async {
681                    let byte_stream: ByteStream =
682                        Box::pin(stream::iter(vec![Ok(Bytes::from("not found"))]));
683                    let response = http::Response::builder()
684                        .status(404)
685                        .body(byte_stream)
686                        .unwrap();
687                    Ok(response)
688                })
689            }
690        }
691    }
692
693    const INVALID_URI: &str = "http://mycrazyunexsistenturl.invaliddomainext";
694
695    #[test_case(INVALID_URI, false, |state| matches!(state, State::StreamClosed))]
696    #[test_case(INVALID_URI, true, |state| matches!(state, State::WaitingToReconnect(_)))]
697    #[tokio::test]
698    async fn initial_connection(uri: &str, retry_initial: bool, expected: fn(&State) -> bool) {
699        let reconnect_opts = ReconnectOptionsBuilder::new(false)
700            .backoff_factor(1)
701            .delay(Duration::from_secs(1))
702            .retry_initial(retry_initial)
703            .build();
704
705        let transport = Arc::new(MockTransport::new(uri.to_string(), true));
706        let req_props = RequestProps {
707            url: uri.parse().unwrap(),
708            headers: HeaderMap::new(),
709            method: "GET".to_string(),
710            body: None,
711            reconnect_opts,
712            max_redirects: 10,
713        };
714
715        let mut reconnecting_request = ReconnectingRequest::new(transport.clone(), req_props, None);
716
717        // sets initial state with a failing request
718        let resp = transport.request(http::Request::builder().uri(uri).body(None).unwrap());
719
720        reconnecting_request.state = State::Connecting {
721            retry: reconnecting_request.props.reconnect_opts.retry_initial,
722            resp,
723        };
724
725        let mut reconnecting_request = pin!(reconnecting_request);
726
727        timeout(Duration::from_millis(500), reconnecting_request.try_next())
728            .await
729            .ok();
730
731        assert!(expected(&reconnecting_request.state));
732    }
733
734    #[test_case(false, |state| matches!(state, State::StreamClosed))]
735    #[test_case(true, |state| matches!(state, State::WaitingToReconnect(_)))]
736    #[tokio::test]
737    async fn initial_connection_mocked_server(retry_initial: bool, expected: fn(&State) -> bool) {
738        let mut mock_server = mockito::Server::new_async().await;
739        let _mock = mock_server
740            .mock("GET", "/")
741            .with_status(404)
742            .create_async()
743            .await;
744
745        initial_connection(&mock_server.url(), retry_initial, expected).await;
746    }
747
748    // When a parse error happens during streaming and reconnect is
749    // enabled, the next stream item should be a fresh `Connected` from
750    // the reconnect, not another error from continuing to drain the
751    // broken response body.
752    #[cfg(feature = "hyper")]
753    #[tokio::test(flavor = "multi_thread")]
754    async fn parser_error_schedules_reconnect_immediately() {
755        use crate::{Client, ClientBuilder, Error, ReconnectOptionsBuilder, SSE};
756        use futures::StreamExt;
757        use launchdarkly_sdk_transport::HyperTransport;
758
759        let mut server = mockito::Server::new_async().await;
760        let _mock = server
761            .mock("GET", "/")
762            .with_status(200)
763            .with_body(b"\xff\xfe:bad\n\n".as_ref())
764            .create_async()
765            .await;
766
767        let transport = HyperTransport::new().expect("failed to build transport");
768        let client = ClientBuilder::for_url(&server.url())
769            .unwrap()
770            .reconnect(
771                ReconnectOptionsBuilder::new(true)
772                    .delay(Duration::from_millis(10))
773                    .delay_max(Duration::from_millis(10))
774                    .retry_initial(true)
775                    .build(),
776            )
777            .build_with_transport(transport);
778
779        let mut stream = client.stream();
780
781        // Expected order: Connected, parse error, Connected (reconnect).
782        let mut items = Vec::new();
783        tokio::time::timeout(Duration::from_secs(2), async {
784            while items.len() < 3 {
785                match stream.next().await {
786                    Some(item) => items.push(item),
787                    None => break,
788                }
789            }
790        })
791        .await
792        .expect("timed out waiting for parse error and reconnect");
793
794        assert!(
795            matches!(items.first(), Some(Ok(SSE::Connected(_)))),
796            "expected initial Connected, got {:?}",
797            items.first()
798        );
799        assert!(
800            matches!(items.get(1), Some(Err(Error::InvalidLine(_)))),
801            "expected InvalidLine error after first connection, got {:?}",
802            items.get(1)
803        );
804        assert!(
805            matches!(items.get(2), Some(Ok(SSE::Connected(_)))),
806            "expected reconnect (Connected) immediately after parse error, got {:?}",
807            items.get(2)
808        );
809    }
810
811    // With reconnect disabled, a parse error should close the stream so the
812    // next poll returns `None` rather than continuing to read from a poisoned
813    // parser or reconnecting via the EOF arm.
814    #[cfg(feature = "hyper")]
815    #[tokio::test(flavor = "multi_thread")]
816    async fn parser_error_closes_stream_when_reconnect_disabled() {
817        use crate::{Client, ClientBuilder, Error, ReconnectOptionsBuilder, SSE};
818        use futures::StreamExt;
819        use launchdarkly_sdk_transport::HyperTransport;
820
821        let mut server = mockito::Server::new_async().await;
822        let _mock = server
823            .mock("GET", "/")
824            .with_status(200)
825            .with_body(b"\xff\xfe:bad\n\n".as_ref())
826            .create_async()
827            .await;
828
829        let transport = HyperTransport::new().expect("failed to build transport");
830        let client = ClientBuilder::for_url(&server.url())
831            .unwrap()
832            .reconnect(
833                ReconnectOptionsBuilder::new(false)
834                    .retry_initial(true)
835                    .build(),
836            )
837            .build_with_transport(transport);
838
839        let mut stream = client.stream();
840
841        let mut items = Vec::new();
842        tokio::time::timeout(Duration::from_secs(2), async {
843            while items.len() < 3 {
844                match stream.next().await {
845                    Some(item) => items.push(item),
846                    None => {
847                        items.push(Ok(SSE::Comment("__stream_ended__".into())));
848                        break;
849                    }
850                }
851            }
852        })
853        .await
854        .expect("timed out waiting for stream to close");
855
856        assert!(
857            matches!(items.first(), Some(Ok(SSE::Connected(_)))),
858            "expected initial Connected, got {:?}",
859            items.first()
860        );
861        assert!(
862            matches!(items.get(1), Some(Err(Error::InvalidLine(_)))),
863            "expected InvalidLine error, got {:?}",
864            items.get(1)
865        );
866        assert!(
867            matches!(
868                items.get(2),
869                Some(Ok(SSE::Comment(s))) if s == "__stream_ended__"
870            ),
871            "expected stream to end (None) after parse error with reconnect disabled, got {:?}",
872            items.get(2)
873        );
874    }
875
876    // With reconnect disabled, a clean end-of-body should close the stream
877    // rather than scheduling a reconnect.
878    #[cfg(feature = "hyper")]
879    #[tokio::test(flavor = "multi_thread")]
880    async fn eof_closes_stream_when_reconnect_disabled() {
881        use crate::{Client, ClientBuilder, Error, ReconnectOptionsBuilder, SSE};
882        use futures::StreamExt;
883        use launchdarkly_sdk_transport::HyperTransport;
884
885        let mut server = mockito::Server::new_async().await;
886        let _mock = server
887            .mock("GET", "/")
888            .with_status(200)
889            .with_body("event: hello\ndata: world\n\n")
890            .create_async()
891            .await;
892
893        let transport = HyperTransport::new().expect("failed to build transport");
894        let client = ClientBuilder::for_url(&server.url())
895            .unwrap()
896            .reconnect(
897                ReconnectOptionsBuilder::new(false)
898                    .retry_initial(true)
899                    .build(),
900            )
901            .build_with_transport(transport);
902
903        let mut stream = client.stream();
904
905        let mut items: Vec<Option<crate::Result<SSE>>> = Vec::new();
906        tokio::time::timeout(Duration::from_secs(2), async {
907            for _ in 0..4 {
908                let item = stream.next().await;
909                let is_terminal = item.is_none();
910                items.push(item);
911                if is_terminal {
912                    break;
913                }
914            }
915        })
916        .await
917        .expect("timed out waiting for stream to close");
918
919        assert!(
920            matches!(items.first(), Some(Some(Ok(SSE::Connected(_))))),
921            "expected initial Connected, got {:?}",
922            items.first()
923        );
924        assert!(
925            matches!(items.get(1), Some(Some(Ok(SSE::Event(e)))) if e.event_type == "hello"),
926            "expected hello event, got {:?}",
927            items.get(1)
928        );
929        assert!(
930            matches!(items.get(2), Some(Some(Err(Error::Eof)))),
931            "expected Eof error after body ends, got {:?}",
932            items.get(2)
933        );
934        assert!(
935            matches!(items.get(3), Some(None)),
936            "expected stream to end (None) after EOF with reconnect disabled, got {:?}",
937            items.get(3)
938        );
939    }
940}