Skip to main content

greptimedb_ingester/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io;
16
17use snafu::{Location, Snafu};
18use tonic::{metadata::errors::InvalidMetadataValue, Status};
19
20#[derive(Debug, Snafu)]
21#[snafu(visibility(pub))]
22pub enum Error {
23    #[snafu(display("Invalid client tls config, {}", msg))]
24    InvalidTlsConfig {
25        msg: String,
26        #[snafu(implicit)]
27        location: Location,
28    },
29
30    #[snafu(display("Invalid config file path, {}", source))]
31    InvalidConfigFilePath {
32        source: io::Error,
33        #[snafu(implicit)]
34        location: Location,
35    },
36
37    #[snafu(display("Failed to create gRPC channel"))]
38    CreateChannel {
39        #[snafu(implicit)]
40        location: Location,
41        source: tonic::transport::Error,
42    },
43
44    #[snafu(display("Illegal gRPC client state: {}", err_msg))]
45    IllegalGrpcClientState {
46        err_msg: String,
47        #[snafu(implicit)]
48        location: Location,
49    },
50
51    #[snafu(display("Missing required field in protobuf, field: {}", field))]
52    MissingField {
53        field: String,
54        #[snafu(implicit)]
55        location: Location,
56    },
57
58    // Server error carried in Tonic Status's metadata.
59    #[snafu(display("{}", msg))]
60    Server { status: Box<Status>, msg: String },
61
62    #[snafu(display("Illegal Database response: {err_msg}"))]
63    IllegalDatabaseResponse {
64        err_msg: String,
65        #[snafu(implicit)]
66        location: Location,
67    },
68
69    #[snafu(display("Invalid Tonic metadata value"))]
70    InvalidTonicMetadataValue {
71        #[snafu(source)]
72        error: InvalidMetadataValue,
73        #[snafu(implicit)]
74        location: Location,
75    },
76
77    #[snafu(display("Failed to serde Json"))]
78    SerdeJson {
79        #[snafu(source)]
80        error: serde_json::error::Error,
81        #[snafu(implicit)]
82        location: Location,
83    },
84
85    #[snafu(display("Failed to create Arrow RecordBatch"))]
86    CreateRecordBatch {
87        #[snafu(source)]
88        error: arrow_schema::ArrowError,
89        #[snafu(implicit)]
90        location: Location,
91    },
92
93    #[snafu(display("Unsupported data type: {:?}", data_type))]
94    UnsupportedDataType {
95        data_type: String,
96        #[snafu(implicit)]
97        location: Location,
98    },
99
100    #[snafu(display("Failed to serialize metadata"))]
101    SerializeMetadata {
102        #[snafu(source)]
103        error: serde_json::Error,
104        #[snafu(implicit)]
105        location: Location,
106    },
107
108    #[snafu(display("Failed to send data to stream: {}", source))]
109    SendData {
110        source: futures::channel::mpsc::SendError,
111        #[snafu(implicit)]
112        location: Location,
113    },
114
115    #[snafu(display("Response stream ended unexpectedly"))]
116    StreamEnded {
117        #[snafu(implicit)]
118        location: Location,
119    },
120
121    #[snafu(display(
122        "Response stream ended unexpectedly with pending requests: {:?}",
123        request_ids
124    ))]
125    StreamEndedWithPendingRequests {
126        request_ids: Vec<i64>,
127        #[snafu(implicit)]
128        location: Location,
129    },
130
131    #[snafu(display(
132        "Request timeout after {:?} for request IDs: {:?}",
133        timeout,
134        request_ids
135    ))]
136    RequestTimeout {
137        request_ids: Vec<i64>,
138        timeout: std::time::Duration,
139        #[snafu(implicit)]
140        location: Location,
141    },
142
143    #[snafu(display(
144        "Schema mismatch: BulkStreamWriter expects schema {} but got {}",
145        expected,
146        actual
147    ))]
148    SchemaMismatch {
149        expected: String,
150        actual: String,
151        #[snafu(implicit)]
152        location: Location,
153    },
154
155    #[snafu(display("Invalid column count: expected {}, got {}", expected, actual))]
156    InvalidColumnCount {
157        expected: usize,
158        actual: usize,
159        #[snafu(implicit)]
160        location: Location,
161    },
162
163    #[snafu(display("Invalid column index: {}, total columns: {}", index, total))]
164    InvalidColumnIndex {
165        index: usize,
166        total: usize,
167        #[snafu(implicit)]
168        location: Location,
169    },
170
171    #[snafu(display("Cannot write empty rows"))]
172    EmptyRows {
173        #[snafu(implicit)]
174        location: Location,
175    },
176
177    #[snafu(display("Cannot add row to a Rows object that was created from a RecordBatch"))]
178    AddRowToBuiltBatch {
179        #[snafu(implicit)]
180        location: Location,
181    },
182
183    #[snafu(display("Rows created from RecordBatch cannot have unflushed rows"))]
184    UnflushedRows {
185        #[snafu(implicit)]
186        location: Location,
187    },
188}
189
190pub type Result<T> = std::result::Result<T, Error>;
191
192pub const INNER_ERROR_MSG: &str = "INNER_ERROR_MSG";
193
194impl From<Status> for Error {
195    fn from(e: Status) -> Self {
196        fn get_metadata_value(e: &Status, key: &str) -> Option<String> {
197            e.metadata()
198                .get(key)
199                .and_then(|v| String::from_utf8(v.as_bytes().to_vec()).ok())
200        }
201
202        let msg = get_metadata_value(&e, INNER_ERROR_MSG).unwrap_or(e.to_string());
203
204        Self::Server {
205            status: Box::new(e),
206            msg,
207        }
208    }
209}
210
211impl Error {
212    /// Indicate if the error is retriable
213    pub fn is_retriable(&self) -> bool {
214        !matches!(
215            self,
216            Self::InvalidTlsConfig { .. }
217                | Self::MissingField { .. }
218                | Self::InvalidConfigFilePath { .. }
219        )
220    }
221}