1#![doc = include_str!("../readme.md")]
2pub use error::{Error, Result};
12use futures::Stream;
13use num_complex::Complex;
14use serde::{Deserialize, Serialize};
15use std::path::PathBuf;
16use std::pin::Pin;
17use std::task::{Context, Poll};
18
19pub mod dsp;
20pub mod error;
21pub mod iqread;
22#[cfg(feature = "pluto")]
23pub mod pluto;
24#[cfg(feature = "rtlsdr")]
25pub mod rtlsdr;
26#[cfg(feature = "soapy")]
27pub mod soapy;
28
29pub fn expanduser(path: PathBuf) -> PathBuf {
46 if let Some(stripped) = path.to_str().and_then(|p| p.strip_prefix("~"))
48 && let Some(home_dir) = dirs::home_dir()
49 {
50 return home_dir.join(stripped.trim_start_matches('/'));
52 }
53 path
54}
55
56pub fn parse_si_value<T>(s: &str) -> Result<T>
75where
76 T: std::str::FromStr,
77 <T as std::str::FromStr>::Err: std::fmt::Display,
78{
79 let s = s.trim();
80
81 let (num_str, multiplier) = if let Some(stripped) = s.strip_suffix('G') {
83 (stripped, 1_000_000_000.0)
84 } else if let Some(stripped) = s.strip_suffix('M') {
85 (stripped, 1_000_000.0)
86 } else if let Some(stripped) = s.strip_suffix(['k', 'K']) {
87 (stripped, 1_000.0)
88 } else {
89 return s
91 .parse()
92 .map_err(|e| Error::other(format!("Invalid numeric value '{}': {}", s, e)));
93 };
94
95 let value_f64: f64 = num_str
97 .parse()
98 .map_err(|e| Error::other(format!("Invalid numeric value '{}': {}", num_str, e)))?;
99
100 let result = value_f64 * multiplier;
102
103 let result_str = format!("{:.0}", result);
105 result_str.parse().map_err(|e| {
106 Error::other(format!(
107 "Failed to convert '{}' to target type: {}",
108 result_str, e
109 ))
110 })
111}
112
113#[derive(Debug, Clone, PartialEq, Serialize)]
130pub enum Gain {
131 Auto,
133 Manual(f64),
135 Elements(Vec<GainElement>),
137}
138
139#[derive(Debug, Clone, PartialEq, Serialize)]
144pub struct GainElement {
145 pub name: GainElementName,
146 pub value_db: f64,
147}
148
149#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
154pub enum GainElementName {
155 Tuner,
157 Lna,
159 Mix,
160 Vga,
161 Pga,
163 Custom(String),
165}
166
167impl<'de> Deserialize<'de> for Gain {
168 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
169 where
170 D: serde::Deserializer<'de>,
171 {
172 use serde::de::{self, MapAccess, Visitor};
173 use std::fmt;
174
175 struct GainVisitor;
176
177 impl<'de> Visitor<'de> for GainVisitor {
178 type Value = Gain;
179
180 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
181 formatter.write_str(r#""auto", a number (e.g., 49.6), or a map of gain elements (e.g., { LNA = 5, VGA = 5 })"#)
182 }
183
184 fn visit_str<E>(self, value: &str) -> std::result::Result<Gain, E>
186 where
187 E: de::Error,
188 {
189 if value.eq_ignore_ascii_case("auto") {
190 Ok(Gain::Auto)
191 } else {
192 Err(de::Error::custom(format!(
193 "expected \"auto\", got \"{}\"",
194 value
195 )))
196 }
197 }
198
199 fn visit_i64<E>(self, value: i64) -> std::result::Result<Gain, E>
201 where
202 E: de::Error,
203 {
204 Ok(Gain::Manual(value as f64))
205 }
206
207 fn visit_u64<E>(self, value: u64) -> std::result::Result<Gain, E>
208 where
209 E: de::Error,
210 {
211 Ok(Gain::Manual(value as f64))
212 }
213
214 fn visit_f64<E>(self, value: f64) -> std::result::Result<Gain, E>
215 where
216 E: de::Error,
217 {
218 Ok(Gain::Manual(value))
219 }
220
221 fn visit_map<M>(self, mut map: M) -> std::result::Result<Gain, M::Error>
223 where
224 M: MapAccess<'de>,
225 {
226 let mut elements = Vec::new();
227
228 while let Some(key) = map.next_key::<String>()? {
229 let value: f64 = map.next_value()?;
230
231 let name = match key.to_uppercase().as_str() {
233 "TUNER" => GainElementName::Tuner,
234 "LNA" => GainElementName::Lna,
235 "MIX" => GainElementName::Mix,
236 "VGA" => GainElementName::Vga,
237 "PGA" => GainElementName::Pga,
238 custom => GainElementName::Custom(custom.to_string()),
239 };
240
241 elements.push(GainElement {
242 name,
243 value_db: value,
244 });
245 }
246
247 if elements.is_empty() {
248 Err(de::Error::custom("gain elements map cannot be empty"))
249 } else {
250 Ok(Gain::Elements(elements))
251 }
252 }
253 }
254
255 deserializer.deserialize_any(GainVisitor)
256 }
257}
258
259impl Gain {
260 pub fn parse(input: &str) -> error::Result<Self> {
277 let input = input.trim();
278
279 if input.eq_ignore_ascii_case("auto") {
280 return Ok(Gain::Auto);
281 }
282
283 if let Ok(value) = input.parse::<f64>() {
285 return Ok(Gain::Manual(value));
286 }
287
288 let mut elements = Vec::new();
290
291 for part in input.split(',') {
292 let part = part.trim();
293 if part.is_empty() {
294 continue;
295 }
296
297 let (name, value) = part.split_once('=').ok_or_else(|| {
298 Error::other(format!(
299 "Invalid gain element '{}', expected format NAME=VALUE",
300 part
301 ))
302 })?;
303
304 let value_db = value.trim().parse::<f64>().map_err(|_| {
305 Error::other(format!("Invalid gain value '{}', expected a number", value))
306 })?;
307
308 let name = match name.trim().to_ascii_uppercase().as_str() {
309 "TUNER" => GainElementName::Tuner,
310 "LNA" => GainElementName::Lna,
311 "MIX" => GainElementName::Mix,
312 "VGA" => GainElementName::Vga,
313 "PGA" => GainElementName::Pga,
314 other => GainElementName::Custom(other.to_string()),
315 };
316
317 elements.push(GainElement { name, value_db });
318 }
319
320 if elements.is_empty() {
321 return Err(Error::other(format!("Invalid gain setting: '{}'", input)));
322 }
323
324 Ok(Gain::Elements(elements))
325 }
326}
327
328#[derive(Debug, Copy, Clone, PartialEq, Eq)]
333pub enum IqFormat {
334 Cu8,
339 Cs8,
344 Cs16,
349 Cf32,
354}
355
356#[derive(Debug, Clone, PartialEq)]
357pub enum DeviceConfig {
358 #[cfg(feature = "pluto")]
359 Pluto(pluto::PlutoConfig),
360 #[cfg(feature = "rtlsdr")]
361 RtlSdr(rtlsdr::RtlSdrConfig),
362 #[cfg(feature = "soapy")]
363 Soapy(soapy::SoapyConfig),
364}
365
366impl std::str::FromStr for DeviceConfig {
367 type Err = Error;
368
369 fn from_str(s: &str) -> Result<Self> {
424 let parts: Vec<&str> = s.splitn(2, "://").collect();
426 if parts.len() != 2 {
427 return Err(Error::other(
428 "Invalid device URL: missing '://' separator".to_string(),
429 ));
430 }
431
432 let scheme = parts[0];
433 #[allow(unused_variables)]
434 let rest = parts[1];
435
436 match scheme {
437 #[cfg(feature = "rtlsdr")]
438 "rtlsdr" => {
439 let (device_part, query) = if let Some(q_pos) = rest.find('?') {
441 (&rest[..q_pos], &rest[q_pos + 1..])
442 } else {
443 (rest, "")
444 };
445
446 let device_index = if device_part.is_empty() {
447 0
448 } else {
449 device_part.parse::<usize>().map_err(|_| {
450 Error::other(format!("Invalid device index: {}", device_part))
451 })?
452 };
453
454 let mut center_freq: Option<u32> = None;
456 let mut sample_rate: Option<u32> = None;
457 let mut gain = Gain::Auto;
458 let mut bias_tee = false;
459
460 for param in query.split('&') {
461 if param.is_empty() {
462 continue;
463 }
464 let kv: Vec<&str> = param.splitn(2, '=').collect();
465 if kv.len() != 2 {
466 continue;
467 }
468 match kv[0] {
469 "freq" | "frequency" => {
470 center_freq = Some(parse_si_value(kv[1])?);
471 }
472 "rate" | "sample_rate" => {
473 sample_rate = Some(parse_si_value(kv[1])?);
474 }
475 "gain" => {
476 if kv[1].to_lowercase() == "auto" {
477 gain = Gain::Auto;
478 } else {
479 let gain_db: f64 = kv[1].parse().map_err(|_| {
480 Error::other(format!("Invalid gain: {}", kv[1]))
481 })?;
482 gain = Gain::Manual(gain_db);
483 }
484 }
485 "bias_tee" | "bias-tee" => {
486 bias_tee = kv[1].to_lowercase() == "true" || kv[1] == "1";
487 }
488 _ => {} }
490 }
491
492 let center_freq = center_freq
493 .ok_or_else(|| Error::other("Missing freq parameter".to_string()))?;
494 let sample_rate = sample_rate
495 .ok_or_else(|| Error::other("Missing rate parameter".to_string()))?;
496
497 Ok(DeviceConfig::RtlSdr(rtlsdr::RtlSdrConfig {
498 device: rtlsdr::DeviceSelector::Index(device_index),
499 center_freq,
500 sample_rate,
501 gain,
502 bias_tee,
503 }))
504 }
505 #[cfg(feature = "soapy")]
506 "soapy" => {
507 let (args_part, query) = if let Some(q_pos) = rest.find('?') {
509 (&rest[..q_pos], &rest[q_pos + 1..])
510 } else {
511 (rest, "")
512 };
513
514 let args = args_part.to_string();
515
516 let mut center_freq: Option<f64> = None;
518 let mut sample_rate: Option<f64> = None;
519 let mut gain = Gain::Auto;
520 let channel = 0;
521 let mut bias_tee = false;
522
523 for param in query.split('&') {
524 if param.is_empty() {
525 continue;
526 }
527 let kv: Vec<&str> = param.splitn(2, '=').collect();
528 if kv.len() != 2 {
529 continue;
530 }
531 match kv[0] {
532 "freq" | "frequency" => {
533 center_freq = Some(parse_si_value(kv[1])?);
534 }
535 "rate" | "sample_rate" => {
536 sample_rate = Some(parse_si_value(kv[1])?);
537 }
538 "gain" => {
539 gain = Gain::parse(kv[1])?;
540 }
541 "bias_tee" | "bias-tee" => {
542 bias_tee = kv[1].to_lowercase() == "true" || kv[1] == "1";
543 }
544 _ => {} }
546 }
547
548 let center_freq = center_freq
549 .ok_or_else(|| Error::other("Missing freq parameter".to_string()))?;
550 let sample_rate = sample_rate
551 .ok_or_else(|| Error::other("Missing rate parameter".to_string()))?;
552
553 Ok(DeviceConfig::Soapy(soapy::SoapyConfig {
554 args,
555 center_freq,
556 sample_rate,
557 channel,
558 gain,
559 bias_tee,
560 }))
561 }
562 #[cfg(feature = "pluto")]
563 "pluto" => {
564 let rest_trimmed = rest.strip_prefix('/').unwrap_or(rest);
570
571 let (uri_part, query) = if let Some(q_pos) = rest_trimmed.find('?') {
572 (&rest_trimmed[..q_pos], &rest_trimmed[q_pos + 1..])
573 } else {
574 (rest_trimmed, "")
575 };
576
577 let uri = uri_part.to_string();
578
579 let mut center_freq: Option<i64> = None;
581 let mut sample_rate: Option<i64> = None;
582 let mut gain = Gain::Manual(40.0); for param in query.split('&') {
585 if param.is_empty() {
586 continue;
587 }
588 let kv: Vec<&str> = param.splitn(2, '=').collect();
589 if kv.len() != 2 {
590 continue;
591 }
592 match kv[0] {
593 "freq" | "frequency" => {
594 center_freq = Some(parse_si_value(kv[1])?);
595 }
596 "rate" | "sample_rate" => {
597 sample_rate = Some(parse_si_value(kv[1])?);
598 }
599 "gain" => {
600 if kv[1].to_lowercase() == "auto" {
601 gain = Gain::Auto;
602 } else {
603 let gain_db: f64 = kv[1].parse().map_err(|_| {
604 Error::other(format!("Invalid gain: {}", kv[1]))
605 })?;
606 gain = Gain::Manual(gain_db);
607 }
608 }
609 _ => {} }
611 }
612
613 let center_freq = center_freq
614 .ok_or_else(|| Error::other("Missing freq parameter".to_string()))?;
615 let sample_rate = sample_rate
616 .ok_or_else(|| Error::other("Missing rate parameter".to_string()))?;
617
618 Ok(DeviceConfig::Pluto(pluto::PlutoConfig {
619 uri,
620 center_freq,
621 sample_rate,
622 gain,
623 }))
624 }
625 _ => Err(Error::other(format!("Unknown device scheme: {}", scheme))),
626 }
627 }
628}
629
630impl std::fmt::Display for IqFormat {
631 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
632 match self {
633 IqFormat::Cu8 => write!(f, "cu8"),
634 IqFormat::Cs8 => write!(f, "cs8"),
635 IqFormat::Cs16 => write!(f, "cs16"),
636 IqFormat::Cf32 => write!(f, "cf32"),
637 }
638 }
639}
640
641impl std::str::FromStr for IqFormat {
642 type Err = Error;
643
644 fn from_str(s: &str) -> Result<Self> {
645 match s.to_lowercase().as_str() {
646 "cu8" => Ok(IqFormat::Cu8),
647 "cs8" => Ok(IqFormat::Cs8),
648 "cs16" => Ok(IqFormat::Cs16),
649 "cf32" => Ok(IqFormat::Cf32),
650 _ => Err(Error::format(format!("Invalid IQ format: '{}'", s))),
651 }
652 }
653}
654
655pub enum IqSource {
660 IqFile(iqread::IqRead<std::io::BufReader<std::fs::File>>),
662 IqStdin(iqread::IqRead<std::io::BufReader<std::io::Stdin>>),
664 IqTcp(iqread::IqRead<std::io::BufReader<std::net::TcpStream>>),
666 #[cfg(feature = "pluto")]
668 PlutoSdr(pluto::PlutoSdrReader),
669 #[cfg(feature = "rtlsdr")]
671 RtlSdr(rtlsdr::RtlSdrReader),
672 #[cfg(feature = "soapy")]
674 SoapySdr(soapy::SoapySdrReader),
675}
676
677impl Iterator for IqSource {
678 type Item = error::Result<Vec<Complex<f32>>>;
679
680 fn next(&mut self) -> Option<Self::Item> {
681 match self {
682 IqSource::IqFile(source) => source.next(),
683 IqSource::IqStdin(source) => source.next(),
684 IqSource::IqTcp(source) => source.next(),
685 #[cfg(feature = "pluto")]
686 IqSource::PlutoSdr(source) => source.next(),
687 #[cfg(feature = "rtlsdr")]
688 IqSource::RtlSdr(source) => source.next(),
689 #[cfg(feature = "soapy")]
690 IqSource::SoapySdr(source) => source.next(),
691 }
692 }
693}
694impl IqSource {
695 pub fn from_file<P: AsRef<std::path::Path>>(
717 path: P,
718 center_freq: u32,
719 sample_rate: u32,
720 chunk_size: usize,
721 iq_format: IqFormat,
722 ) -> error::Result<Self> {
723 let source =
724 iqread::IqRead::from_file(path, center_freq, sample_rate, chunk_size, iq_format)?;
725 Ok(IqSource::IqFile(source))
726 }
727
728 pub fn from_stdin(
745 center_freq: u32,
746 sample_rate: u32,
747 chunk_size: usize,
748 iq_format: IqFormat,
749 ) -> error::Result<IqSource> {
750 let source = iqread::IqRead::from_stdin(center_freq, sample_rate, chunk_size, iq_format);
751 Ok(IqSource::IqStdin(source))
752 }
753
754 pub fn from_tcp(
755 addr: &str,
756 port: u16,
757 center_freq: u32,
758 sample_rate: u32,
759 chunk_size: usize,
760 iq_format: IqFormat,
761 ) -> error::Result<Self> {
762 let source =
763 iqread::IqRead::from_tcp(addr, port, center_freq, sample_rate, chunk_size, iq_format)?;
764 Ok(IqSource::IqTcp(source))
765 }
766
767 pub fn from_device_config(config: DeviceConfig) -> error::Result<Self> {
769 match config {
770 #[cfg(feature = "pluto")]
771 DeviceConfig::Pluto(cfg) => {
772 let source = pluto::PlutoSdrReader::new(&cfg)?;
773 Ok(IqSource::PlutoSdr(source))
774 }
775 #[cfg(feature = "rtlsdr")]
776 DeviceConfig::RtlSdr(cfg) => {
777 let source = rtlsdr::RtlSdrReader::new(&cfg)?;
778 Ok(IqSource::RtlSdr(source))
779 }
780 #[cfg(feature = "soapy")]
781 DeviceConfig::Soapy(cfg) => {
782 let source = soapy::SoapySdrReader::new(&cfg)?;
783 Ok(IqSource::SoapySdr(source))
784 }
785 }
786 }
787
788 #[cfg(feature = "pluto")]
789 pub fn from_pluto(
791 uri: &str,
792 center_freq: i64,
793 sample_rate: i64,
794 gain: f64,
795 ) -> error::Result<Self> {
796 let config = pluto::PlutoConfig {
797 uri: uri.to_string(),
798 center_freq,
799 sample_rate,
800 gain: Gain::Manual(gain),
801 };
802 let source = pluto::PlutoSdrReader::new(&config)?;
803 Ok(IqSource::PlutoSdr(source))
804 }
805
806 #[cfg(feature = "rtlsdr")]
807 pub fn from_rtlsdr(
809 device_index: usize,
810 center_freq: u32,
811 sample_rate: u32,
812 gain: Option<i32>,
813 ) -> error::Result<Self> {
814 let config = rtlsdr::RtlSdrConfig {
815 device: rtlsdr::DeviceSelector::Index(device_index),
816 center_freq,
817 sample_rate,
818 gain: match gain {
819 Some(g) => Gain::Manual((g as f64) / 10.0),
820 None => Gain::Auto,
821 },
822 bias_tee: false,
823 };
824 let source = rtlsdr::RtlSdrReader::new(&config)?;
825 Ok(IqSource::RtlSdr(source))
826 }
827
828 #[cfg(feature = "soapy")]
829 pub fn from_soapy(
831 args: &str,
832 channel: usize,
833 center_freq: u32,
834 sample_rate: u32,
835 gain: Gain,
836 ) -> error::Result<Self> {
837 let config = soapy::SoapyConfig {
838 args: args.to_string(),
839 center_freq: center_freq as f64,
840 sample_rate: sample_rate as f64,
841 channel,
842 gain,
843 bias_tee: false,
844 };
845 let source = soapy::SoapySdrReader::new(&config)?;
846 Ok(IqSource::SoapySdr(source))
847 }
848}
849
850pub enum IqAsyncSource {
857 IqAsyncFile(iqread::IqAsyncRead<tokio::io::BufReader<tokio::fs::File>>),
859 IqAsyncStdin(iqread::IqAsyncRead<tokio::io::BufReader<tokio::io::Stdin>>),
861 IqAsyncTcp(iqread::IqAsyncRead<tokio::io::BufReader<tokio::net::TcpStream>>),
863 #[cfg(feature = "pluto")]
865 PlutoSdr(pluto::AsyncPlutoSdrReader),
866 #[cfg(feature = "rtlsdr")]
868 RtlSdr(rtlsdr::AsyncRtlSdrReader),
869 #[cfg(feature = "soapy")]
871 SoapySdr(soapy::AsyncSoapySdrReader),
872}
873
874impl IqAsyncSource {
875 pub async fn from_file<P: AsRef<std::path::Path>>(
901 path: P,
902 center_freq: u32,
903 sample_rate: u32,
904 chunk_size: usize,
905 iq_format: IqFormat,
906 ) -> error::Result<Self> {
907 let source =
908 iqread::IqAsyncRead::from_file(path, center_freq, sample_rate, chunk_size, iq_format)
909 .await?;
910 Ok(IqAsyncSource::IqAsyncFile(source))
911 }
912
913 pub fn from_stdin(
932 center_freq: u32,
933 sample_rate: u32,
934 chunk_size: usize,
935 iq_format: IqFormat,
936 ) -> Self {
937 let source =
938 iqread::IqAsyncRead::from_stdin(center_freq, sample_rate, chunk_size, iq_format);
939 IqAsyncSource::IqAsyncStdin(source)
940 }
941
942 pub async fn from_tcp(
944 addr: &str,
945 port: u16,
946 center_freq: u32,
947 sample_rate: u32,
948 chunk_size: usize,
949 iq_format: IqFormat,
950 ) -> error::Result<Self> {
951 let source = iqread::IqAsyncRead::from_tcp(
952 addr,
953 port,
954 center_freq,
955 sample_rate,
956 chunk_size,
957 iq_format,
958 )
959 .await?;
960 Ok(IqAsyncSource::IqAsyncTcp(source))
961 }
962
963 #[cfg(feature = "pluto")]
964 pub async fn from_pluto(
966 uri: &str,
967 center_freq: i64,
968 sample_rate: i64,
969 gain: f64,
970 ) -> error::Result<Self> {
971 let config = pluto::PlutoConfig {
972 uri: uri.to_string(),
973 center_freq,
974 sample_rate,
975 gain: Gain::Manual(gain),
976 };
977 let source = pluto::AsyncPlutoSdrReader::new(&config).await?;
978 Ok(IqAsyncSource::PlutoSdr(source))
979 }
980
981 #[cfg(feature = "rtlsdr")]
982 pub async fn from_rtlsdr(
984 device_index: usize,
985 center_freq: u32,
986 sample_rate: u32,
987 gain: Option<i32>,
988 ) -> error::Result<Self> {
989 let config = rtlsdr::RtlSdrConfig {
990 device: rtlsdr::DeviceSelector::Index(device_index),
991 center_freq,
992 sample_rate,
993 gain: match gain {
994 Some(g) => Gain::Manual((g as f64) / 10.0),
995 None => Gain::Auto,
996 },
997 bias_tee: false,
998 };
999 let async_reader = rtlsdr::AsyncRtlSdrReader::new(&config)?;
1000 Ok(IqAsyncSource::RtlSdr(async_reader))
1001 }
1002
1003 #[cfg(feature = "soapy")]
1004 pub async fn from_soapy(
1006 args: &str,
1007 channel: usize,
1008 center_freq: u32,
1009 sample_rate: u32,
1010 gain: Gain,
1011 ) -> error::Result<Self> {
1012 let config = soapy::SoapyConfig {
1013 args: args.to_string(),
1014 center_freq: center_freq as f64,
1015 sample_rate: sample_rate as f64,
1016 channel,
1017 gain,
1018 bias_tee: false,
1019 };
1020 let async_reader = soapy::AsyncSoapySdrReader::new(&config)?;
1021 Ok(IqAsyncSource::SoapySdr(async_reader))
1022 }
1023
1024 #[cfg(any(feature = "rtlsdr", feature = "pluto", feature = "soapy"))]
1029 pub async fn from_device_config(config: &DeviceConfig) -> error::Result<Self> {
1030 match config {
1031 #[cfg(feature = "rtlsdr")]
1032 DeviceConfig::RtlSdr(cfg) => {
1033 let async_reader = rtlsdr::AsyncRtlSdrReader::new(cfg)?;
1034 Ok(IqAsyncSource::RtlSdr(async_reader))
1035 }
1036 #[cfg(feature = "pluto")]
1037 DeviceConfig::Pluto(cfg) => {
1038 let async_reader = pluto::AsyncPlutoSdrReader::new(cfg).await?;
1039 Ok(IqAsyncSource::PlutoSdr(async_reader))
1040 }
1041 #[cfg(feature = "soapy")]
1042 DeviceConfig::Soapy(cfg) => {
1043 let async_reader = soapy::AsyncSoapySdrReader::new(cfg)?;
1044 Ok(IqAsyncSource::SoapySdr(async_reader))
1045 }
1046 }
1047 }
1048}
1049
1050impl Stream for IqAsyncSource {
1051 type Item = error::Result<Vec<Complex<f32>>>;
1052
1053 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1054 match self.get_mut() {
1055 IqAsyncSource::IqAsyncFile(source) => Pin::new(source).poll_next(cx),
1056 IqAsyncSource::IqAsyncStdin(source) => Pin::new(source).poll_next(cx),
1057 IqAsyncSource::IqAsyncTcp(source) => Pin::new(source).poll_next(cx),
1058 #[cfg(feature = "pluto")]
1059 IqAsyncSource::PlutoSdr(source) => Pin::new(source).poll_next(cx),
1060 #[cfg(feature = "rtlsdr")]
1061 IqAsyncSource::RtlSdr(source) => Pin::new(source).poll_next(cx),
1062 #[cfg(feature = "soapy")]
1063 IqAsyncSource::SoapySdr(source) => Pin::new(source).poll_next(cx),
1064 }
1065 }
1066}
1067
1068fn convert_bytes_to_complex(format: IqFormat, buffer: &[u8]) -> Vec<Complex<f32>> {
1069 match format {
1070 IqFormat::Cu8 => buffer
1071 .chunks_exact(2)
1072 .map(|c| Complex::new((c[0] as f32 - 127.5) / 128.0, (c[1] as f32 - 127.5) / 128.0))
1073 .collect(),
1074 IqFormat::Cs8 => buffer
1075 .chunks_exact(2)
1076 .map(|c| Complex::new((c[0] as i8) as f32 / 128.0, (c[1] as i8) as f32 / 128.0))
1077 .collect(),
1078 IqFormat::Cs16 => buffer
1079 .chunks_exact(4)
1080 .map(|c| {
1081 Complex::new(
1082 i16::from_le_bytes([c[0], c[1]]) as f32 / 32768.0,
1083 i16::from_le_bytes([c[2], c[3]]) as f32 / 32768.0,
1084 )
1085 })
1086 .collect(),
1087 IqFormat::Cf32 => buffer
1088 .chunks_exact(8)
1089 .map(|c| {
1090 Complex::new(
1091 f32::from_le_bytes([c[0], c[1], c[2], c[3]]),
1092 f32::from_le_bytes([c[4], c[5], c[6], c[7]]),
1093 )
1094 })
1095 .collect(),
1096 }
1097}
1098
1099#[cfg(test)]
1100mod tests {
1101 use super::*;
1102
1103 #[test]
1104 fn test_iqformat_display() {
1105 assert_eq!(IqFormat::Cu8.to_string(), "cu8");
1106 assert_eq!(IqFormat::Cs8.to_string(), "cs8");
1107 assert_eq!(IqFormat::Cs16.to_string(), "cs16");
1108 assert_eq!(IqFormat::Cf32.to_string(), "cf32");
1109 }
1110
1111 #[test]
1112 fn test_iqformat_fromstr() {
1113 assert_eq!("cu8".parse::<IqFormat>().unwrap(), IqFormat::Cu8);
1114 assert_eq!("cs8".parse::<IqFormat>().unwrap(), IqFormat::Cs8);
1115 assert_eq!("cs16".parse::<IqFormat>().unwrap(), IqFormat::Cs16);
1116 assert_eq!("cf32".parse::<IqFormat>().unwrap(), IqFormat::Cf32);
1117 }
1118
1119 #[test]
1120 fn test_iqformat_fromstr_case_insensitive() {
1121 assert_eq!("CU8".parse::<IqFormat>().unwrap(), IqFormat::Cu8);
1122 assert_eq!("Cs8".parse::<IqFormat>().unwrap(), IqFormat::Cs8);
1123 assert_eq!("CS16".parse::<IqFormat>().unwrap(), IqFormat::Cs16);
1124 assert_eq!("CF32".parse::<IqFormat>().unwrap(), IqFormat::Cf32);
1125 }
1126
1127 #[test]
1128 fn test_iqformat_fromstr_invalid() {
1129 assert!("invalid".parse::<IqFormat>().is_err());
1130 assert!("cu16".parse::<IqFormat>().is_err());
1131 assert!("".parse::<IqFormat>().is_err());
1132 }
1133
1134 #[test]
1135 fn test_iqformat_roundtrip() {
1136 let formats = [IqFormat::Cu8, IqFormat::Cs8, IqFormat::Cs16, IqFormat::Cf32];
1137 for format in formats {
1138 let s = format.to_string();
1139 let parsed: IqFormat = s.parse().unwrap();
1140 assert_eq!(parsed, format);
1141 }
1142 }
1143
1144 #[test]
1145 #[cfg(feature = "pluto")]
1146 fn test_pluto_uri_parsing() {
1147 use std::str::FromStr;
1148
1149 let config = DeviceConfig::from_str("pluto://192.168.2.1?freq=1090M&rate=2.4M").unwrap();
1151 if let DeviceConfig::Pluto(pluto_config) = config {
1152 assert_eq!(pluto_config.uri, "192.168.2.1");
1153 } else {
1154 panic!("Expected PlutoSDR config");
1155 }
1156
1157 let config = DeviceConfig::from_str("pluto://ip:192.168.2.1?freq=1090M&rate=2.4M").unwrap();
1159 if let DeviceConfig::Pluto(pluto_config) = config {
1160 assert_eq!(pluto_config.uri, "ip:192.168.2.1");
1161 } else {
1162 panic!("Expected PlutoSDR config");
1163 }
1164
1165 let config = DeviceConfig::from_str("pluto:///usb:1.18.5?freq=1090M&rate=2.4M").unwrap();
1167 if let DeviceConfig::Pluto(pluto_config) = config {
1168 assert_eq!(pluto_config.uri, "usb:1.18.5");
1169 } else {
1170 panic!("Expected PlutoSDR config");
1171 }
1172
1173 let config = DeviceConfig::from_str("pluto:///usb:?freq=1090M&rate=2.4M").unwrap();
1175 if let DeviceConfig::Pluto(pluto_config) = config {
1176 assert_eq!(pluto_config.uri, "usb:");
1177 } else {
1178 panic!("Expected PlutoSDR config");
1179 }
1180 }
1181}