qdrant_edge/segment/common/
operation_error.rs1use std::backtrace::Backtrace;
2use std::collections::TryReserveError;
3use std::io::{Error as IoError, ErrorKind};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::time::Duration;
6
7use atomicwrites::Error as AtomicIoError;
8use crate::common::fs::FileStorageError;
9use crate::common::mmap::Error as MmapError;
10use crate::common::universal_io::UniversalIoError;
11use crate::gridstore::error::GridstoreError;
12use rayon::ThreadPoolBuildError;
13use thiserror::Error;
14
15use crate::segment::types::{PayloadKeyType, PointIdType, SeqNumberType, VectorNameBuf};
16use crate::segment::utils::mem::Mem;
17
18pub const PROCESS_CANCELLED_BY_SERVICE_MESSAGE: &str = "process cancelled by service";
19
20#[derive(Error, Debug, Clone, PartialEq)]
21#[error("{0}")]
22pub enum OperationError {
23 #[error("Vector dimension error: expected dim: {expected_dim}, got {received_dim}")]
24 WrongVectorDimension {
25 expected_dim: usize,
26 received_dim: usize,
27 },
28 #[error("Not existing vector name error: {received_name}")]
29 VectorNameNotExists { received_name: VectorNameBuf },
30 #[error("No point with id {missed_point_id}")]
31 PointIdError { missed_point_id: PointIdType },
32 #[error(
33 "Payload type does not match with previously given for field {field_name}. Expected: {expected_type}"
34 )]
35 TypeError {
36 field_name: PayloadKeyType,
37 expected_type: String,
38 },
39 #[error("Unable to infer type for the field '{field_name}'. Please specify `field_type`")]
40 TypeInferenceError { field_name: PayloadKeyType },
41 #[error("Service runtime error: {description}")]
44 ServiceError {
45 description: String,
46 backtrace: Option<String>,
47 },
48 #[error("Inconsistent storage: {description}")]
49 InconsistentStorage { description: String },
50 #[error("Out of memory, free: {free}, {description}")]
51 OutOfMemory { description: String, free: u64 },
52 #[error("Operation cancelled: {description}")]
53 Cancelled { description: String },
54 #[error("Timeout error: {description}")]
55 Timeout { description: String },
56 #[error("Validation failed: {description}")]
57 ValidationError { description: String },
58 #[error("Wrong usage of sparse vectors")]
59 WrongSparse,
60 #[error("Wrong usage of multi vectors")]
61 WrongMulti,
62 #[error(
63 "No range index for `order_by` key: `{key}`. Please create one to use `order_by`. Check https://qdrant.tech/documentation/concepts/indexing/#payload-index to see which payload schemas support Range conditions"
64 )]
65 MissingRangeIndexForOrderBy { key: String },
66 #[error(
67 "No appropriate index for faceting: `{key}`. Please create one to facet on this field. Check https://qdrant.tech/documentation/concepts/indexing/#payload-index to see which payload schemas support Match conditions"
68 )]
69 MissingMapIndexForFacet { key: String },
70 #[error(
71 "Expected {expected_type} value for {field_name} in the payload and/or in the formula defaults. Error: {description}"
72 )]
73 VariableTypeError {
74 field_name: PayloadKeyType,
75 expected_type: String,
76 description: String,
77 },
78 #[error("The expression {expression} produced a non-finite number")]
79 NonFiniteNumber { expression: String },
80
81 #[error("RocksDB column family {name} not found")]
83 RocksDbColumnFamilyNotFound { name: String },
84}
85
86impl OperationError {
87 pub fn service_error(description: impl Into<String>) -> Self {
90 Self::ServiceError {
91 description: description.into(),
92 backtrace: Some(Backtrace::force_capture().to_string()),
93 }
94 }
95
96 pub fn service_error_light(description: impl Into<String>) -> Self {
98 Self::ServiceError {
99 description: description.into(),
100 backtrace: None,
101 }
102 }
103
104 pub fn validation_error(description: impl Into<String>) -> Self {
105 Self::ValidationError {
106 description: description.into(),
107 }
108 }
109
110 pub fn inconsistent_storage(description: impl Into<String>) -> Self {
111 Self::InconsistentStorage {
112 description: description.into(),
113 }
114 }
115
116 pub fn cancelled(description: impl Into<String>) -> Self {
117 Self::Cancelled {
118 description: description.into(),
119 }
120 }
121
122 pub fn vector_name_not_exists(vector_name: impl Into<String>) -> Self {
123 Self::VectorNameNotExists {
124 received_name: vector_name.into(),
125 }
126 }
127
128 pub fn timeout(timeout: Duration, operation: impl Into<String>) -> Self {
129 Self::Timeout {
130 description: format!(
131 "Operation '{}' timed out after {timeout:?}",
132 operation.into(),
133 ),
134 }
135 }
136}
137
138#[derive(Debug, Clone)]
140pub struct SegmentFailedState {
141 pub version: SeqNumberType,
142 pub point_id: Option<PointIdType>,
143 pub error: OperationError,
144}
145
146impl From<ThreadPoolBuildError> for OperationError {
147 fn from(error: ThreadPoolBuildError) -> Self {
148 OperationError::ServiceError {
149 description: format!("{error}"),
150 backtrace: Some(Backtrace::force_capture().to_string()),
151 }
152 }
153}
154
155impl From<FileStorageError> for OperationError {
156 fn from(err: FileStorageError) -> Self {
157 Self::service_error(err.to_string())
158 }
159}
160
161impl From<MmapError> for OperationError {
162 fn from(err: MmapError) -> Self {
163 Self::service_error(err.to_string())
164 }
165}
166
167impl From<UniversalIoError> for OperationError {
168 fn from(err: UniversalIoError) -> Self {
169 match err {
170 UniversalIoError::Io(err) => OperationError::from(err),
171 UniversalIoError::Mmap(err) => OperationError::from(err),
172
173 UniversalIoError::IoUringNotSupported(_)
174 | UniversalIoError::NotFound { .. }
175 | UniversalIoError::OutOfBounds { .. }
176 | UniversalIoError::InvalidFileIndex { .. } => {
177 OperationError::service_error(err.to_string())
178 }
179 UniversalIoError::BytemuckCast(_) => OperationError::service_error(err.to_string()),
180 UniversalIoError::Uninitialized { .. } => {
181 OperationError::service_error(err.to_string())
182 }
183 }
184 }
185}
186
187impl From<serde_cbor::Error> for OperationError {
188 fn from(err: serde_cbor::Error) -> Self {
189 OperationError::service_error(format!("Failed to parse data: {err}"))
190 }
191}
192
193impl<E> From<AtomicIoError<E>> for OperationError {
194 fn from(err: AtomicIoError<E>) -> Self {
195 match err {
196 AtomicIoError::Internal(io_err) => OperationError::from(io_err),
197 AtomicIoError::User(_user_err) => {
198 OperationError::service_error("Unknown atomic write error")
199 }
200 }
201 }
202}
203
204impl From<IoError> for OperationError {
205 fn from(err: IoError) -> Self {
206 match err.kind() {
207 ErrorKind::OutOfMemory => {
208 let free_memory = Mem::new().available_memory_bytes();
209 OperationError::OutOfMemory {
210 description: format!("IO Error: {err}"),
211 free: free_memory,
212 }
213 }
214 _ => OperationError::service_error(format!("IO Error: {err}")),
215 }
216 }
217}
218
219impl From<serde_json::Error> for OperationError {
220 fn from(err: serde_json::Error) -> Self {
221 OperationError::service_error(format!("Json error: {err}"))
222 }
223}
224
225impl From<fs_extra::error::Error> for OperationError {
226 fn from(err: fs_extra::error::Error) -> Self {
227 OperationError::service_error(format!("File system error: {err}"))
228 }
229}
230
231impl From<geohash::GeohashError> for OperationError {
232 fn from(err: geohash::GeohashError) -> Self {
233 OperationError::service_error(format!("Geohash error: {err}"))
234 }
235}
236
237impl From<crate::quantization::EncodingError> for OperationError {
238 fn from(err: crate::quantization::EncodingError) -> Self {
239 match err {
240 crate::quantization::EncodingError::IOError(err)
241 | crate::quantization::EncodingError::EncodingError(err)
242 | crate::quantization::EncodingError::ArgumentsError(err) => {
243 OperationError::service_error(format!("Quantization encoding error: {err}"))
244 }
245 crate::quantization::EncodingError::Stopped => OperationError::Cancelled {
246 description: PROCESS_CANCELLED_BY_SERVICE_MESSAGE.to_string(),
247 },
248 }
249 }
250}
251
252impl From<TryReserveError> for OperationError {
253 fn from(err: TryReserveError) -> Self {
254 let free_memory = Mem::new().available_memory_bytes();
255 OperationError::OutOfMemory {
256 description: format!("Failed to reserve memory: {err}"),
257 free: free_memory,
258 }
259 }
260}
261
262impl From<GridstoreError> for OperationError {
263 fn from(err: GridstoreError) -> Self {
264 match err {
265 GridstoreError::ServiceError { description } => {
266 Self::service_error(format!("Gridstore error: {description}"))
267 }
268 GridstoreError::FlushCancelled => Self::Cancelled {
269 description: "Gridstore flushing was cancelled".to_string(),
270 },
271 GridstoreError::Io(_) | GridstoreError::Mmap(_) | GridstoreError::SerdeJson(_) => {
272 Self::service_error(err.to_string())
273 }
274 GridstoreError::ValidationError { message } => Self::validation_error(message),
275 GridstoreError::UniversalIo(err) => {
276 Self::service_error(format!("Gridstore IO error: {err}"))
277 }
278 GridstoreError::PageNotFound { .. } => Self::service_error(err.to_string()),
279 }
280 }
281}
282
283#[cfg(feature = "gpu")]
284impl From<gpu::GpuError> for OperationError {
285 fn from(err: gpu::GpuError) -> Self {
286 Self::service_error(format!("GPU error: {err:?}"))
287 }
288}
289
290pub type OperationResult<T> = Result<T, OperationError>;
291
292pub fn get_service_error<T>(err: &OperationResult<T>) -> Option<OperationError> {
293 match err {
294 Ok(_) => None,
295 Err(error) => match error {
296 OperationError::ServiceError { .. } => Some(error.clone()),
297 _ => None,
298 },
299 }
300}
301
302#[derive(Debug, Copy, Clone)]
303pub struct CancelledError;
304
305pub type CancellableResult<T> = Result<T, CancelledError>;
306
307impl From<CancelledError> for OperationError {
308 fn from(CancelledError: CancelledError) -> Self {
309 OperationError::Cancelled {
310 description: PROCESS_CANCELLED_BY_SERVICE_MESSAGE.to_string(),
311 }
312 }
313}
314
315pub fn check_process_stopped(stopped: &AtomicBool) -> CancellableResult<()> {
316 if stopped.load(Ordering::Relaxed) {
317 return Err(CancelledError);
318 }
319 Ok(())
320}
321
322#[cfg(test)]
323mod tests {
324 use std::time::Duration;
325
326 use super::*;
327
328 #[test]
329 fn test_timeout_error_formatting() {
330 let timeout = Duration::from_millis(500);
332 let error = OperationError::timeout(timeout, "test operation");
333 let error_msg = format!("{error}");
334 assert!(
335 error_msg.contains("500ms"),
336 "Expected '500ms' but got: {error_msg}"
337 );
338
339 let timeout = Duration::from_millis(1000);
341 let error = OperationError::timeout(timeout, "test operation");
342 let error_msg = format!("{error}");
343 assert!(
344 error_msg.contains("1s"),
345 "Expected '1s' but got: {error_msg}"
346 );
347
348 let timeout = Duration::from_millis(2500);
350 let error = OperationError::timeout(timeout, "test operation");
351 let error_msg = format!("{error}");
352 assert!(
353 error_msg.contains("2.5s"),
354 "Expected '2.5s' but got: {error_msg}"
355 );
356
357 let timeout = Duration::from_millis(60000);
359 let error = OperationError::timeout(timeout, "test operation");
360 let error_msg = format!("{error}");
361 assert!(
362 error_msg.contains("60s"),
363 "Expected '60s' but got: {error_msg}"
364 );
365 }
366}