qdrant_edge/edge/
update.rs1use std::fmt;
2
3use crate::common::counter::hardware_counter::HardwareCounterCell;
4use crate::segment::common::operation_error::{OperationError, OperationResult};
5use crate::shard::operations::CollectionUpdateOperations;
6use crate::shard::update::*;
7use crate::shard::wal::WalRawRecord;
8
9use crate::edge::EdgeShard;
10
11impl EdgeShard {
12 pub fn update(&self, operation: CollectionUpdateOperations) -> OperationResult<()> {
13 let record = WalRawRecord::new(&operation).map_err(service_error)?;
14
15 let mut wal = self.wal.lock();
16
17 let operation_id = wal.write(&record).map_err(service_error)?;
18 let hw_counter = HardwareCounterCell::disposable();
19 let _update_guard = self.segments.acquire_updates_lock();
20
21 let segments_guard = self.segments.read();
22
23 let result = match operation {
24 CollectionUpdateOperations::PointOperation(point_operation) => {
25 process_point_operation(&segments_guard, operation_id, point_operation, &hw_counter)
26 }
27 CollectionUpdateOperations::VectorOperation(vector_operation) => {
28 process_vector_operation(
29 &segments_guard,
30 operation_id,
31 vector_operation,
32 &hw_counter,
33 )
34 }
35 CollectionUpdateOperations::PayloadOperation(payload_operation) => {
36 process_payload_operation(
37 &segments_guard,
38 operation_id,
39 payload_operation,
40 &hw_counter,
41 )
42 }
43 CollectionUpdateOperations::FieldIndexOperation(index_operation) => {
44 process_field_index_operation(
45 &segments_guard,
46 operation_id,
47 &index_operation,
48 &hw_counter,
49 )
50 }
51 #[cfg(feature = "staging")]
52 CollectionUpdateOperations::StagingOperation(staging_operation) => {
53 crate::shard::update::process_staging_operation(
54 &segments_guard,
55 operation_id,
56 staging_operation,
57 )
58 }
59 };
60
61 result.map(|_| ())
62 }
63}
64
65fn service_error(err: impl fmt::Display) -> OperationError {
66 OperationError::service_error(err.to_string())
67}