1use log::{trace, warn};
8
9use crate::block::{Block, BlockRet};
10use crate::iir_filter::ClampedFilter;
11use crate::stream::{ReadStream, WriteStream};
12use crate::{Float, Result};
13
14pub trait Ted: Send {}
16
17pub struct TedZeroCrossing {}
19
20impl TedZeroCrossing {
21 pub fn new() -> Self {
23 Self {}
24 }
25}
26
27impl Default for TedZeroCrossing {
28 fn default() -> Self {
29 Self::new()
30 }
31}
32
33impl Ted for TedZeroCrossing {}
36
37#[derive(rustradio_macros::Block)]
44#[rustradio(crate)]
45pub struct SymbolSync {
46 sps: Float,
47 max_deviation: Float,
48 clock: Float,
49 _ted: Box<dyn Ted>,
50 clock_filter: Box<dyn ClampedFilter<Float>>,
51 last_sign: bool,
52 stream_pos: Float,
53 last_sym_boundary_pos: Float,
54 next_sym_middle: Float,
55 #[rustradio(in)]
56 src: ReadStream<Float>,
57 #[rustradio(out)]
58 dst: WriteStream<Float>,
59 #[rustradio(out)]
60 out_clock: Option<WriteStream<Float>>,
61}
62
63impl SymbolSync {
64 pub fn new(
70 src: ReadStream<Float>,
71 sps: Float,
72 max_deviation: Float,
73 ted: Box<dyn Ted>,
74 mut clock_filter: Box<dyn ClampedFilter<Float>>,
75 ) -> (Self, ReadStream<Float>) {
76 assert!(sps > 1.0);
77 clock_filter.fill(sps);
78 let (dst, dr) = crate::stream::new_stream();
79 (
80 Self {
81 src,
82 dst,
83 sps,
84 clock: sps,
85 _ted: ted,
86 clock_filter,
87 max_deviation,
88 last_sign: false,
89 stream_pos: 0.0,
90 last_sym_boundary_pos: 0.0,
91 next_sym_middle: 0.0,
92 out_clock: None,
93 },
94 dr,
95 )
96 }
97
98 pub fn out_clock(&mut self) -> Option<ReadStream<Float>> {
103 if self.out_clock.is_some() {
104 warn!("SymbolSync::out_clock() called more than once");
105 return None;
106 }
107 let (tx, rx) = crate::stream::new_stream();
108 self.out_clock = Some(tx);
109 Some(rx)
110 }
111}
112
113impl Block for SymbolSync {
114 fn work(&mut self) -> Result<BlockRet> {
115 let (input, _tags) = self.src.read_buf()?;
116 if input.is_empty() {
117 return Ok(BlockRet::WaitForStream(&self.src, 1));
118 }
119 let mut o = self.dst.write_buf()?;
120 if o.is_empty() {
121 return Ok(BlockRet::WaitForStream(&self.dst, 1));
122 }
123 let mut out_clock = self.out_clock.as_mut().map(|x| x.write_buf().unwrap());
125
126 let mut n = 0; let mut opos = 0; let olen = o.len();
129 let oslice = o.slice();
130 for sample in input.iter() {
131 n += 1;
132 if self.stream_pos >= self.next_sym_middle {
133 oslice[opos] = *sample;
135 if let Some(ref mut s) = out_clock {
136 s.slice()[opos] = self.clock;
137 }
138 opos += 1;
139 self.next_sym_middle += self.clock;
140 if opos == olen {
141 break;
142 }
143 }
144 let sign = *sample > 0.0;
145 if sign != self.last_sign {
146 if self.stream_pos > 0.0 && self.last_sym_boundary_pos > 0.0 {
147 assert!(
148 self.stream_pos > self.last_sym_boundary_pos,
149 "{} not > {}",
150 self.stream_pos,
151 self.last_sym_boundary_pos
152 );
153 let mi = self.sps - self.max_deviation;
154 let mx = self.sps + self.max_deviation;
155 let mut t = self.stream_pos - self.last_sym_boundary_pos;
156 let pre = self.clock;
157 while t > mx {
158 let t2 = t - self.clock;
159 if (t - self.clock).abs() < (t2 - self.clock).abs() {
160 break;
161 }
162 t = t2;
163 }
164 if t > mi * 0.8 && t < mx * 1.2 {
165 assert!(
166 t > 0.0,
167 "t negative {} {}",
168 self.stream_pos,
169 self.last_sym_boundary_pos
170 );
171 self.clock = self.clock_filter.filter_clamped(
172 t - self.sps,
173 mi - self.sps,
174 mx - self.sps,
175 ) + self.sps;
176 self.next_sym_middle = self.last_sym_boundary_pos + self.clock / 2.0;
177 while self.next_sym_middle < self.stream_pos {
178 self.next_sym_middle += self.clock;
179 }
180 trace!(
181 "SymbolSync: clock@{} pre={pre} now={t} min={mi} max={mx} => {}",
182 self.stream_pos, self.clock
183 );
184 }
185 }
186 self.last_sym_boundary_pos = self.stream_pos;
187 self.last_sign = sign;
188 }
189 self.stream_pos += 1.0;
190 let step_back = 10.0 * self.clock;
192 if self.stream_pos > step_back
193 && self.last_sym_boundary_pos > step_back
194 && self.next_sym_middle > step_back
195 {
196 self.stream_pos -= step_back;
197 self.last_sym_boundary_pos -= step_back;
198 self.next_sym_middle -= step_back;
199 }
200 }
201 input.consume(n);
202 o.produce(opos, &[]);
203 if let Some(s) = out_clock {
204 s.produce(opos, &[]);
205 }
206 Ok(BlockRet::Again)
207 }
208}
209