1use std::io;
16
17use snafu::{Location, Snafu};
18use tonic::{metadata::errors::InvalidMetadataValue, Status};
19
20#[derive(Debug, Snafu)]
21#[snafu(visibility(pub))]
22pub enum Error {
23 #[snafu(display("Invalid client tls config, {}", msg))]
24 InvalidTlsConfig {
25 msg: String,
26 #[snafu(implicit)]
27 location: Location,
28 },
29
30 #[snafu(display("Invalid config file path, {}", source))]
31 InvalidConfigFilePath {
32 source: io::Error,
33 #[snafu(implicit)]
34 location: Location,
35 },
36
37 #[snafu(display("Failed to create gRPC channel"))]
38 CreateChannel {
39 #[snafu(implicit)]
40 location: Location,
41 source: tonic::transport::Error,
42 },
43
44 #[snafu(display("Illegal gRPC client state: {}", err_msg))]
45 IllegalGrpcClientState {
46 err_msg: String,
47 #[snafu(implicit)]
48 location: Location,
49 },
50
51 #[snafu(display("Missing required field in protobuf, field: {}", field))]
52 MissingField {
53 field: String,
54 #[snafu(implicit)]
55 location: Location,
56 },
57
58 #[snafu(display("{}", msg))]
60 Server { status: Box<Status>, msg: String },
61
62 #[snafu(display("Illegal Database response: {err_msg}"))]
63 IllegalDatabaseResponse {
64 err_msg: String,
65 #[snafu(implicit)]
66 location: Location,
67 },
68
69 #[snafu(display("Invalid Tonic metadata value"))]
70 InvalidTonicMetadataValue {
71 #[snafu(source)]
72 error: InvalidMetadataValue,
73 #[snafu(implicit)]
74 location: Location,
75 },
76
77 #[snafu(display("Failed to serde Json"))]
78 SerdeJson {
79 #[snafu(source)]
80 error: serde_json::error::Error,
81 #[snafu(implicit)]
82 location: Location,
83 },
84
85 #[snafu(display("Failed to create Arrow RecordBatch"))]
86 CreateRecordBatch {
87 #[snafu(source)]
88 error: arrow_schema::ArrowError,
89 #[snafu(implicit)]
90 location: Location,
91 },
92
93 #[snafu(display("Unsupported data type: {:?}", data_type))]
94 UnsupportedDataType {
95 data_type: String,
96 #[snafu(implicit)]
97 location: Location,
98 },
99
100 #[snafu(display("Failed to serialize metadata"))]
101 SerializeMetadata {
102 #[snafu(source)]
103 error: serde_json::Error,
104 #[snafu(implicit)]
105 location: Location,
106 },
107
108 #[snafu(display("Failed to send data to stream: {}", source))]
109 SendData {
110 source: futures::channel::mpsc::SendError,
111 #[snafu(implicit)]
112 location: Location,
113 },
114
115 #[snafu(display("Response stream ended unexpectedly"))]
116 StreamEnded {
117 #[snafu(implicit)]
118 location: Location,
119 },
120
121 #[snafu(display(
122 "Response stream ended unexpectedly with pending requests: {:?}",
123 request_ids
124 ))]
125 StreamEndedWithPendingRequests {
126 request_ids: Vec<i64>,
127 #[snafu(implicit)]
128 location: Location,
129 },
130
131 #[snafu(display(
132 "Request timeout after {:?} for request IDs: {:?}",
133 timeout,
134 request_ids
135 ))]
136 RequestTimeout {
137 request_ids: Vec<i64>,
138 timeout: std::time::Duration,
139 #[snafu(implicit)]
140 location: Location,
141 },
142
143 #[snafu(display(
144 "Schema mismatch: BulkStreamWriter expects schema {} but got {}",
145 expected,
146 actual
147 ))]
148 SchemaMismatch {
149 expected: String,
150 actual: String,
151 #[snafu(implicit)]
152 location: Location,
153 },
154
155 #[snafu(display("Invalid column count: expected {}, got {}", expected, actual))]
156 InvalidColumnCount {
157 expected: usize,
158 actual: usize,
159 #[snafu(implicit)]
160 location: Location,
161 },
162
163 #[snafu(display("Invalid column index: {}, total columns: {}", index, total))]
164 InvalidColumnIndex {
165 index: usize,
166 total: usize,
167 #[snafu(implicit)]
168 location: Location,
169 },
170
171 #[snafu(display("Cannot write empty rows"))]
172 EmptyRows {
173 #[snafu(implicit)]
174 location: Location,
175 },
176
177 #[snafu(display("Cannot add row to a Rows object that was created from a RecordBatch"))]
178 AddRowToBuiltBatch {
179 #[snafu(implicit)]
180 location: Location,
181 },
182
183 #[snafu(display("Rows created from RecordBatch cannot have unflushed rows"))]
184 UnflushedRows {
185 #[snafu(implicit)]
186 location: Location,
187 },
188}
189
190pub type Result<T> = std::result::Result<T, Error>;
191
192pub const INNER_ERROR_MSG: &str = "INNER_ERROR_MSG";
193
194impl From<Status> for Error {
195 fn from(e: Status) -> Self {
196 fn get_metadata_value(e: &Status, key: &str) -> Option<String> {
197 e.metadata()
198 .get(key)
199 .and_then(|v| String::from_utf8(v.as_bytes().to_vec()).ok())
200 }
201
202 let msg = get_metadata_value(&e, INNER_ERROR_MSG).unwrap_or(e.to_string());
203
204 Self::Server {
205 status: Box::new(e),
206 msg,
207 }
208 }
209}
210
211impl Error {
212 pub fn is_retriable(&self) -> bool {
214 !matches!(
215 self,
216 Self::InvalidTlsConfig { .. }
217 | Self::MissingField { .. }
218 | Self::InvalidConfigFilePath { .. }
219 )
220 }
221}