1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
//! BINEX to RINEX deserialization
use std::io::Read;
use crate::{
prelude::{Duration, Epoch, Rinex},
production::{Postponing, SnapshotMode},
};
use binex::prelude::{Decoder, EphemerisFrame, Message, Record, SolutionsFrame, StreamElement};
#[cfg(feature = "log")]
use log::{error, info};
/// BIN2RNX is a RINEX producer from a BINEX stream.
/// It can serialize the streamed messages and collect them as RINEX.
/// The production behavior is defined by [SnapshotMode]
pub struct BIN2RNX<'a, R: Read> {
/// True when collecting is feasible
pub active: bool,
/// Collected size, for postponing mechanism
size: usize,
/// Snapshot mode
pub snapshot_mode: SnapshotMode,
/// Postponing option
pub postponing: Postponing,
/// Deploy time
deploy_t: Epoch,
/// BINEX [Decoder]
decoder: Decoder<'a, R>,
/// Pending NAV [Rinex]
nav_rinex: Rinex,
/// Pending OBS [Rinex]
obs_rinex: Rinex,
}
impl<'a, R: Read> Iterator for BIN2RNX<'a, R> {
type Item = Option<String>;
fn next(&mut self) -> Option<Self::Item> {
match self.decoder.next() {
Some(Ok(StreamElement::OpenSource(msg))) => {
if self.active {
match msg.record {
Record::EphemerisFrame(fr) => {
//let nav = self.nav_rinex.record.as_mut_nav().unwrap();
match fr {
EphemerisFrame::GAL(_) => {},
EphemerisFrame::GLO(_) => {},
EphemerisFrame::GPS(_) => {},
EphemerisFrame::SBAS(_) => {},
EphemerisFrame::GPSRaw(_raw) => {},
}
},
Record::MonumentGeo(geo) => for _ in geo.frames.iter() {},
Record::Solutions(pvt) => {
for fr in pvt.frames.iter() {
match fr {
SolutionsFrame::AntennaEcefPosition(_ecef) => {},
SolutionsFrame::AntennaGeoPosition(_geo) => {},
SolutionsFrame::Comment(_comment) => {},
SolutionsFrame::TemporalSolution(_time) => {},
SolutionsFrame::TimeSystem(_time) => {},
SolutionsFrame::AntennaEcefVelocity(_ecef) => {},
SolutionsFrame::AntennaGeoVelocity(_geo) => {},
SolutionsFrame::Extra(_extra) => {},
}
}
},
}
} else {
self.postponed(&msg);
}
},
#[cfg(feature = "log")]
Some(Ok(StreamElement::ClosedSource(msg))) => {
error!(
"received closed source message: cannot interprate {:?}",
msg.closed_meta
)
},
#[cfg(not(feature = "log"))]
Some(Ok(StreamElement::ClosedSource(_))) => {},
#[cfg(feature = "log")]
Some(Err(e)) => {
error!("binex decoding error: {:?}", e);
},
#[cfg(not(feature = "log"))]
Some(Err(_)) => {},
_ => {},
}
None
}
}
impl<'a, R: Read> BIN2RNX<'a, R> {
/// Creates a new [BIN2RNX] working from [Read]able interface.
/// It will stream Tokens as long as the interface is alive.
///
/// NB:
/// - [BIN2RNX] needs the system time to be determined for the postponing
/// mechanism. If determination fails, this method will panic.
/// We propose [Self::new_system_time] if you want to manually
/// define "now".
/// - since RINEX is fully open source, only open source BINEX messages
/// may be picked up and collected: closed source elements are discarded.
///
/// ## Inputs
/// - crinex: set to true if you want to use the CRINEX compression
/// algorithm when collecting Observation RINEX.
/// - production rate control as [SnapshotMode]
/// - [Postponing] option
/// - read: [Read]able interface
pub fn new(crinex: bool, snapshot_mode: SnapshotMode, postponing: Postponing, read: R) -> Self {
Self::new_system_time(
crinex,
Epoch::now().unwrap_or_else(|e| panic!("system time determination failed with {}", e)),
snapshot_mode,
postponing,
read,
)
}
/// Infaillible [BIN2RNX] creation, use this if you have no means to access system time.
/// Define it yourself with "now". Refer to [Self::new] for more information.
///
/// ## Inputs
/// - crinex: set to true if you want to use the CRINEX compression
/// algorithm when collecting Observation RINEX.
pub fn new_system_time(
crinex: bool,
now: Epoch,
snapshot_mode: SnapshotMode,
postponing: Postponing,
read: R,
) -> Self {
Self {
size: 0,
postponing,
snapshot_mode,
deploy_t: now,
nav_rinex: Rinex::basic_nav(),
obs_rinex: if crinex {
Rinex::basic_crinex()
} else {
Rinex::basic_obs()
},
decoder: Decoder::new(read),
active: postponing == Postponing::None,
}
}
/// Creates a new [BIN2RNX] that will collect a [Rinex] once a day at midnight,
/// with deployment possibly postponed.
///
/// ## Inputs
/// - crinex: set to true if you want to use the CRINEX compression
/// algorithm when collecting Observation RINEX.
/// - [Postponing] option
/// - read: [Read]able interface
pub fn new_daily(crinex: bool, postponing: Postponing, read: R) -> Self {
Self::new(crinex, SnapshotMode::DailyMidnight, postponing, read)
}
/// Creates a new [BIN2RNX] that will collect a [Rinex] twice a day at midnight and noon,
/// with deployment possibly postponed.
///
/// ## Inputs
/// - crinex: set to true if you want to use the CRINEX compression
/// algorithm when collecting Observation RINEX.
/// - [Postponing] option
/// - read: [Read]able interface
pub fn new_midnight_noon(crinex: bool, postponing: Postponing, read: R) -> Self {
Self::new(crinex, SnapshotMode::DailyMidnightNoon, postponing, read)
}
/// Creates a new [BIN2RNX] that will collect a [Rinex] hourly
/// with deployment possibly postponed.
///
/// ## Inputs
/// - crinex: set to true if you want to use the CRINEX compression
/// algorithm when collecting Observation RINEX.
/// - [Postponing] option
/// - read: [Read]able interface
pub fn new_hourly(crinex: bool, postponing: Postponing, read: R) -> Self {
Self::new(crinex, SnapshotMode::Hourly, postponing, read)
}
/// Creates a new [BIN2RNX] that will collect a [Rinex] periodically,
/// with deployment possibly postponed.
/// ## Inputs
/// - crinex: set to true if you want to use the CRINEX compression
/// algorithm when collecting Observation RINEX.
/// - period: production period, as [Duration]
/// - [Postponing] option
/// - read: [Read]able interface
pub fn new_periodic(crinex: bool, period: Duration, postponing: Postponing, read: R) -> Self {
Self::new(crinex, SnapshotMode::Periodic(period), postponing, read)
}
fn postponed(&mut self, msg: &Message) {
match self.postponing {
Postponing::SystemTime(t) => self.system_time_postponing(t),
Postponing::Size(size) => self.bytewise_postponing(msg.encoding_size(), size),
Postponing::Messages(size) => self.protocol_postponing(size),
_ => unreachable!("no postponing!"),
}
}
/// Holds production until system time as reached specific instant
fn system_time_postponing(&mut self, t: Epoch) {
let now =
Epoch::now().unwrap_or_else(|e| panic!("system time determination failure: {}", e));
if now > t {
// todo log message
self.active = true;
self.deploy_t = now;
}
}
/// Collect "size" bytes until production is allowed
fn bytewise_postponing(&mut self, msg_size: usize, size: usize) {
self.size += msg_size;
if self.size >= size {
#[cfg(feature = "log")]
info!("bin2rnx now deployed: production is pending");
let now =
Epoch::now().unwrap_or_else(|e| panic!("system time determination failure: {}", e));
self.active = true;
self.deploy_t = now;
} else {
#[cfg(feature = "log")]
info!("binex postponing..");
}
}
/// Collect "size" messages until production is allowed
fn protocol_postponing(&mut self, size: usize) {
match self.decoder.next() {
Some(Ok(StreamElement::OpenSource(_))) => {
self.size += 1;
#[cfg(feature = "log")]
info!("binex postponing {}/{} messages", self.size, size);
},
#[cfg(feature = "log")]
Some(Ok(StreamElement::ClosedSource(msg))) => {
error!(
"received closed source message: cannot interprate {:?}",
msg.closed_meta
)
},
#[cfg(not(feature = "log"))]
Some(Ok(StreamElement::ClosedSource(_))) => {},
#[cfg(feature = "log")]
Some(Err(e)) => {
error!("binex decoding error: {:?}", e);
},
#[cfg(not(feature = "log"))]
Some(Err(_)) => {},
_ => {},
}
if self.size >= size {
let now =
Epoch::now().unwrap_or_else(|e| panic!("system time determination failure: {}", e));
self.active = true;
self.deploy_t = now;
#[cfg(feature = "log")]
info!("bin2rnx now deployed: production is pending");
}
}
/// Obtain reference to collected Observation RINEX
pub fn obs_rinex(&self) -> &Rinex {
&self.obs_rinex
}
/// Obtain reference to collected Navigation RINEX
pub fn nav_rinex(&self) -> &Rinex {
&self.nav_rinex
}
}