1use std::{
2 borrow::Cow,
3 convert::TryFrom,
4 io,
5 sync::{
6 atomic::{AtomicU16, Ordering},
7 Arc,
8 },
9 thread::JoinHandle,
10 time::Duration,
11};
12
13use crossbeam_channel::Receiver;
14use crossbeam_utils::thread;
15use log::trace;
16use parking_lot::Mutex;
17use sbp::{
18 link::{Link, LinkSource},
19 messages::settings::{
20 MsgSettingsReadByIndexDone, MsgSettingsReadByIndexReq, MsgSettingsReadByIndexResp,
21 MsgSettingsReadReq, MsgSettingsReadResp, MsgSettingsWrite, MsgSettingsWriteResp,
22 },
23 sbp_string::Multipart,
24 Sbp, SbpIterExt, SbpString,
25};
26
27use crate::context::Context;
28use crate::error::{BoxedError, Error};
29use crate::setting::{Setting, SettingValue};
30
31const SENDER_ID: u16 = 0x42;
32const NUM_WORKERS: usize = 10;
33
34pub struct Client<'a> {
35 link: Link<'a, ()>,
36 sender: MsgSender,
37 handle: Option<JoinHandle<()>>,
38}
39
40impl<'a> Client<'a> {
41 pub fn new<R, W>(reader: R, mut writer: W) -> Client<'static>
42 where
43 R: io::Read + Send + 'static,
44 W: io::Write + Send + 'static,
45 {
46 let source = LinkSource::new();
47 let mut client = Client::<'static>::with_link(source.link(), move |msg| {
48 sbp::to_writer(&mut writer, &msg).map_err(Into::into)
49 });
50 client.handle = Some(std::thread::spawn(move || {
51 let messages = sbp::iter_messages(reader).log_errors(log::Level::Warn);
52 for msg in messages {
53 source.send(msg);
54 }
55 }));
56 client
57 }
58
59 pub fn with_link<F>(link: Link<'a, ()>, sender: F) -> Client<'a>
60 where
61 F: FnMut(Sbp) -> Result<(), BoxedError> + Send + 'static,
62 {
63 Self {
64 link,
65 sender: MsgSender(Arc::new(Mutex::new(Box::new(sender)))),
66 handle: None,
67 }
68 }
69
70 pub fn write_setting(
71 &mut self,
72 group: impl Into<String>,
73 name: impl Into<String>,
74 value: impl Into<String>,
75 ) -> Result<Entry, Error> {
76 let (ctx, _ctx_handle) = Context::new();
77 self.write_setting_ctx(group, name, value, ctx)
78 }
79
80 pub fn write_setting_ctx(
81 &mut self,
82 group: impl Into<String>,
83 name: impl Into<String>,
84 value: impl Into<String>,
85 ctx: Context,
86 ) -> Result<Entry, Error> {
87 self.write_setting_inner(group.into(), name.into(), value.into(), ctx)
88 }
89
90 pub fn write_setting_with_timeout(
91 &mut self,
92 group: impl Into<String>,
93 name: impl Into<String>,
94 value: impl Into<String>,
95 timeout: Duration,
96 ) -> Result<Entry, Error> {
97 let (ctx, _ctx_handle) = Context::with_timeout(timeout);
98 self.write_setting_ctx(group, name, value, ctx)
99 }
100
101 pub fn read_setting(
102 &mut self,
103 group: impl Into<String>,
104 name: impl Into<String>,
105 ) -> Result<Option<Entry>, Error> {
106 let (ctx, _ctx_handle) = Context::new();
107 self.read_setting_ctx(group, name, ctx)
108 }
109
110 pub fn read_setting_with_timeout(
111 &mut self,
112 group: impl Into<String>,
113 name: impl Into<String>,
114 timeout: Duration,
115 ) -> Result<Option<Entry>, Error> {
116 let (ctx, _ctx_handle) = Context::with_timeout(timeout);
117 self.read_setting_ctx(group, name, ctx)
118 }
119
120 pub fn read_setting_ctx(
121 &mut self,
122 group: impl Into<String>,
123 name: impl Into<String>,
124 ctx: Context,
125 ) -> Result<Option<Entry>, Error> {
126 self.read_setting_inner(group.into(), name.into(), ctx)
127 }
128
129 pub fn read_all(&mut self) -> (Vec<Entry>, Vec<Error>) {
130 let (ctx, _ctx_handle) = Context::new();
131 self.read_all_ctx(ctx)
132 }
133
134 pub fn read_all_ctx(&mut self, ctx: Context) -> (Vec<Entry>, Vec<Error>) {
135 self.read_all_inner(ctx)
136 }
137
138 fn read_all_inner(&mut self, ctx: Context) -> (Vec<Entry>, Vec<Error>) {
139 let (done_tx, done_rx) = crossbeam_channel::bounded(NUM_WORKERS);
140 let done_key = self.link.register(move |_: MsgSettingsReadByIndexDone| {
141 for _ in 0..NUM_WORKERS {
142 let _ = done_tx.try_send(());
143 }
144 });
145 let (settings, errors) = (Mutex::new(Vec::new()), Mutex::new(Vec::new()));
146 let idx = AtomicU16::new(0);
147 thread::scope(|scope| {
148 for _ in 0..NUM_WORKERS {
149 let this = &self;
150 let idx = &idx;
151 let settings = &settings;
152 let errors = &errors;
153 let done_rx = &done_rx;
154 let mut ctx = ctx.clone();
155 scope.spawn(move |_| loop {
156 let idx = idx.fetch_add(1, Ordering::SeqCst);
157 match this.read_by_index(idx, done_rx, &ctx) {
158 Ok(Some(setting)) => {
159 settings.lock().push((idx, setting));
160 ctx.reset_timeout();
161 }
162 Ok(None) => break,
163 Err(err) => {
164 let exit = matches!(err, Error::TimedOut | Error::Canceled);
165 errors.lock().push((idx, err));
166 if exit {
167 break;
168 }
169 }
170 }
171 });
172 }
173 })
174 .expect("read_all worker thread panicked");
175 self.link.unregister(done_key);
176 settings.lock().sort_by_key(|(idx, _)| *idx);
177 errors.lock().sort_by_key(|(idx, _)| *idx);
178 (
179 settings.into_inner().into_iter().map(|e| e.1).collect(),
180 errors.into_inner().into_iter().map(|e| e.1).collect(),
181 )
182 }
183
184 fn read_by_index(
185 &self,
186 index: u16,
187 done_rx: &Receiver<()>,
188 ctx: &Context,
189 ) -> Result<Option<Entry>, Error> {
190 trace!("read_by_idx: {}", index);
191 let (tx, rx) = crossbeam_channel::bounded(1);
192 let key = self.link.register(move |msg: MsgSettingsReadByIndexResp| {
193 if index == msg.index {
194 let _ = tx.try_send(Entry::try_from(msg));
195 }
196 });
197 self.sender.send(MsgSettingsReadByIndexReq {
198 sender_id: Some(SENDER_ID),
199 index,
200 })?;
201 let res = crossbeam_channel::select! {
202 recv(rx) -> msg => msg.expect("read_by_index channel disconnected").map(Some),
203 recv(done_rx) -> _ => Ok(None),
204 recv(ctx.timeout_rx) -> _ => Err(Error::TimedOut),
205 recv(ctx.cancel_rx) -> _ => Err(Error::Canceled),
206 };
207 self.link.unregister(key);
208 res
209 }
210
211 fn read_setting_inner(
212 &mut self,
213 group: String,
214 name: String,
215 ctx: Context,
216 ) -> Result<Option<Entry>, Error> {
217 trace!("read_setting: {} {}", group, name);
218 let req = MsgSettingsReadReq {
219 sender_id: Some(SENDER_ID),
220 setting: format!("{}\0{}\0", group, name).into(),
221 };
222 let (tx, rx) = crossbeam_channel::bounded(1);
223 let key = self.link.register(move |msg: MsgSettingsReadResp| {
224 if request_matches(&group, &name, &msg.setting) {
225 let _ = tx.try_send(Entry::try_from(msg).map(|e| {
226 if e.value.is_some() {
227 Some(e)
228 } else {
229 None
230 }
231 }));
232 }
233 });
234 self.sender.send(req)?;
235 let res = crossbeam_channel::select! {
236 recv(rx) -> msg => msg.expect("read_setting_inner channel disconnected"),
237 recv(ctx.timeout_rx) -> _ => Err(Error::TimedOut),
238 recv(ctx.cancel_rx) -> _ => Err(Error::Canceled),
239 };
240 self.link.unregister(key);
241 res
242 }
243
244 fn write_setting_inner(
245 &mut self,
246 group: String,
247 name: String,
248 value: String,
249 ctx: Context,
250 ) -> Result<Entry, Error> {
251 trace!("write_setting: {} {} {}", group, name, value);
252 let req = MsgSettingsWrite {
253 sender_id: Some(SENDER_ID),
254 setting: format!("{}\0{}\0{}\0", group, name, value).into(),
255 };
256 let (tx, rx) = crossbeam_channel::bounded(1);
257 let key = self.link.register(move |msg: MsgSettingsWriteResp| {
258 if request_matches(&group, &name, &msg.setting) {
259 let _ = tx.try_send(Entry::try_from(msg));
260 }
261 });
262 self.sender.send(req)?;
263 let res = crossbeam_channel::select! {
264 recv(rx) -> msg => msg.expect("write_setting_inner channel disconnected"),
265 recv(ctx.timeout_rx) -> _ => Err(Error::TimedOut),
266 recv(ctx.cancel_rx) -> _ => Err(Error::Canceled),
267 };
268 self.link.unregister(key);
269 res
270 }
271}
272
273#[derive(Debug, Clone, PartialEq)]
274pub struct Entry {
275 pub setting: Cow<'static, Setting>,
276 pub value: Option<SettingValue>,
277}
278
279impl TryFrom<MsgSettingsWriteResp> for Entry {
280 type Error = Error;
281
282 fn try_from(msg: MsgSettingsWriteResp) -> Result<Self, Self::Error> {
283 if msg.status != 0 {
284 return Err(Error::WriteError(msg.status.into()));
285 }
286 let fields = split_multipart(&msg.setting);
287 if let [group, name, value] = fields.as_slice() {
288 let setting = Setting::new(group, name);
289 let value = SettingValue::parse(value, setting.kind);
290 Ok(Entry { setting, value })
291 } else {
292 Err(Error::ParseError)
293 }
294 }
295}
296
297impl TryFrom<MsgSettingsReadResp> for Entry {
298 type Error = Error;
299
300 fn try_from(msg: MsgSettingsReadResp) -> Result<Self, Self::Error> {
301 let fields = split_multipart(&msg.setting);
302 match fields.as_slice() {
303 [group, name] => {
304 let setting = Setting::new(group, name);
305 Ok(Entry {
306 setting,
307 value: None,
308 })
309 }
310 [group, name, value] => {
311 let setting = Setting::new(group, name);
312 let value = SettingValue::parse(value, setting.kind);
313 Ok(Entry { setting, value })
314 }
315 _ => Err(Error::ParseError),
316 }
317 }
318}
319
320impl TryFrom<MsgSettingsReadByIndexResp> for Entry {
321 type Error = Error;
322
323 fn try_from(msg: MsgSettingsReadByIndexResp) -> Result<Self, Self::Error> {
324 let fields = split_multipart(&msg.setting);
325 match fields.as_slice() {
326 [group, name, value, fmt_type] => {
327 let setting = if fmt_type.is_empty() {
328 Setting::new(group, name)
329 } else {
330 Setting::with_fmt_type(group, name, fmt_type)
331 };
332 let value = SettingValue::parse(value, setting.kind);
333 Ok(Entry { setting, value })
334 }
335 [group, name, value] => {
336 let setting = Setting::new(group, name);
337 let value = SettingValue::parse(value, setting.kind);
338 Ok(Entry { setting, value })
339 }
340 _ => Err(Error::ParseError),
341 }
342 }
343}
344
345type SenderFunc = Box<dyn FnMut(Sbp) -> Result<(), BoxedError> + Send>;
346
347struct MsgSender(Arc<Mutex<SenderFunc>>);
348
349impl MsgSender {
350 const RETRIES: usize = 5;
351 const TIMEOUT: Duration = Duration::from_millis(100);
352
353 fn send(&self, msg: impl Into<Sbp>) -> Result<(), BoxedError> {
354 self.send_inner(msg.into(), 0)
355 }
356
357 fn send_inner(&self, msg: Sbp, tries: usize) -> Result<(), BoxedError> {
358 let res = (self.0.lock())(msg.clone());
359 if res.is_err() && tries < Self::RETRIES {
360 std::thread::sleep(Self::TIMEOUT);
361 self.send_inner(msg, tries + 1)
362 } else {
363 res
364 }
365 }
366}
367
368fn request_matches(group: &str, name: &str, setting: &SbpString<Vec<u8>, Multipart>) -> bool {
369 let fields = split_multipart(setting);
370 matches!(fields.as_slice(), [g, n, ..] if g == group && n == name)
371}
372
373fn split_multipart(s: &SbpString<Vec<u8>, Multipart>) -> Vec<Cow<'_, str>> {
374 let mut parts: Vec<_> = s
375 .as_bytes()
376 .split(|b| *b == 0)
377 .map(String::from_utf8_lossy)
378 .collect();
379 parts.pop();
380 parts
381}
382
383#[cfg(test)]
384mod tests {
385 use std::io::{Read, Write};
386 use std::time::Instant;
387
388 use super::*;
389
390 use crossbeam_utils::thread::scope;
391 use sbp::messages::settings::{MsgSettingsReadReq, MsgSettingsReadResp};
392 use sbp::{SbpMessage, SbpString};
393
394 #[test]
395 fn test_should_retry() {
396 let (group, name) = ("sbp", "obs_msg_max_size");
397 let mut mock = Mock::with_errors(5);
398 mock.req_reply(
399 MsgSettingsReadReq {
400 sender_id: Some(SENDER_ID),
401 setting: format!("{}\0{}\0", group, name).into(),
402 },
403 MsgSettingsReadResp {
404 sender_id: Some(0x42),
405 setting: format!("{}\0{}\010\0", group, name).into(),
406 },
407 );
408 let (reader, writer) = mock.into_io();
409 let mut client = Client::new(reader, writer);
410 let response = client.read_setting(group, name).unwrap().unwrap();
411 assert!(matches!(response.value, Some(SettingValue::Integer(10))));
412 }
413
414 #[test]
415 fn read_setting_timeout() {
416 let (group, name) = ("sbp", "obs_msg_max_size");
417 let mock = Mock::new();
418 let (reader, writer) = mock.into_io();
419 let mut client = Client::new(reader, writer);
420 let (ctx, _ctx_handle) = Context::with_timeout(Duration::from_millis(100));
421 let now = Instant::now();
422 let mut response = Ok(None);
423 scope(|scope| {
424 scope.spawn(|_| {
425 response = client.read_setting_ctx(group, name, ctx);
426 });
427 })
428 .unwrap();
429 assert!(now.elapsed().as_millis() >= 100);
430 assert!(matches!(response, Err(Error::TimedOut)));
431 }
432
433 #[test]
434 fn read_setting_cancel() {
435 let (group, name) = ("sbp", "obs_msg_max_size");
436 let mock = Mock::new();
437 let (reader, writer) = mock.into_io();
438 let mut client = Client::new(reader, writer);
439 let (ctx, ctx_handle) = Context::new();
440 let now = Instant::now();
441 let mut response = Ok(None);
442 scope(|scope| {
443 scope.spawn(|_| {
444 response = client.read_setting_ctx(group, name, ctx);
445 });
446 std::thread::sleep(Duration::from_millis(100));
447 ctx_handle.cancel();
448 })
449 .unwrap();
450 assert!(now.elapsed().as_millis() >= 100);
451 assert!(matches!(response, Err(Error::Canceled)));
452 }
453
454 #[test]
455 fn mock_read_setting_int() {
456 let (group, name) = ("sbp", "obs_msg_max_size");
457 let mut mock = Mock::new();
458 mock.req_reply(
459 MsgSettingsReadReq {
460 sender_id: Some(SENDER_ID),
461 setting: format!("{}\0{}\0", group, name).into(),
462 },
463 MsgSettingsReadResp {
464 sender_id: Some(0x42),
465 setting: format!("{}\0{}\010\0", group, name).into(),
466 },
467 );
468 let (reader, writer) = mock.into_io();
469 let mut client = Client::new(reader, writer);
470 let response = client.read_setting(group, name).unwrap().unwrap();
471 assert!(matches!(response.value, Some(SettingValue::Integer(10))));
472 }
473
474 #[test]
475 fn mock_read_setting_bool() {
476 let (group, name) = ("surveyed_position", "broadcast");
477 let mut mock = Mock::new();
478 mock.req_reply(
479 MsgSettingsReadReq {
480 sender_id: Some(SENDER_ID),
481 setting: format!("{}\0{}\0", group, name).into(),
482 },
483 MsgSettingsReadResp {
484 sender_id: Some(0x42),
485 setting: SbpString::from(format!("{}\0{}\0True\0", group, name)),
486 },
487 );
488 let (reader, writer) = mock.into_io();
489 let mut client = Client::new(reader, writer);
490 let response = client.read_setting(group, name).unwrap().unwrap();
491 assert!(matches!(response.value, Some(SettingValue::Boolean(true))));
492 }
493
494 #[test]
495 fn mock_read_setting_float() {
496 let (group, name) = ("ins", "filter_vel_half_life_alpha");
497 let mut mock = Mock::new();
498 mock.req_reply(
499 MsgSettingsReadReq {
500 sender_id: Some(SENDER_ID),
501 setting: SbpString::from(format!("{}\0{}\0", group, name)),
502 },
503 MsgSettingsReadResp {
504 sender_id: Some(SENDER_ID),
505 setting: SbpString::from(format!("{}\0{}\00.1\0", group, name)),
506 },
507 );
508 let (reader, writer) = mock.into_io();
509 let mut client = Client::new(reader, writer);
510 let response = client.read_setting(group, name).unwrap().unwrap();
511 assert_eq!(response.value, Some(SettingValue::Float(0.1)));
512 }
513
514 #[test]
515 fn mock_read_setting_double() {
516 let (group, name) = ("surveyed_position", "surveyed_lat");
517 let mut mock = Mock::new();
518 mock.req_reply(
519 MsgSettingsReadReq {
520 sender_id: Some(SENDER_ID),
521 setting: SbpString::from(format!("{}\0{}\0", group, name)),
522 },
523 MsgSettingsReadResp {
524 sender_id: Some(SENDER_ID),
525 setting: SbpString::from(format!("{}\0{}\00.1\0", group, name)),
526 },
527 );
528 let (reader, writer) = mock.into_io();
529 let mut client = Client::new(reader, writer);
530 let response = client.read_setting(group, name).unwrap().unwrap();
531 assert_eq!(response.value, Some(SettingValue::Double(0.1)));
532 }
533
534 #[test]
535 fn mock_read_setting_string() {
536 let (group, name) = ("rtcm_out", "ant_descriptor");
537 let mut mock = Mock::new();
538 mock.req_reply(
539 MsgSettingsReadReq {
540 sender_id: Some(0x42),
541 setting: SbpString::from(format!("{}\0{}\0", group, name)),
542 },
543 MsgSettingsReadResp {
544 sender_id: Some(SENDER_ID),
545 setting: SbpString::from(format!("{}\0{}\0foo\0", group, name)),
546 },
547 );
548 let (reader, writer) = mock.into_io();
549 let mut client = Client::new(reader, writer);
550 let response = client.read_setting(group, name).unwrap().unwrap();
551 assert_eq!(response.value, Some(SettingValue::String("foo".into())));
552 }
553
554 #[test]
555 fn mock_read_setting_enum() {
556 let (group, name) = ("frontend", "antenna_selection");
557 let mut mock = Mock::new();
558 mock.req_reply(
559 MsgSettingsReadReq {
560 sender_id: Some(SENDER_ID),
561 setting: SbpString::from(format!("{}\0{}\0", group, name)),
562 },
563 MsgSettingsReadResp {
564 sender_id: Some(0x42),
565 setting: SbpString::from(format!("{}\0{}\0Secondary\0", group, name)),
566 },
567 );
568 let (reader, writer) = mock.into_io();
569 let mut client = Client::new(reader, writer);
570 let response = client.read_setting(group, name).unwrap().unwrap();
571 assert_eq!(
572 response.value,
573 Some(SettingValue::String("Secondary".into()))
574 );
575 }
576
577 #[derive(Clone)]
578 struct Mock {
579 stream: mockstream::SyncMockStream,
580 write_errors: u16,
581 }
582
583 impl Mock {
584 fn new() -> Self {
585 Self {
586 stream: mockstream::SyncMockStream::new(),
587 write_errors: 0,
588 }
589 }
590
591 fn with_errors(write_errors: u16) -> Self {
592 Self {
593 stream: mockstream::SyncMockStream::new(),
594 write_errors,
595 }
596 }
597
598 fn req_reply(&mut self, req: impl SbpMessage, res: impl SbpMessage) {
599 self.reqs_reply(&[req], res)
600 }
601
602 fn reqs_reply(&mut self, reqs: &[impl SbpMessage], res: impl SbpMessage) {
603 let bytes: Vec<_> = reqs
604 .iter()
605 .flat_map(|req| sbp::to_vec(req).unwrap())
606 .collect();
607 self.stream.wait_for(bytes.as_ref());
608 let bytes = sbp::to_vec(&res).unwrap();
609 self.stream.push_bytes_to_read(bytes.as_ref());
610 }
611
612 fn into_io(self) -> (impl io::Read, impl io::Write) {
613 (self.clone(), self)
614 }
615 }
616
617 impl Read for Mock {
618 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
619 self.stream.read(buf)
620 }
621 }
622
623 impl Write for Mock {
624 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
625 if self.write_errors > 0 {
626 self.write_errors -= 1;
627 Err(io::Error::new(io::ErrorKind::Other, "error"))
628 } else {
629 self.stream.write(buf)
630 }
631 }
632
633 fn flush(&mut self) -> io::Result<()> {
634 self.stream.flush()
635 }
636 }
637}