futuresdr/blocks/
file_source.rs1use crate::prelude::*;
2use std::path::Path;
3use std::path::PathBuf;
4
5#[derive(Block)]
31pub struct FileSource<T: Send + 'static, O: CpuBufferWriter<Item = T> = DefaultCpuWriter<T>> {
32 file_path: PathBuf,
33 file: Option<async_fs::File>,
34 repeat: bool,
35 #[output]
36 output: O,
37}
38
39impl<T: Send + 'static, O: CpuBufferWriter<Item = T>> FileSource<T, O> {
40 pub fn new(file_path: impl AsRef<Path>, repeat: bool) -> Self {
42 Self {
43 file_path: file_path.as_ref().to_path_buf(),
44 file: None,
45 repeat,
46 output: O::default(),
47 }
48 }
49}
50
51#[doc(hidden)]
52impl<T: Send + 'static, O: CpuBufferWriter<Item = T>> Kernel for FileSource<T, O> {
53 async fn work(
54 &mut self,
55 io: &mut WorkIo,
56 _mio: &mut MessageOutputs,
57 _meta: &mut BlockMeta,
58 ) -> Result<()> {
59 let out = self.output.slice();
60
61 let out_bytes = unsafe {
62 std::slice::from_raw_parts_mut(out.as_ptr() as *mut u8, std::mem::size_of_val(out))
63 };
64
65 let item_size = std::mem::size_of::<T>();
66 let mut i = 0;
67
68 while i < out_bytes.len() {
69 match self.file.as_mut().unwrap().read(&mut out_bytes[i..]).await {
70 Ok(0) => {
71 if self.repeat {
72 self.file =
73 Some(async_fs::File::open(self.file_path.clone()).await.unwrap());
74 } else {
75 io.finished = true;
76 break;
77 }
78 }
79 Ok(written) => {
80 i += written;
81 }
82 Err(e) => panic!("FileSource: Error reading from file: {e:?}"),
83 }
84 }
85
86 self.output.produce(i / item_size);
87
88 Ok(())
89 }
90
91 async fn init(&mut self, _mio: &mut MessageOutputs, _meta: &mut BlockMeta) -> Result<()> {
92 self.file = Some(async_fs::File::open(self.file_path.clone()).await.unwrap());
93 Ok(())
94 }
95}