Skip to main content

qdrant_edge/edge/
mod.rs

1mod config;
2mod count;
3mod facet;
4mod info;
5mod optimize;
6mod query;
7mod reexports;
8mod retrieve;
9mod scroll;
10mod search;
11mod snapshots;
12mod types;
13pub use types::*;
14mod update;
15
16use std::num::NonZero;
17use std::path::{Path, PathBuf};
18use std::sync::Arc;
19use std::sync::atomic::AtomicBool;
20use std::time::Duration;
21
22use crate::common::save_on_disk::SaveOnDisk;
23pub use config::optimizers::EdgeOptimizersConfig;
24pub use config::shard::EdgeConfig;
25pub use config::vectors::{EdgeSparseVectorParams, EdgeVectorParams};
26use fs_err as fs;
27pub use info::ShardInfo;
28use parking_lot::Mutex;
29pub use reexports::*;
30use crate::segment::entry::ReadSegmentEntry as _;
31use crate::segment::segment_constructor::{load_segment, normalize_segment_dir};
32use crate::shard::files::{PAYLOAD_INDEX_CONFIG_FILE, SEGMENTS_PATH};
33use crate::shard::operations::CollectionUpdateOperations;
34use crate::shard::segment_holder::SegmentHolder;
35use crate::shard::segment_holder::locked::LockedSegmentHolder;
36use crate::shard::wal::SerdeWal;
37use crate::wal::WalOptions;
38
39use crate::edge::config::shard::EDGE_CONFIG_FILE;
40
41#[derive(Debug)]
42pub struct EdgeShard {
43    path: PathBuf,
44    config: SaveOnDisk<EdgeConfig>,
45    wal: Mutex<SerdeWal<CollectionUpdateOperations>>,
46    segments: LockedSegmentHolder,
47}
48
49const WAL_PATH: &str = "wal";
50impl EdgeShard {
51    /// Create a new edge shard at `path` with the given configuration.
52    ///
53    /// Fails if the shard already exists (i.e. the segments directory contains any segment).
54    /// Configuration is required and is persisted to `edge_config.json`.
55    pub fn new(path: &Path, config: EdgeConfig) -> OperationResult<Self> {
56        if has_existing_segments(path) {
57            return Err(OperationError::service_error(
58                "cannot create edge shard: path already contains segment data",
59            ));
60        }
61
62        let (wal, segments_path) = ensure_dirs_and_open_wal(path)?;
63        config.save(path)?;
64
65        let mut segments = SegmentHolder::default();
66        ensure_appendable_segment(&mut segments, path, &segments_path, &config)?;
67
68        let config_path = path.join(EDGE_CONFIG_FILE);
69        let config = SaveOnDisk::new(&config_path, config)
70            .map_err(|e| OperationError::service_error(e.to_string()))?;
71
72        Ok(Self {
73            path: path.into(),
74            config,
75            wal: parking_lot::Mutex::new(wal),
76            segments: LockedSegmentHolder::new(segments),
77        })
78    }
79
80    /// Load an edge shard from existing files at `path`.
81    ///
82    /// * If `config` is `Some`: check compatibility with loaded segments, then overwrite
83    ///   `edge_config.json` with it.
84    /// * If `config` is `None`: load config from `edge_config.json`, or infer from segments;
85    ///   check compatibility, then persist so future loads have it.
86    ///
87    /// Fails if no segments exist and no config can be loaded or inferred.
88    pub fn load(path: &Path, config: Option<EdgeConfig>) -> OperationResult<Self> {
89        let (wal, segments_path) = ensure_dirs_and_open_wal(path)?;
90
91        let mut config = resolve_initial_config(path, config)?;
92        let mut segments = load_segments(path, &segments_path, &mut config)?;
93
94        ensure_appendable_segment(
95            &mut segments,
96            path,
97            &segments_path,
98            config.as_ref().ok_or_else(|| {
99                OperationError::service_error(
100                    "edge config is not provided and no segments were loaded",
101                )
102            })?,
103        )?;
104
105        let config = config.ok_or_else(|| {
106            OperationError::service_error("edge config is not provided and no segments were loaded")
107        })?;
108
109        let config_path = path.join(EDGE_CONFIG_FILE);
110        let config = SaveOnDisk::new(&config_path, config)
111            .map_err(|e| OperationError::service_error(e.to_string()))?;
112
113        Ok(Self {
114            path: path.into(),
115            config,
116            wal: parking_lot::Mutex::new(wal),
117            segments: LockedSegmentHolder::new(segments),
118        })
119    }
120
121    pub fn config(&self) -> parking_lot::RwLockReadGuard<'_, EdgeConfig> {
122        self.config.read()
123    }
124
125    pub fn path(&self) -> &Path {
126        &self.path
127    }
128
129    /// Update global HNSW config and persist. Does not change per-vector HNSW.
130    pub fn set_hnsw_config(&self, hnsw_config: crate::segment::types::HnswConfig) -> OperationResult<()> {
131        self.config
132            .write(|cfg| cfg.set_hnsw_config(hnsw_config))
133            .map_err(|e| OperationError::service_error(e.to_string()))
134    }
135
136    /// Update HNSW config for a named vector and persist.
137    /// Fails if the vector does not exist. Immutable fields (e.g. size, distance) cannot be changed.
138    pub fn set_vector_hnsw_config(
139        &self,
140        vector_name: &str,
141        hnsw_config: crate::segment::types::HnswConfig,
142    ) -> OperationResult<()> {
143        let mut cfg = self.config.read().clone();
144        cfg.set_vector_hnsw_config(vector_name, hnsw_config)?;
145        self.config
146            .write(|c| *c = cfg)
147            .map_err(|e| OperationError::service_error(e.to_string()))
148    }
149
150    /// Update optimizer config and persist.
151    pub fn set_optimizers_config(&self, optimizers: EdgeOptimizersConfig) -> OperationResult<()> {
152        self.config
153            .write(|cfg| cfg.set_optimizers_config(optimizers))
154            .map_err(|e| OperationError::service_error(e.to_string()))
155    }
156
157    pub fn flush(&self) {
158        self.wal
159            .try_lock()
160            .expect("WAL lock acquired")
161            .flush()
162            .expect("WAL flushed");
163
164        self.segments
165            .try_read()
166            .expect("segment holder lock acquired")
167            .flush_all(true, true)
168            .expect("segments flushed");
169    }
170
171    /// This function removes edge-specific config and closes the shard.
172    /// Removing config might be necessary to avoid incompatibilities on snapshot recovery.
173    pub fn drop_and_clean_config(self) -> OperationResult<()> {
174        let config_path = self.path.join(EDGE_CONFIG_FILE);
175        if config_path.exists() {
176            fs_err::remove_file(self.path.join(EDGE_CONFIG_FILE))?;
177        }
178        Ok(())
179    }
180}
181
182impl Drop for EdgeShard {
183    fn drop(&mut self) {
184        self.flush();
185    }
186}
187
188fn default_wal_options() -> WalOptions {
189    WalOptions {
190        segment_capacity: 32 * 1024 * 1024,
191        segment_queue_len: 0,
192        retain_closed: NonZero::new(1).unwrap(),
193    }
194}
195
196fn has_existing_segments(path: &Path) -> bool {
197    let segments_path = path.join(SEGMENTS_PATH);
198    let Ok(entries) = fs::read_dir(&segments_path) else {
199        return false;
200    };
201    for entry in entries.flatten() {
202        let p = entry.path();
203        if !p.is_dir() {
204            continue;
205        }
206        if p.file_name()
207            .and_then(|n| n.to_str())
208            .is_some_and(|n| n.starts_with('.'))
209        {
210            continue;
211        }
212        if normalize_segment_dir(&p).ok().flatten().is_some() {
213            return true;
214        }
215    }
216    false
217}
218
219fn ensure_dirs_and_open_wal(
220    path: &Path,
221) -> OperationResult<(SerdeWal<CollectionUpdateOperations>, PathBuf)> {
222    let wal_path = path.join(WAL_PATH);
223    if !wal_path.exists() {
224        fs::create_dir(&wal_path).map_err(|err| {
225            OperationError::service_error(format!("failed to create WAL directory: {err}"))
226        })?;
227    }
228
229    let wal = SerdeWal::new(&wal_path, default_wal_options()).map_err(|err| {
230        OperationError::service_error(format!("failed to open WAL {}: {err}", wal_path.display(),))
231    })?;
232
233    let segments_path = path.join(SEGMENTS_PATH);
234    if !segments_path.exists() {
235        fs::create_dir(&segments_path).map_err(|err| {
236            OperationError::service_error(format!("failed to create segments directory: {err}"))
237        })?;
238    }
239
240    Ok((wal, segments_path))
241}
242
243fn resolve_initial_config(
244    path: &Path,
245    config: Option<EdgeConfig>,
246) -> OperationResult<Option<EdgeConfig>> {
247    Ok(match config {
248        Some(c) => Some(c),
249        None => match EdgeConfig::load(path) {
250            Some(Ok(c)) => Some(c),
251            Some(Err(e)) => return Err(e),
252            None => None,
253        },
254    })
255}
256
257fn load_segments(
258    _path: &Path,
259    segments_path: &Path,
260    config: &mut Option<EdgeConfig>,
261) -> OperationResult<SegmentHolder> {
262    let segments_dir = fs::read_dir(segments_path).map_err(|err| {
263        OperationError::service_error(format!("failed to read segments directory: {err}"))
264    })?;
265
266    let mut segments = SegmentHolder::default();
267
268    for entry in segments_dir {
269        let entry = entry.map_err(|err| {
270            OperationError::service_error(format!(
271                "failed to read entry in segments directory: {err}",
272            ))
273        })?;
274
275        let segment_path = entry.path();
276
277        if !segment_path.is_dir() {
278            log::warn!(
279                "Skipping non-directory segment entry {}",
280                segment_path.display(),
281            );
282            continue;
283        }
284
285        if segment_path
286            .file_name()
287            .and_then(|n| n.to_str())
288            .is_some_and(|n| n.starts_with('.'))
289        {
290            log::warn!(
291                "Skipping hidden segment directory {}",
292                segment_path.display(),
293            );
294            continue;
295        }
296
297        let Some((segment_path, segment_uuid)) = normalize_segment_dir(&segment_path)? else {
298            continue;
299        };
300
301        let mut segment = load_segment(&segment_path, segment_uuid, None, &AtomicBool::new(false))
302            .map_err(|err| {
303                OperationError::service_error(format!(
304                    "failed to load segment {}: {err}",
305                    segment_path.display(),
306                ))
307            })?;
308
309        let segment_cfg = segment.config();
310        if let Some(cfg) = config.as_ref() {
311            cfg.check_compatible_with_segment_config(segment_cfg).map_err(
312                |err| OperationError::service_error(format!(
313                    "segment {} is incompatible with provided config or previously loaded segments: {err}",
314                    segment_path.display(),
315                ))
316            )?;
317        } else {
318            *config = Some(EdgeConfig::from_segment_config(segment_cfg));
319        }
320
321        segment.check_consistency_and_repair().map_err(|err| {
322            OperationError::service_error(format!(
323                "failed to repair segment {}: {err}",
324                segment_path.display(),
325            ))
326        })?;
327
328        segments.add_new(segment);
329    }
330
331    Ok(segments)
332}
333
334fn ensure_appendable_segment(
335    segments: &mut SegmentHolder,
336    path: &Path,
337    segments_path: &Path,
338    config: &EdgeConfig,
339) -> OperationResult<()> {
340    if segments.has_appendable_segment() {
341        return Ok(());
342    }
343
344    let payload_index_schema_path = path.join(PAYLOAD_INDEX_CONFIG_FILE);
345    let payload_index_schema = SaveOnDisk::load_or_init_default(&payload_index_schema_path)
346        .map_err(|err| {
347            OperationError::service_error(format!(
348                "failed to initialize payload index schema file {}: {err}",
349                payload_index_schema_path.display(),
350            ))
351        })?;
352
353    segments.create_appendable_segment(
354        segments_path,
355        config.plain_segment_config(),
356        Arc::new(payload_index_schema),
357        None,
358    )?;
359
360    debug_assert!(segments.has_appendable_segment());
361    Ok(())
362}
363
364// Default timeout of 1h used as a placeholder in Edge
365pub(crate) const DEFAULT_EDGE_TIMEOUT: Duration = Duration::from_secs(3600);