1use std::io::{Read, Write};
2
3use sbp::{
4 json::{Json2JsonEncoder, JsonEncoder},
5 HandleParseError, SbpEncoder,
6};
7use serde_json::ser::Formatter;
8
9pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
10
11#[derive(Copy, Clone, Debug)]
12pub enum ErrorHandlerOptions {
13 ReturnOnFirstError,
14 FilterOutErrors,
15 CoerceErrorsToInvalidMsg,
16}
17
18impl std::str::FromStr for ErrorHandlerOptions {
19 type Err = String;
20
21 fn from_str(s: &str) -> std::result::Result<ErrorHandlerOptions, Self::Err> {
22 match s {
23 s if s.eq_ignore_ascii_case("Return") => Ok(ErrorHandlerOptions::ReturnOnFirstError),
24 s if s.eq_ignore_ascii_case("Skip") => Ok(ErrorHandlerOptions::FilterOutErrors),
25 s if s.eq_ignore_ascii_case("ToInvalid")
26 | s.eq_ignore_ascii_case("to-invalid")
27 | s.eq_ignore_ascii_case("to_invalid") =>
28 {
29 Ok(ErrorHandlerOptions::CoerceErrorsToInvalidMsg)
30 }
31
32 s => Err(format!(
33 "Unable to cast option {s} to a valid error handler option. \
34 Valid options are 'return', 'skip', & 'to-invalid')",
35 )),
36 }
37 }
38}
39
40impl From<bool> for ErrorHandlerOptions {
42 fn from(bool_flag: bool) -> Self {
43 if bool_flag {
44 Self::ReturnOnFirstError
45 } else {
46 Self::FilterOutErrors
47 }
48 }
49}
50
51pub struct ErrorHandler<'a, T, E: std::error::Error + Send + Sync + HandleParseError<T>> {
53 inner_iter: Box<dyn Iterator<Item = std::result::Result<T, E>> + 'a>,
54 opts: ErrorHandlerOptions,
55}
56
57impl<'a, T, E: std::error::Error + Send + Sync + HandleParseError<T>> ErrorHandler<'a, T, E> {
58 pub fn new(
59 inner_iter: impl Iterator<Item = std::result::Result<T, E>> + 'a,
60 opts: ErrorHandlerOptions,
61 ) -> Self {
62 Self {
63 inner_iter: Box::new(inner_iter),
64 opts,
65 }
66 }
67}
68
69impl<M, E: std::error::Error + Send + Sync + HandleParseError<M>> Iterator
70 for ErrorHandler<'_, M, E>
71{
72 type Item = M;
73
74 fn next(&mut self) -> Option<M> {
75 match (self.opts, self.inner_iter.next()) {
76 (_, None) => None,
77 (_, Some(Ok(x))) => Some(x),
78 (ErrorHandlerOptions::ReturnOnFirstError, Some(Err(e))) => {
79 eprintln!("{e}");
80 None
81 }
82 (ErrorHandlerOptions::FilterOutErrors, Some(Err(e))) => {
83 eprintln!("{e}");
84 self.next()
85 }
86 (ErrorHandlerOptions::CoerceErrorsToInvalidMsg, Some(Err(e))) => {
87 eprintln!("{e}");
88 Some(e.handle_parse_error())
89 }
90 }
91 }
92}
93
94pub fn json2sbp<R, W>(
95 input: R,
96 output: W,
97 buffered: bool,
98 error_handler_opt: impl Into<ErrorHandlerOptions>,
99) -> Result<()>
100where
101 R: Read,
102 W: Write,
103{
104 let source = ErrorHandler::new(sbp::json::iter_messages(input), error_handler_opt.into());
105 let mut sink = SbpEncoder::new(output);
106 if buffered {
107 sink.send_all(source)?;
108 } else {
109 for msg in source {
110 sink.send(&msg)?;
111 }
112 }
113 Ok(())
114}
115
116pub fn jsonfields2sbp<R, W>(
117 input: R,
118 output: W,
119 buffered: bool,
120 error_handler_opt: impl Into<ErrorHandlerOptions>,
121) -> Result<()>
122where
123 R: Read,
124 W: Write,
125{
126 let source = ErrorHandler::new(
127 sbp::json::iter_messages_from_fields(input),
128 error_handler_opt.into(),
129 );
130 let mut sink = SbpEncoder::new(output);
131 if buffered {
132 sink.send_all(source)?;
133 } else {
134 for msg in source {
135 sink.send(&msg)?;
136 }
137 }
138 Ok(())
139}
140
141pub fn json2json<R, W, F>(
142 input: R,
143 output: W,
144 formatter: F,
145 buffered: bool,
146 error_handler_opt: impl Into<ErrorHandlerOptions>,
147) -> Result<()>
148where
149 R: Read,
150 W: Write,
151 F: Formatter + Clone,
152{
153 let opt = error_handler_opt.into();
154 if matches!(opt, ErrorHandlerOptions::CoerceErrorsToInvalidMsg) {
155 unimplemented!(
156 "We do not yet support coverting to invalid messages \
157 in json2json."
158 );
159 }
160 let source = ErrorHandler::new(sbp::json::iter_json2json_messages(input), opt);
161 let mut sink = Json2JsonEncoder::new(output, formatter);
162 if buffered {
163 sink.send_all(source)?;
164 } else {
165 for msg in source {
166 sink.send(msg)?;
167 }
168 }
169 Ok(())
170}
171
172pub fn sbp2json<R, W, F>(
173 input: R,
174 output: W,
175 formatter: F,
176 buffered: bool,
177 error_handler_opt: impl Into<ErrorHandlerOptions>,
178) -> Result<()>
179where
180 R: Read,
181 W: Write,
182 F: Formatter + Clone,
183{
184 let source = ErrorHandler::new(sbp::iter_messages(input), error_handler_opt.into());
185 let mut sink = JsonEncoder::new(output, formatter);
186 if buffered {
187 sink.send_all(source)?;
188 } else {
189 for msg in source {
190 sink.send(&msg)?;
191 }
192 }
193 Ok(())
194}