1use std::io::{Read, Write};
2
3use sbp::{
4 json::{Json2JsonEncoder, JsonEncoder},
5 HandleParseError, SbpEncoder,
6};
7use serde_json::ser::Formatter;
8
9pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
10
11#[derive(Copy, Clone, Debug)]
12pub enum ErrorHandlerOptions {
13 ReturnOnFirstError,
14 FilterOutErrors,
15 CoerceErrorsToInvalidMsg,
16}
17
18impl std::str::FromStr for ErrorHandlerOptions {
19 type Err = String;
20
21 fn from_str(s: &str) -> std::result::Result<ErrorHandlerOptions, Self::Err> {
22 match s {
23 s if s.eq_ignore_ascii_case("Return") => Ok(ErrorHandlerOptions::ReturnOnFirstError),
24 s if s.eq_ignore_ascii_case("Skip") => Ok(ErrorHandlerOptions::FilterOutErrors),
25 s if s.eq_ignore_ascii_case("ToInvalid")
26 | s.eq_ignore_ascii_case("to-invalid")
27 | s.eq_ignore_ascii_case("to_invalid") =>
28 {
29 Ok(ErrorHandlerOptions::CoerceErrorsToInvalidMsg)
30 }
31
32 s => Err(format!(
33 "Unable to cast option {} to a valid error handler option. \
34 Valid options are 'return', 'skip', & 'to-invalid')",
35 s
36 )),
37 }
38 }
39}
40
41impl From<bool> for ErrorHandlerOptions {
43 fn from(bool_flag: bool) -> Self {
44 if bool_flag {
45 Self::ReturnOnFirstError
46 } else {
47 Self::FilterOutErrors
48 }
49 }
50}
51
52pub struct ErrorHandler<'a, T, E: std::error::Error + Send + Sync + HandleParseError<T>> {
54 inner_iter: Box<dyn Iterator<Item = std::result::Result<T, E>> + 'a>,
55 opts: ErrorHandlerOptions,
56}
57
58impl<'a, T, E: std::error::Error + Send + Sync + HandleParseError<T>> ErrorHandler<'a, T, E> {
59 pub fn new(
60 inner_iter: impl Iterator<Item = std::result::Result<T, E>> + 'a,
61 opts: ErrorHandlerOptions,
62 ) -> Self {
63 Self {
64 inner_iter: Box::new(inner_iter),
65 opts,
66 }
67 }
68}
69
70impl<M, E: std::error::Error + Send + Sync + HandleParseError<M>> Iterator
71 for ErrorHandler<'_, M, E>
72{
73 type Item = M;
74
75 fn next(&mut self) -> Option<M> {
76 match (self.opts, self.inner_iter.next()) {
77 (_, None) => None,
78 (_, Some(Ok(x))) => Some(x),
79 (ErrorHandlerOptions::ReturnOnFirstError, Some(Err(e))) => {
80 eprintln!("{e}");
81 None
82 }
83 (ErrorHandlerOptions::FilterOutErrors, Some(Err(e))) => {
84 eprintln!("{e}");
85 self.next()
86 }
87 (ErrorHandlerOptions::CoerceErrorsToInvalidMsg, Some(Err(e))) => {
88 eprintln!("{e}");
89 Some(e.handle_parse_error())
90 }
91 }
92 }
93}
94
95pub fn json2sbp<R, W>(
96 input: R,
97 output: W,
98 buffered: bool,
99 error_handler_opt: impl Into<ErrorHandlerOptions>,
100) -> Result<()>
101where
102 R: Read,
103 W: Write,
104{
105 let source = ErrorHandler::new(sbp::json::iter_messages(input), error_handler_opt.into());
106 let mut sink = SbpEncoder::new(output);
107 if buffered {
108 sink.send_all(source)?;
109 } else {
110 for msg in source {
111 sink.send(&msg)?;
112 }
113 }
114 Ok(())
115}
116
117pub fn jsonfields2sbp<R, W>(
118 input: R,
119 output: W,
120 buffered: bool,
121 error_handler_opt: impl Into<ErrorHandlerOptions>,
122) -> Result<()>
123where
124 R: Read,
125 W: Write,
126{
127 let source = ErrorHandler::new(
128 sbp::json::iter_messages_from_fields(input),
129 error_handler_opt.into(),
130 );
131 let mut sink = SbpEncoder::new(output);
132 if buffered {
133 sink.send_all(source)?;
134 } else {
135 for msg in source {
136 sink.send(&msg)?;
137 }
138 }
139 Ok(())
140}
141
142pub fn json2json<R, W, F>(
143 input: R,
144 output: W,
145 formatter: F,
146 buffered: bool,
147 error_handler_opt: impl Into<ErrorHandlerOptions>,
148) -> Result<()>
149where
150 R: Read,
151 W: Write,
152 F: Formatter + Clone,
153{
154 let opt = error_handler_opt.into();
155 if matches!(opt, ErrorHandlerOptions::CoerceErrorsToInvalidMsg) {
156 unimplemented!(
157 "We do not yet support coverting to invalid messages \
158 in json2json."
159 );
160 }
161 let source = ErrorHandler::new(sbp::json::iter_json2json_messages(input), opt);
162 let mut sink = Json2JsonEncoder::new(output, formatter);
163 if buffered {
164 sink.send_all(source)?;
165 } else {
166 for msg in source {
167 sink.send(msg)?;
168 }
169 }
170 Ok(())
171}
172
173pub fn sbp2json<R, W, F>(
174 input: R,
175 output: W,
176 formatter: F,
177 buffered: bool,
178 error_handler_opt: impl Into<ErrorHandlerOptions>,
179) -> Result<()>
180where
181 R: Read,
182 W: Write,
183 F: Formatter + Clone,
184{
185 let source = ErrorHandler::new(sbp::iter_messages(input), error_handler_opt.into());
186 let mut sink = JsonEncoder::new(output, formatter);
187 if buffered {
188 sink.send_all(source)?;
189 } else {
190 for msg in source {
191 sink.send(&msg)?;
192 }
193 }
194 Ok(())
195}