rustradio/
sigmf.rs

1//! SigMF implementation.
2
3/*
4 * TODO:
5 * create sink block.
6 * add sigmf archive (tar) support.
7 */
8use std::io::{Read, Seek, Write};
9
10use log::debug;
11use serde::{Deserialize, Serialize};
12
13const DATATYPE_CF32: &str = "cf32";
14pub const VERSION: &str = "1.1.0";
15
16use crate::block::{Block, BlockRet};
17use crate::stream::{ReadStream, WriteStream};
18use crate::{Complex, Error, Float, Repeat, Result, Sample};
19
20impl From<serde_json::Error> for Error {
21    fn from(e: serde_json::Error) -> Self {
22        Error::wrap(e, "sigmf")
23    }
24}
25
26/// Capture segment.
27#[allow(dead_code)]
28#[derive(Serialize, Deserialize, Debug, Default)]
29pub struct Capture {
30    /// Sample index in the dataset file at which this segment takes
31    /// effect.
32    #[serde(rename = "core:sample_start")]
33    pub core_sample_start: u64,
34
35    /// The index of the sample referenced by `sample_start` relative
36    /// to an original sample stream.
37    #[serde(rename = "core:global_index", skip_serializing_if = "Option::is_none")]
38    pub core_global_index: Option<u64>,
39
40    /// Header bytes to skip.
41    #[serde(rename = "core:header_bytes", skip_serializing_if = "Option::is_none")]
42    pub core_header_bytes: Option<u64>,
43
44    /// Frequency of capture.
45    #[serde(rename = "core:frequency", skip_serializing_if = "Option::is_none")]
46    pub core_frequency: Option<f64>,
47
48    /// ISO8601 string for when this was captured.
49    #[serde(rename = "core:datetime", skip_serializing_if = "Option::is_none")]
50    pub core_datetime: Option<String>,
51    // In my example, but not in the spec.
52    //#[serde(rename="core:length")]
53    //core_length: u64,
54}
55
56impl Capture {
57    #[must_use]
58    pub fn new(start: u64) -> Self {
59        Self {
60            core_sample_start: start,
61            ..Default::default()
62        }
63    }
64}
65
66/// Annotation segment.
67#[allow(dead_code)]
68#[derive(Serialize, Deserialize, Debug, Default)]
69pub struct Annotation {
70    /// Sample offset.
71    #[serde(rename = "core:sample_start")]
72    pub core_sample_start: u64,
73
74    /// Annotation width.
75    #[serde(rename = "core:sample_count", skip_serializing_if = "Option::is_none")]
76    pub core_sample_count: Option<u64>,
77
78    /// Annotation creator.
79    #[serde(rename = "core:generator", skip_serializing_if = "Option::is_none")]
80    pub core_generator: Option<String>,
81
82    /// Annotation label.
83    #[serde(rename = "core:label", skip_serializing_if = "Option::is_none")]
84    pub core_label: Option<String>,
85
86    /// Comment.
87    #[serde(rename = "core:comment", skip_serializing_if = "Option::is_none")]
88    pub core_comment: Option<String>,
89
90    /// Frequency lower edge.
91    #[serde(
92        rename = "core:freq_lower_edge",
93        skip_serializing_if = "Option::is_none"
94    )]
95    pub core_freq_lower_edge: Option<f64>,
96
97    /// Frequency upper edge.
98    #[serde(
99        rename = "core:freq_upper_edge",
100        skip_serializing_if = "Option::is_none"
101    )]
102    pub core_freq_upper_edge: Option<f64>,
103
104    /// UUID.
105    #[serde(rename = "core:uuid", skip_serializing_if = "Option::is_none")]
106    pub core_uuid: Option<String>,
107}
108
109/// Global object.
110#[allow(dead_code)]
111#[derive(Serialize, Deserialize, Debug, Default)]
112pub struct Global {
113    /// Data format.
114    #[serde(rename = "core:datatype")]
115    pub core_datatype: String,
116
117    /// Sample rate.
118    #[serde(rename = "core:sample_rate", skip_serializing_if = "Option::is_none")]
119    pub core_sample_rate: Option<f64>,
120
121    /// SigMF version.
122    #[serde(rename = "core:version")]
123    pub core_version: String,
124
125    /// Number of channels.
126    #[serde(rename = "core:num_channels", skip_serializing_if = "Option::is_none")]
127    pub core_num_channels: Option<u64>,
128
129    /// SHA512 of the data.
130    #[serde(rename = "core:sha512", skip_serializing_if = "Option::is_none")]
131    pub core_sha512: Option<String>,
132
133    // offset
134    /// Description.
135    #[serde(rename = "core:description", skip_serializing_if = "Option::is_none")]
136    pub core_description: Option<String>,
137
138    /// Author of the recording.
139    #[serde(rename = "core:author", skip_serializing_if = "Option::is_none")]
140    pub core_author: Option<String>,
141
142    // meta_doi
143    // data_doi
144    /// Recorder software.
145    #[serde(rename = "core:recorder", skip_serializing_if = "Option::is_none")]
146    pub core_recorder: Option<String>,
147
148    /// License of the data.
149    #[serde(rename = "core:license", skip_serializing_if = "Option::is_none")]
150    pub core_license: Option<String>,
151
152    /// Hardware used to make the recording.
153    #[serde(rename = "core:hw", skip_serializing_if = "Option::is_none")]
154    pub core_hw: Option<String>,
155    // dataset
156    // trailing_bytes
157    // metadata_only
158    // geolocation
159    // extensions
160    // collection
161}
162
163/// SigMF data.
164#[allow(dead_code)]
165#[derive(Serialize, Deserialize, Debug, Default)]
166pub struct SigMF {
167    /// Global information.
168    pub global: Global,
169
170    /// Capture segments.
171    #[serde()]
172    pub captures: Vec<Capture>,
173
174    /// Annotations on the data.
175    #[serde(default)]
176    pub annotations: Vec<Annotation>,
177}
178
179impl SigMF {
180    /// Create new SigMF object from a data type.
181    ///
182    /// TODO: Should probably not be done from outside the crate.
183    #[must_use]
184    pub fn new(typ: String) -> Self {
185        Self {
186            global: Global {
187                core_version: "1.1.0".to_owned(),
188                core_datatype: typ,
189                ..Default::default()
190            },
191            captures: vec![],
192            annotations: vec![],
193        }
194    }
195}
196
197/// Parse metadata for SigMF file.
198pub fn parse_meta(contents: &str) -> Result<SigMF> {
199    Ok(serde_json::from_str(contents)?)
200}
201
202/// Write metadata file.
203pub fn write<P: AsRef<std::path::Path>>(path: P, samp_rate: f64, freq: f64) -> Result<()> {
204    let data = SigMF {
205        global: Global {
206            core_version: VERSION.to_string(),
207            core_datatype: DATATYPE_CF32.to_string(),
208            core_sample_rate: Some(samp_rate),
209            ..Default::default()
210        },
211        captures: vec![Capture {
212            core_sample_start: 0,
213            core_frequency: Some(freq),
214            ..Default::default()
215        }],
216        annotations: Vec::new(),
217    };
218
219    // Serialize the data to a JSON string.
220    let serialized = serde_json::to_string(&data)?;
221
222    // Create a file and write the serialized string to it.
223    let mut file = std::fs::File::create(path)?;
224    file.write_all(serialized.as_bytes())?;
225    Ok(())
226}
227
228/// SigMF source builder.
229pub struct SigMFSourceBuilder<T> {
230    filename: std::path::PathBuf,
231    repeat: Repeat,
232    ignore_type_error: bool,
233    sample_rate: Option<f64>,
234    dummy: std::marker::PhantomData<T>,
235}
236
237impl<T: Sample + Type> SigMFSourceBuilder<T> {
238    /// Force a certain sample rate.
239    #[must_use]
240    pub fn sample_rate(mut self, rate: f64) -> Self {
241        self.sample_rate = Some(rate);
242        self
243    }
244    /// Force a certain sample rate.
245    #[must_use]
246    pub fn repeat(mut self, repeat: Repeat) -> Self {
247        self.repeat = repeat;
248        self
249    }
250    /// Ignore type error.
251    ///
252    /// This is used e.g. if you just want the bytes of the data stream, to
253    /// checksum or something.
254    #[must_use]
255    pub fn ignore_type_error(mut self) -> Self {
256        self.ignore_type_error = true;
257        self
258    }
259    /// Build a SigMFSource.
260    pub fn build(self) -> Result<(SigMFSource<T>, ReadStream<T>)> {
261        let mut ret = SigMFSource::new2(&self.filename, self.sample_rate, self.ignore_type_error)?;
262        ret.0.repeat = self.repeat;
263        Ok(ret)
264    }
265}
266
267/// SigMF file source.
268#[derive(rustradio_macros::Block)]
269#[rustradio(crate)]
270pub struct SigMFSource<T: Sample> {
271    file: std::fs::File,
272    meta: SigMF,
273    range: (u64, u64),
274    left: u64,
275    repeat: Repeat,
276    buf: Vec<u8>,
277    #[rustradio(out)]
278    dst: WriteStream<T>,
279}
280
281/// Trait that needs implementing for all supported SigMF data types.
282pub trait Type {
283    /// Return full type, or endianness prefix of the type.
284    fn type_string() -> &'static str;
285}
286
287impl Type for i32 {
288    fn type_string() -> &'static str {
289        "ri32"
290    }
291}
292
293impl Type for u8 {
294    fn type_string() -> &'static str {
295        "ru8"
296    }
297}
298
299impl Type for num_complex::Complex<i32> {
300    fn type_string() -> &'static str {
301        "ci32"
302    }
303}
304
305impl Type for Complex {
306    fn type_string() -> &'static str {
307        // TODO: support Float being 64bit.
308        assert_eq![std::mem::size_of::<Float>(), 4];
309        "cf32"
310    }
311}
312
313impl Type for Float {
314    fn type_string() -> &'static str {
315        // TODO: support Float being 64bit.
316        assert_eq![std::mem::size_of::<Float>(), 4];
317        "rf32"
318    }
319}
320
321fn base_append<P: AsRef<std::path::Path>>(path: P, s: &str) -> std::path::PathBuf {
322    let path_ref = path.as_ref();
323    let parent = path_ref.parent();
324    // "Or default", or return error?
325    let filename = path_ref.file_name().unwrap_or_default();
326    let mut new_filename = filename.to_os_string();
327    new_filename.push(s);
328    if let Some(parent) = parent {
329        parent.join(new_filename)
330    } else {
331        std::path::PathBuf::from(new_filename)
332    }
333}
334
335impl<T: Sample + Type> SigMFSource<T> {
336    /// Create new SigMF source builder.
337    ///
338    /// If the exact file name exists, then treat it as an Archive.
339    /// If it does not, fall back to checking for separate Recording files.
340    #[must_use]
341    pub fn builder(filename: std::path::PathBuf) -> SigMFSourceBuilder<T> {
342        SigMFSourceBuilder {
343            filename,
344            ignore_type_error: false,
345            repeat: Repeat::finite(1),
346            sample_rate: None,
347            dummy: std::marker::PhantomData,
348        }
349    }
350
351    /// Create a new SigMF source block.
352    ///
353    /// If the exact file name exists, then treat it as an Archive.
354    /// If it does not, fall back to checking for separate Recording files.
355    ///
356    /// If `samp_rate` is provided, and the metadata also provides a sample rate,
357    /// then they *must* match, or an error is returned.
358    pub fn new<P: AsRef<std::path::Path>>(
359        path: P,
360        samp_rate: Option<f64>,
361    ) -> Result<(Self, ReadStream<T>)> {
362        Self::new2(path, samp_rate, false)
363    }
364
365    /// Internal creator used by Builder.
366    fn new2<P: AsRef<std::path::Path>>(
367        path: P,
368        samp_rate: Option<f64>,
369        ignore_type_error: bool,
370    ) -> Result<(Self, ReadStream<T>)> {
371        let (block, dst) = if std::fs::exists(&path)? {
372            Self::from_archive(&path)?
373        } else {
374            match Self::from_recording(&path) {
375                Err(e) => {
376                    return Err(Error::msg(format!(
377                        "SigMF Archive '{}' doesn't exist, and trying to read separated Recording files failed too: {e}",
378                        path.as_ref().display()
379                    )));
380                }
381                Ok(r) => r,
382            }
383        };
384        let meta = block.meta();
385        if let Some(samp_rate) = samp_rate
386            && let Some(t) = meta.global.core_sample_rate
387            && t != samp_rate
388        {
389            return Err(Error::msg(format!(
390                "sigmf file {} sample rate ({}) is not the expected {}",
391                path.as_ref().display(),
392                t,
393                samp_rate
394            )));
395        }
396        // TODO: support i8/u8 and _be.
397        if !ignore_type_error {
398            let expected_type = T::type_string().to_owned() + "_le";
399            if meta.global.core_datatype != expected_type {
400                return Err(Error::msg(format!(
401                    "sigmf file {} data type ({}) not the expected {}",
402                    path.as_ref().display(),
403                    meta.global.core_datatype,
404                    expected_type
405                )));
406            }
407        }
408        Ok((block, dst))
409    }
410    /// Create a new SigMF from separated Recording files.
411    ///
412    fn from_recording<P: AsRef<std::path::Path>>(base: P) -> Result<(Self, ReadStream<T>)> {
413        let meta: SigMF = {
414            let file = std::fs::File::open(base_append(&base, "-meta"))?;
415            let reader = std::io::BufReader::new(file);
416            serde_json::from_reader(reader)?
417        };
418        let file = std::fs::File::open(base_append(base, "-data"))?;
419        let range = (0, file.metadata()?.len());
420        let (dst, rx) = crate::stream::new_stream();
421        Ok((
422            Self {
423                file,
424                meta,
425                range,
426                repeat: Repeat::finite(1),
427                left: range.1,
428                buf: vec![],
429                dst,
430            },
431            rx,
432        ))
433    }
434    /// Create a new SigMF source block.
435    fn from_archive<P: AsRef<std::path::Path>>(filename: P) -> Result<(Self, ReadStream<T>)> {
436        let (mut file, mut archive) = {
437            let file = std::fs::File::open(&filename)?;
438            let file2 = file.try_clone()?;
439            let archive = tar::Archive::new(file);
440            (file2, archive)
441        };
442        let mut found = None;
443
444        // Find the sole metadata.
445        for entry in archive.entries_with_seek()? {
446            let mut entry = entry?;
447            if entry.path()?.extension().unwrap_or_default() != "sigmf-meta" {
448                continue;
449            }
450            debug!("Tar contents: {:?}", entry.path()?);
451            match entry.header().entry_type() {
452                tar::EntryType::Regular => {}
453                other => {
454                    return Err(Error::msg(format!("data file is of bad type {other:?}")));
455                }
456            }
457            let mut s = String::new();
458            entry.read_to_string(&mut s)?;
459            let metaname = {
460                let mut metaname = entry.path()?.into_owned();
461                // Not sure what to do with bad file names. Presumably we can't
462                // count on the encoding allowing us to remove "-meta"?
463                let new_filename = metaname
464                    .file_name()
465                    .expect("can't happen: we know it ends in sigmf-meta")
466                    .to_str()
467                    .ok_or(Error::msg("file name with bad UTF-8?"))?
468                    .to_owned();
469                let new_filename = &new_filename[..(new_filename.len() - 5)];
470                metaname.set_file_name(new_filename);
471                metaname
472            };
473            found = Some(match found {
474                Some(_) => {
475                    return Err(Error::msg(
476                        "sigmf doesn't yet support multiple recordings in an archive",
477                    ));
478                }
479                None => (metaname, s),
480            });
481        }
482        let (base, meta_string) = match found {
483            None => return Err(Error::msg("sigmf doesn't contain any recording")),
484            Some((b, m)) => (b, m),
485        };
486
487        // Find the matching data file.
488        let want = base_append(&base, "-data");
489        let range = {
490            let mut range = None;
491            let mut file = file.try_clone()?;
492            file.seek(std::io::SeekFrom::Start(0))?;
493            let mut archive = tar::Archive::new(file);
494            for e in archive.entries_with_seek()? {
495                let e = e?;
496                let got = e.path()?.into_owned().into_os_string();
497                if got != want {
498                    continue;
499                }
500                match e.header().entry_type() {
501                    tar::EntryType::Regular => {}
502                    tar::EntryType::GNUSparse => {
503                        return Err(Error::msg(
504                            "SigMF source block doesn't support sparse tar files",
505                        ));
506                    }
507                    other => {
508                        return Err(Error::msg(format!("data file is of bad type {other:?}")));
509                    }
510                }
511                range = match range {
512                    None => Some((e.raw_file_position(), e.size())),
513                    Some(_) => {
514                        return Err(Error::msg(format!(
515                            "Multiple files named '{}' in archive",
516                            want.display()
517                        )));
518                    }
519                };
520            }
521            range
522        };
523        let range = range.ok_or(Error::msg(format!(
524            "data file for base {} missing",
525            base.display()
526        )))?;
527        file.seek(std::io::SeekFrom::Start(range.0))?;
528        let meta = parse_meta(&meta_string)?;
529        let (dst, rx) = crate::stream::new_stream();
530        Ok((
531            Self {
532                file,
533                meta,
534                range,
535                repeat: Repeat::finite(1),
536                left: range.1,
537                buf: vec![],
538                dst,
539            },
540            rx,
541        ))
542    }
543    /// Get the sample rate from the meta file.
544    #[must_use]
545    pub fn sample_rate(&self) -> Option<f64> {
546        self.meta.global.core_sample_rate
547    }
548    /// Get the SigMF metadata.
549    #[must_use]
550    pub fn meta(&self) -> &SigMF {
551        &self.meta
552    }
553}
554
555fn u64_to_clamped_usize(v: u64) -> usize {
556    if v > (usize::MAX as u64) {
557        usize::MAX
558    } else {
559        v as usize
560    }
561}
562
563impl<T> Block for SigMFSource<T>
564where
565    T: Sample<Type = T> + std::fmt::Debug + Type,
566{
567    fn work(&mut self) -> Result<BlockRet<'_>> {
568        if self.left == 0 {
569            if self.repeat.again() {
570                self.file.seek(std::io::SeekFrom::Start(self.range.0))?;
571                self.left = self.range.1;
572            } else {
573                return Ok(BlockRet::EOF);
574            }
575        }
576        let mut o = self.dst.write_buf()?;
577        if o.is_empty() {
578            return Ok(BlockRet::WaitForStream(&self.dst, 1));
579        }
580        let sample_size = T::size();
581        let have = self.buf.len() / sample_size;
582        let want = o.len();
583        if have == 0 {
584            let left = u64_to_clamped_usize(self.left);
585            let want_bytes = std::cmp::min(want * sample_size, left);
586            assert_ne!(want_bytes, 0);
587            let mut buffer = vec![0; want_bytes];
588            let n = self.file.read(&mut buffer)?;
589            assert!(n <= left);
590            // Can't get EOF here.
591            assert_ne!(n, 0);
592            self.left -= n as u64;
593            self.buf.extend(&buffer[..n]);
594        }
595        let have = self.buf.len() / sample_size;
596        let samples = std::cmp::min(have, want);
597        o.fill_from_iter(
598            self.buf
599                .chunks_exact(sample_size)
600                .take(samples)
601                .map(|d| T::parse(d).expect("failed to parse a sample")),
602        );
603        o.produce(samples, &[]);
604        self.buf.drain(..(samples * sample_size));
605        Ok(BlockRet::WaitForStream(&self.dst, 1))
606    }
607}