Skip to main content

qdrant_edge/edge/
update.rs

1use std::fmt;
2
3use crate::common::counter::hardware_counter::HardwareCounterCell;
4use crate::segment::common::operation_error::{OperationError, OperationResult};
5use crate::shard::operations::CollectionUpdateOperations;
6use crate::shard::update::*;
7use crate::shard::wal::WalRawRecord;
8
9use crate::edge::EdgeShard;
10
11impl EdgeShard {
12    pub fn update(&self, operation: CollectionUpdateOperations) -> OperationResult<()> {
13        let record = WalRawRecord::new(&operation).map_err(service_error)?;
14
15        let mut wal = self.wal.lock();
16
17        let operation_id = wal.write(&record).map_err(service_error)?;
18        let hw_counter = HardwareCounterCell::disposable();
19        let _update_guard = self.segments.acquire_updates_lock();
20
21        let segments_guard = self.segments.read();
22
23        let result = match operation {
24            CollectionUpdateOperations::PointOperation(point_operation) => {
25                process_point_operation(&segments_guard, operation_id, point_operation, &hw_counter)
26            }
27            CollectionUpdateOperations::VectorOperation(vector_operation) => {
28                process_vector_operation(
29                    &segments_guard,
30                    operation_id,
31                    vector_operation,
32                    &hw_counter,
33                )
34            }
35            CollectionUpdateOperations::PayloadOperation(payload_operation) => {
36                process_payload_operation(
37                    &segments_guard,
38                    operation_id,
39                    payload_operation,
40                    &hw_counter,
41                )
42            }
43            CollectionUpdateOperations::FieldIndexOperation(index_operation) => {
44                process_field_index_operation(
45                    &segments_guard,
46                    operation_id,
47                    &index_operation,
48                    &hw_counter,
49                )
50            }
51            #[cfg(feature = "staging")]
52            CollectionUpdateOperations::StagingOperation(staging_operation) => {
53                crate::shard::update::process_staging_operation(
54                    &segments_guard,
55                    operation_id,
56                    staging_operation,
57                )
58            }
59        };
60
61        result.map(|_| ())
62    }
63}
64
65fn service_error(err: impl fmt::Display) -> OperationError {
66    OperationError::service_error(err.to_string())
67}