Skip to main content

seedlink_rs_client/
reconnect.rs

1use std::collections::HashMap;
2use std::time::Duration;
3
4use futures_core::Stream;
5use seedlink_rs_protocol::SequenceNumber;
6use tracing::{debug, info, warn};
7
8use crate::SeedLinkClient;
9use crate::error::{ClientError, Result};
10use crate::state::{ClientConfig, OwnedFrame, StationKey};
11
12/// Configuration for automatic reconnect with exponential backoff.
13#[derive(Clone, Debug)]
14pub struct ReconnectConfig {
15    /// Initial delay before the first reconnect attempt. Default: 1 second.
16    pub initial_backoff: Duration,
17    /// Maximum delay between reconnect attempts. Default: 60 seconds.
18    pub max_backoff: Duration,
19    /// Multiplier applied to backoff after each failed attempt. Default: 2.0.
20    pub multiplier: f64,
21    /// Maximum number of reconnect attempts. 0 = unlimited. Default: 0.
22    pub max_attempts: u32,
23}
24
25impl Default for ReconnectConfig {
26    fn default() -> Self {
27        Self {
28            initial_backoff: Duration::from_secs(1),
29            max_backoff: Duration::from_secs(60),
30            multiplier: 2.0,
31            max_attempts: 0,
32        }
33    }
34}
35
36/// Records a subscription step for replay on reconnect.
37#[derive(Clone, Debug)]
38enum SubscriptionStep {
39    Station { station: String, network: String },
40    Select { pattern: String },
41    Data,
42    DataFrom(SequenceNumber),
43    TimeWindow { start: String, end: Option<String> },
44}
45
46/// A wrapper around [`SeedLinkClient`] that automatically reconnects on disconnect.
47///
48/// Records all subscription steps (STATION, SELECT, DATA, TIME) and replays them
49/// on reconnect. On resume, replaces DATA with DATA-from-sequence using the last
50/// tracked sequence numbers.
51///
52/// # Deduplication guarantee
53///
54/// SeedLink servers may resend the last frame at the requested sequence number
55/// when resuming with `DATA seq`. This client automatically deduplicates frames
56/// after reconnect: any frame whose sequence number is ≤ the last tracked
57/// sequence for its station is silently dropped. Downstream consumers are
58/// guaranteed to never see duplicate frames.
59pub struct ReconnectingClient {
60    addr: String,
61    config: ClientConfig,
62    reconnect: ReconnectConfig,
63    subscriptions: Vec<SubscriptionStep>,
64    client: Option<SeedLinkClient>,
65    sequences: HashMap<StationKey, SequenceNumber>,
66}
67
68impl ReconnectingClient {
69    /// Connect to a SeedLink server with reconnect support using default configs.
70    pub async fn connect(addr: &str) -> Result<Self> {
71        Self::connect_with_config(addr, ClientConfig::default(), ReconnectConfig::default()).await
72    }
73
74    /// Connect with custom client and reconnect configuration.
75    pub async fn connect_with_config(
76        addr: &str,
77        config: ClientConfig,
78        reconnect: ReconnectConfig,
79    ) -> Result<Self> {
80        let client = SeedLinkClient::connect_with_config(addr, config.clone()).await?;
81        Ok(Self {
82            addr: addr.to_owned(),
83            config,
84            reconnect,
85            subscriptions: Vec::new(),
86            client: Some(client),
87            sequences: HashMap::new(),
88        })
89    }
90
91    /// Select a station and network. Records the step for reconnect replay.
92    pub async fn station(&mut self, station: &str, network: &str) -> Result<()> {
93        self.subscriptions.push(SubscriptionStep::Station {
94            station: station.to_owned(),
95            network: network.to_owned(),
96        });
97        self.client_mut()?.station(station, network).await
98    }
99
100    /// Select channels. Records the step for reconnect replay.
101    pub async fn select(&mut self, pattern: &str) -> Result<()> {
102        self.subscriptions.push(SubscriptionStep::Select {
103            pattern: pattern.to_owned(),
104        });
105        self.client_mut()?.select(pattern).await
106    }
107
108    /// Arm with DATA. Records the step for reconnect replay.
109    pub async fn data(&mut self) -> Result<()> {
110        self.subscriptions.push(SubscriptionStep::Data);
111        self.client_mut()?.data().await
112    }
113
114    /// Arm with DATA from a specific sequence. Records the step for reconnect replay.
115    pub async fn data_from(&mut self, sequence: SequenceNumber) -> Result<()> {
116        self.subscriptions
117            .push(SubscriptionStep::DataFrom(sequence));
118        self.client_mut()?.data_from(sequence).await
119    }
120
121    /// Arm with TIME window. Records the step for reconnect replay.
122    pub async fn time_window(&mut self, start: &str, end: Option<&str>) -> Result<()> {
123        self.subscriptions.push(SubscriptionStep::TimeWindow {
124            start: start.to_owned(),
125            end: end.map(|s| s.to_owned()),
126        });
127        self.client_mut()?.time_window(start, end).await
128    }
129
130    /// Send END to start streaming. Does not record (replayed automatically).
131    pub async fn end_stream(&mut self) -> Result<()> {
132        self.client_mut()?.end_stream().await
133    }
134
135    /// Read the next frame, automatically reconnecting on EOF.
136    ///
137    /// Returns `Ok(Some(frame))` on success, `Ok(None)` when the stream truly ends
138    /// (max attempts exhausted or server sends clean EOF after reconnect),
139    /// or `Err` on non-recoverable errors.
140    ///
141    /// Frames with sequence ≤ the last tracked sequence for their station are
142    /// silently dropped (deduplication after reconnect).
143    pub async fn next_frame(&mut self) -> Result<Option<OwnedFrame>> {
144        loop {
145            let result = match self.client.as_mut() {
146                Some(client) => client.next_frame().await,
147                None => return Err(ClientError::Disconnected),
148            };
149
150            match result {
151                Ok(Some(frame)) => {
152                    // Dedup: skip frames we've already seen (server may resend
153                    // the last frame after reconnect with DATA seq)
154                    if let Some(key) = frame.station_key()
155                        && let Some(&tracked) = self.sequences.get(&key)
156                        && frame.sequence() <= tracked
157                    {
158                        debug!(
159                            seq = %frame.sequence(),
160                            tracked = %tracked,
161                            station = ?key,
162                            "skipping duplicate frame"
163                        );
164                        continue;
165                    }
166
167                    // Track sequence from the inner client
168                    self.sync_sequences();
169                    return Ok(Some(frame));
170                }
171                Ok(None) => {
172                    // EOF — attempt reconnect
173                    debug!("stream ended, attempting reconnect");
174                    match self.attempt_reconnect().await {
175                        Ok(()) => {
176                            // Reconnected — loop to read from new connection
177                            continue;
178                        }
179                        Err(ClientError::ReconnectFailed { attempts }) => {
180                            warn!(attempts, "reconnect failed, giving up");
181                            return Err(ClientError::ReconnectFailed { attempts });
182                        }
183                        Err(e) => return Err(e),
184                    }
185                }
186                Err(e) => return Err(e),
187            }
188        }
189    }
190
191    /// Consume this client and return a [`Stream`] of frames with auto-reconnect.
192    ///
193    /// Duplicate frames after reconnect are automatically filtered out.
194    pub fn into_stream(self) -> impl Stream<Item = Result<OwnedFrame>> {
195        async_stream::try_stream! {
196            let mut this = self;
197            loop {
198                match this.next_frame().await {
199                    Ok(Some(frame)) => yield frame,
200                    Ok(None) => break,
201                    Err(ClientError::ReconnectFailed { .. }) => break,
202                    Err(e) => Err(e)?,
203                }
204            }
205        }
206    }
207
208    /// Returns the last received sequence number for a given network/station pair.
209    pub fn last_sequence(&self, network: &str, station: &str) -> Option<SequenceNumber> {
210        let key = StationKey {
211            network: network.to_owned(),
212            station: station.to_owned(),
213        };
214        self.sequences.get(&key).copied()
215    }
216
217    /// Returns a reference to all tracked sequences.
218    pub fn sequences(&self) -> &HashMap<StationKey, SequenceNumber> {
219        &self.sequences
220    }
221
222    // -- Private helpers --
223
224    fn client_mut(&mut self) -> Result<&mut SeedLinkClient> {
225        self.client.as_mut().ok_or(ClientError::Disconnected)
226    }
227
228    fn sync_sequences(&mut self) {
229        if let Some(client) = &self.client {
230            for (key, seq) in client.sequences() {
231                self.sequences.insert(key.clone(), *seq);
232            }
233        }
234    }
235
236    /// Try to reconnect and replay subscriptions.
237    async fn attempt_reconnect(&mut self) -> Result<()> {
238        self.client = None;
239
240        let mut backoff = self.reconnect.initial_backoff;
241        let max_attempts = self.reconnect.max_attempts;
242
243        for attempt in 1.. {
244            if max_attempts > 0 && attempt > max_attempts {
245                return Err(ClientError::ReconnectFailed {
246                    attempts: max_attempts,
247                });
248            }
249
250            info!(attempt, backoff_ms = backoff.as_millis(), "reconnecting");
251            tokio::time::sleep(backoff).await;
252
253            match SeedLinkClient::connect_with_config(&self.addr, self.config.clone()).await {
254                Ok(mut new_client) => {
255                    // Replay subscriptions
256                    if let Err(e) = self.replay_subscriptions(&mut new_client).await {
257                        warn!(attempt, error = %e, "replay failed, retrying");
258                        backoff = self.next_backoff(backoff);
259                        continue;
260                    }
261
262                    // Send END to resume streaming
263                    if let Err(e) = new_client.end_stream().await {
264                        warn!(attempt, error = %e, "end_stream failed, retrying");
265                        backoff = self.next_backoff(backoff);
266                        continue;
267                    }
268
269                    info!(attempt, "reconnected successfully");
270                    self.client = Some(new_client);
271                    return Ok(());
272                }
273                Err(e) => {
274                    warn!(attempt, error = %e, "reconnect attempt failed");
275                    backoff = self.next_backoff(backoff);
276                }
277            }
278        }
279
280        unreachable!()
281    }
282
283    fn next_backoff(&self, current: Duration) -> Duration {
284        let next = current.mul_f64(self.reconnect.multiplier);
285        next.min(self.reconnect.max_backoff)
286    }
287
288    /// Replay all recorded subscription steps on a new client.
289    ///
290    /// Replaces bare `Data` steps with `DataFrom(last_seq)` when we have
291    /// a tracked sequence for the current station context.
292    async fn replay_subscriptions(&self, client: &mut SeedLinkClient) -> Result<()> {
293        let mut current_station: Option<StationKey> = None;
294
295        for step in &self.subscriptions {
296            match step {
297                SubscriptionStep::Station { station, network } => {
298                    client.station(station, network).await?;
299                    current_station = Some(StationKey {
300                        network: network.clone(),
301                        station: station.clone(),
302                    });
303                }
304                SubscriptionStep::Select { pattern } => {
305                    client.select(pattern).await?;
306                }
307                SubscriptionStep::Data => {
308                    // Try to resume from last known sequence
309                    if let Some(ref key) = current_station {
310                        if let Some(seq) = self.sequences.get(key) {
311                            debug!(%seq, station = %key.station, network = %key.network, "resuming from sequence");
312                            client.data_from(*seq).await?;
313                        } else {
314                            client.data().await?;
315                        }
316                    } else {
317                        client.data().await?;
318                    }
319                }
320                SubscriptionStep::DataFrom(seq) => {
321                    // If we have a newer sequence, use that instead
322                    if let Some(ref key) = current_station
323                        && let Some(tracked) = self.sequences.get(key)
324                        && *tracked > *seq
325                    {
326                        client.data_from(*tracked).await?;
327                        continue;
328                    }
329                    client.data_from(*seq).await?;
330                }
331                SubscriptionStep::TimeWindow { start, end } => {
332                    client.time_window(start, end.as_deref()).await?;
333                }
334            }
335        }
336
337        Ok(())
338    }
339}
340
341// Clone ClientConfig so we can reuse it across reconnects
342impl Clone for ClientConfig {
343    fn clone(&self) -> Self {
344        Self {
345            connect_timeout: self.connect_timeout,
346            read_timeout: self.read_timeout,
347            prefer_v4: self.prefer_v4,
348        }
349    }
350}
351
352#[cfg(test)]
353mod tests {
354    use super::*;
355    use crate::mock::{MockConfig, MockServer};
356    use seedlink_rs_protocol::frame::v3;
357
358    fn make_v3_frame(seq: u64, station: &str, network: &str) -> Vec<u8> {
359        let mut payload = [0u8; v3::PAYLOAD_LEN];
360        let sta_bytes = station.as_bytes();
361        for (i, &b) in sta_bytes.iter().enumerate().take(5) {
362            payload[8 + i] = b;
363        }
364        for i in sta_bytes.len()..5 {
365            payload[8 + i] = b' ';
366        }
367        let net_bytes = network.as_bytes();
368        for (i, &b) in net_bytes.iter().enumerate().take(2) {
369            payload[18 + i] = b;
370        }
371        for i in net_bytes.len()..2 {
372            payload[18 + i] = b' ';
373        }
374        v3::write(SequenceNumber::new(seq), &payload).unwrap()
375    }
376
377    #[tokio::test]
378    async fn reconnect_on_disconnect() {
379        // Connection 0: seq=1, Connection 1: seq=2 (new data after reconnect)
380        let config = MockConfig {
381            close_after_stream: true,
382            max_connections: 2,
383            connection_frames: Some(vec![
384                vec![make_v3_frame(1, "ANMO", "IU")],
385                vec![make_v3_frame(2, "ANMO", "IU")],
386            ]),
387            ..MockConfig::v3_default(vec![])
388        };
389        let server = MockServer::start(config).await;
390
391        let reconnect_config = ReconnectConfig {
392            initial_backoff: Duration::from_millis(10),
393            max_backoff: Duration::from_millis(50),
394            max_attempts: 3,
395            ..Default::default()
396        };
397
398        let client_config = ClientConfig {
399            prefer_v4: false,
400            ..Default::default()
401        };
402
403        let mut client = ReconnectingClient::connect_with_config(
404            &server.addr().to_string(),
405            client_config,
406            reconnect_config,
407        )
408        .await
409        .unwrap();
410
411        client.station("ANMO", "IU").await.unwrap();
412        client.data().await.unwrap();
413        client.end_stream().await.unwrap();
414
415        // First frame from first connection
416        let frame1 = client.next_frame().await.unwrap().unwrap();
417        assert_eq!(frame1.sequence(), SequenceNumber::new(1));
418
419        // Connection closes → auto-reconnect → dedup-clean frame from second connection
420        let frame2 = client.next_frame().await.unwrap().unwrap();
421        assert_eq!(frame2.sequence(), SequenceNumber::new(2));
422    }
423
424    #[tokio::test]
425    async fn reconnect_max_attempts() {
426        // Server accepts only 1 connection
427        let frames = vec![make_v3_frame(1, "ANMO", "IU")];
428        let config = MockConfig {
429            close_after_stream: true,
430            max_connections: 1,
431            ..MockConfig::v3_default(frames)
432        };
433        let server = MockServer::start(config).await;
434
435        let reconnect_config = ReconnectConfig {
436            initial_backoff: Duration::from_millis(10),
437            max_backoff: Duration::from_millis(20),
438            max_attempts: 2,
439            ..Default::default()
440        };
441
442        let client_config = ClientConfig {
443            prefer_v4: false,
444            ..Default::default()
445        };
446
447        let mut client = ReconnectingClient::connect_with_config(
448            &server.addr().to_string(),
449            client_config,
450            reconnect_config,
451        )
452        .await
453        .unwrap();
454
455        client.station("ANMO", "IU").await.unwrap();
456        client.data().await.unwrap();
457        client.end_stream().await.unwrap();
458
459        // First frame OK
460        let frame = client.next_frame().await.unwrap().unwrap();
461        assert_eq!(frame.sequence(), SequenceNumber::new(1));
462
463        // EOF → reconnect fails (server only accepts 1 connection)
464        let err = client.next_frame().await.unwrap_err();
465        assert!(matches!(err, ClientError::ReconnectFailed { attempts: 2 }));
466    }
467
468    #[tokio::test]
469    async fn reconnect_resumes_sequence_verified_on_wire() {
470        // Connection 0: seq=10,11. Connection 1: seq=10,11 (dupes) + seq=12 (new).
471        let config = MockConfig {
472            close_after_stream: true,
473            max_connections: 2,
474            connection_frames: Some(vec![
475                vec![
476                    make_v3_frame(10, "ANMO", "IU"),
477                    make_v3_frame(11, "ANMO", "IU"),
478                ],
479                vec![
480                    make_v3_frame(10, "ANMO", "IU"),
481                    make_v3_frame(11, "ANMO", "IU"),
482                    make_v3_frame(12, "ANMO", "IU"),
483                ],
484            ]),
485            ..MockConfig::v3_default(vec![])
486        };
487        let server = MockServer::start(config).await;
488
489        let reconnect_config = ReconnectConfig {
490            initial_backoff: Duration::from_millis(10),
491            max_backoff: Duration::from_millis(50),
492            max_attempts: 3,
493            ..Default::default()
494        };
495
496        let client_config = ClientConfig {
497            prefer_v4: false,
498            ..Default::default()
499        };
500
501        let mut client = ReconnectingClient::connect_with_config(
502            &server.addr().to_string(),
503            client_config,
504            reconnect_config,
505        )
506        .await
507        .unwrap();
508
509        client.station("ANMO", "IU").await.unwrap();
510        client.data().await.unwrap();
511        client.end_stream().await.unwrap();
512
513        // Read both frames from first connection
514        let f1 = client.next_frame().await.unwrap().unwrap();
515        assert_eq!(f1.sequence(), SequenceNumber::new(10));
516        let f2 = client.next_frame().await.unwrap().unwrap();
517        assert_eq!(f2.sequence(), SequenceNumber::new(11));
518
519        assert_eq!(
520            client.last_sequence("IU", "ANMO"),
521            Some(SequenceNumber::new(11))
522        );
523
524        // Auto-reconnect happens here — seq=10 and seq=11 are deduped, seq=12 returned
525        let f3 = client.next_frame().await.unwrap().unwrap();
526        assert_eq!(f3.sequence(), SequenceNumber::new(12));
527
528        // Verify what was sent on the wire for connection #0 (original)
529        let conn0 = server.captured().connection(0);
530        assert_eq!(conn0[0], "HELLO");
531        assert_eq!(conn0[1], "STATION ANMO IU");
532        assert_eq!(conn0[2], "DATA");
533        assert_eq!(conn0[3], "END");
534
535        // Verify connection #1 (reconnect) used DATA with sequence
536        let conn1 = server.captured().connection(1);
537        assert_eq!(conn1[0], "HELLO");
538        assert_eq!(conn1[1], "STATION ANMO IU");
539        // Key assertion: DATA was replayed as DATA 00000B (hex for 11)
540        assert_eq!(conn1[2], "DATA 00000B");
541        assert_eq!(conn1[3], "END");
542    }
543
544    #[tokio::test]
545    async fn reconnect_multi_station_resumes_each_sequence() {
546        // Two stations: IU/ANMO (last seq=11) and GE/WLF (last seq=5)
547        // Connection 1 includes dupes + new frames for each station
548        let config = MockConfig {
549            close_after_stream: true,
550            max_connections: 2,
551            connection_frames: Some(vec![
552                vec![
553                    make_v3_frame(10, "ANMO", "IU"),
554                    make_v3_frame(11, "ANMO", "IU"),
555                    make_v3_frame(4, "WLF", "GE"),
556                    make_v3_frame(5, "WLF", "GE"),
557                ],
558                vec![
559                    make_v3_frame(11, "ANMO", "IU"), // dupe
560                    make_v3_frame(12, "ANMO", "IU"), // new
561                    make_v3_frame(5, "WLF", "GE"),   // dupe
562                    make_v3_frame(6, "WLF", "GE"),   // new
563                ],
564            ]),
565            ..MockConfig::v3_default(vec![])
566        };
567        let server = MockServer::start(config).await;
568
569        let reconnect_config = ReconnectConfig {
570            initial_backoff: Duration::from_millis(10),
571            max_backoff: Duration::from_millis(50),
572            max_attempts: 3,
573            ..Default::default()
574        };
575
576        let client_config = ClientConfig {
577            prefer_v4: false,
578            ..Default::default()
579        };
580
581        let mut client = ReconnectingClient::connect_with_config(
582            &server.addr().to_string(),
583            client_config,
584            reconnect_config,
585        )
586        .await
587        .unwrap();
588
589        // Subscribe two stations
590        client.station("ANMO", "IU").await.unwrap();
591        client.data().await.unwrap();
592        client.station("WLF", "GE").await.unwrap();
593        client.data().await.unwrap();
594        client.end_stream().await.unwrap();
595
596        // Read all 4 frames from connection 0
597        for _ in 0..4 {
598            client.next_frame().await.unwrap().unwrap();
599        }
600
601        // Verify tracked sequences
602        assert_eq!(
603            client.last_sequence("IU", "ANMO"),
604            Some(SequenceNumber::new(11))
605        );
606        assert_eq!(
607            client.last_sequence("GE", "WLF"),
608            Some(SequenceNumber::new(5))
609        );
610
611        // Trigger reconnect — dupes (seq=11/ANMO, seq=5/WLF) are skipped
612        let f = client.next_frame().await.unwrap().unwrap();
613        assert_eq!(f.sequence(), SequenceNumber::new(12)); // first non-dupe
614
615        let f = client.next_frame().await.unwrap().unwrap();
616        assert_eq!(f.sequence(), SequenceNumber::new(6)); // second non-dupe (WLF)
617
618        // Verify reconnect commands on the wire
619        let conn1 = server.captured().connection(1);
620        assert_eq!(conn1[0], "HELLO");
621        // Station 1: ANMO/IU with DATA resume from seq 11
622        assert_eq!(conn1[1], "STATION ANMO IU");
623        assert_eq!(conn1[2], "DATA 00000B"); // hex(11) = 00000B
624        // Station 2: WLF/GE with DATA resume from seq 5
625        assert_eq!(conn1[3], "STATION WLF GE");
626        assert_eq!(conn1[4], "DATA 000005"); // hex(5) = 000005
627        assert_eq!(conn1[5], "END");
628    }
629
630    #[tokio::test]
631    async fn reconnect_into_stream() {
632        use std::pin::pin;
633        use tokio_stream::StreamExt;
634
635        // Connection 0: seq=1, Connection 1: seq=2 (new data)
636        let config = MockConfig {
637            close_after_stream: true,
638            max_connections: 2,
639            connection_frames: Some(vec![
640                vec![make_v3_frame(1, "ANMO", "IU")],
641                vec![make_v3_frame(2, "ANMO", "IU")],
642            ]),
643            ..MockConfig::v3_default(vec![])
644        };
645        let server = MockServer::start(config).await;
646
647        let reconnect_config = ReconnectConfig {
648            initial_backoff: Duration::from_millis(10),
649            max_backoff: Duration::from_millis(50),
650            max_attempts: 1,
651            ..Default::default()
652        };
653
654        let client_config = ClientConfig {
655            prefer_v4: false,
656            ..Default::default()
657        };
658
659        let mut client = ReconnectingClient::connect_with_config(
660            &server.addr().to_string(),
661            client_config,
662            reconnect_config,
663        )
664        .await
665        .unwrap();
666
667        client.station("ANMO", "IU").await.unwrap();
668        client.data().await.unwrap();
669        client.end_stream().await.unwrap();
670
671        let mut stream = pin!(client.into_stream());
672
673        // Frame from first connection
674        let frame1 = stream.next().await.unwrap().unwrap();
675        assert_eq!(frame1.sequence(), SequenceNumber::new(1));
676
677        // Auto-reconnect, dedup-clean frame from second connection
678        let frame2 = stream.next().await.unwrap().unwrap();
679        assert_eq!(frame2.sequence(), SequenceNumber::new(2));
680
681        // After second connection closes, max_attempts=1 exhausted → stream ends
682        let end = stream.next().await;
683        assert!(end.is_none());
684    }
685
686    #[tokio::test]
687    async fn reconnect_dedup_skips_all_duplicates() {
688        // Connection 0: seq=10,11. Connection 1: seq=10,11 (all dupes).
689        // Since conn1 has NO new frames, all are skipped → EOF → reconnect fails.
690        let config = MockConfig {
691            close_after_stream: true,
692            max_connections: 2,
693            connection_frames: Some(vec![
694                vec![
695                    make_v3_frame(10, "ANMO", "IU"),
696                    make_v3_frame(11, "ANMO", "IU"),
697                ],
698                vec![
699                    make_v3_frame(10, "ANMO", "IU"),
700                    make_v3_frame(11, "ANMO", "IU"),
701                ],
702            ]),
703            ..MockConfig::v3_default(vec![])
704        };
705        let server = MockServer::start(config).await;
706
707        let reconnect_config = ReconnectConfig {
708            initial_backoff: Duration::from_millis(10),
709            max_backoff: Duration::from_millis(20),
710            max_attempts: 1,
711            ..Default::default()
712        };
713
714        let client_config = ClientConfig {
715            prefer_v4: false,
716            ..Default::default()
717        };
718
719        let mut client = ReconnectingClient::connect_with_config(
720            &server.addr().to_string(),
721            client_config,
722            reconnect_config,
723        )
724        .await
725        .unwrap();
726
727        client.station("ANMO", "IU").await.unwrap();
728        client.data().await.unwrap();
729        client.end_stream().await.unwrap();
730
731        // Read both frames from first connection
732        let f1 = client.next_frame().await.unwrap().unwrap();
733        assert_eq!(f1.sequence(), SequenceNumber::new(10));
734        let f2 = client.next_frame().await.unwrap().unwrap();
735        assert_eq!(f2.sequence(), SequenceNumber::new(11));
736
737        // EOF → reconnect → all frames are dupes → EOF → reconnect fails
738        let err = client.next_frame().await.unwrap_err();
739        assert!(matches!(err, ClientError::ReconnectFailed { attempts: 1 }));
740    }
741}