Skip to main content

seedlink_rs_client/
client.rs

1use std::collections::HashMap;
2
3use futures_core::Stream;
4use seedlink_rs_protocol::{Command, InfoLevel, ProtocolVersion, Response, SequenceNumber};
5use tracing::{debug, info, trace, warn};
6
7use crate::connection::Connection;
8use crate::error::{ClientError, Result};
9use crate::negotiate;
10use crate::state::{ClientConfig, ClientState, OwnedFrame, ServerInfo, StationKey};
11
12/// Async SeedLink client for connecting to seismic data servers.
13///
14/// Implements the SeedLink v3/v4 protocol state machine:
15/// `Connected` → `Configured` → `Streaming` → `Disconnected`.
16///
17/// # Example
18///
19/// ```no_run
20/// # async fn example() -> seedlink_rs_client::Result<()> {
21/// use seedlink_rs_client::SeedLinkClient;
22///
23/// let mut client = SeedLinkClient::connect("rtserve.iris.washington.edu:18000").await?;
24/// client.station("ANMO", "IU").await?;
25/// client.select("BHZ").await?;
26/// client.data().await?;
27/// client.end_stream().await?;
28///
29/// while let Some(frame) = client.next_frame().await? {
30///     println!("seq={}, len={}", frame.sequence(), frame.payload().len());
31/// }
32/// # Ok(())
33/// # }
34/// ```
35pub struct SeedLinkClient {
36    connection: Connection,
37    state: ClientState,
38    version: ProtocolVersion,
39    server_info: ServerInfo,
40    sequences: HashMap<StationKey, SequenceNumber>,
41    config: ClientConfig,
42}
43
44impl SeedLinkClient {
45    /// Connect to a SeedLink server with default configuration.
46    ///
47    /// Performs TCP connect, sends HELLO, and negotiates v4 if supported.
48    /// On success the client is in [`ClientState::Connected`].
49    pub async fn connect(addr: &str) -> Result<Self> {
50        Self::connect_with_config(addr, ClientConfig::default()).await
51    }
52
53    /// Connect to a SeedLink server with custom [`ClientConfig`].
54    ///
55    /// Performs TCP connect, sends HELLO, and optionally negotiates v4.
56    /// On success the client is in [`ClientState::Connected`].
57    pub async fn connect_with_config(addr: &str, config: ClientConfig) -> Result<Self> {
58        info!(addr, "connecting");
59        let mut connection =
60            Connection::connect(addr, config.connect_timeout, config.read_timeout).await?;
61
62        // Send HELLO
63        connection
64            .send_command(&Command::Hello, ProtocolVersion::V3)
65            .await?;
66
67        // Read 2-line hello response
68        let line1 = connection.read_line().await?;
69        let line2 = connection.read_line().await?;
70        let hello = Response::parse_hello(&line1, &line2)?;
71
72        let (software, version_str, extra, organization) = match hello {
73            Response::Hello {
74                software,
75                version,
76                extra,
77                organization,
78            } => (software, version, extra, organization),
79            _ => {
80                return Err(ClientError::UnexpectedResponse(
81                    "expected HELLO response".into(),
82                ));
83            }
84        };
85
86        let capabilities = negotiate::parse_capabilities(&extra);
87        let mut protocol_version = ProtocolVersion::V3;
88
89        // Attempt v4 negotiation if preferred and supported
90        if config.prefer_v4 && negotiate::supports_v4(&capabilities) {
91            connection
92                .send_command(
93                    &Command::SlProto {
94                        version: "4.0".into(),
95                    },
96                    ProtocolVersion::V4,
97                )
98                .await?;
99
100            let response_line = connection.read_line().await?;
101            let response = Response::parse_line(&response_line)?;
102            match response {
103                Response::Ok => {
104                    protocol_version = ProtocolVersion::V4;
105                }
106                Response::Error { description, .. } => {
107                    warn!(%description, "v4 negotiation failed, falling back to v3");
108                }
109                _ => {
110                    return Err(ClientError::UnexpectedResponse(format!(
111                        "expected OK or ERROR for SLPROTO, got: {response_line:?}"
112                    )));
113                }
114            }
115        }
116
117        let server_info = ServerInfo {
118            software,
119            version: version_str,
120            organization,
121            capabilities,
122        };
123
124        info!(version = ?protocol_version, "connected");
125
126        Ok(Self {
127            connection,
128            state: ClientState::Connected,
129            version: protocol_version,
130            server_info,
131            sequences: HashMap::new(),
132            config,
133        })
134    }
135
136    // -- Accessors --
137
138    /// Returns the negotiated protocol version (V3 or V4).
139    pub fn version(&self) -> ProtocolVersion {
140        self.version
141    }
142
143    /// Returns information about the connected server (software, version, capabilities).
144    pub fn server_info(&self) -> &ServerInfo {
145        &self.server_info
146    }
147
148    /// Returns the current client state.
149    pub fn state(&self) -> ClientState {
150        self.state
151    }
152
153    /// Returns the configuration used for this connection.
154    pub fn config(&self) -> &ClientConfig {
155        &self.config
156    }
157
158    // -- Configuration (Connected|Configured → Configured) --
159
160    /// Select a station and network for data subscription.
161    ///
162    /// Requires state `Connected` or `Configured`. Transitions to `Configured`.
163    /// Server must reply OK; returns [`ClientError::ServerError`] on ERROR.
164    pub async fn station(&mut self, station: &str, network: &str) -> Result<()> {
165        self.require_state_in(
166            &[ClientState::Connected, ClientState::Configured],
167            "station",
168        )?;
169
170        debug!(station, network, "STATION");
171        let cmd = Command::Station {
172            station: station.to_owned(),
173            network: network.to_owned(),
174        };
175        self.connection.send_command(&cmd, self.version).await?;
176
177        // All modern servers reply OK/ERROR (EXTREPLY behavior)
178        self.read_ok_response("STATION").await?;
179
180        self.state = ClientState::Configured;
181        Ok(())
182    }
183
184    /// Select channels within the current station subscription.
185    ///
186    /// `pattern` is a SeedLink channel selector (e.g., `"BHZ"`, `"??.BHZ"`).
187    /// Requires state `Connected` or `Configured`. Transitions to `Configured`.
188    pub async fn select(&mut self, pattern: &str) -> Result<()> {
189        self.require_state_in(&[ClientState::Connected, ClientState::Configured], "select")?;
190
191        debug!(pattern, "SELECT");
192        let cmd = Command::Select {
193            pattern: pattern.to_owned(),
194        };
195        self.connection.send_command(&cmd, self.version).await?;
196
197        // All modern servers reply OK/ERROR (EXTREPLY behavior)
198        self.read_ok_response("SELECT").await?;
199
200        self.state = ClientState::Configured;
201        Ok(())
202    }
203
204    // -- Arming (Configured → Configured) --
205
206    /// Arm the current station subscription with DATA (stream from beginning).
207    ///
208    /// This does NOT start streaming — call [`end_stream()`](Self::end_stream) or
209    /// [`fetch()`](Self::fetch) after arming all stations.
210    /// Requires state `Configured`. State stays `Configured`.
211    pub async fn data(&mut self) -> Result<()> {
212        self.require_state_in(&[ClientState::Configured], "data")?;
213
214        debug!("DATA");
215        let cmd = Command::Data {
216            sequence: None,
217            start: None,
218            end: None,
219        };
220        self.connection.send_command(&cmd, self.version).await?;
221
222        // Server replies OK/ERROR
223        self.read_ok_response("DATA").await?;
224
225        // State stays Configured — END triggers streaming
226        Ok(())
227    }
228
229    /// Arm the current station subscription with DATA, resuming from `sequence`.
230    ///
231    /// Requires state `Configured`. State stays `Configured`.
232    pub async fn data_from(&mut self, sequence: SequenceNumber) -> Result<()> {
233        self.require_state_in(&[ClientState::Configured], "data_from")?;
234
235        debug!(%sequence, "DATA (resume)");
236        let cmd = Command::Data {
237            sequence: Some(sequence),
238            start: None,
239            end: None,
240        };
241        self.connection.send_command(&cmd, self.version).await?;
242
243        // Server replies OK/ERROR
244        self.read_ok_response("DATA").await?;
245
246        // State stays Configured — END triggers streaming
247        Ok(())
248    }
249
250    /// Arm the current station subscription with a time window (v3 only).
251    ///
252    /// Sends `TIME start [end]` to request data within a specific time range.
253    /// Requires state `Configured`. State stays `Configured`.
254    pub async fn time_window(&mut self, start: &str, end: Option<&str>) -> Result<()> {
255        self.require_state_in(&[ClientState::Configured], "time_window")?;
256
257        debug!(start, ?end, "TIME");
258        let cmd = Command::Time {
259            start: start.to_owned(),
260            end: end.map(|s| s.to_owned()),
261        };
262        self.connection.send_command(&cmd, self.version).await?;
263
264        self.read_ok_response("TIME").await?;
265
266        // State stays Configured — END triggers streaming
267        Ok(())
268    }
269
270    // -- Streaming (Configured → Streaming) --
271
272    /// Send END to trigger continuous binary streaming.
273    ///
274    /// The server begins sending frames immediately with no text response.
275    /// Requires state `Configured`. Transitions to `Streaming`.
276    pub async fn end_stream(&mut self) -> Result<()> {
277        self.require_state_in(&[ClientState::Configured], "end_stream")?;
278
279        self.connection
280            .send_command(&Command::End, self.version)
281            .await?;
282
283        // END has NO text response — binary streaming starts immediately
284        self.state = ClientState::Streaming;
285        Ok(())
286    }
287
288    /// Send FETCH to stream buffered data then close (v3 only).
289    ///
290    /// Unlike [`end_stream()`](Self::end_stream), FETCH delivers only what the
291    /// server has buffered, then the server closes the connection.
292    /// Requires state `Configured`. Transitions to `Streaming`.
293    pub async fn fetch(&mut self) -> Result<()> {
294        self.require_state_in(&[ClientState::Configured], "fetch")?;
295
296        let cmd = Command::Fetch { sequence: None };
297        self.connection.send_command(&cmd, self.version).await?;
298
299        self.state = ClientState::Streaming;
300        Ok(())
301    }
302
303    /// Send FETCH resuming from `sequence` (v3 only).
304    ///
305    /// Requires state `Configured`. Transitions to `Streaming`.
306    pub async fn fetch_from(&mut self, sequence: SequenceNumber) -> Result<()> {
307        self.require_state_in(&[ClientState::Configured], "fetch_from")?;
308
309        let cmd = Command::Fetch {
310            sequence: Some(sequence),
311        };
312        self.connection.send_command(&cmd, self.version).await?;
313
314        self.state = ClientState::Streaming;
315        Ok(())
316    }
317
318    // -- Frame reading (Streaming) --
319
320    /// Read the next SeedLink frame from the server.
321    ///
322    /// Returns `Ok(Some(frame))` on success, `Ok(None)` on clean EOF
323    /// (server closed connection), or `Err` on protocol/timeout errors.
324    /// On EOF, state transitions to `Disconnected`.
325    /// Requires state `Streaming`.
326    pub async fn next_frame(&mut self) -> Result<Option<OwnedFrame>> {
327        self.require_state_in(&[ClientState::Streaming], "next_frame")?;
328
329        let result = match self.version {
330            ProtocolVersion::V3 => self.connection.read_v3_frame().await,
331            ProtocolVersion::V4 => self.connection.read_v4_frame().await,
332        };
333
334        match result {
335            Ok(frame) => {
336                trace!(sequence = %frame.sequence(), "frame received");
337                self.track_sequence(&frame);
338                Ok(Some(frame))
339            }
340            Err(ClientError::Disconnected) => {
341                self.state = ClientState::Disconnected;
342                Ok(None)
343            }
344            Err(ClientError::Io(ref e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
345                self.state = ClientState::Disconnected;
346                Ok(None)
347            }
348            Err(e) => Err(e),
349        }
350    }
351
352    // -- Stream conversion --
353
354    /// Consume this client and return a [`Stream`] of frames.
355    ///
356    /// The client must be in `Streaming` state. The stream yields
357    /// `Ok(OwnedFrame)` per frame and ends with `None` on EOF.
358    pub fn into_stream(self) -> impl Stream<Item = Result<OwnedFrame>> {
359        crate::stream::frame_stream(self)
360    }
361
362    // -- Utility (any state) --
363
364    /// Request server information at the given detail level.
365    ///
366    /// Returns a vec of INFO response frames (typically XML payloads).
367    /// Can be called in any state.
368    pub async fn info(&mut self, level: InfoLevel) -> Result<Vec<OwnedFrame>> {
369        let cmd = Command::Info { level };
370        self.connection.send_command(&cmd, self.version).await?;
371
372        let mut frames = Vec::new();
373
374        // INFO response: SL frames containing XML, terminated by text line or
375        // last frame having '*' in header. Mock sends frames then "END\r\n".
376        loop {
377            let mut peek = [0u8; 2];
378            self.connection.read_exact(&mut peek).await?;
379
380            match &peek {
381                b"SL" => {
382                    let mut buf = [0u8; seedlink_rs_protocol::frame::v3::FRAME_LEN];
383                    buf[0..2].copy_from_slice(&peek);
384                    self.connection.read_exact(&mut buf[2..]).await?;
385                    let raw = seedlink_rs_protocol::frame::v3::parse(&buf)?;
386                    frames.push(OwnedFrame::from(raw));
387                }
388                b"SE" => {
389                    let mut header = [0u8; seedlink_rs_protocol::frame::v4::MIN_HEADER_LEN];
390                    header[0..2].copy_from_slice(&peek);
391                    self.connection.read_exact(&mut header[2..]).await?;
392                    let station_id_len = header[16] as usize;
393                    let payload_len =
394                        u32::from_le_bytes([header[4], header[5], header[6], header[7]]) as usize;
395                    let remaining = station_id_len + payload_len;
396                    let mut full = Vec::with_capacity(
397                        seedlink_rs_protocol::frame::v4::MIN_HEADER_LEN + remaining,
398                    );
399                    full.extend_from_slice(&header);
400                    full.resize(
401                        seedlink_rs_protocol::frame::v4::MIN_HEADER_LEN + remaining,
402                        0,
403                    );
404                    self.connection
405                        .read_exact(&mut full[seedlink_rs_protocol::frame::v4::MIN_HEADER_LEN..])
406                        .await?;
407                    let (raw, _) = seedlink_rs_protocol::frame::v4::parse(&full)?;
408                    frames.push(OwnedFrame::from(raw));
409                }
410                _ => {
411                    // Text line (END, ERROR, etc.) — read rest and stop
412                    let prefix = String::from_utf8_lossy(&peek).to_string();
413                    let rest = self.connection.read_line().await?;
414                    let _full_line = format!("{prefix}{rest}");
415                    break;
416                }
417            }
418        }
419
420        Ok(frames)
421    }
422
423    /// Send BYE and close the connection.
424    ///
425    /// Transitions to `Disconnected`. Can be called in any state.
426    pub async fn bye(&mut self) -> Result<()> {
427        self.connection
428            .send_command(&Command::Bye, self.version)
429            .await?;
430        self.connection.shutdown().await.ok();
431        self.state = ClientState::Disconnected;
432        Ok(())
433    }
434
435    // -- State (no I/O) --
436
437    /// Returns the last received sequence number for a given network/station pair.
438    ///
439    /// Returns `None` if no frames have been received for that station.
440    pub fn last_sequence(&self, network: &str, station: &str) -> Option<SequenceNumber> {
441        let key = StationKey {
442            network: network.to_owned(),
443            station: station.to_owned(),
444        };
445        self.sequences.get(&key).copied()
446    }
447
448    /// Returns a reference to all tracked network/station → sequence mappings.
449    pub fn sequences(&self) -> &HashMap<StationKey, SequenceNumber> {
450        &self.sequences
451    }
452
453    // -- Private helpers --
454
455    fn require_state_in(&self, allowed: &[ClientState], _method: &str) -> Result<()> {
456        if allowed.contains(&self.state) {
457            Ok(())
458        } else {
459            let expected_static: &'static str = match allowed {
460                [ClientState::Connected, ClientState::Configured] => "Connected|Configured",
461                [ClientState::Configured] => "Configured",
462                [ClientState::Streaming] => "Streaming",
463                _ => "valid state",
464            };
465            Err(ClientError::InvalidState {
466                expected: expected_static,
467                actual: self.state.as_str(),
468            })
469        }
470    }
471
472    async fn read_ok_response(&mut self, command_name: &str) -> Result<()> {
473        let line = self.connection.read_line().await?;
474        let response = Response::parse_line(&line)?;
475        match response {
476            Response::Ok => Ok(()),
477            Response::Error {
478                code, description, ..
479            } => {
480                let msg = if let Some(c) = code {
481                    format!("{command_name}: {} {description}", c.as_str())
482                } else {
483                    format!("{command_name}: {description}")
484                };
485                Err(ClientError::ServerError(msg))
486            }
487            _ => Err(ClientError::UnexpectedResponse(format!(
488                "expected OK for {command_name}, got: {line:?}"
489            ))),
490        }
491    }
492
493    fn track_sequence(&mut self, frame: &OwnedFrame) {
494        match frame {
495            OwnedFrame::V3 {
496                sequence, payload, ..
497            } => {
498                if payload.len() >= 20 {
499                    let station = std::str::from_utf8(&payload[8..13])
500                        .unwrap_or("")
501                        .trim()
502                        .to_owned();
503                    let network = std::str::from_utf8(&payload[18..20])
504                        .unwrap_or("")
505                        .trim()
506                        .to_owned();
507                    if !station.is_empty() && !network.is_empty() {
508                        self.sequences
509                            .insert(StationKey { network, station }, *sequence);
510                    }
511                }
512            }
513            OwnedFrame::V4 {
514                sequence,
515                station_id,
516                ..
517            } => {
518                if let Some((network, station)) = station_id.split_once('_') {
519                    self.sequences.insert(
520                        StationKey {
521                            network: network.to_owned(),
522                            station: station.to_owned(),
523                        },
524                        *sequence,
525                    );
526                }
527            }
528        }
529    }
530}
531
532#[cfg(test)]
533mod tests {
534    use super::*;
535    use crate::mock::{MockConfig, MockServer};
536    use seedlink_rs_protocol::frame::{PayloadFormat, PayloadSubformat, v3, v4};
537
538    fn make_v3_frame(seq: u64, station: &str, network: &str) -> Vec<u8> {
539        let mut payload = [0u8; v3::PAYLOAD_LEN];
540        let sta_bytes = station.as_bytes();
541        for (i, &b) in sta_bytes.iter().enumerate().take(5) {
542            payload[8 + i] = b;
543        }
544        for i in sta_bytes.len()..5 {
545            payload[8 + i] = b' ';
546        }
547        let net_bytes = network.as_bytes();
548        for (i, &b) in net_bytes.iter().enumerate().take(2) {
549            payload[18 + i] = b;
550        }
551        for i in net_bytes.len()..2 {
552            payload[18 + i] = b' ';
553        }
554        v3::write(SequenceNumber::new(seq), &payload).unwrap()
555    }
556
557    fn make_v4_frame(seq: u64, station_id: &str) -> Vec<u8> {
558        v4::write(
559            PayloadFormat::MiniSeed2,
560            PayloadSubformat::Data,
561            SequenceNumber::new(seq),
562            station_id,
563            &[0u8; 64],
564        )
565        .unwrap()
566    }
567
568    #[tokio::test]
569    async fn hello_v3() {
570        let frames = vec![make_v3_frame(1, "ANMO", "IU")];
571        let server = MockServer::start(MockConfig::v3_default(frames)).await;
572
573        let client = SeedLinkClient::connect(&server.addr().to_string())
574            .await
575            .unwrap();
576
577        assert_eq!(client.version(), ProtocolVersion::V3);
578        assert_eq!(client.server_info().software, "SeedLink");
579        assert_eq!(client.server_info().organization, "Mock Server");
580        assert_eq!(client.state(), ClientState::Connected);
581    }
582
583    #[tokio::test]
584    async fn hello_v4_negotiation() {
585        let frames = vec![make_v4_frame(1, "IU_ANMO")];
586        let server = MockServer::start(MockConfig::v4_default(frames)).await;
587
588        let client = SeedLinkClient::connect(&server.addr().to_string())
589            .await
590            .unwrap();
591
592        assert_eq!(client.version(), ProtocolVersion::V4);
593        assert_eq!(client.server_info().organization, "Mock Server v4");
594        assert_eq!(client.state(), ClientState::Connected);
595    }
596
597    #[tokio::test]
598    async fn v4_fallback_to_v3() {
599        let config = MockConfig {
600            version: ProtocolVersion::V3,
601            hello_line1: "SeedLink v3.1 :: SLPROTO:4.0".to_owned(),
602            hello_line2: "Fake v4 Server".to_owned(),
603            frames: vec![make_v3_frame(1, "ANMO", "IU")],
604            connection_frames: None,
605            accept_slproto: false,
606            close_after_stream: false,
607            max_connections: 1,
608        };
609        let server = MockServer::start(config).await;
610
611        let client = SeedLinkClient::connect(&server.addr().to_string())
612            .await
613            .unwrap();
614
615        assert_eq!(client.version(), ProtocolVersion::V3);
616    }
617
618    // -- v3 flow: STATION → DATA → END → frames --
619
620    #[tokio::test]
621    async fn v3_station_data_end_flow() {
622        let frames = vec![
623            make_v3_frame(1, "ANMO", "IU"),
624            make_v3_frame(2, "ANMO", "IU"),
625        ];
626        let server = MockServer::start(MockConfig::v3_default(frames)).await;
627
628        let mut client = SeedLinkClient::connect(&server.addr().to_string())
629            .await
630            .unwrap();
631
632        // STATION → OK, state → Configured
633        client.station("ANMO", "IU").await.unwrap();
634        assert_eq!(client.state(), ClientState::Configured);
635
636        // DATA → OK, state stays Configured
637        client.data().await.unwrap();
638        assert_eq!(client.state(), ClientState::Configured);
639
640        // END → no response, state → Streaming
641        client.end_stream().await.unwrap();
642        assert_eq!(client.state(), ClientState::Streaming);
643
644        let frame1 = client.next_frame().await.unwrap().unwrap();
645        assert_eq!(frame1.sequence(), SequenceNumber::new(1));
646
647        let frame2 = client.next_frame().await.unwrap().unwrap();
648        assert_eq!(frame2.sequence(), SequenceNumber::new(2));
649    }
650
651    #[tokio::test]
652    async fn v3_station_select_data_end_flow() {
653        let frames = vec![
654            make_v3_frame(1, "ANMO", "IU"),
655            make_v3_frame(2, "ANMO", "IU"),
656        ];
657        let server = MockServer::start(MockConfig::v3_default(frames)).await;
658
659        let mut client = SeedLinkClient::connect(&server.addr().to_string())
660            .await
661            .unwrap();
662
663        client.station("ANMO", "IU").await.unwrap();
664        assert_eq!(client.state(), ClientState::Configured);
665
666        client.select("BHZ").await.unwrap();
667        assert_eq!(client.state(), ClientState::Configured);
668
669        client.data().await.unwrap();
670        assert_eq!(client.state(), ClientState::Configured);
671
672        client.end_stream().await.unwrap();
673        assert_eq!(client.state(), ClientState::Streaming);
674
675        let frame1 = client.next_frame().await.unwrap().unwrap();
676        assert_eq!(frame1.sequence(), SequenceNumber::new(1));
677    }
678
679    // -- v4 flow: STATION → SELECT → DATA → END → frames --
680
681    #[tokio::test]
682    async fn v4_station_select_data_end_flow() {
683        let frames = vec![make_v4_frame(10, "IU_ANMO"), make_v4_frame(11, "IU_ANMO")];
684        let server = MockServer::start(MockConfig::v4_default(frames)).await;
685
686        let mut client = SeedLinkClient::connect(&server.addr().to_string())
687            .await
688            .unwrap();
689
690        assert_eq!(client.version(), ProtocolVersion::V4);
691
692        client.station("ANMO", "IU").await.unwrap();
693        client.select("BHZ").await.unwrap();
694        client.data().await.unwrap();
695        assert_eq!(client.state(), ClientState::Configured);
696
697        client.end_stream().await.unwrap();
698        assert_eq!(client.state(), ClientState::Streaming);
699
700        let frame1 = client.next_frame().await.unwrap().unwrap();
701        assert_eq!(frame1.sequence(), SequenceNumber::new(10));
702
703        let frame2 = client.next_frame().await.unwrap().unwrap();
704        assert_eq!(frame2.sequence(), SequenceNumber::new(11));
705    }
706
707    // -- Multi-station --
708
709    #[tokio::test]
710    async fn multi_station_end_stream() {
711        let frames = vec![
712            make_v3_frame(1, "ANMO", "IU"),
713            make_v3_frame(2, "WLF", "GE"),
714        ];
715        let server = MockServer::start(MockConfig::v3_default(frames)).await;
716
717        let mut client = SeedLinkClient::connect(&server.addr().to_string())
718            .await
719            .unwrap();
720
721        // STATION → DATA → STATION → DATA → END
722        client.station("ANMO", "IU").await.unwrap();
723        client.data().await.unwrap();
724
725        client.station("WLF", "GE").await.unwrap();
726        client.data().await.unwrap();
727
728        client.end_stream().await.unwrap();
729
730        let frame1 = client.next_frame().await.unwrap().unwrap();
731        assert_eq!(frame1.sequence(), SequenceNumber::new(1));
732
733        let frame2 = client.next_frame().await.unwrap().unwrap();
734        assert_eq!(frame2.sequence(), SequenceNumber::new(2));
735    }
736
737    // -- Resume from sequence --
738
739    #[tokio::test]
740    async fn data_from_resume() {
741        let frames = vec![make_v3_frame(100, "ANMO", "IU")];
742        let server = MockServer::start(MockConfig::v3_default(frames)).await;
743
744        let mut client = SeedLinkClient::connect(&server.addr().to_string())
745            .await
746            .unwrap();
747
748        client.station("ANMO", "IU").await.unwrap();
749        client.data_from(SequenceNumber::new(99)).await.unwrap();
750        assert_eq!(client.state(), ClientState::Configured);
751
752        client.end_stream().await.unwrap();
753
754        let frame = client.next_frame().await.unwrap().unwrap();
755        assert_eq!(frame.sequence(), SequenceNumber::new(100));
756    }
757
758    // -- State machine enforcement --
759
760    #[tokio::test]
761    async fn state_machine_enforcement() {
762        let server =
763            MockServer::start(MockConfig::v3_default(vec![make_v3_frame(1, "ANMO", "IU")])).await;
764
765        let mut client = SeedLinkClient::connect(&server.addr().to_string())
766            .await
767            .unwrap();
768
769        // Cannot call data() in Connected state (need Configured)
770        let err = client.data().await.unwrap_err();
771        assert!(matches!(err, ClientError::InvalidState { .. }));
772
773        // Cannot call end_stream() in Connected state
774        let err = client.end_stream().await.unwrap_err();
775        assert!(matches!(err, ClientError::InvalidState { .. }));
776
777        // Cannot call next_frame() in Connected state
778        let err = client.next_frame().await.unwrap_err();
779        assert!(matches!(err, ClientError::InvalidState { .. }));
780
781        // After station → Configured
782        client.station("ANMO", "IU").await.unwrap();
783
784        // Cannot call next_frame() in Configured state
785        let err = client.next_frame().await.unwrap_err();
786        assert!(matches!(err, ClientError::InvalidState { .. }));
787
788        // data() keeps state as Configured
789        client.data().await.unwrap();
790        assert_eq!(client.state(), ClientState::Configured);
791
792        // end_stream() → Streaming
793        client.end_stream().await.unwrap();
794        assert_eq!(client.state(), ClientState::Streaming);
795
796        // Cannot call station() in Streaming state
797        let err = client.station("WLF", "GE").await.unwrap_err();
798        assert!(matches!(err, ClientError::InvalidState { .. }));
799    }
800
801    // -- BYE --
802
803    #[tokio::test]
804    async fn bye() {
805        let server = MockServer::start(MockConfig::v3_default(vec![])).await;
806
807        let mut client = SeedLinkClient::connect(&server.addr().to_string())
808            .await
809            .unwrap();
810
811        client.bye().await.unwrap();
812        assert_eq!(client.state(), ClientState::Disconnected);
813    }
814
815    // -- Sequence tracking --
816
817    #[tokio::test]
818    async fn v3_sequence_tracking() {
819        let frames = vec![
820            make_v3_frame(5, "ANMO", "IU"),
821            make_v3_frame(10, "ANMO", "IU"),
822            make_v3_frame(3, "WLF", "GE"),
823        ];
824        let server = MockServer::start(MockConfig::v3_default(frames)).await;
825
826        let mut client = SeedLinkClient::connect(&server.addr().to_string())
827            .await
828            .unwrap();
829
830        client.station("ANMO", "IU").await.unwrap();
831        client.data().await.unwrap();
832        client.end_stream().await.unwrap();
833
834        client.next_frame().await.unwrap();
835        client.next_frame().await.unwrap();
836        client.next_frame().await.unwrap();
837
838        assert_eq!(
839            client.last_sequence("IU", "ANMO"),
840            Some(SequenceNumber::new(10))
841        );
842        assert_eq!(
843            client.last_sequence("GE", "WLF"),
844            Some(SequenceNumber::new(3))
845        );
846        assert_eq!(client.last_sequence("XX", "NONE"), None);
847        assert_eq!(client.sequences().len(), 2);
848    }
849
850    #[tokio::test]
851    async fn v4_sequence_tracking() {
852        let frames = vec![make_v4_frame(20, "IU_ANMO"), make_v4_frame(21, "IU_ANMO")];
853        let server = MockServer::start(MockConfig::v4_default(frames)).await;
854
855        let mut client = SeedLinkClient::connect(&server.addr().to_string())
856            .await
857            .unwrap();
858
859        client.station("ANMO", "IU").await.unwrap();
860        client.data().await.unwrap();
861        client.end_stream().await.unwrap();
862
863        client.next_frame().await.unwrap();
864        client.next_frame().await.unwrap();
865
866        assert_eq!(
867            client.last_sequence("IU", "ANMO"),
868            Some(SequenceNumber::new(21))
869        );
870    }
871
872    // -- Config --
873
874    #[tokio::test]
875    async fn connect_no_prefer_v4() {
876        let server = MockServer::start(MockConfig::v4_default(vec![])).await;
877
878        let config = ClientConfig {
879            prefer_v4: false,
880            ..ClientConfig::default()
881        };
882        let client = SeedLinkClient::connect_with_config(&server.addr().to_string(), config)
883            .await
884            .unwrap();
885
886        assert_eq!(client.version(), ProtocolVersion::V3);
887    }
888
889    // -- Server error handling --
890
891    #[tokio::test]
892    async fn server_error_on_station() {
893        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
894        let addr = listener.local_addr().unwrap();
895
896        tokio::spawn(async move {
897            let (stream, _) = listener.accept().await.unwrap();
898            let (read, mut write) = stream.into_split();
899            let mut reader = tokio::io::BufReader::new(read);
900            let mut line = String::new();
901
902            loop {
903                line.clear();
904                let n = tokio::io::AsyncBufReadExt::read_line(&mut reader, &mut line)
905                    .await
906                    .unwrap_or(0);
907                if n == 0 {
908                    break;
909                }
910                let trimmed = line.trim().to_uppercase();
911                if trimmed == "HELLO" {
912                    let _ = tokio::io::AsyncWriteExt::write_all(
913                        &mut write,
914                        b"SeedLink v3.3\r\nTest\r\n",
915                    )
916                    .await;
917                    let _ = tokio::io::AsyncWriteExt::flush(&mut write).await;
918                } else if trimmed.starts_with("STATION") {
919                    let _ = tokio::io::AsyncWriteExt::write_all(
920                        &mut write,
921                        b"ERROR ARGUMENTS bad station\r\n",
922                    )
923                    .await;
924                    let _ = tokio::io::AsyncWriteExt::flush(&mut write).await;
925                } else if trimmed == "BYE" {
926                    break;
927                }
928            }
929        });
930
931        let config = ClientConfig {
932            prefer_v4: false,
933            ..ClientConfig::default()
934        };
935        let mut client = SeedLinkClient::connect_with_config(&addr.to_string(), config)
936            .await
937            .unwrap();
938
939        let err = client.station("BAD", "XX").await.unwrap_err();
940        assert!(matches!(err, ClientError::ServerError(_)));
941    }
942
943    // -- EOF handling --
944
945    #[tokio::test]
946    async fn next_frame_returns_none_on_eof() {
947        let frames = vec![make_v3_frame(1, "ANMO", "IU")];
948        let config = MockConfig {
949            close_after_stream: true,
950            ..MockConfig::v3_default(frames)
951        };
952        let server = MockServer::start(config).await;
953
954        let mut client = SeedLinkClient::connect(&server.addr().to_string())
955            .await
956            .unwrap();
957
958        client.station("ANMO", "IU").await.unwrap();
959        client.data().await.unwrap();
960        client.end_stream().await.unwrap();
961
962        // First frame → Some
963        let frame = client.next_frame().await.unwrap();
964        assert!(frame.is_some());
965        assert_eq!(frame.unwrap().sequence(), SequenceNumber::new(1));
966
967        // Second frame → None (server closed)
968        let frame = client.next_frame().await.unwrap();
969        assert!(frame.is_none());
970
971        assert_eq!(client.state(), ClientState::Disconnected);
972    }
973
974    // -- Fetch --
975
976    #[tokio::test]
977    async fn v3_fetch_flow() {
978        let frames = vec![make_v3_frame(1, "ANMO", "IU")];
979        let config = MockConfig {
980            close_after_stream: true,
981            ..MockConfig::v3_default(frames)
982        };
983        let server = MockServer::start(config).await;
984
985        let mut client = SeedLinkClient::connect(&server.addr().to_string())
986            .await
987            .unwrap();
988
989        client.station("ANMO", "IU").await.unwrap();
990        client.data().await.unwrap();
991        client.fetch().await.unwrap();
992        assert_eq!(client.state(), ClientState::Streaming);
993
994        let frame = client.next_frame().await.unwrap();
995        assert!(frame.is_some());
996
997        let frame = client.next_frame().await.unwrap();
998        assert!(frame.is_none());
999        assert_eq!(client.state(), ClientState::Disconnected);
1000    }
1001
1002    // -- TIME window --
1003
1004    #[tokio::test]
1005    async fn time_window_v3_flow() {
1006        let frames = vec![make_v3_frame(1, "ANMO", "IU")];
1007        let server = MockServer::start(MockConfig::v3_default(frames)).await;
1008
1009        let mut client = SeedLinkClient::connect(&server.addr().to_string())
1010            .await
1011            .unwrap();
1012
1013        client.station("ANMO", "IU").await.unwrap();
1014        client.time_window("2024,1,0,0,0", None).await.unwrap();
1015        assert_eq!(client.state(), ClientState::Configured);
1016
1017        client.end_stream().await.unwrap();
1018        assert_eq!(client.state(), ClientState::Streaming);
1019
1020        let frame = client.next_frame().await.unwrap().unwrap();
1021        assert_eq!(frame.sequence(), SequenceNumber::new(1));
1022    }
1023
1024    #[tokio::test]
1025    async fn time_window_with_end() {
1026        let frames = vec![make_v3_frame(1, "ANMO", "IU")];
1027        let server = MockServer::start(MockConfig::v3_default(frames)).await;
1028
1029        let mut client = SeedLinkClient::connect(&server.addr().to_string())
1030            .await
1031            .unwrap();
1032
1033        client.station("ANMO", "IU").await.unwrap();
1034        client
1035            .time_window("2024,1,0,0,0", Some("2024,2,0,0,0"))
1036            .await
1037            .unwrap();
1038        assert_eq!(client.state(), ClientState::Configured);
1039    }
1040
1041    #[tokio::test]
1042    async fn time_window_requires_configured() {
1043        let server = MockServer::start(MockConfig::v3_default(vec![])).await;
1044
1045        let mut client = SeedLinkClient::connect(&server.addr().to_string())
1046            .await
1047            .unwrap();
1048
1049        // Connected, not Configured — should fail
1050        let err = client.time_window("2024,1,0,0,0", None).await.unwrap_err();
1051        assert!(matches!(err, ClientError::InvalidState { .. }));
1052    }
1053}