futuresdr/blocks/
file_source.rs

1use crate::prelude::*;
2use std::path::Path;
3use std::path::PathBuf;
4
5/// Read samples from a file.
6///
7/// Samples are assumed to be encoded in the native format for the runtime. For
8/// example, on most machines, that means little endian. For complex samples,
9/// the real component must come before the imaginary component.
10///
11/// # Inputs
12///
13/// No inputs.
14///
15/// # Outputs
16///
17/// `out`: Output samples
18///
19/// # Usage
20/// ```no_run
21/// use futuresdr::blocks::FileSource;
22/// use futuresdr::runtime::Flowgraph;
23/// use num_complex::Complex;
24///
25/// let mut fg = Flowgraph::new();
26///
27/// // Loads 8-byte samples from the file
28/// let source = fg.add_block(FileSource::<Complex<f32>>::new("my_filename.cf32", false));
29/// ```
30#[derive(Block)]
31pub struct FileSource<T: Send + 'static, O: CpuBufferWriter<Item = T> = DefaultCpuWriter<T>> {
32    file_path: PathBuf,
33    file: Option<async_fs::File>,
34    repeat: bool,
35    #[output]
36    output: O,
37}
38
39impl<T: Send + 'static, O: CpuBufferWriter<Item = T>> FileSource<T, O> {
40    /// Create FileSource block
41    pub fn new(file_path: impl AsRef<Path>, repeat: bool) -> Self {
42        Self {
43            file_path: file_path.as_ref().to_path_buf(),
44            file: None,
45            repeat,
46            output: O::default(),
47        }
48    }
49}
50
51#[doc(hidden)]
52impl<T: Send + 'static, O: CpuBufferWriter<Item = T>> Kernel for FileSource<T, O> {
53    async fn work(
54        &mut self,
55        io: &mut WorkIo,
56        _mio: &mut MessageOutputs,
57        _meta: &mut BlockMeta,
58    ) -> Result<()> {
59        let out = self.output.slice();
60
61        let out_bytes = unsafe {
62            std::slice::from_raw_parts_mut(out.as_ptr() as *mut u8, std::mem::size_of_val(out))
63        };
64
65        let item_size = std::mem::size_of::<T>();
66        let mut i = 0;
67
68        while i < out_bytes.len() {
69            match self.file.as_mut().unwrap().read(&mut out_bytes[i..]).await {
70                Ok(0) => {
71                    if self.repeat {
72                        self.file =
73                            Some(async_fs::File::open(self.file_path.clone()).await.unwrap());
74                    } else {
75                        io.finished = true;
76                        break;
77                    }
78                }
79                Ok(written) => {
80                    i += written;
81                }
82                Err(e) => panic!("FileSource: Error reading from file: {e:?}"),
83            }
84        }
85
86        self.output.produce(i / item_size);
87
88        Ok(())
89    }
90
91    async fn init(&mut self, _mio: &mut MessageOutputs, _meta: &mut BlockMeta) -> Result<()> {
92        self.file = Some(async_fs::File::open(self.file_path.clone()).await.unwrap());
93        Ok(())
94    }
95}