1use anyhow::Result;
8use log::{trace, warn};
9
10use crate::block::{Block, BlockRet};
11use crate::iir_filter::CappedFilter;
12use crate::stream::{ReadStream, WriteStream};
13use crate::{Error, Float};
14
15pub trait TED: Send {}
17
18pub struct TEDZeroCrossing {}
20
21impl TEDZeroCrossing {
22 pub fn new() -> Self {
24 Self {}
25 }
26}
27
28impl Default for TEDZeroCrossing {
29 fn default() -> Self {
30 Self::new()
31 }
32}
33
34impl TED for TEDZeroCrossing {}
37
38#[derive(rustradio_macros::Block)]
45#[rustradio(crate)]
46pub struct SymbolSync {
47 sps: Float,
48 max_deviation: Float,
49 clock: Float,
50 _ted: Box<dyn TED>,
51 clock_filter: Box<dyn CappedFilter<Float>>,
52 last_sign: bool,
53 stream_pos: Float,
54 last_sym_boundary_pos: Float,
55 next_sym_middle: Float,
56 #[rustradio(in)]
57 src: ReadStream<Float>,
58 #[rustradio(out)]
59 dst: WriteStream<Float>,
60 #[rustradio(out)]
61 out_clock: Option<WriteStream<Float>>,
62}
63
64impl SymbolSync {
65 pub fn new(
71 src: ReadStream<Float>,
72 sps: Float,
73 max_deviation: Float,
74 ted: Box<dyn TED>,
75 mut clock_filter: Box<dyn CappedFilter<Float>>,
76 ) -> (Self, ReadStream<Float>) {
77 assert!(sps > 1.0);
78 clock_filter.fill(sps);
79 let (dst, dr) = crate::stream::new_stream();
80 (
81 Self {
82 src,
83 dst,
84 sps,
85 clock: sps,
86 _ted: ted,
87 clock_filter,
88 max_deviation,
89 last_sign: false,
90 stream_pos: 0.0,
91 last_sym_boundary_pos: 0.0,
92 next_sym_middle: 0.0,
93 out_clock: None,
94 },
95 dr,
96 )
97 }
98
99 pub fn out_clock(&mut self) -> Option<ReadStream<Float>> {
104 if self.out_clock.is_some() {
105 warn!("SymbolSync::out_clock() called more than once");
106 return None;
107 }
108 let (tx, rx) = crate::stream::new_stream();
109 self.out_clock = Some(tx);
110 Some(rx)
111 }
112}
113
114impl Block for SymbolSync {
115 fn work(&mut self) -> Result<BlockRet, Error> {
116 let (input, _tags) = self.src.read_buf()?;
117 if input.is_empty() {
118 return Ok(BlockRet::WaitForStream(&self.src, 1));
119 }
120 let mut o = self.dst.write_buf()?;
121 if o.is_empty() {
122 return Ok(BlockRet::WaitForStream(&self.dst, 1));
123 }
124 let mut out_clock = self.out_clock.as_mut().map(|x| x.write_buf().unwrap());
126
127 let mut n = 0; let mut opos = 0; let olen = o.len();
130 let oslice = o.slice();
131 for sample in input.iter() {
132 n += 1;
133 if self.stream_pos >= self.next_sym_middle {
134 oslice[opos] = *sample;
136 if let Some(ref mut s) = out_clock {
137 s.slice()[opos] = self.clock;
138 }
139 opos += 1;
140 self.next_sym_middle += self.clock;
141 if opos == olen {
142 break;
143 }
144 }
145 let sign = *sample > 0.0;
146 if sign != self.last_sign {
147 if self.stream_pos > 0.0 && self.last_sym_boundary_pos > 0.0 {
148 assert!(
149 self.stream_pos > self.last_sym_boundary_pos,
150 "{} not > {}",
151 self.stream_pos,
152 self.last_sym_boundary_pos
153 );
154 let mi = self.sps - self.max_deviation;
155 let mx = self.sps + self.max_deviation;
156 let mut t = self.stream_pos - self.last_sym_boundary_pos;
157 let pre = self.clock;
158 while t > mx {
159 let t2 = t - self.clock;
160 if (t - self.clock).abs() < (t2 - self.clock).abs() {
161 break;
162 }
163 t = t2;
164 }
165 if t > mi * 0.8 && t < mx * 1.2 {
166 assert!(
167 t > 0.0,
168 "t negative {} {}",
169 self.stream_pos,
170 self.last_sym_boundary_pos
171 );
172 self.clock = self.clock_filter.filter_capped(
173 t - self.sps,
174 mi - self.sps,
175 mx - self.sps,
176 ) + self.sps;
177 self.next_sym_middle = self.last_sym_boundary_pos + self.clock / 2.0;
178 while self.next_sym_middle < self.stream_pos {
179 self.next_sym_middle += self.clock;
180 }
181 trace!(
182 "SymbolSync: clock@{} pre={pre} now={t} min={mi} max={mx} => {}",
183 self.stream_pos, self.clock
184 );
185 }
186 }
187 self.last_sym_boundary_pos = self.stream_pos;
188 self.last_sign = sign;
189 }
190 self.stream_pos += 1.0;
191 let step_back = 10.0 * self.clock;
193 if self.stream_pos > step_back
194 && self.last_sym_boundary_pos > step_back
195 && self.next_sym_middle > step_back
196 {
197 self.stream_pos -= step_back;
198 self.last_sym_boundary_pos -= step_back;
199 self.next_sym_middle -= step_back;
200 }
201 }
202 input.consume(n);
203 o.produce(opos, &[]);
204 if let Some(s) = out_clock {
205 s.produce(opos, &[]);
206 }
207 Ok(BlockRet::Ok)
208 }
209}
210