sbp_settings/
client.rs

1use std::{
2    borrow::Cow,
3    convert::TryFrom,
4    io,
5    sync::{
6        atomic::{AtomicU16, Ordering},
7        Arc,
8    },
9    thread::JoinHandle,
10    time::Duration,
11};
12
13use crossbeam_channel::Receiver;
14use crossbeam_utils::thread;
15use log::trace;
16use parking_lot::Mutex;
17use sbp::{
18    link::{Link, LinkSource},
19    messages::settings::{
20        MsgSettingsReadByIndexDone, MsgSettingsReadByIndexReq, MsgSettingsReadByIndexResp,
21        MsgSettingsReadReq, MsgSettingsReadResp, MsgSettingsWrite, MsgSettingsWriteResp,
22    },
23    sbp_string::Multipart,
24    Sbp, SbpIterExt, SbpString,
25};
26
27use crate::context::Context;
28use crate::error::{BoxedError, Error};
29use crate::setting::{Setting, SettingValue};
30
31const SENDER_ID: u16 = 0x42;
32const NUM_WORKERS: usize = 10;
33
34pub struct Client<'a> {
35    link: Link<'a, ()>,
36    sender: MsgSender,
37    handle: Option<JoinHandle<()>>,
38}
39
40impl<'a> Client<'a> {
41    pub fn new<R, W>(reader: R, mut writer: W) -> Client<'static>
42    where
43        R: io::Read + Send + 'static,
44        W: io::Write + Send + 'static,
45    {
46        let source = LinkSource::new();
47        let mut client = Client::<'static>::with_link(source.link(), move |msg| {
48            sbp::to_writer(&mut writer, &msg).map_err(Into::into)
49        });
50        client.handle = Some(std::thread::spawn(move || {
51            let messages = sbp::iter_messages(reader).log_errors(log::Level::Warn);
52            for msg in messages {
53                source.send(msg);
54            }
55        }));
56        client
57    }
58
59    pub fn with_link<F>(link: Link<'a, ()>, sender: F) -> Client<'a>
60    where
61        F: FnMut(Sbp) -> Result<(), BoxedError> + Send + 'static,
62    {
63        Self {
64            link,
65            sender: MsgSender(Arc::new(Mutex::new(Box::new(sender)))),
66            handle: None,
67        }
68    }
69
70    pub fn write_setting(
71        &mut self,
72        group: impl Into<String>,
73        name: impl Into<String>,
74        value: impl Into<String>,
75    ) -> Result<Entry, Error> {
76        let (ctx, _ctx_handle) = Context::new();
77        self.write_setting_ctx(group, name, value, ctx)
78    }
79
80    pub fn write_setting_ctx(
81        &mut self,
82        group: impl Into<String>,
83        name: impl Into<String>,
84        value: impl Into<String>,
85        ctx: Context,
86    ) -> Result<Entry, Error> {
87        self.write_setting_inner(group.into(), name.into(), value.into(), ctx)
88    }
89
90    pub fn write_setting_with_timeout(
91        &mut self,
92        group: impl Into<String>,
93        name: impl Into<String>,
94        value: impl Into<String>,
95        timeout: Duration,
96    ) -> Result<Entry, Error> {
97        let (ctx, _ctx_handle) = Context::with_timeout(timeout);
98        self.write_setting_ctx(group, name, value, ctx)
99    }
100
101    pub fn read_setting(
102        &mut self,
103        group: impl Into<String>,
104        name: impl Into<String>,
105    ) -> Result<Option<Entry>, Error> {
106        let (ctx, _ctx_handle) = Context::new();
107        self.read_setting_ctx(group, name, ctx)
108    }
109
110    pub fn read_setting_with_timeout(
111        &mut self,
112        group: impl Into<String>,
113        name: impl Into<String>,
114        timeout: Duration,
115    ) -> Result<Option<Entry>, Error> {
116        let (ctx, _ctx_handle) = Context::with_timeout(timeout);
117        self.read_setting_ctx(group, name, ctx)
118    }
119
120    pub fn read_setting_ctx(
121        &mut self,
122        group: impl Into<String>,
123        name: impl Into<String>,
124        ctx: Context,
125    ) -> Result<Option<Entry>, Error> {
126        self.read_setting_inner(group.into(), name.into(), ctx)
127    }
128
129    pub fn read_all(&mut self) -> (Vec<Entry>, Vec<Error>) {
130        let (ctx, _ctx_handle) = Context::new();
131        self.read_all_ctx(ctx)
132    }
133
134    pub fn read_all_ctx(&mut self, ctx: Context) -> (Vec<Entry>, Vec<Error>) {
135        self.read_all_inner(ctx)
136    }
137
138    fn read_all_inner(&mut self, ctx: Context) -> (Vec<Entry>, Vec<Error>) {
139        let (done_tx, done_rx) = crossbeam_channel::bounded(NUM_WORKERS);
140        let done_key = self.link.register(move |_: MsgSettingsReadByIndexDone| {
141            for _ in 0..NUM_WORKERS {
142                let _ = done_tx.try_send(());
143            }
144        });
145        let (settings, errors) = (Mutex::new(Vec::new()), Mutex::new(Vec::new()));
146        let idx = AtomicU16::new(0);
147        thread::scope(|scope| {
148            for _ in 0..NUM_WORKERS {
149                let this = &self;
150                let idx = &idx;
151                let settings = &settings;
152                let errors = &errors;
153                let done_rx = &done_rx;
154                let mut ctx = ctx.clone();
155                scope.spawn(move |_| loop {
156                    let idx = idx.fetch_add(1, Ordering::SeqCst);
157                    match this.read_by_index(idx, done_rx, &ctx) {
158                        Ok(Some(setting)) => {
159                            settings.lock().push((idx, setting));
160                            ctx.reset_timeout();
161                        }
162                        Ok(None) => break,
163                        Err(err) => {
164                            let exit = matches!(err, Error::TimedOut | Error::Canceled);
165                            errors.lock().push((idx, err));
166                            if exit {
167                                break;
168                            }
169                        }
170                    }
171                });
172            }
173        })
174        .expect("read_all worker thread panicked");
175        self.link.unregister(done_key);
176        settings.lock().sort_by_key(|(idx, _)| *idx);
177        errors.lock().sort_by_key(|(idx, _)| *idx);
178        (
179            settings.into_inner().into_iter().map(|e| e.1).collect(),
180            errors.into_inner().into_iter().map(|e| e.1).collect(),
181        )
182    }
183
184    fn read_by_index(
185        &self,
186        index: u16,
187        done_rx: &Receiver<()>,
188        ctx: &Context,
189    ) -> Result<Option<Entry>, Error> {
190        trace!("read_by_idx: {}", index);
191        let (tx, rx) = crossbeam_channel::bounded(1);
192        let key = self.link.register(move |msg: MsgSettingsReadByIndexResp| {
193            if index == msg.index {
194                let _ = tx.try_send(Entry::try_from(msg));
195            }
196        });
197        self.sender.send(MsgSettingsReadByIndexReq {
198            sender_id: Some(SENDER_ID),
199            index,
200        })?;
201        let res = crossbeam_channel::select! {
202            recv(rx) -> msg => msg.expect("read_by_index channel disconnected").map(Some),
203            recv(done_rx) -> _ => Ok(None),
204            recv(ctx.timeout_rx) -> _ => Err(Error::TimedOut),
205            recv(ctx.cancel_rx) -> _ => Err(Error::Canceled),
206        };
207        self.link.unregister(key);
208        res
209    }
210
211    fn read_setting_inner(
212        &mut self,
213        group: String,
214        name: String,
215        ctx: Context,
216    ) -> Result<Option<Entry>, Error> {
217        trace!("read_setting: {} {}", group, name);
218        let req = MsgSettingsReadReq {
219            sender_id: Some(SENDER_ID),
220            setting: format!("{}\0{}\0", group, name).into(),
221        };
222        let (tx, rx) = crossbeam_channel::bounded(1);
223        let key = self.link.register(move |msg: MsgSettingsReadResp| {
224            if request_matches(&group, &name, &msg.setting) {
225                let _ = tx.try_send(Entry::try_from(msg).map(|e| {
226                    if e.value.is_some() {
227                        Some(e)
228                    } else {
229                        None
230                    }
231                }));
232            }
233        });
234        self.sender.send(req)?;
235        let res = crossbeam_channel::select! {
236            recv(rx) -> msg => msg.expect("read_setting_inner channel disconnected"),
237            recv(ctx.timeout_rx) -> _ => Err(Error::TimedOut),
238            recv(ctx.cancel_rx) -> _ => Err(Error::Canceled),
239        };
240        self.link.unregister(key);
241        res
242    }
243
244    fn write_setting_inner(
245        &mut self,
246        group: String,
247        name: String,
248        value: String,
249        ctx: Context,
250    ) -> Result<Entry, Error> {
251        trace!("write_setting: {} {} {}", group, name, value);
252        let req = MsgSettingsWrite {
253            sender_id: Some(SENDER_ID),
254            setting: format!("{}\0{}\0{}\0", group, name, value).into(),
255        };
256        let (tx, rx) = crossbeam_channel::bounded(1);
257        let key = self.link.register(move |msg: MsgSettingsWriteResp| {
258            if request_matches(&group, &name, &msg.setting) {
259                let _ = tx.try_send(Entry::try_from(msg));
260            }
261        });
262        self.sender.send(req)?;
263        let res = crossbeam_channel::select! {
264            recv(rx) -> msg => msg.expect("write_setting_inner channel disconnected"),
265            recv(ctx.timeout_rx) -> _ => Err(Error::TimedOut),
266            recv(ctx.cancel_rx) -> _ => Err(Error::Canceled),
267        };
268        self.link.unregister(key);
269        res
270    }
271}
272
273#[derive(Debug, Clone, PartialEq)]
274pub struct Entry {
275    pub setting: Cow<'static, Setting>,
276    pub value: Option<SettingValue>,
277}
278
279impl TryFrom<MsgSettingsWriteResp> for Entry {
280    type Error = Error;
281
282    fn try_from(msg: MsgSettingsWriteResp) -> Result<Self, Self::Error> {
283        if msg.status != 0 {
284            return Err(Error::WriteError(msg.status.into()));
285        }
286        let fields = split_multipart(&msg.setting);
287        if let [group, name, value] = fields.as_slice() {
288            let setting = Setting::new(group, name);
289            let value = SettingValue::parse(value, setting.kind);
290            Ok(Entry { setting, value })
291        } else {
292            Err(Error::ParseError)
293        }
294    }
295}
296
297impl TryFrom<MsgSettingsReadResp> for Entry {
298    type Error = Error;
299
300    fn try_from(msg: MsgSettingsReadResp) -> Result<Self, Self::Error> {
301        let fields = split_multipart(&msg.setting);
302        match fields.as_slice() {
303            [group, name] => {
304                let setting = Setting::new(group, name);
305                Ok(Entry {
306                    setting,
307                    value: None,
308                })
309            }
310            [group, name, value] => {
311                let setting = Setting::new(group, name);
312                let value = SettingValue::parse(value, setting.kind);
313                Ok(Entry { setting, value })
314            }
315            _ => Err(Error::ParseError),
316        }
317    }
318}
319
320impl TryFrom<MsgSettingsReadByIndexResp> for Entry {
321    type Error = Error;
322
323    fn try_from(msg: MsgSettingsReadByIndexResp) -> Result<Self, Self::Error> {
324        let fields = split_multipart(&msg.setting);
325        match fields.as_slice() {
326            [group, name, value, fmt_type] => {
327                let setting = if fmt_type.is_empty() {
328                    Setting::new(group, name)
329                } else {
330                    Setting::with_fmt_type(group, name, fmt_type)
331                };
332                let value = SettingValue::parse(value, setting.kind);
333                Ok(Entry { setting, value })
334            }
335            [group, name, value] => {
336                let setting = Setting::new(group, name);
337                let value = SettingValue::parse(value, setting.kind);
338                Ok(Entry { setting, value })
339            }
340            _ => Err(Error::ParseError),
341        }
342    }
343}
344
345type SenderFunc = Box<dyn FnMut(Sbp) -> Result<(), BoxedError> + Send>;
346
347struct MsgSender(Arc<Mutex<SenderFunc>>);
348
349impl MsgSender {
350    const RETRIES: usize = 5;
351    const TIMEOUT: Duration = Duration::from_millis(100);
352
353    fn send(&self, msg: impl Into<Sbp>) -> Result<(), BoxedError> {
354        self.send_inner(msg.into(), 0)
355    }
356
357    fn send_inner(&self, msg: Sbp, tries: usize) -> Result<(), BoxedError> {
358        let res = (self.0.lock())(msg.clone());
359        if res.is_err() && tries < Self::RETRIES {
360            std::thread::sleep(Self::TIMEOUT);
361            self.send_inner(msg, tries + 1)
362        } else {
363            res
364        }
365    }
366}
367
368fn request_matches(group: &str, name: &str, setting: &SbpString<Vec<u8>, Multipart>) -> bool {
369    let fields = split_multipart(setting);
370    matches!(fields.as_slice(), [g, n, ..] if g == group && n == name)
371}
372
373fn split_multipart(s: &SbpString<Vec<u8>, Multipart>) -> Vec<Cow<'_, str>> {
374    let mut parts: Vec<_> = s
375        .as_bytes()
376        .split(|b| *b == 0)
377        .map(String::from_utf8_lossy)
378        .collect();
379    parts.pop();
380    parts
381}
382
383#[cfg(test)]
384mod tests {
385    use std::io::{Read, Write};
386    use std::time::Instant;
387
388    use super::*;
389
390    use crossbeam_utils::thread::scope;
391    use sbp::messages::settings::{MsgSettingsReadReq, MsgSettingsReadResp};
392    use sbp::{SbpMessage, SbpString};
393
394    #[test]
395    fn test_should_retry() {
396        let (group, name) = ("sbp", "obs_msg_max_size");
397        let mut mock = Mock::with_errors(5);
398        mock.req_reply(
399            MsgSettingsReadReq {
400                sender_id: Some(SENDER_ID),
401                setting: format!("{}\0{}\0", group, name).into(),
402            },
403            MsgSettingsReadResp {
404                sender_id: Some(0x42),
405                setting: format!("{}\0{}\010\0", group, name).into(),
406            },
407        );
408        let (reader, writer) = mock.into_io();
409        let mut client = Client::new(reader, writer);
410        let response = client.read_setting(group, name).unwrap().unwrap();
411        assert!(matches!(response.value, Some(SettingValue::Integer(10))));
412    }
413
414    #[test]
415    fn read_setting_timeout() {
416        let (group, name) = ("sbp", "obs_msg_max_size");
417        let mock = Mock::new();
418        let (reader, writer) = mock.into_io();
419        let mut client = Client::new(reader, writer);
420        let (ctx, _ctx_handle) = Context::with_timeout(Duration::from_millis(100));
421        let now = Instant::now();
422        let mut response = Ok(None);
423        scope(|scope| {
424            scope.spawn(|_| {
425                response = client.read_setting_ctx(group, name, ctx);
426            });
427        })
428        .unwrap();
429        assert!(now.elapsed().as_millis() >= 100);
430        assert!(matches!(response, Err(Error::TimedOut)));
431    }
432
433    #[test]
434    fn read_setting_cancel() {
435        let (group, name) = ("sbp", "obs_msg_max_size");
436        let mock = Mock::new();
437        let (reader, writer) = mock.into_io();
438        let mut client = Client::new(reader, writer);
439        let (ctx, ctx_handle) = Context::new();
440        let now = Instant::now();
441        let mut response = Ok(None);
442        scope(|scope| {
443            scope.spawn(|_| {
444                response = client.read_setting_ctx(group, name, ctx);
445            });
446            std::thread::sleep(Duration::from_millis(100));
447            ctx_handle.cancel();
448        })
449        .unwrap();
450        assert!(now.elapsed().as_millis() >= 100);
451        assert!(matches!(response, Err(Error::Canceled)));
452    }
453
454    #[test]
455    fn mock_read_setting_int() {
456        let (group, name) = ("sbp", "obs_msg_max_size");
457        let mut mock = Mock::new();
458        mock.req_reply(
459            MsgSettingsReadReq {
460                sender_id: Some(SENDER_ID),
461                setting: format!("{}\0{}\0", group, name).into(),
462            },
463            MsgSettingsReadResp {
464                sender_id: Some(0x42),
465                setting: format!("{}\0{}\010\0", group, name).into(),
466            },
467        );
468        let (reader, writer) = mock.into_io();
469        let mut client = Client::new(reader, writer);
470        let response = client.read_setting(group, name).unwrap().unwrap();
471        assert!(matches!(response.value, Some(SettingValue::Integer(10))));
472    }
473
474    #[test]
475    fn mock_read_setting_bool() {
476        let (group, name) = ("surveyed_position", "broadcast");
477        let mut mock = Mock::new();
478        mock.req_reply(
479            MsgSettingsReadReq {
480                sender_id: Some(SENDER_ID),
481                setting: format!("{}\0{}\0", group, name).into(),
482            },
483            MsgSettingsReadResp {
484                sender_id: Some(0x42),
485                setting: SbpString::from(format!("{}\0{}\0True\0", group, name)),
486            },
487        );
488        let (reader, writer) = mock.into_io();
489        let mut client = Client::new(reader, writer);
490        let response = client.read_setting(group, name).unwrap().unwrap();
491        assert!(matches!(response.value, Some(SettingValue::Boolean(true))));
492    }
493
494    #[test]
495    fn mock_read_setting_float() {
496        let (group, name) = ("ins", "filter_vel_half_life_alpha");
497        let mut mock = Mock::new();
498        mock.req_reply(
499            MsgSettingsReadReq {
500                sender_id: Some(SENDER_ID),
501                setting: SbpString::from(format!("{}\0{}\0", group, name)),
502            },
503            MsgSettingsReadResp {
504                sender_id: Some(SENDER_ID),
505                setting: SbpString::from(format!("{}\0{}\00.1\0", group, name)),
506            },
507        );
508        let (reader, writer) = mock.into_io();
509        let mut client = Client::new(reader, writer);
510        let response = client.read_setting(group, name).unwrap().unwrap();
511        assert_eq!(response.value, Some(SettingValue::Float(0.1)));
512    }
513
514    #[test]
515    fn mock_read_setting_double() {
516        let (group, name) = ("surveyed_position", "surveyed_lat");
517        let mut mock = Mock::new();
518        mock.req_reply(
519            MsgSettingsReadReq {
520                sender_id: Some(SENDER_ID),
521                setting: SbpString::from(format!("{}\0{}\0", group, name)),
522            },
523            MsgSettingsReadResp {
524                sender_id: Some(SENDER_ID),
525                setting: SbpString::from(format!("{}\0{}\00.1\0", group, name)),
526            },
527        );
528        let (reader, writer) = mock.into_io();
529        let mut client = Client::new(reader, writer);
530        let response = client.read_setting(group, name).unwrap().unwrap();
531        assert_eq!(response.value, Some(SettingValue::Double(0.1)));
532    }
533
534    #[test]
535    fn mock_read_setting_string() {
536        let (group, name) = ("rtcm_out", "ant_descriptor");
537        let mut mock = Mock::new();
538        mock.req_reply(
539            MsgSettingsReadReq {
540                sender_id: Some(0x42),
541                setting: SbpString::from(format!("{}\0{}\0", group, name)),
542            },
543            MsgSettingsReadResp {
544                sender_id: Some(SENDER_ID),
545                setting: SbpString::from(format!("{}\0{}\0foo\0", group, name)),
546            },
547        );
548        let (reader, writer) = mock.into_io();
549        let mut client = Client::new(reader, writer);
550        let response = client.read_setting(group, name).unwrap().unwrap();
551        assert_eq!(response.value, Some(SettingValue::String("foo".into())));
552    }
553
554    #[test]
555    fn mock_read_setting_enum() {
556        let (group, name) = ("frontend", "antenna_selection");
557        let mut mock = Mock::new();
558        mock.req_reply(
559            MsgSettingsReadReq {
560                sender_id: Some(SENDER_ID),
561                setting: SbpString::from(format!("{}\0{}\0", group, name)),
562            },
563            MsgSettingsReadResp {
564                sender_id: Some(0x42),
565                setting: SbpString::from(format!("{}\0{}\0Secondary\0", group, name)),
566            },
567        );
568        let (reader, writer) = mock.into_io();
569        let mut client = Client::new(reader, writer);
570        let response = client.read_setting(group, name).unwrap().unwrap();
571        assert_eq!(
572            response.value,
573            Some(SettingValue::String("Secondary".into()))
574        );
575    }
576
577    #[derive(Clone)]
578    struct Mock {
579        stream: mockstream::SyncMockStream,
580        write_errors: u16,
581    }
582
583    impl Mock {
584        fn new() -> Self {
585            Self {
586                stream: mockstream::SyncMockStream::new(),
587                write_errors: 0,
588            }
589        }
590
591        fn with_errors(write_errors: u16) -> Self {
592            Self {
593                stream: mockstream::SyncMockStream::new(),
594                write_errors,
595            }
596        }
597
598        fn req_reply(&mut self, req: impl SbpMessage, res: impl SbpMessage) {
599            self.reqs_reply(&[req], res)
600        }
601
602        fn reqs_reply(&mut self, reqs: &[impl SbpMessage], res: impl SbpMessage) {
603            let bytes: Vec<_> = reqs
604                .iter()
605                .flat_map(|req| sbp::to_vec(req).unwrap())
606                .collect();
607            self.stream.wait_for(bytes.as_ref());
608            let bytes = sbp::to_vec(&res).unwrap();
609            self.stream.push_bytes_to_read(bytes.as_ref());
610        }
611
612        fn into_io(self) -> (impl io::Read, impl io::Write) {
613            (self.clone(), self)
614        }
615    }
616
617    impl Read for Mock {
618        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
619            self.stream.read(buf)
620        }
621    }
622
623    impl Write for Mock {
624        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
625            if self.write_errors > 0 {
626                self.write_errors -= 1;
627                Err(io::Error::new(io::ErrorKind::Other, "error"))
628            } else {
629                self.stream.write(buf)
630            }
631        }
632
633        fn flush(&mut self) -> io::Result<()> {
634            self.stream.flush()
635        }
636    }
637}