1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
//! FFT stream.
//!
//! Takes a stream of data, runs an FFT on it, and outputs it as a stream.
//! The consumer of the stream needs to know what the FFT size is, or it won't
//! be able to make sense of it.
use crate::Result;
use rustfft::FftPlanner;
use crate::block::{Block, BlockRet};
use crate::stream::{ReadStream, WriteStream};
use crate::{Complex, Float};
/// Takes a stream of data, runs an FFT on it, and outputs it as a stream.
/// The consumer of the stream needs to know what the FFT size is, or it won't
/// be able to make sense of it.
#[derive(rustradio_macros::Block)]
#[rustradio(crate)]
pub struct FftStream {
size: usize,
fft: std::sync::Arc<dyn rustfft::Fft<Float>>,
threaded: bool,
#[rustradio(in)]
src: ReadStream<Complex>,
#[rustradio(out)]
dst: WriteStream<Complex>,
}
impl FftStream {
/// Create a new `FftStream`.
#[must_use]
pub fn new(src: ReadStream<Complex>, size: usize) -> (Self, ReadStream<Complex>) {
let mut planner = FftPlanner::new();
let fft = planner.plan_fft_forward(size);
let (dst, dr) = crate::stream::new_stream();
(
Self {
size,
fft,
src,
dst,
threaded: false,
},
dr,
)
}
/// Turn on or off Rayon multithreading.
///
/// Initial benchmarks seem to indicate that this does not help. Maybe with
/// bigger than default stream buffers for more concurrency.
pub fn threaded(&mut self, onoff: bool) {
self.threaded = onoff;
}
}
impl Block for FftStream {
fn work(&mut self) -> Result<BlockRet<'_>> {
let (input, _tags) = self.src.read_buf()?;
let ii = input.slice();
if ii.len() < self.size {
return Ok(BlockRet::WaitForStream(&self.src, self.size));
}
let mut o = self.dst.write_buf()?;
let oo = o.slice();
if oo.len() < self.size {
return Ok(BlockRet::WaitForStream(&self.dst, self.size));
}
let len = std::cmp::min(ii.len(), oo.len());
let len = len - (len % self.size);
oo[..len].copy_from_slice(&ii[..len]);
// It would be nice to use fft.process_outofplace_with_scratch(), but it
// requires input also be scratch space, and therefore mutable.
if self.threaded {
use rayon::prelude::*;
oo.par_chunks_exact_mut(self.size).for_each(|chunk| {
self.fft.process(chunk);
});
} else {
oo.chunks_exact_mut(self.size).for_each(|chunk| {
self.fft.process(chunk);
});
}
input.consume(len);
o.produce(len, &[]);
Ok(BlockRet::Again)
}
}
/* vim: textwidth=80
*/