Skip to main content

qdrant_edge/segment/common/
operation_error.rs

1use std::backtrace::Backtrace;
2use std::collections::TryReserveError;
3use std::io::{Error as IoError, ErrorKind};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::time::Duration;
6
7use atomicwrites::Error as AtomicIoError;
8use crate::common::fs::FileStorageError;
9use crate::common::mmap::Error as MmapError;
10use crate::common::universal_io::UniversalIoError;
11use crate::gridstore::error::GridstoreError;
12use rayon::ThreadPoolBuildError;
13use thiserror::Error;
14
15use crate::segment::types::{PayloadKeyType, PointIdType, SeqNumberType, VectorNameBuf};
16use crate::segment::utils::mem::Mem;
17
18pub const PROCESS_CANCELLED_BY_SERVICE_MESSAGE: &str = "process cancelled by service";
19
20#[derive(Error, Debug, Clone, PartialEq)]
21#[error("{0}")]
22pub enum OperationError {
23    #[error("Vector dimension error: expected dim: {expected_dim}, got {received_dim}")]
24    WrongVectorDimension {
25        expected_dim: usize,
26        received_dim: usize,
27    },
28    #[error("Not existing vector name error: {received_name}")]
29    VectorNameNotExists { received_name: VectorNameBuf },
30    #[error("No point with id {missed_point_id}")]
31    PointIdError { missed_point_id: PointIdType },
32    #[error(
33        "Payload type does not match with previously given for field {field_name}. Expected: {expected_type}"
34    )]
35    TypeError {
36        field_name: PayloadKeyType,
37        expected_type: String,
38    },
39    #[error("Unable to infer type for the field '{field_name}'. Please specify `field_type`")]
40    TypeInferenceError { field_name: PayloadKeyType },
41    /// Service Error prevents further update of the collection until it is fixed.
42    /// Should only be used for hardware, data corruption, IO, or other unexpected internal errors.
43    #[error("Service runtime error: {description}")]
44    ServiceError {
45        description: String,
46        backtrace: Option<String>,
47    },
48    #[error("Inconsistent storage: {description}")]
49    InconsistentStorage { description: String },
50    #[error("Out of memory, free: {free}, {description}")]
51    OutOfMemory { description: String, free: u64 },
52    #[error("Operation cancelled: {description}")]
53    Cancelled { description: String },
54    #[error("Timeout error: {description}")]
55    Timeout { description: String },
56    #[error("Validation failed: {description}")]
57    ValidationError { description: String },
58    #[error("Wrong usage of sparse vectors")]
59    WrongSparse,
60    #[error("Wrong usage of multi vectors")]
61    WrongMulti,
62    #[error(
63        "No range index for `order_by` key: `{key}`. Please create one to use `order_by`. Check https://qdrant.tech/documentation/concepts/indexing/#payload-index to see which payload schemas support Range conditions"
64    )]
65    MissingRangeIndexForOrderBy { key: String },
66    #[error(
67        "No appropriate index for faceting: `{key}`. Please create one to facet on this field. Check https://qdrant.tech/documentation/concepts/indexing/#payload-index to see which payload schemas support Match conditions"
68    )]
69    MissingMapIndexForFacet { key: String },
70    #[error(
71        "Expected {expected_type} value for {field_name} in the payload and/or in the formula defaults. Error: {description}"
72    )]
73    VariableTypeError {
74        field_name: PayloadKeyType,
75        expected_type: String,
76        description: String,
77    },
78    #[error("The expression {expression} produced a non-finite number")]
79    NonFiniteNumber { expression: String },
80
81    // ToDo: Remove after RocksDB is deprecated
82    #[error("RocksDB column family {name} not found")]
83    RocksDbColumnFamilyNotFound { name: String },
84}
85
86impl OperationError {
87    /// Create a new service error with a description and a backtrace
88    /// Warning: capturing a backtrace can be an expensive operation on some platforms, so this should be used with caution in performance-sensitive parts of code.
89    pub fn service_error(description: impl Into<String>) -> Self {
90        Self::ServiceError {
91            description: description.into(),
92            backtrace: Some(Backtrace::force_capture().to_string()),
93        }
94    }
95
96    /// Create a new service error with a description and no backtrace
97    pub fn service_error_light(description: impl Into<String>) -> Self {
98        Self::ServiceError {
99            description: description.into(),
100            backtrace: None,
101        }
102    }
103
104    pub fn validation_error(description: impl Into<String>) -> Self {
105        Self::ValidationError {
106            description: description.into(),
107        }
108    }
109
110    pub fn inconsistent_storage(description: impl Into<String>) -> Self {
111        Self::InconsistentStorage {
112            description: description.into(),
113        }
114    }
115
116    pub fn cancelled(description: impl Into<String>) -> Self {
117        Self::Cancelled {
118            description: description.into(),
119        }
120    }
121
122    pub fn vector_name_not_exists(vector_name: impl Into<String>) -> Self {
123        Self::VectorNameNotExists {
124            received_name: vector_name.into(),
125        }
126    }
127
128    pub fn timeout(timeout: Duration, operation: impl Into<String>) -> Self {
129        Self::Timeout {
130            description: format!(
131                "Operation '{}' timed out after {timeout:?}",
132                operation.into(),
133            ),
134        }
135    }
136}
137
138/// Contains information regarding last operation error, which should be fixed before next operation could be processed
139#[derive(Debug, Clone)]
140pub struct SegmentFailedState {
141    pub version: SeqNumberType,
142    pub point_id: Option<PointIdType>,
143    pub error: OperationError,
144}
145
146impl From<ThreadPoolBuildError> for OperationError {
147    fn from(error: ThreadPoolBuildError) -> Self {
148        OperationError::ServiceError {
149            description: format!("{error}"),
150            backtrace: Some(Backtrace::force_capture().to_string()),
151        }
152    }
153}
154
155impl From<FileStorageError> for OperationError {
156    fn from(err: FileStorageError) -> Self {
157        Self::service_error(err.to_string())
158    }
159}
160
161impl From<MmapError> for OperationError {
162    fn from(err: MmapError) -> Self {
163        Self::service_error(err.to_string())
164    }
165}
166
167impl From<UniversalIoError> for OperationError {
168    fn from(err: UniversalIoError) -> Self {
169        match err {
170            UniversalIoError::Io(err) => OperationError::from(err),
171            UniversalIoError::Mmap(err) => OperationError::from(err),
172
173            UniversalIoError::IoUringNotSupported(_)
174            | UniversalIoError::NotFound { .. }
175            | UniversalIoError::OutOfBounds { .. }
176            | UniversalIoError::InvalidFileIndex { .. } => {
177                OperationError::service_error(err.to_string())
178            }
179            UniversalIoError::BytemuckCast(_) => OperationError::service_error(err.to_string()),
180            UniversalIoError::Uninitialized { .. } => {
181                OperationError::service_error(err.to_string())
182            }
183        }
184    }
185}
186
187impl From<serde_cbor::Error> for OperationError {
188    fn from(err: serde_cbor::Error) -> Self {
189        OperationError::service_error(format!("Failed to parse data: {err}"))
190    }
191}
192
193impl<E> From<AtomicIoError<E>> for OperationError {
194    fn from(err: AtomicIoError<E>) -> Self {
195        match err {
196            AtomicIoError::Internal(io_err) => OperationError::from(io_err),
197            AtomicIoError::User(_user_err) => {
198                OperationError::service_error("Unknown atomic write error")
199            }
200        }
201    }
202}
203
204impl From<IoError> for OperationError {
205    fn from(err: IoError) -> Self {
206        match err.kind() {
207            ErrorKind::OutOfMemory => {
208                let free_memory = Mem::new().available_memory_bytes();
209                OperationError::OutOfMemory {
210                    description: format!("IO Error: {err}"),
211                    free: free_memory,
212                }
213            }
214            _ => OperationError::service_error(format!("IO Error: {err}")),
215        }
216    }
217}
218
219impl From<serde_json::Error> for OperationError {
220    fn from(err: serde_json::Error) -> Self {
221        OperationError::service_error(format!("Json error: {err}"))
222    }
223}
224
225impl From<fs_extra::error::Error> for OperationError {
226    fn from(err: fs_extra::error::Error) -> Self {
227        OperationError::service_error(format!("File system error: {err}"))
228    }
229}
230
231impl From<geohash::GeohashError> for OperationError {
232    fn from(err: geohash::GeohashError) -> Self {
233        OperationError::service_error(format!("Geohash error: {err}"))
234    }
235}
236
237impl From<crate::quantization::EncodingError> for OperationError {
238    fn from(err: crate::quantization::EncodingError) -> Self {
239        match err {
240            crate::quantization::EncodingError::IOError(err)
241            | crate::quantization::EncodingError::EncodingError(err)
242            | crate::quantization::EncodingError::ArgumentsError(err) => {
243                OperationError::service_error(format!("Quantization encoding error: {err}"))
244            }
245            crate::quantization::EncodingError::Stopped => OperationError::Cancelled {
246                description: PROCESS_CANCELLED_BY_SERVICE_MESSAGE.to_string(),
247            },
248        }
249    }
250}
251
252impl From<TryReserveError> for OperationError {
253    fn from(err: TryReserveError) -> Self {
254        let free_memory = Mem::new().available_memory_bytes();
255        OperationError::OutOfMemory {
256            description: format!("Failed to reserve memory: {err}"),
257            free: free_memory,
258        }
259    }
260}
261
262impl From<GridstoreError> for OperationError {
263    fn from(err: GridstoreError) -> Self {
264        match err {
265            GridstoreError::ServiceError { description } => {
266                Self::service_error(format!("Gridstore error: {description}"))
267            }
268            GridstoreError::FlushCancelled => Self::Cancelled {
269                description: "Gridstore flushing was cancelled".to_string(),
270            },
271            GridstoreError::Io(_) | GridstoreError::Mmap(_) | GridstoreError::SerdeJson(_) => {
272                Self::service_error(err.to_string())
273            }
274            GridstoreError::ValidationError { message } => Self::validation_error(message),
275            GridstoreError::UniversalIo(err) => {
276                Self::service_error(format!("Gridstore IO error: {err}"))
277            }
278            GridstoreError::PageNotFound { .. } => Self::service_error(err.to_string()),
279        }
280    }
281}
282
283#[cfg(feature = "gpu")]
284impl From<gpu::GpuError> for OperationError {
285    fn from(err: gpu::GpuError) -> Self {
286        Self::service_error(format!("GPU error: {err:?}"))
287    }
288}
289
290pub type OperationResult<T> = Result<T, OperationError>;
291
292pub fn get_service_error<T>(err: &OperationResult<T>) -> Option<OperationError> {
293    match err {
294        Ok(_) => None,
295        Err(error) => match error {
296            OperationError::ServiceError { .. } => Some(error.clone()),
297            _ => None,
298        },
299    }
300}
301
302#[derive(Debug, Copy, Clone)]
303pub struct CancelledError;
304
305pub type CancellableResult<T> = Result<T, CancelledError>;
306
307impl From<CancelledError> for OperationError {
308    fn from(CancelledError: CancelledError) -> Self {
309        OperationError::Cancelled {
310            description: PROCESS_CANCELLED_BY_SERVICE_MESSAGE.to_string(),
311        }
312    }
313}
314
315pub fn check_process_stopped(stopped: &AtomicBool) -> CancellableResult<()> {
316    if stopped.load(Ordering::Relaxed) {
317        return Err(CancelledError);
318    }
319    Ok(())
320}
321
322#[cfg(test)]
323mod tests {
324    use std::time::Duration;
325
326    use super::*;
327
328    #[test]
329    fn test_timeout_error_formatting() {
330        // Test sub-second timeout (500ms)
331        let timeout = Duration::from_millis(500);
332        let error = OperationError::timeout(timeout, "test operation");
333        let error_msg = format!("{error}");
334        assert!(
335            error_msg.contains("500ms"),
336            "Expected '500ms' but got: {error_msg}"
337        );
338
339        // Test exact second timeout (1000ms = 1s)
340        let timeout = Duration::from_millis(1000);
341        let error = OperationError::timeout(timeout, "test operation");
342        let error_msg = format!("{error}");
343        assert!(
344            error_msg.contains("1s"),
345            "Expected '1s' but got: {error_msg}"
346        );
347
348        // Test multi-second timeout with sub-second precision (2500ms = 2.5s)
349        let timeout = Duration::from_millis(2500);
350        let error = OperationError::timeout(timeout, "test operation");
351        let error_msg = format!("{error}");
352        assert!(
353            error_msg.contains("2.5s"),
354            "Expected '2.5s' but got: {error_msg}"
355        );
356
357        // Test large timeout (60000ms = 60s)
358        let timeout = Duration::from_millis(60000);
359        let error = OperationError::timeout(timeout, "test operation");
360        let error_msg = format!("{error}");
361        assert!(
362            error_msg.contains("60s"),
363            "Expected '60s' but got: {error_msg}"
364        );
365    }
366}