1use std::io::{Read, Write};
2
3use sbp::{
4    json::{Json2JsonEncoder, JsonEncoder},
5    HandleParseError, SbpEncoder,
6};
7use serde_json::ser::Formatter;
8
9pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
10
11#[derive(Copy, Clone, Debug)]
12pub enum ErrorHandlerOptions {
13    ReturnOnFirstError,
14    FilterOutErrors,
15    CoerceErrorsToInvalidMsg,
16}
17
18impl std::str::FromStr for ErrorHandlerOptions {
19    type Err = String;
20
21    fn from_str(s: &str) -> std::result::Result<ErrorHandlerOptions, Self::Err> {
22        match s {
23            s if s.eq_ignore_ascii_case("Return") => Ok(ErrorHandlerOptions::ReturnOnFirstError),
24            s if s.eq_ignore_ascii_case("Skip") => Ok(ErrorHandlerOptions::FilterOutErrors),
25            s if s.eq_ignore_ascii_case("ToInvalid")
26                | s.eq_ignore_ascii_case("to-invalid")
27                | s.eq_ignore_ascii_case("to_invalid") =>
28            {
29                Ok(ErrorHandlerOptions::CoerceErrorsToInvalidMsg)
30            }
31
32            s => Err(format!(
33                "Unable to cast option {} to a valid error handler option. \
34                    Valid options are 'return', 'skip', & 'to-invalid')",
35                s
36            )),
37        }
38    }
39}
40
41impl From<bool> for ErrorHandlerOptions {
43    fn from(bool_flag: bool) -> Self {
44        if bool_flag {
45            Self::ReturnOnFirstError
46        } else {
47            Self::FilterOutErrors
48        }
49    }
50}
51
52pub struct ErrorHandler<'a, T, E: std::error::Error + Send + Sync + HandleParseError<T>> {
54    inner_iter: Box<dyn Iterator<Item = std::result::Result<T, E>> + 'a>,
55    opts: ErrorHandlerOptions,
56}
57
58impl<'a, T, E: std::error::Error + Send + Sync + HandleParseError<T>> ErrorHandler<'a, T, E> {
59    pub fn new(
60        inner_iter: impl Iterator<Item = std::result::Result<T, E>> + 'a,
61        opts: ErrorHandlerOptions,
62    ) -> Self {
63        Self {
64            inner_iter: Box::new(inner_iter),
65            opts,
66        }
67    }
68}
69
70impl<M, E: std::error::Error + Send + Sync + HandleParseError<M>> Iterator
71    for ErrorHandler<'_, M, E>
72{
73    type Item = M;
74
75    fn next(&mut self) -> Option<M> {
76        match (self.opts, self.inner_iter.next()) {
77            (_, None) => None,
78            (_, Some(Ok(x))) => Some(x),
79            (ErrorHandlerOptions::ReturnOnFirstError, Some(Err(e))) => {
80                eprintln!("{e}");
81                None
82            }
83            (ErrorHandlerOptions::FilterOutErrors, Some(Err(e))) => {
84                eprintln!("{e}");
85                self.next()
86            }
87            (ErrorHandlerOptions::CoerceErrorsToInvalidMsg, Some(Err(e))) => {
88                eprintln!("{e}");
89                Some(e.handle_parse_error())
90            }
91        }
92    }
93}
94
95pub fn json2sbp<R, W>(
96    input: R,
97    output: W,
98    buffered: bool,
99    error_handler_opt: impl Into<ErrorHandlerOptions>,
100) -> Result<()>
101where
102    R: Read,
103    W: Write,
104{
105    let source = ErrorHandler::new(sbp::json::iter_messages(input), error_handler_opt.into());
106    let mut sink = SbpEncoder::new(output);
107    if buffered {
108        sink.send_all(source)?;
109    } else {
110        for msg in source {
111            sink.send(&msg)?;
112        }
113    }
114    Ok(())
115}
116
117pub fn jsonfields2sbp<R, W>(
118    input: R,
119    output: W,
120    buffered: bool,
121    error_handler_opt: impl Into<ErrorHandlerOptions>,
122) -> Result<()>
123where
124    R: Read,
125    W: Write,
126{
127    let source = ErrorHandler::new(
128        sbp::json::iter_messages_from_fields(input),
129        error_handler_opt.into(),
130    );
131    let mut sink = SbpEncoder::new(output);
132    if buffered {
133        sink.send_all(source)?;
134    } else {
135        for msg in source {
136            sink.send(&msg)?;
137        }
138    }
139    Ok(())
140}
141
142pub fn json2json<R, W, F>(
143    input: R,
144    output: W,
145    formatter: F,
146    buffered: bool,
147    error_handler_opt: impl Into<ErrorHandlerOptions>,
148) -> Result<()>
149where
150    R: Read,
151    W: Write,
152    F: Formatter + Clone,
153{
154    let opt = error_handler_opt.into();
155    if matches!(opt, ErrorHandlerOptions::CoerceErrorsToInvalidMsg) {
156        unimplemented!(
157            "We do not yet support coverting to invalid messages \
158        in json2json."
159        );
160    }
161    let source = ErrorHandler::new(sbp::json::iter_json2json_messages(input), opt);
162    let mut sink = Json2JsonEncoder::new(output, formatter);
163    if buffered {
164        sink.send_all(source)?;
165    } else {
166        for msg in source {
167            sink.send(msg)?;
168        }
169    }
170    Ok(())
171}
172
173pub fn sbp2json<R, W, F>(
174    input: R,
175    output: W,
176    formatter: F,
177    buffered: bool,
178    error_handler_opt: impl Into<ErrorHandlerOptions>,
179) -> Result<()>
180where
181    R: Read,
182    W: Write,
183    F: Formatter + Clone,
184{
185    let source = ErrorHandler::new(sbp::iter_messages(input), error_handler_opt.into());
186    let mut sink = JsonEncoder::new(output, formatter);
187    if buffered {
188        sink.send_all(source)?;
189    } else {
190        for msg in source {
191            sink.send(&msg)?;
192        }
193    }
194    Ok(())
195}