1use std::collections::HashMap;
2
3use futures_core::Stream;
4use seedlink_rs_protocol::{Command, InfoLevel, ProtocolVersion, Response, SequenceNumber};
5use tracing::{debug, info, trace, warn};
6
7use crate::connection::Connection;
8use crate::error::{ClientError, Result};
9use crate::negotiate;
10use crate::state::{ClientConfig, ClientState, OwnedFrame, ServerInfo, StationKey};
11
12pub struct SeedLinkClient {
36 connection: Connection,
37 state: ClientState,
38 version: ProtocolVersion,
39 server_info: ServerInfo,
40 sequences: HashMap<StationKey, SequenceNumber>,
41 config: ClientConfig,
42}
43
44impl SeedLinkClient {
45 pub async fn connect(addr: &str) -> Result<Self> {
50 Self::connect_with_config(addr, ClientConfig::default()).await
51 }
52
53 pub async fn connect_with_config(addr: &str, config: ClientConfig) -> Result<Self> {
58 info!(addr, "connecting");
59 let mut connection =
60 Connection::connect(addr, config.connect_timeout, config.read_timeout).await?;
61
62 connection
64 .send_command(&Command::Hello, ProtocolVersion::V3)
65 .await?;
66
67 let line1 = connection.read_line().await?;
69 let line2 = connection.read_line().await?;
70 let hello = Response::parse_hello(&line1, &line2)?;
71
72 let (software, version_str, extra, organization) = match hello {
73 Response::Hello {
74 software,
75 version,
76 extra,
77 organization,
78 } => (software, version, extra, organization),
79 _ => {
80 return Err(ClientError::UnexpectedResponse(
81 "expected HELLO response".into(),
82 ));
83 }
84 };
85
86 let capabilities = negotiate::parse_capabilities(&extra);
87 let mut protocol_version = ProtocolVersion::V3;
88
89 if config.prefer_v4 && negotiate::supports_v4(&capabilities) {
91 connection
92 .send_command(
93 &Command::SlProto {
94 version: "4.0".into(),
95 },
96 ProtocolVersion::V4,
97 )
98 .await?;
99
100 let response_line = connection.read_line().await?;
101 let response = Response::parse_line(&response_line)?;
102 match response {
103 Response::Ok => {
104 protocol_version = ProtocolVersion::V4;
105 }
106 Response::Error { description, .. } => {
107 warn!(%description, "v4 negotiation failed, falling back to v3");
108 }
109 _ => {
110 return Err(ClientError::UnexpectedResponse(format!(
111 "expected OK or ERROR for SLPROTO, got: {response_line:?}"
112 )));
113 }
114 }
115 }
116
117 let server_info = ServerInfo {
118 software,
119 version: version_str,
120 organization,
121 capabilities,
122 };
123
124 info!(version = ?protocol_version, "connected");
125
126 Ok(Self {
127 connection,
128 state: ClientState::Connected,
129 version: protocol_version,
130 server_info,
131 sequences: HashMap::new(),
132 config,
133 })
134 }
135
136 pub fn version(&self) -> ProtocolVersion {
140 self.version
141 }
142
143 pub fn server_info(&self) -> &ServerInfo {
145 &self.server_info
146 }
147
148 pub fn state(&self) -> ClientState {
150 self.state
151 }
152
153 pub fn config(&self) -> &ClientConfig {
155 &self.config
156 }
157
158 pub async fn station(&mut self, station: &str, network: &str) -> Result<()> {
165 self.require_state_in(
166 &[ClientState::Connected, ClientState::Configured],
167 "station",
168 )?;
169
170 debug!(station, network, "STATION");
171 let cmd = Command::Station {
172 station: station.to_owned(),
173 network: network.to_owned(),
174 };
175 self.connection.send_command(&cmd, self.version).await?;
176
177 self.read_ok_response("STATION").await?;
179
180 self.state = ClientState::Configured;
181 Ok(())
182 }
183
184 pub async fn select(&mut self, pattern: &str) -> Result<()> {
189 self.require_state_in(&[ClientState::Connected, ClientState::Configured], "select")?;
190
191 debug!(pattern, "SELECT");
192 let cmd = Command::Select {
193 pattern: pattern.to_owned(),
194 };
195 self.connection.send_command(&cmd, self.version).await?;
196
197 self.read_ok_response("SELECT").await?;
199
200 self.state = ClientState::Configured;
201 Ok(())
202 }
203
204 pub async fn data(&mut self) -> Result<()> {
212 self.require_state_in(&[ClientState::Configured], "data")?;
213
214 debug!("DATA");
215 let cmd = Command::Data {
216 sequence: None,
217 start: None,
218 end: None,
219 };
220 self.connection.send_command(&cmd, self.version).await?;
221
222 self.read_ok_response("DATA").await?;
224
225 Ok(())
227 }
228
229 pub async fn data_from(&mut self, sequence: SequenceNumber) -> Result<()> {
233 self.require_state_in(&[ClientState::Configured], "data_from")?;
234
235 debug!(%sequence, "DATA (resume)");
236 let cmd = Command::Data {
237 sequence: Some(sequence),
238 start: None,
239 end: None,
240 };
241 self.connection.send_command(&cmd, self.version).await?;
242
243 self.read_ok_response("DATA").await?;
245
246 Ok(())
248 }
249
250 pub async fn time_window(&mut self, start: &str, end: Option<&str>) -> Result<()> {
255 self.require_state_in(&[ClientState::Configured], "time_window")?;
256
257 debug!(start, ?end, "TIME");
258 let cmd = Command::Time {
259 start: start.to_owned(),
260 end: end.map(|s| s.to_owned()),
261 };
262 self.connection.send_command(&cmd, self.version).await?;
263
264 self.read_ok_response("TIME").await?;
265
266 Ok(())
268 }
269
270 pub async fn end_stream(&mut self) -> Result<()> {
277 self.require_state_in(&[ClientState::Configured], "end_stream")?;
278
279 self.connection
280 .send_command(&Command::End, self.version)
281 .await?;
282
283 self.state = ClientState::Streaming;
285 Ok(())
286 }
287
288 pub async fn fetch(&mut self) -> Result<()> {
294 self.require_state_in(&[ClientState::Configured], "fetch")?;
295
296 let cmd = Command::Fetch { sequence: None };
297 self.connection.send_command(&cmd, self.version).await?;
298
299 self.state = ClientState::Streaming;
300 Ok(())
301 }
302
303 pub async fn fetch_from(&mut self, sequence: SequenceNumber) -> Result<()> {
307 self.require_state_in(&[ClientState::Configured], "fetch_from")?;
308
309 let cmd = Command::Fetch {
310 sequence: Some(sequence),
311 };
312 self.connection.send_command(&cmd, self.version).await?;
313
314 self.state = ClientState::Streaming;
315 Ok(())
316 }
317
318 pub async fn next_frame(&mut self) -> Result<Option<OwnedFrame>> {
327 self.require_state_in(&[ClientState::Streaming], "next_frame")?;
328
329 let result = match self.version {
330 ProtocolVersion::V3 => self.connection.read_v3_frame().await,
331 ProtocolVersion::V4 => self.connection.read_v4_frame().await,
332 };
333
334 match result {
335 Ok(frame) => {
336 trace!(sequence = %frame.sequence(), "frame received");
337 self.track_sequence(&frame);
338 Ok(Some(frame))
339 }
340 Err(ClientError::Disconnected) => {
341 self.state = ClientState::Disconnected;
342 Ok(None)
343 }
344 Err(ClientError::Io(ref e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
345 self.state = ClientState::Disconnected;
346 Ok(None)
347 }
348 Err(e) => Err(e),
349 }
350 }
351
352 pub fn into_stream(self) -> impl Stream<Item = Result<OwnedFrame>> {
359 crate::stream::frame_stream(self)
360 }
361
362 pub async fn info(&mut self, level: InfoLevel) -> Result<Vec<OwnedFrame>> {
369 let cmd = Command::Info { level };
370 self.connection.send_command(&cmd, self.version).await?;
371
372 let mut frames = Vec::new();
373
374 loop {
377 let mut peek = [0u8; 2];
378 self.connection.read_exact(&mut peek).await?;
379
380 match &peek {
381 b"SL" => {
382 let mut buf = [0u8; seedlink_rs_protocol::frame::v3::FRAME_LEN];
383 buf[0..2].copy_from_slice(&peek);
384 self.connection.read_exact(&mut buf[2..]).await?;
385 let raw = seedlink_rs_protocol::frame::v3::parse(&buf)?;
386 frames.push(OwnedFrame::from(raw));
387 }
388 b"SE" => {
389 let mut header = [0u8; seedlink_rs_protocol::frame::v4::MIN_HEADER_LEN];
390 header[0..2].copy_from_slice(&peek);
391 self.connection.read_exact(&mut header[2..]).await?;
392 let station_id_len = header[16] as usize;
393 let payload_len =
394 u32::from_le_bytes([header[4], header[5], header[6], header[7]]) as usize;
395 let remaining = station_id_len + payload_len;
396 let mut full = Vec::with_capacity(
397 seedlink_rs_protocol::frame::v4::MIN_HEADER_LEN + remaining,
398 );
399 full.extend_from_slice(&header);
400 full.resize(
401 seedlink_rs_protocol::frame::v4::MIN_HEADER_LEN + remaining,
402 0,
403 );
404 self.connection
405 .read_exact(&mut full[seedlink_rs_protocol::frame::v4::MIN_HEADER_LEN..])
406 .await?;
407 let (raw, _) = seedlink_rs_protocol::frame::v4::parse(&full)?;
408 frames.push(OwnedFrame::from(raw));
409 }
410 _ => {
411 let prefix = String::from_utf8_lossy(&peek).to_string();
413 let rest = self.connection.read_line().await?;
414 let _full_line = format!("{prefix}{rest}");
415 break;
416 }
417 }
418 }
419
420 Ok(frames)
421 }
422
423 pub async fn bye(&mut self) -> Result<()> {
427 self.connection
428 .send_command(&Command::Bye, self.version)
429 .await?;
430 self.connection.shutdown().await.ok();
431 self.state = ClientState::Disconnected;
432 Ok(())
433 }
434
435 pub fn last_sequence(&self, network: &str, station: &str) -> Option<SequenceNumber> {
441 let key = StationKey {
442 network: network.to_owned(),
443 station: station.to_owned(),
444 };
445 self.sequences.get(&key).copied()
446 }
447
448 pub fn sequences(&self) -> &HashMap<StationKey, SequenceNumber> {
450 &self.sequences
451 }
452
453 fn require_state_in(&self, allowed: &[ClientState], _method: &str) -> Result<()> {
456 if allowed.contains(&self.state) {
457 Ok(())
458 } else {
459 let expected_static: &'static str = match allowed {
460 [ClientState::Connected, ClientState::Configured] => "Connected|Configured",
461 [ClientState::Configured] => "Configured",
462 [ClientState::Streaming] => "Streaming",
463 _ => "valid state",
464 };
465 Err(ClientError::InvalidState {
466 expected: expected_static,
467 actual: self.state.as_str(),
468 })
469 }
470 }
471
472 async fn read_ok_response(&mut self, command_name: &str) -> Result<()> {
473 let line = self.connection.read_line().await?;
474 let response = Response::parse_line(&line)?;
475 match response {
476 Response::Ok => Ok(()),
477 Response::Error {
478 code, description, ..
479 } => {
480 let msg = if let Some(c) = code {
481 format!("{command_name}: {} {description}", c.as_str())
482 } else {
483 format!("{command_name}: {description}")
484 };
485 Err(ClientError::ServerError(msg))
486 }
487 _ => Err(ClientError::UnexpectedResponse(format!(
488 "expected OK for {command_name}, got: {line:?}"
489 ))),
490 }
491 }
492
493 fn track_sequence(&mut self, frame: &OwnedFrame) {
494 match frame {
495 OwnedFrame::V3 {
496 sequence, payload, ..
497 } => {
498 if payload.len() >= 20 {
499 let station = std::str::from_utf8(&payload[8..13])
500 .unwrap_or("")
501 .trim()
502 .to_owned();
503 let network = std::str::from_utf8(&payload[18..20])
504 .unwrap_or("")
505 .trim()
506 .to_owned();
507 if !station.is_empty() && !network.is_empty() {
508 self.sequences
509 .insert(StationKey { network, station }, *sequence);
510 }
511 }
512 }
513 OwnedFrame::V4 {
514 sequence,
515 station_id,
516 ..
517 } => {
518 if let Some((network, station)) = station_id.split_once('_') {
519 self.sequences.insert(
520 StationKey {
521 network: network.to_owned(),
522 station: station.to_owned(),
523 },
524 *sequence,
525 );
526 }
527 }
528 }
529 }
530}
531
532#[cfg(test)]
533mod tests {
534 use super::*;
535 use crate::mock::{MockConfig, MockServer};
536 use seedlink_rs_protocol::frame::{PayloadFormat, PayloadSubformat, v3, v4};
537
538 fn make_v3_frame(seq: u64, station: &str, network: &str) -> Vec<u8> {
539 let mut payload = [0u8; v3::PAYLOAD_LEN];
540 let sta_bytes = station.as_bytes();
541 for (i, &b) in sta_bytes.iter().enumerate().take(5) {
542 payload[8 + i] = b;
543 }
544 for i in sta_bytes.len()..5 {
545 payload[8 + i] = b' ';
546 }
547 let net_bytes = network.as_bytes();
548 for (i, &b) in net_bytes.iter().enumerate().take(2) {
549 payload[18 + i] = b;
550 }
551 for i in net_bytes.len()..2 {
552 payload[18 + i] = b' ';
553 }
554 v3::write(SequenceNumber::new(seq), &payload).unwrap()
555 }
556
557 fn make_v4_frame(seq: u64, station_id: &str) -> Vec<u8> {
558 v4::write(
559 PayloadFormat::MiniSeed2,
560 PayloadSubformat::Data,
561 SequenceNumber::new(seq),
562 station_id,
563 &[0u8; 64],
564 )
565 .unwrap()
566 }
567
568 #[tokio::test]
569 async fn hello_v3() {
570 let frames = vec![make_v3_frame(1, "ANMO", "IU")];
571 let server = MockServer::start(MockConfig::v3_default(frames)).await;
572
573 let client = SeedLinkClient::connect(&server.addr().to_string())
574 .await
575 .unwrap();
576
577 assert_eq!(client.version(), ProtocolVersion::V3);
578 assert_eq!(client.server_info().software, "SeedLink");
579 assert_eq!(client.server_info().organization, "Mock Server");
580 assert_eq!(client.state(), ClientState::Connected);
581 }
582
583 #[tokio::test]
584 async fn hello_v4_negotiation() {
585 let frames = vec![make_v4_frame(1, "IU_ANMO")];
586 let server = MockServer::start(MockConfig::v4_default(frames)).await;
587
588 let client = SeedLinkClient::connect(&server.addr().to_string())
589 .await
590 .unwrap();
591
592 assert_eq!(client.version(), ProtocolVersion::V4);
593 assert_eq!(client.server_info().organization, "Mock Server v4");
594 assert_eq!(client.state(), ClientState::Connected);
595 }
596
597 #[tokio::test]
598 async fn v4_fallback_to_v3() {
599 let config = MockConfig {
600 version: ProtocolVersion::V3,
601 hello_line1: "SeedLink v3.1 :: SLPROTO:4.0".to_owned(),
602 hello_line2: "Fake v4 Server".to_owned(),
603 frames: vec![make_v3_frame(1, "ANMO", "IU")],
604 connection_frames: None,
605 accept_slproto: false,
606 close_after_stream: false,
607 max_connections: 1,
608 };
609 let server = MockServer::start(config).await;
610
611 let client = SeedLinkClient::connect(&server.addr().to_string())
612 .await
613 .unwrap();
614
615 assert_eq!(client.version(), ProtocolVersion::V3);
616 }
617
618 #[tokio::test]
621 async fn v3_station_data_end_flow() {
622 let frames = vec![
623 make_v3_frame(1, "ANMO", "IU"),
624 make_v3_frame(2, "ANMO", "IU"),
625 ];
626 let server = MockServer::start(MockConfig::v3_default(frames)).await;
627
628 let mut client = SeedLinkClient::connect(&server.addr().to_string())
629 .await
630 .unwrap();
631
632 client.station("ANMO", "IU").await.unwrap();
634 assert_eq!(client.state(), ClientState::Configured);
635
636 client.data().await.unwrap();
638 assert_eq!(client.state(), ClientState::Configured);
639
640 client.end_stream().await.unwrap();
642 assert_eq!(client.state(), ClientState::Streaming);
643
644 let frame1 = client.next_frame().await.unwrap().unwrap();
645 assert_eq!(frame1.sequence(), SequenceNumber::new(1));
646
647 let frame2 = client.next_frame().await.unwrap().unwrap();
648 assert_eq!(frame2.sequence(), SequenceNumber::new(2));
649 }
650
651 #[tokio::test]
652 async fn v3_station_select_data_end_flow() {
653 let frames = vec![
654 make_v3_frame(1, "ANMO", "IU"),
655 make_v3_frame(2, "ANMO", "IU"),
656 ];
657 let server = MockServer::start(MockConfig::v3_default(frames)).await;
658
659 let mut client = SeedLinkClient::connect(&server.addr().to_string())
660 .await
661 .unwrap();
662
663 client.station("ANMO", "IU").await.unwrap();
664 assert_eq!(client.state(), ClientState::Configured);
665
666 client.select("BHZ").await.unwrap();
667 assert_eq!(client.state(), ClientState::Configured);
668
669 client.data().await.unwrap();
670 assert_eq!(client.state(), ClientState::Configured);
671
672 client.end_stream().await.unwrap();
673 assert_eq!(client.state(), ClientState::Streaming);
674
675 let frame1 = client.next_frame().await.unwrap().unwrap();
676 assert_eq!(frame1.sequence(), SequenceNumber::new(1));
677 }
678
679 #[tokio::test]
682 async fn v4_station_select_data_end_flow() {
683 let frames = vec![make_v4_frame(10, "IU_ANMO"), make_v4_frame(11, "IU_ANMO")];
684 let server = MockServer::start(MockConfig::v4_default(frames)).await;
685
686 let mut client = SeedLinkClient::connect(&server.addr().to_string())
687 .await
688 .unwrap();
689
690 assert_eq!(client.version(), ProtocolVersion::V4);
691
692 client.station("ANMO", "IU").await.unwrap();
693 client.select("BHZ").await.unwrap();
694 client.data().await.unwrap();
695 assert_eq!(client.state(), ClientState::Configured);
696
697 client.end_stream().await.unwrap();
698 assert_eq!(client.state(), ClientState::Streaming);
699
700 let frame1 = client.next_frame().await.unwrap().unwrap();
701 assert_eq!(frame1.sequence(), SequenceNumber::new(10));
702
703 let frame2 = client.next_frame().await.unwrap().unwrap();
704 assert_eq!(frame2.sequence(), SequenceNumber::new(11));
705 }
706
707 #[tokio::test]
710 async fn multi_station_end_stream() {
711 let frames = vec![
712 make_v3_frame(1, "ANMO", "IU"),
713 make_v3_frame(2, "WLF", "GE"),
714 ];
715 let server = MockServer::start(MockConfig::v3_default(frames)).await;
716
717 let mut client = SeedLinkClient::connect(&server.addr().to_string())
718 .await
719 .unwrap();
720
721 client.station("ANMO", "IU").await.unwrap();
723 client.data().await.unwrap();
724
725 client.station("WLF", "GE").await.unwrap();
726 client.data().await.unwrap();
727
728 client.end_stream().await.unwrap();
729
730 let frame1 = client.next_frame().await.unwrap().unwrap();
731 assert_eq!(frame1.sequence(), SequenceNumber::new(1));
732
733 let frame2 = client.next_frame().await.unwrap().unwrap();
734 assert_eq!(frame2.sequence(), SequenceNumber::new(2));
735 }
736
737 #[tokio::test]
740 async fn data_from_resume() {
741 let frames = vec![make_v3_frame(100, "ANMO", "IU")];
742 let server = MockServer::start(MockConfig::v3_default(frames)).await;
743
744 let mut client = SeedLinkClient::connect(&server.addr().to_string())
745 .await
746 .unwrap();
747
748 client.station("ANMO", "IU").await.unwrap();
749 client.data_from(SequenceNumber::new(99)).await.unwrap();
750 assert_eq!(client.state(), ClientState::Configured);
751
752 client.end_stream().await.unwrap();
753
754 let frame = client.next_frame().await.unwrap().unwrap();
755 assert_eq!(frame.sequence(), SequenceNumber::new(100));
756 }
757
758 #[tokio::test]
761 async fn state_machine_enforcement() {
762 let server =
763 MockServer::start(MockConfig::v3_default(vec![make_v3_frame(1, "ANMO", "IU")])).await;
764
765 let mut client = SeedLinkClient::connect(&server.addr().to_string())
766 .await
767 .unwrap();
768
769 let err = client.data().await.unwrap_err();
771 assert!(matches!(err, ClientError::InvalidState { .. }));
772
773 let err = client.end_stream().await.unwrap_err();
775 assert!(matches!(err, ClientError::InvalidState { .. }));
776
777 let err = client.next_frame().await.unwrap_err();
779 assert!(matches!(err, ClientError::InvalidState { .. }));
780
781 client.station("ANMO", "IU").await.unwrap();
783
784 let err = client.next_frame().await.unwrap_err();
786 assert!(matches!(err, ClientError::InvalidState { .. }));
787
788 client.data().await.unwrap();
790 assert_eq!(client.state(), ClientState::Configured);
791
792 client.end_stream().await.unwrap();
794 assert_eq!(client.state(), ClientState::Streaming);
795
796 let err = client.station("WLF", "GE").await.unwrap_err();
798 assert!(matches!(err, ClientError::InvalidState { .. }));
799 }
800
801 #[tokio::test]
804 async fn bye() {
805 let server = MockServer::start(MockConfig::v3_default(vec![])).await;
806
807 let mut client = SeedLinkClient::connect(&server.addr().to_string())
808 .await
809 .unwrap();
810
811 client.bye().await.unwrap();
812 assert_eq!(client.state(), ClientState::Disconnected);
813 }
814
815 #[tokio::test]
818 async fn v3_sequence_tracking() {
819 let frames = vec![
820 make_v3_frame(5, "ANMO", "IU"),
821 make_v3_frame(10, "ANMO", "IU"),
822 make_v3_frame(3, "WLF", "GE"),
823 ];
824 let server = MockServer::start(MockConfig::v3_default(frames)).await;
825
826 let mut client = SeedLinkClient::connect(&server.addr().to_string())
827 .await
828 .unwrap();
829
830 client.station("ANMO", "IU").await.unwrap();
831 client.data().await.unwrap();
832 client.end_stream().await.unwrap();
833
834 client.next_frame().await.unwrap();
835 client.next_frame().await.unwrap();
836 client.next_frame().await.unwrap();
837
838 assert_eq!(
839 client.last_sequence("IU", "ANMO"),
840 Some(SequenceNumber::new(10))
841 );
842 assert_eq!(
843 client.last_sequence("GE", "WLF"),
844 Some(SequenceNumber::new(3))
845 );
846 assert_eq!(client.last_sequence("XX", "NONE"), None);
847 assert_eq!(client.sequences().len(), 2);
848 }
849
850 #[tokio::test]
851 async fn v4_sequence_tracking() {
852 let frames = vec![make_v4_frame(20, "IU_ANMO"), make_v4_frame(21, "IU_ANMO")];
853 let server = MockServer::start(MockConfig::v4_default(frames)).await;
854
855 let mut client = SeedLinkClient::connect(&server.addr().to_string())
856 .await
857 .unwrap();
858
859 client.station("ANMO", "IU").await.unwrap();
860 client.data().await.unwrap();
861 client.end_stream().await.unwrap();
862
863 client.next_frame().await.unwrap();
864 client.next_frame().await.unwrap();
865
866 assert_eq!(
867 client.last_sequence("IU", "ANMO"),
868 Some(SequenceNumber::new(21))
869 );
870 }
871
872 #[tokio::test]
875 async fn connect_no_prefer_v4() {
876 let server = MockServer::start(MockConfig::v4_default(vec![])).await;
877
878 let config = ClientConfig {
879 prefer_v4: false,
880 ..ClientConfig::default()
881 };
882 let client = SeedLinkClient::connect_with_config(&server.addr().to_string(), config)
883 .await
884 .unwrap();
885
886 assert_eq!(client.version(), ProtocolVersion::V3);
887 }
888
889 #[tokio::test]
892 async fn server_error_on_station() {
893 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
894 let addr = listener.local_addr().unwrap();
895
896 tokio::spawn(async move {
897 let (stream, _) = listener.accept().await.unwrap();
898 let (read, mut write) = stream.into_split();
899 let mut reader = tokio::io::BufReader::new(read);
900 let mut line = String::new();
901
902 loop {
903 line.clear();
904 let n = tokio::io::AsyncBufReadExt::read_line(&mut reader, &mut line)
905 .await
906 .unwrap_or(0);
907 if n == 0 {
908 break;
909 }
910 let trimmed = line.trim().to_uppercase();
911 if trimmed == "HELLO" {
912 let _ = tokio::io::AsyncWriteExt::write_all(
913 &mut write,
914 b"SeedLink v3.3\r\nTest\r\n",
915 )
916 .await;
917 let _ = tokio::io::AsyncWriteExt::flush(&mut write).await;
918 } else if trimmed.starts_with("STATION") {
919 let _ = tokio::io::AsyncWriteExt::write_all(
920 &mut write,
921 b"ERROR ARGUMENTS bad station\r\n",
922 )
923 .await;
924 let _ = tokio::io::AsyncWriteExt::flush(&mut write).await;
925 } else if trimmed == "BYE" {
926 break;
927 }
928 }
929 });
930
931 let config = ClientConfig {
932 prefer_v4: false,
933 ..ClientConfig::default()
934 };
935 let mut client = SeedLinkClient::connect_with_config(&addr.to_string(), config)
936 .await
937 .unwrap();
938
939 let err = client.station("BAD", "XX").await.unwrap_err();
940 assert!(matches!(err, ClientError::ServerError(_)));
941 }
942
943 #[tokio::test]
946 async fn next_frame_returns_none_on_eof() {
947 let frames = vec![make_v3_frame(1, "ANMO", "IU")];
948 let config = MockConfig {
949 close_after_stream: true,
950 ..MockConfig::v3_default(frames)
951 };
952 let server = MockServer::start(config).await;
953
954 let mut client = SeedLinkClient::connect(&server.addr().to_string())
955 .await
956 .unwrap();
957
958 client.station("ANMO", "IU").await.unwrap();
959 client.data().await.unwrap();
960 client.end_stream().await.unwrap();
961
962 let frame = client.next_frame().await.unwrap();
964 assert!(frame.is_some());
965 assert_eq!(frame.unwrap().sequence(), SequenceNumber::new(1));
966
967 let frame = client.next_frame().await.unwrap();
969 assert!(frame.is_none());
970
971 assert_eq!(client.state(), ClientState::Disconnected);
972 }
973
974 #[tokio::test]
977 async fn v3_fetch_flow() {
978 let frames = vec![make_v3_frame(1, "ANMO", "IU")];
979 let config = MockConfig {
980 close_after_stream: true,
981 ..MockConfig::v3_default(frames)
982 };
983 let server = MockServer::start(config).await;
984
985 let mut client = SeedLinkClient::connect(&server.addr().to_string())
986 .await
987 .unwrap();
988
989 client.station("ANMO", "IU").await.unwrap();
990 client.data().await.unwrap();
991 client.fetch().await.unwrap();
992 assert_eq!(client.state(), ClientState::Streaming);
993
994 let frame = client.next_frame().await.unwrap();
995 assert!(frame.is_some());
996
997 let frame = client.next_frame().await.unwrap();
998 assert!(frame.is_none());
999 assert_eq!(client.state(), ClientState::Disconnected);
1000 }
1001
1002 #[tokio::test]
1005 async fn time_window_v3_flow() {
1006 let frames = vec![make_v3_frame(1, "ANMO", "IU")];
1007 let server = MockServer::start(MockConfig::v3_default(frames)).await;
1008
1009 let mut client = SeedLinkClient::connect(&server.addr().to_string())
1010 .await
1011 .unwrap();
1012
1013 client.station("ANMO", "IU").await.unwrap();
1014 client.time_window("2024,1,0,0,0", None).await.unwrap();
1015 assert_eq!(client.state(), ClientState::Configured);
1016
1017 client.end_stream().await.unwrap();
1018 assert_eq!(client.state(), ClientState::Streaming);
1019
1020 let frame = client.next_frame().await.unwrap().unwrap();
1021 assert_eq!(frame.sequence(), SequenceNumber::new(1));
1022 }
1023
1024 #[tokio::test]
1025 async fn time_window_with_end() {
1026 let frames = vec![make_v3_frame(1, "ANMO", "IU")];
1027 let server = MockServer::start(MockConfig::v3_default(frames)).await;
1028
1029 let mut client = SeedLinkClient::connect(&server.addr().to_string())
1030 .await
1031 .unwrap();
1032
1033 client.station("ANMO", "IU").await.unwrap();
1034 client
1035 .time_window("2024,1,0,0,0", Some("2024,2,0,0,0"))
1036 .await
1037 .unwrap();
1038 assert_eq!(client.state(), ClientState::Configured);
1039 }
1040
1041 #[tokio::test]
1042 async fn time_window_requires_configured() {
1043 let server = MockServer::start(MockConfig::v3_default(vec![])).await;
1044
1045 let mut client = SeedLinkClient::connect(&server.addr().to_string())
1046 .await
1047 .unwrap();
1048
1049 let err = client.time_window("2024,1,0,0,0", None).await.unwrap_err();
1051 assert!(matches!(err, ClientError::InvalidState { .. }));
1052 }
1053}