1use std::collections::HashMap;
2use std::time::Duration;
3
4use futures_core::Stream;
5use seedlink_rs_protocol::SequenceNumber;
6use tracing::{debug, info, warn};
7
8use crate::SeedLinkClient;
9use crate::error::{ClientError, Result};
10use crate::state::{ClientConfig, OwnedFrame, StationKey};
11
12#[derive(Clone, Debug)]
14pub struct ReconnectConfig {
15 pub initial_backoff: Duration,
17 pub max_backoff: Duration,
19 pub multiplier: f64,
21 pub max_attempts: u32,
23}
24
25impl Default for ReconnectConfig {
26 fn default() -> Self {
27 Self {
28 initial_backoff: Duration::from_secs(1),
29 max_backoff: Duration::from_secs(60),
30 multiplier: 2.0,
31 max_attempts: 0,
32 }
33 }
34}
35
36#[derive(Clone, Debug)]
38enum SubscriptionStep {
39 Station { station: String, network: String },
40 Select { pattern: String },
41 Data,
42 DataFrom(SequenceNumber),
43 TimeWindow { start: String, end: Option<String> },
44}
45
46pub struct ReconnectingClient {
60 addr: String,
61 config: ClientConfig,
62 reconnect: ReconnectConfig,
63 subscriptions: Vec<SubscriptionStep>,
64 client: Option<SeedLinkClient>,
65 sequences: HashMap<StationKey, SequenceNumber>,
66}
67
68impl ReconnectingClient {
69 pub async fn connect(addr: &str) -> Result<Self> {
71 Self::connect_with_config(addr, ClientConfig::default(), ReconnectConfig::default()).await
72 }
73
74 pub async fn connect_with_config(
76 addr: &str,
77 config: ClientConfig,
78 reconnect: ReconnectConfig,
79 ) -> Result<Self> {
80 let client = SeedLinkClient::connect_with_config(addr, config.clone()).await?;
81 Ok(Self {
82 addr: addr.to_owned(),
83 config,
84 reconnect,
85 subscriptions: Vec::new(),
86 client: Some(client),
87 sequences: HashMap::new(),
88 })
89 }
90
91 pub async fn station(&mut self, station: &str, network: &str) -> Result<()> {
93 self.subscriptions.push(SubscriptionStep::Station {
94 station: station.to_owned(),
95 network: network.to_owned(),
96 });
97 self.client_mut()?.station(station, network).await
98 }
99
100 pub async fn select(&mut self, pattern: &str) -> Result<()> {
102 self.subscriptions.push(SubscriptionStep::Select {
103 pattern: pattern.to_owned(),
104 });
105 self.client_mut()?.select(pattern).await
106 }
107
108 pub async fn data(&mut self) -> Result<()> {
110 self.subscriptions.push(SubscriptionStep::Data);
111 self.client_mut()?.data().await
112 }
113
114 pub async fn data_from(&mut self, sequence: SequenceNumber) -> Result<()> {
116 self.subscriptions
117 .push(SubscriptionStep::DataFrom(sequence));
118 self.client_mut()?.data_from(sequence).await
119 }
120
121 pub async fn time_window(&mut self, start: &str, end: Option<&str>) -> Result<()> {
123 self.subscriptions.push(SubscriptionStep::TimeWindow {
124 start: start.to_owned(),
125 end: end.map(|s| s.to_owned()),
126 });
127 self.client_mut()?.time_window(start, end).await
128 }
129
130 pub async fn end_stream(&mut self) -> Result<()> {
132 self.client_mut()?.end_stream().await
133 }
134
135 pub async fn next_frame(&mut self) -> Result<Option<OwnedFrame>> {
144 loop {
145 let result = match self.client.as_mut() {
146 Some(client) => client.next_frame().await,
147 None => return Err(ClientError::Disconnected),
148 };
149
150 match result {
151 Ok(Some(frame)) => {
152 if let Some(key) = frame.station_key()
155 && let Some(&tracked) = self.sequences.get(&key)
156 && frame.sequence() <= tracked
157 {
158 debug!(
159 seq = %frame.sequence(),
160 tracked = %tracked,
161 station = ?key,
162 "skipping duplicate frame"
163 );
164 continue;
165 }
166
167 self.sync_sequences();
169 return Ok(Some(frame));
170 }
171 Ok(None) => {
172 debug!("stream ended, attempting reconnect");
174 match self.attempt_reconnect().await {
175 Ok(()) => {
176 continue;
178 }
179 Err(ClientError::ReconnectFailed { attempts }) => {
180 warn!(attempts, "reconnect failed, giving up");
181 return Err(ClientError::ReconnectFailed { attempts });
182 }
183 Err(e) => return Err(e),
184 }
185 }
186 Err(e) => return Err(e),
187 }
188 }
189 }
190
191 pub fn into_stream(self) -> impl Stream<Item = Result<OwnedFrame>> {
195 async_stream::try_stream! {
196 let mut this = self;
197 loop {
198 match this.next_frame().await {
199 Ok(Some(frame)) => yield frame,
200 Ok(None) => break,
201 Err(ClientError::ReconnectFailed { .. }) => break,
202 Err(e) => Err(e)?,
203 }
204 }
205 }
206 }
207
208 pub fn last_sequence(&self, network: &str, station: &str) -> Option<SequenceNumber> {
210 let key = StationKey {
211 network: network.to_owned(),
212 station: station.to_owned(),
213 };
214 self.sequences.get(&key).copied()
215 }
216
217 pub fn sequences(&self) -> &HashMap<StationKey, SequenceNumber> {
219 &self.sequences
220 }
221
222 fn client_mut(&mut self) -> Result<&mut SeedLinkClient> {
225 self.client.as_mut().ok_or(ClientError::Disconnected)
226 }
227
228 fn sync_sequences(&mut self) {
229 if let Some(client) = &self.client {
230 for (key, seq) in client.sequences() {
231 self.sequences.insert(key.clone(), *seq);
232 }
233 }
234 }
235
236 async fn attempt_reconnect(&mut self) -> Result<()> {
238 self.client = None;
239
240 let mut backoff = self.reconnect.initial_backoff;
241 let max_attempts = self.reconnect.max_attempts;
242
243 for attempt in 1.. {
244 if max_attempts > 0 && attempt > max_attempts {
245 return Err(ClientError::ReconnectFailed {
246 attempts: max_attempts,
247 });
248 }
249
250 info!(attempt, backoff_ms = backoff.as_millis(), "reconnecting");
251 tokio::time::sleep(backoff).await;
252
253 match SeedLinkClient::connect_with_config(&self.addr, self.config.clone()).await {
254 Ok(mut new_client) => {
255 if let Err(e) = self.replay_subscriptions(&mut new_client).await {
257 warn!(attempt, error = %e, "replay failed, retrying");
258 backoff = self.next_backoff(backoff);
259 continue;
260 }
261
262 if let Err(e) = new_client.end_stream().await {
264 warn!(attempt, error = %e, "end_stream failed, retrying");
265 backoff = self.next_backoff(backoff);
266 continue;
267 }
268
269 info!(attempt, "reconnected successfully");
270 self.client = Some(new_client);
271 return Ok(());
272 }
273 Err(e) => {
274 warn!(attempt, error = %e, "reconnect attempt failed");
275 backoff = self.next_backoff(backoff);
276 }
277 }
278 }
279
280 unreachable!()
281 }
282
283 fn next_backoff(&self, current: Duration) -> Duration {
284 let next = current.mul_f64(self.reconnect.multiplier);
285 next.min(self.reconnect.max_backoff)
286 }
287
288 async fn replay_subscriptions(&self, client: &mut SeedLinkClient) -> Result<()> {
293 let mut current_station: Option<StationKey> = None;
294
295 for step in &self.subscriptions {
296 match step {
297 SubscriptionStep::Station { station, network } => {
298 client.station(station, network).await?;
299 current_station = Some(StationKey {
300 network: network.clone(),
301 station: station.clone(),
302 });
303 }
304 SubscriptionStep::Select { pattern } => {
305 client.select(pattern).await?;
306 }
307 SubscriptionStep::Data => {
308 if let Some(ref key) = current_station {
310 if let Some(seq) = self.sequences.get(key) {
311 debug!(%seq, station = %key.station, network = %key.network, "resuming from sequence");
312 client.data_from(*seq).await?;
313 } else {
314 client.data().await?;
315 }
316 } else {
317 client.data().await?;
318 }
319 }
320 SubscriptionStep::DataFrom(seq) => {
321 if let Some(ref key) = current_station
323 && let Some(tracked) = self.sequences.get(key)
324 && *tracked > *seq
325 {
326 client.data_from(*tracked).await?;
327 continue;
328 }
329 client.data_from(*seq).await?;
330 }
331 SubscriptionStep::TimeWindow { start, end } => {
332 client.time_window(start, end.as_deref()).await?;
333 }
334 }
335 }
336
337 Ok(())
338 }
339}
340
341impl Clone for ClientConfig {
343 fn clone(&self) -> Self {
344 Self {
345 connect_timeout: self.connect_timeout,
346 read_timeout: self.read_timeout,
347 prefer_v4: self.prefer_v4,
348 }
349 }
350}
351
352#[cfg(test)]
353mod tests {
354 use super::*;
355 use crate::mock::{MockConfig, MockServer};
356 use seedlink_rs_protocol::frame::v3;
357
358 fn make_v3_frame(seq: u64, station: &str, network: &str) -> Vec<u8> {
359 let mut payload = [0u8; v3::PAYLOAD_LEN];
360 let sta_bytes = station.as_bytes();
361 for (i, &b) in sta_bytes.iter().enumerate().take(5) {
362 payload[8 + i] = b;
363 }
364 for i in sta_bytes.len()..5 {
365 payload[8 + i] = b' ';
366 }
367 let net_bytes = network.as_bytes();
368 for (i, &b) in net_bytes.iter().enumerate().take(2) {
369 payload[18 + i] = b;
370 }
371 for i in net_bytes.len()..2 {
372 payload[18 + i] = b' ';
373 }
374 v3::write(SequenceNumber::new(seq), &payload).unwrap()
375 }
376
377 #[tokio::test]
378 async fn reconnect_on_disconnect() {
379 let config = MockConfig {
381 close_after_stream: true,
382 max_connections: 2,
383 connection_frames: Some(vec![
384 vec![make_v3_frame(1, "ANMO", "IU")],
385 vec![make_v3_frame(2, "ANMO", "IU")],
386 ]),
387 ..MockConfig::v3_default(vec![])
388 };
389 let server = MockServer::start(config).await;
390
391 let reconnect_config = ReconnectConfig {
392 initial_backoff: Duration::from_millis(10),
393 max_backoff: Duration::from_millis(50),
394 max_attempts: 3,
395 ..Default::default()
396 };
397
398 let client_config = ClientConfig {
399 prefer_v4: false,
400 ..Default::default()
401 };
402
403 let mut client = ReconnectingClient::connect_with_config(
404 &server.addr().to_string(),
405 client_config,
406 reconnect_config,
407 )
408 .await
409 .unwrap();
410
411 client.station("ANMO", "IU").await.unwrap();
412 client.data().await.unwrap();
413 client.end_stream().await.unwrap();
414
415 let frame1 = client.next_frame().await.unwrap().unwrap();
417 assert_eq!(frame1.sequence(), SequenceNumber::new(1));
418
419 let frame2 = client.next_frame().await.unwrap().unwrap();
421 assert_eq!(frame2.sequence(), SequenceNumber::new(2));
422 }
423
424 #[tokio::test]
425 async fn reconnect_max_attempts() {
426 let frames = vec![make_v3_frame(1, "ANMO", "IU")];
428 let config = MockConfig {
429 close_after_stream: true,
430 max_connections: 1,
431 ..MockConfig::v3_default(frames)
432 };
433 let server = MockServer::start(config).await;
434
435 let reconnect_config = ReconnectConfig {
436 initial_backoff: Duration::from_millis(10),
437 max_backoff: Duration::from_millis(20),
438 max_attempts: 2,
439 ..Default::default()
440 };
441
442 let client_config = ClientConfig {
443 prefer_v4: false,
444 ..Default::default()
445 };
446
447 let mut client = ReconnectingClient::connect_with_config(
448 &server.addr().to_string(),
449 client_config,
450 reconnect_config,
451 )
452 .await
453 .unwrap();
454
455 client.station("ANMO", "IU").await.unwrap();
456 client.data().await.unwrap();
457 client.end_stream().await.unwrap();
458
459 let frame = client.next_frame().await.unwrap().unwrap();
461 assert_eq!(frame.sequence(), SequenceNumber::new(1));
462
463 let err = client.next_frame().await.unwrap_err();
465 assert!(matches!(err, ClientError::ReconnectFailed { attempts: 2 }));
466 }
467
468 #[tokio::test]
469 async fn reconnect_resumes_sequence_verified_on_wire() {
470 let config = MockConfig {
472 close_after_stream: true,
473 max_connections: 2,
474 connection_frames: Some(vec![
475 vec![
476 make_v3_frame(10, "ANMO", "IU"),
477 make_v3_frame(11, "ANMO", "IU"),
478 ],
479 vec![
480 make_v3_frame(10, "ANMO", "IU"),
481 make_v3_frame(11, "ANMO", "IU"),
482 make_v3_frame(12, "ANMO", "IU"),
483 ],
484 ]),
485 ..MockConfig::v3_default(vec![])
486 };
487 let server = MockServer::start(config).await;
488
489 let reconnect_config = ReconnectConfig {
490 initial_backoff: Duration::from_millis(10),
491 max_backoff: Duration::from_millis(50),
492 max_attempts: 3,
493 ..Default::default()
494 };
495
496 let client_config = ClientConfig {
497 prefer_v4: false,
498 ..Default::default()
499 };
500
501 let mut client = ReconnectingClient::connect_with_config(
502 &server.addr().to_string(),
503 client_config,
504 reconnect_config,
505 )
506 .await
507 .unwrap();
508
509 client.station("ANMO", "IU").await.unwrap();
510 client.data().await.unwrap();
511 client.end_stream().await.unwrap();
512
513 let f1 = client.next_frame().await.unwrap().unwrap();
515 assert_eq!(f1.sequence(), SequenceNumber::new(10));
516 let f2 = client.next_frame().await.unwrap().unwrap();
517 assert_eq!(f2.sequence(), SequenceNumber::new(11));
518
519 assert_eq!(
520 client.last_sequence("IU", "ANMO"),
521 Some(SequenceNumber::new(11))
522 );
523
524 let f3 = client.next_frame().await.unwrap().unwrap();
526 assert_eq!(f3.sequence(), SequenceNumber::new(12));
527
528 let conn0 = server.captured().connection(0);
530 assert_eq!(conn0[0], "HELLO");
531 assert_eq!(conn0[1], "STATION ANMO IU");
532 assert_eq!(conn0[2], "DATA");
533 assert_eq!(conn0[3], "END");
534
535 let conn1 = server.captured().connection(1);
537 assert_eq!(conn1[0], "HELLO");
538 assert_eq!(conn1[1], "STATION ANMO IU");
539 assert_eq!(conn1[2], "DATA 00000B");
541 assert_eq!(conn1[3], "END");
542 }
543
544 #[tokio::test]
545 async fn reconnect_multi_station_resumes_each_sequence() {
546 let config = MockConfig {
549 close_after_stream: true,
550 max_connections: 2,
551 connection_frames: Some(vec![
552 vec![
553 make_v3_frame(10, "ANMO", "IU"),
554 make_v3_frame(11, "ANMO", "IU"),
555 make_v3_frame(4, "WLF", "GE"),
556 make_v3_frame(5, "WLF", "GE"),
557 ],
558 vec![
559 make_v3_frame(11, "ANMO", "IU"), make_v3_frame(12, "ANMO", "IU"), make_v3_frame(5, "WLF", "GE"), make_v3_frame(6, "WLF", "GE"), ],
564 ]),
565 ..MockConfig::v3_default(vec![])
566 };
567 let server = MockServer::start(config).await;
568
569 let reconnect_config = ReconnectConfig {
570 initial_backoff: Duration::from_millis(10),
571 max_backoff: Duration::from_millis(50),
572 max_attempts: 3,
573 ..Default::default()
574 };
575
576 let client_config = ClientConfig {
577 prefer_v4: false,
578 ..Default::default()
579 };
580
581 let mut client = ReconnectingClient::connect_with_config(
582 &server.addr().to_string(),
583 client_config,
584 reconnect_config,
585 )
586 .await
587 .unwrap();
588
589 client.station("ANMO", "IU").await.unwrap();
591 client.data().await.unwrap();
592 client.station("WLF", "GE").await.unwrap();
593 client.data().await.unwrap();
594 client.end_stream().await.unwrap();
595
596 for _ in 0..4 {
598 client.next_frame().await.unwrap().unwrap();
599 }
600
601 assert_eq!(
603 client.last_sequence("IU", "ANMO"),
604 Some(SequenceNumber::new(11))
605 );
606 assert_eq!(
607 client.last_sequence("GE", "WLF"),
608 Some(SequenceNumber::new(5))
609 );
610
611 let f = client.next_frame().await.unwrap().unwrap();
613 assert_eq!(f.sequence(), SequenceNumber::new(12)); let f = client.next_frame().await.unwrap().unwrap();
616 assert_eq!(f.sequence(), SequenceNumber::new(6)); let conn1 = server.captured().connection(1);
620 assert_eq!(conn1[0], "HELLO");
621 assert_eq!(conn1[1], "STATION ANMO IU");
623 assert_eq!(conn1[2], "DATA 00000B"); assert_eq!(conn1[3], "STATION WLF GE");
626 assert_eq!(conn1[4], "DATA 000005"); assert_eq!(conn1[5], "END");
628 }
629
630 #[tokio::test]
631 async fn reconnect_into_stream() {
632 use std::pin::pin;
633 use tokio_stream::StreamExt;
634
635 let config = MockConfig {
637 close_after_stream: true,
638 max_connections: 2,
639 connection_frames: Some(vec![
640 vec![make_v3_frame(1, "ANMO", "IU")],
641 vec![make_v3_frame(2, "ANMO", "IU")],
642 ]),
643 ..MockConfig::v3_default(vec![])
644 };
645 let server = MockServer::start(config).await;
646
647 let reconnect_config = ReconnectConfig {
648 initial_backoff: Duration::from_millis(10),
649 max_backoff: Duration::from_millis(50),
650 max_attempts: 1,
651 ..Default::default()
652 };
653
654 let client_config = ClientConfig {
655 prefer_v4: false,
656 ..Default::default()
657 };
658
659 let mut client = ReconnectingClient::connect_with_config(
660 &server.addr().to_string(),
661 client_config,
662 reconnect_config,
663 )
664 .await
665 .unwrap();
666
667 client.station("ANMO", "IU").await.unwrap();
668 client.data().await.unwrap();
669 client.end_stream().await.unwrap();
670
671 let mut stream = pin!(client.into_stream());
672
673 let frame1 = stream.next().await.unwrap().unwrap();
675 assert_eq!(frame1.sequence(), SequenceNumber::new(1));
676
677 let frame2 = stream.next().await.unwrap().unwrap();
679 assert_eq!(frame2.sequence(), SequenceNumber::new(2));
680
681 let end = stream.next().await;
683 assert!(end.is_none());
684 }
685
686 #[tokio::test]
687 async fn reconnect_dedup_skips_all_duplicates() {
688 let config = MockConfig {
691 close_after_stream: true,
692 max_connections: 2,
693 connection_frames: Some(vec![
694 vec![
695 make_v3_frame(10, "ANMO", "IU"),
696 make_v3_frame(11, "ANMO", "IU"),
697 ],
698 vec![
699 make_v3_frame(10, "ANMO", "IU"),
700 make_v3_frame(11, "ANMO", "IU"),
701 ],
702 ]),
703 ..MockConfig::v3_default(vec![])
704 };
705 let server = MockServer::start(config).await;
706
707 let reconnect_config = ReconnectConfig {
708 initial_backoff: Duration::from_millis(10),
709 max_backoff: Duration::from_millis(20),
710 max_attempts: 1,
711 ..Default::default()
712 };
713
714 let client_config = ClientConfig {
715 prefer_v4: false,
716 ..Default::default()
717 };
718
719 let mut client = ReconnectingClient::connect_with_config(
720 &server.addr().to_string(),
721 client_config,
722 reconnect_config,
723 )
724 .await
725 .unwrap();
726
727 client.station("ANMO", "IU").await.unwrap();
728 client.data().await.unwrap();
729 client.end_stream().await.unwrap();
730
731 let f1 = client.next_frame().await.unwrap().unwrap();
733 assert_eq!(f1.sequence(), SequenceNumber::new(10));
734 let f2 = client.next_frame().await.unwrap().unwrap();
735 assert_eq!(f2.sequence(), SequenceNumber::new(11));
736
737 let err = client.next_frame().await.unwrap_err();
739 assert!(matches!(err, ClientError::ReconnectFailed { attempts: 1 }));
740 }
741}