converters/
lib.rs

1use std::io::{Read, Write};
2
3use sbp::{
4    json::{Json2JsonEncoder, JsonEncoder},
5    HandleParseError, SbpEncoder,
6};
7use serde_json::ser::Formatter;
8
9pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
10
11#[derive(Copy, Clone, Debug)]
12pub enum ErrorHandlerOptions {
13    ReturnOnFirstError,
14    FilterOutErrors,
15    CoerceErrorsToInvalidMsg,
16}
17
18impl std::str::FromStr for ErrorHandlerOptions {
19    type Err = String;
20
21    fn from_str(s: &str) -> std::result::Result<ErrorHandlerOptions, Self::Err> {
22        match s {
23            s if s.eq_ignore_ascii_case("Return") => Ok(ErrorHandlerOptions::ReturnOnFirstError),
24            s if s.eq_ignore_ascii_case("Skip") => Ok(ErrorHandlerOptions::FilterOutErrors),
25            s if s.eq_ignore_ascii_case("ToInvalid")
26                | s.eq_ignore_ascii_case("to-invalid")
27                | s.eq_ignore_ascii_case("to_invalid") =>
28            {
29                Ok(ErrorHandlerOptions::CoerceErrorsToInvalidMsg)
30            }
31
32            s => Err(format!(
33                "Unable to cast option {s} to a valid error handler option. \
34                    Valid options are 'return', 'skip', & 'to-invalid')",
35            )),
36        }
37    }
38}
39
40/// Impl from bool for backwards compatibility
41impl From<bool> for ErrorHandlerOptions {
42    fn from(bool_flag: bool) -> Self {
43        if bool_flag {
44            Self::ReturnOnFirstError
45        } else {
46            Self::FilterOutErrors
47        }
48    }
49}
50
51/// This struct is a wrapper of an iter that returns messages or errors.
52pub struct ErrorHandler<'a, T, E: std::error::Error + Send + Sync + HandleParseError<T>> {
53    inner_iter: Box<dyn Iterator<Item = std::result::Result<T, E>> + 'a>,
54    opts: ErrorHandlerOptions,
55}
56
57impl<'a, T, E: std::error::Error + Send + Sync + HandleParseError<T>> ErrorHandler<'a, T, E> {
58    pub fn new(
59        inner_iter: impl Iterator<Item = std::result::Result<T, E>> + 'a,
60        opts: ErrorHandlerOptions,
61    ) -> Self {
62        Self {
63            inner_iter: Box::new(inner_iter),
64            opts,
65        }
66    }
67}
68
69impl<M, E: std::error::Error + Send + Sync + HandleParseError<M>> Iterator
70    for ErrorHandler<'_, M, E>
71{
72    type Item = M;
73
74    fn next(&mut self) -> Option<M> {
75        match (self.opts, self.inner_iter.next()) {
76            (_, None) => None,
77            (_, Some(Ok(x))) => Some(x),
78            (ErrorHandlerOptions::ReturnOnFirstError, Some(Err(e))) => {
79                eprintln!("{e}");
80                None
81            }
82            (ErrorHandlerOptions::FilterOutErrors, Some(Err(e))) => {
83                eprintln!("{e}");
84                self.next()
85            }
86            (ErrorHandlerOptions::CoerceErrorsToInvalidMsg, Some(Err(e))) => {
87                eprintln!("{e}");
88                Some(e.handle_parse_error())
89            }
90        }
91    }
92}
93
94pub fn json2sbp<R, W>(
95    input: R,
96    output: W,
97    buffered: bool,
98    error_handler_opt: impl Into<ErrorHandlerOptions>,
99) -> Result<()>
100where
101    R: Read,
102    W: Write,
103{
104    let source = ErrorHandler::new(sbp::json::iter_messages(input), error_handler_opt.into());
105    let mut sink = SbpEncoder::new(output);
106    if buffered {
107        sink.send_all(source)?;
108    } else {
109        for msg in source {
110            sink.send(&msg)?;
111        }
112    }
113    Ok(())
114}
115
116pub fn jsonfields2sbp<R, W>(
117    input: R,
118    output: W,
119    buffered: bool,
120    error_handler_opt: impl Into<ErrorHandlerOptions>,
121) -> Result<()>
122where
123    R: Read,
124    W: Write,
125{
126    let source = ErrorHandler::new(
127        sbp::json::iter_messages_from_fields(input),
128        error_handler_opt.into(),
129    );
130    let mut sink = SbpEncoder::new(output);
131    if buffered {
132        sink.send_all(source)?;
133    } else {
134        for msg in source {
135            sink.send(&msg)?;
136        }
137    }
138    Ok(())
139}
140
141pub fn json2json<R, W, F>(
142    input: R,
143    output: W,
144    formatter: F,
145    buffered: bool,
146    error_handler_opt: impl Into<ErrorHandlerOptions>,
147) -> Result<()>
148where
149    R: Read,
150    W: Write,
151    F: Formatter + Clone,
152{
153    let opt = error_handler_opt.into();
154    if matches!(opt, ErrorHandlerOptions::CoerceErrorsToInvalidMsg) {
155        unimplemented!(
156            "We do not yet support coverting to invalid messages \
157        in json2json."
158        );
159    }
160    let source = ErrorHandler::new(sbp::json::iter_json2json_messages(input), opt);
161    let mut sink = Json2JsonEncoder::new(output, formatter);
162    if buffered {
163        sink.send_all(source)?;
164    } else {
165        for msg in source {
166            sink.send(msg)?;
167        }
168    }
169    Ok(())
170}
171
172pub fn sbp2json<R, W, F>(
173    input: R,
174    output: W,
175    formatter: F,
176    buffered: bool,
177    error_handler_opt: impl Into<ErrorHandlerOptions>,
178) -> Result<()>
179where
180    R: Read,
181    W: Write,
182    F: Formatter + Clone,
183{
184    let source = ErrorHandler::new(sbp::iter_messages(input), error_handler_opt.into());
185    let mut sink = JsonEncoder::new(output, formatter);
186    if buffered {
187        sink.send_all(source)?;
188    } else {
189        for msg in source {
190            sink.send(&msg)?;
191        }
192    }
193    Ok(())
194}