1use std::io::{Read, Seek, Write};
9
10use log::debug;
11use serde::{Deserialize, Serialize};
12
13const DATATYPE_CF32: &str = "cf32";
14pub const VERSION: &str = "1.1.0";
15
16use crate::block::{Block, BlockRet};
17use crate::stream::{ReadStream, WriteStream};
18use crate::{Complex, Error, Float, Repeat, Result, Sample};
19
20impl From<serde_json::Error> for Error {
21 fn from(e: serde_json::Error) -> Self {
22 Error::wrap(e, "sigmf")
23 }
24}
25
26#[allow(dead_code)]
28#[derive(Serialize, Deserialize, Debug, Default)]
29pub struct Capture {
30 #[serde(rename = "core:sample_start")]
33 pub core_sample_start: u64,
34
35 #[serde(rename = "core:global_index", skip_serializing_if = "Option::is_none")]
38 pub core_global_index: Option<u64>,
39
40 #[serde(rename = "core:header_bytes", skip_serializing_if = "Option::is_none")]
42 pub core_header_bytes: Option<u64>,
43
44 #[serde(rename = "core:frequency", skip_serializing_if = "Option::is_none")]
46 pub core_frequency: Option<f64>,
47
48 #[serde(rename = "core:datetime", skip_serializing_if = "Option::is_none")]
50 pub core_datetime: Option<String>,
51 }
55
56impl Capture {
57 #[must_use]
58 pub fn new(start: u64) -> Self {
59 Self {
60 core_sample_start: start,
61 ..Default::default()
62 }
63 }
64}
65
66#[allow(dead_code)]
68#[derive(Serialize, Deserialize, Debug, Default)]
69pub struct Annotation {
70 #[serde(rename = "core:sample_start")]
72 pub core_sample_start: u64,
73
74 #[serde(rename = "core:sample_count", skip_serializing_if = "Option::is_none")]
76 pub core_sample_count: Option<u64>,
77
78 #[serde(rename = "core:generator", skip_serializing_if = "Option::is_none")]
80 pub core_generator: Option<String>,
81
82 #[serde(rename = "core:label", skip_serializing_if = "Option::is_none")]
84 pub core_label: Option<String>,
85
86 #[serde(rename = "core:comment", skip_serializing_if = "Option::is_none")]
88 pub core_comment: Option<String>,
89
90 #[serde(
92 rename = "core:freq_lower_edge",
93 skip_serializing_if = "Option::is_none"
94 )]
95 pub core_freq_lower_edge: Option<f64>,
96
97 #[serde(
99 rename = "core:freq_upper_edge",
100 skip_serializing_if = "Option::is_none"
101 )]
102 pub core_freq_upper_edge: Option<f64>,
103
104 #[serde(rename = "core:uuid", skip_serializing_if = "Option::is_none")]
106 pub core_uuid: Option<String>,
107}
108
109#[allow(dead_code)]
111#[derive(Serialize, Deserialize, Debug, Default)]
112pub struct Global {
113 #[serde(rename = "core:datatype")]
115 pub core_datatype: String,
116
117 #[serde(rename = "core:sample_rate", skip_serializing_if = "Option::is_none")]
119 pub core_sample_rate: Option<f64>,
120
121 #[serde(rename = "core:version")]
123 pub core_version: String,
124
125 #[serde(rename = "core:num_channels", skip_serializing_if = "Option::is_none")]
127 pub core_num_channels: Option<u64>,
128
129 #[serde(rename = "core:sha512", skip_serializing_if = "Option::is_none")]
131 pub core_sha512: Option<String>,
132
133 #[serde(rename = "core:description", skip_serializing_if = "Option::is_none")]
136 pub core_description: Option<String>,
137
138 #[serde(rename = "core:author", skip_serializing_if = "Option::is_none")]
140 pub core_author: Option<String>,
141
142 #[serde(rename = "core:recorder", skip_serializing_if = "Option::is_none")]
146 pub core_recorder: Option<String>,
147
148 #[serde(rename = "core:license", skip_serializing_if = "Option::is_none")]
150 pub core_license: Option<String>,
151
152 #[serde(rename = "core:hw", skip_serializing_if = "Option::is_none")]
154 pub core_hw: Option<String>,
155 }
162
163#[allow(dead_code)]
165#[derive(Serialize, Deserialize, Debug, Default)]
166pub struct SigMF {
167 pub global: Global,
169
170 #[serde()]
172 pub captures: Vec<Capture>,
173
174 #[serde(default)]
176 pub annotations: Vec<Annotation>,
177}
178
179impl SigMF {
180 #[must_use]
184 pub fn new(typ: String) -> Self {
185 Self {
186 global: Global {
187 core_version: "1.1.0".to_owned(),
188 core_datatype: typ,
189 ..Default::default()
190 },
191 captures: vec![],
192 annotations: vec![],
193 }
194 }
195}
196
197pub fn parse_meta(contents: &str) -> Result<SigMF> {
199 Ok(serde_json::from_str(contents)?)
200}
201
202pub fn write<P: AsRef<std::path::Path>>(path: P, samp_rate: f64, freq: f64) -> Result<()> {
204 let data = SigMF {
205 global: Global {
206 core_version: VERSION.to_string(),
207 core_datatype: DATATYPE_CF32.to_string(),
208 core_sample_rate: Some(samp_rate),
209 ..Default::default()
210 },
211 captures: vec![Capture {
212 core_sample_start: 0,
213 core_frequency: Some(freq),
214 ..Default::default()
215 }],
216 annotations: Vec::new(),
217 };
218
219 let serialized = serde_json::to_string(&data)?;
221
222 let mut file = std::fs::File::create(path)?;
224 file.write_all(serialized.as_bytes())?;
225 Ok(())
226}
227
228pub struct SigMFSourceBuilder<T> {
230 filename: std::path::PathBuf,
231 repeat: Repeat,
232 ignore_type_error: bool,
233 sample_rate: Option<f64>,
234 dummy: std::marker::PhantomData<T>,
235}
236
237impl<T: Sample + Type> SigMFSourceBuilder<T> {
238 #[must_use]
240 pub fn sample_rate(mut self, rate: f64) -> Self {
241 self.sample_rate = Some(rate);
242 self
243 }
244 #[must_use]
246 pub fn repeat(mut self, repeat: Repeat) -> Self {
247 self.repeat = repeat;
248 self
249 }
250 #[must_use]
255 pub fn ignore_type_error(mut self) -> Self {
256 self.ignore_type_error = true;
257 self
258 }
259 pub fn build(self) -> Result<(SigMFSource<T>, ReadStream<T>)> {
261 let mut ret = SigMFSource::new2(&self.filename, self.sample_rate, self.ignore_type_error)?;
262 ret.0.repeat = self.repeat;
263 Ok(ret)
264 }
265}
266
267#[derive(rustradio_macros::Block)]
269#[rustradio(crate)]
270pub struct SigMFSource<T: Sample> {
271 file: std::fs::File,
272 meta: SigMF,
273 range: (u64, u64),
274 left: u64,
275 repeat: Repeat,
276 buf: Vec<u8>,
277 #[rustradio(out)]
278 dst: WriteStream<T>,
279}
280
281pub trait Type {
283 fn type_string() -> &'static str;
285}
286
287impl Type for i32 {
288 fn type_string() -> &'static str {
289 "ri32"
290 }
291}
292
293impl Type for u8 {
294 fn type_string() -> &'static str {
295 "ru8"
296 }
297}
298
299impl Type for num_complex::Complex<i32> {
300 fn type_string() -> &'static str {
301 "ci32"
302 }
303}
304
305impl Type for Complex {
306 fn type_string() -> &'static str {
307 assert_eq![std::mem::size_of::<Float>(), 4];
309 "cf32"
310 }
311}
312
313impl Type for Float {
314 fn type_string() -> &'static str {
315 assert_eq![std::mem::size_of::<Float>(), 4];
317 "rf32"
318 }
319}
320
321fn base_append<P: AsRef<std::path::Path>>(path: P, s: &str) -> std::path::PathBuf {
322 let path_ref = path.as_ref();
323 let parent = path_ref.parent();
324 let filename = path_ref.file_name().unwrap_or_default();
326 let mut new_filename = filename.to_os_string();
327 new_filename.push(s);
328 if let Some(parent) = parent {
329 parent.join(new_filename)
330 } else {
331 std::path::PathBuf::from(new_filename)
332 }
333}
334
335impl<T: Sample + Type> SigMFSource<T> {
336 #[must_use]
341 pub fn builder(filename: std::path::PathBuf) -> SigMFSourceBuilder<T> {
342 SigMFSourceBuilder {
343 filename,
344 ignore_type_error: false,
345 repeat: Repeat::finite(1),
346 sample_rate: None,
347 dummy: std::marker::PhantomData,
348 }
349 }
350
351 pub fn new<P: AsRef<std::path::Path>>(
359 path: P,
360 samp_rate: Option<f64>,
361 ) -> Result<(Self, ReadStream<T>)> {
362 Self::new2(path, samp_rate, false)
363 }
364
365 fn new2<P: AsRef<std::path::Path>>(
367 path: P,
368 samp_rate: Option<f64>,
369 ignore_type_error: bool,
370 ) -> Result<(Self, ReadStream<T>)> {
371 let (block, dst) = if std::fs::exists(&path)? {
372 Self::from_archive(&path)?
373 } else {
374 match Self::from_recording(&path) {
375 Err(e) => {
376 return Err(Error::msg(format!(
377 "SigMF Archive '{}' doesn't exist, and trying to read separated Recording files failed too: {e}",
378 path.as_ref().display()
379 )));
380 }
381 Ok(r) => r,
382 }
383 };
384 let meta = block.meta();
385 if let Some(samp_rate) = samp_rate
386 && let Some(t) = meta.global.core_sample_rate
387 && t != samp_rate
388 {
389 return Err(Error::msg(format!(
390 "sigmf file {} sample rate ({}) is not the expected {}",
391 path.as_ref().display(),
392 t,
393 samp_rate
394 )));
395 }
396 if !ignore_type_error {
398 let expected_type = T::type_string().to_owned() + "_le";
399 if meta.global.core_datatype != expected_type {
400 return Err(Error::msg(format!(
401 "sigmf file {} data type ({}) not the expected {}",
402 path.as_ref().display(),
403 meta.global.core_datatype,
404 expected_type
405 )));
406 }
407 }
408 Ok((block, dst))
409 }
410 fn from_recording<P: AsRef<std::path::Path>>(base: P) -> Result<(Self, ReadStream<T>)> {
413 let meta: SigMF = {
414 let file = std::fs::File::open(base_append(&base, "-meta"))?;
415 let reader = std::io::BufReader::new(file);
416 serde_json::from_reader(reader)?
417 };
418 let file = std::fs::File::open(base_append(base, "-data"))?;
419 let range = (0, file.metadata()?.len());
420 let (dst, rx) = crate::stream::new_stream();
421 Ok((
422 Self {
423 file,
424 meta,
425 range,
426 repeat: Repeat::finite(1),
427 left: range.1,
428 buf: vec![],
429 dst,
430 },
431 rx,
432 ))
433 }
434 fn from_archive<P: AsRef<std::path::Path>>(filename: P) -> Result<(Self, ReadStream<T>)> {
436 let (mut file, mut archive) = {
437 let file = std::fs::File::open(&filename)?;
438 let file2 = file.try_clone()?;
439 let archive = tar::Archive::new(file);
440 (file2, archive)
441 };
442 let mut found = None;
443
444 for entry in archive.entries_with_seek()? {
446 let mut entry = entry?;
447 if entry.path()?.extension().unwrap_or_default() != "sigmf-meta" {
448 continue;
449 }
450 debug!("Tar contents: {:?}", entry.path()?);
451 match entry.header().entry_type() {
452 tar::EntryType::Regular => {}
453 other => {
454 return Err(Error::msg(format!("data file is of bad type {other:?}")));
455 }
456 }
457 let mut s = String::new();
458 entry.read_to_string(&mut s)?;
459 let metaname = {
460 let mut metaname = entry.path()?.into_owned();
461 let new_filename = metaname
464 .file_name()
465 .expect("can't happen: we know it ends in sigmf-meta")
466 .to_str()
467 .ok_or(Error::msg("file name with bad UTF-8?"))?
468 .to_owned();
469 let new_filename = &new_filename[..(new_filename.len() - 5)];
470 metaname.set_file_name(new_filename);
471 metaname
472 };
473 found = Some(match found {
474 Some(_) => {
475 return Err(Error::msg(
476 "sigmf doesn't yet support multiple recordings in an archive",
477 ));
478 }
479 None => (metaname, s),
480 });
481 }
482 let (base, meta_string) = match found {
483 None => return Err(Error::msg("sigmf doesn't contain any recording")),
484 Some((b, m)) => (b, m),
485 };
486
487 let want = base_append(&base, "-data");
489 let range = {
490 let mut range = None;
491 let mut file = file.try_clone()?;
492 file.seek(std::io::SeekFrom::Start(0))?;
493 let mut archive = tar::Archive::new(file);
494 for e in archive.entries_with_seek()? {
495 let e = e?;
496 let got = e.path()?.into_owned().into_os_string();
497 if got != want {
498 continue;
499 }
500 match e.header().entry_type() {
501 tar::EntryType::Regular => {}
502 tar::EntryType::GNUSparse => {
503 return Err(Error::msg(
504 "SigMF source block doesn't support sparse tar files",
505 ));
506 }
507 other => {
508 return Err(Error::msg(format!("data file is of bad type {other:?}")));
509 }
510 }
511 range = match range {
512 None => Some((e.raw_file_position(), e.size())),
513 Some(_) => {
514 return Err(Error::msg(format!(
515 "Multiple files named '{}' in archive",
516 want.display()
517 )));
518 }
519 };
520 }
521 range
522 };
523 let range = range.ok_or(Error::msg(format!(
524 "data file for base {} missing",
525 base.display()
526 )))?;
527 file.seek(std::io::SeekFrom::Start(range.0))?;
528 let meta = parse_meta(&meta_string)?;
529 let (dst, rx) = crate::stream::new_stream();
530 Ok((
531 Self {
532 file,
533 meta,
534 range,
535 repeat: Repeat::finite(1),
536 left: range.1,
537 buf: vec![],
538 dst,
539 },
540 rx,
541 ))
542 }
543 #[must_use]
545 pub fn sample_rate(&self) -> Option<f64> {
546 self.meta.global.core_sample_rate
547 }
548 #[must_use]
550 pub fn meta(&self) -> &SigMF {
551 &self.meta
552 }
553}
554
555fn u64_to_clamped_usize(v: u64) -> usize {
556 if v > (usize::MAX as u64) {
557 usize::MAX
558 } else {
559 v as usize
560 }
561}
562
563impl<T> Block for SigMFSource<T>
564where
565 T: Sample<Type = T> + std::fmt::Debug + Type,
566{
567 fn work(&mut self) -> Result<BlockRet<'_>> {
568 if self.left == 0 {
569 if self.repeat.again() {
570 self.file.seek(std::io::SeekFrom::Start(self.range.0))?;
571 self.left = self.range.1;
572 } else {
573 return Ok(BlockRet::EOF);
574 }
575 }
576 let mut o = self.dst.write_buf()?;
577 if o.is_empty() {
578 return Ok(BlockRet::WaitForStream(&self.dst, 1));
579 }
580 let sample_size = T::size();
581 let have = self.buf.len() / sample_size;
582 let want = o.len();
583 if have == 0 {
584 let left = u64_to_clamped_usize(self.left);
585 let want_bytes = std::cmp::min(want * sample_size, left);
586 assert_ne!(want_bytes, 0);
587 let mut buffer = vec![0; want_bytes];
588 let n = self.file.read(&mut buffer)?;
589 assert!(n <= left);
590 assert_ne!(n, 0);
592 self.left -= n as u64;
593 self.buf.extend(&buffer[..n]);
594 }
595 let have = self.buf.len() / sample_size;
596 let samples = std::cmp::min(have, want);
597 o.fill_from_iter(
598 self.buf
599 .chunks_exact(sample_size)
600 .take(samples)
601 .map(|d| T::parse(d).expect("failed to parse a sample")),
602 );
603 o.produce(samples, &[]);
604 self.buf.drain(..(samples * sample_size));
605 Ok(BlockRet::WaitForStream(&self.dst, 1))
606 }
607}