1mod config;
2mod count;
3mod facet;
4mod info;
5mod optimize;
6mod query;
7mod reexports;
8mod retrieve;
9mod scroll;
10mod search;
11mod snapshots;
12mod types;
13pub use types::*;
14mod update;
15
16use std::num::NonZero;
17use std::path::{Path, PathBuf};
18use std::sync::Arc;
19use std::sync::atomic::AtomicBool;
20use std::time::Duration;
21
22use crate::common::save_on_disk::SaveOnDisk;
23pub use config::optimizers::EdgeOptimizersConfig;
24pub use config::shard::EdgeConfig;
25pub use config::vectors::{EdgeSparseVectorParams, EdgeVectorParams};
26use fs_err as fs;
27pub use info::ShardInfo;
28use parking_lot::Mutex;
29pub use reexports::*;
30use crate::segment::entry::ReadSegmentEntry as _;
31use crate::segment::segment_constructor::{load_segment, normalize_segment_dir};
32use crate::shard::files::{PAYLOAD_INDEX_CONFIG_FILE, SEGMENTS_PATH};
33use crate::shard::operations::CollectionUpdateOperations;
34use crate::shard::segment_holder::SegmentHolder;
35use crate::shard::segment_holder::locked::LockedSegmentHolder;
36use crate::shard::wal::SerdeWal;
37use crate::wal::WalOptions;
38
39use crate::edge::config::shard::EDGE_CONFIG_FILE;
40
41#[derive(Debug)]
42pub struct EdgeShard {
43 path: PathBuf,
44 config: SaveOnDisk<EdgeConfig>,
45 wal: Mutex<SerdeWal<CollectionUpdateOperations>>,
46 segments: LockedSegmentHolder,
47}
48
49const WAL_PATH: &str = "wal";
50impl EdgeShard {
51 pub fn new(path: &Path, config: EdgeConfig) -> OperationResult<Self> {
56 if has_existing_segments(path) {
57 return Err(OperationError::service_error(
58 "cannot create edge shard: path already contains segment data",
59 ));
60 }
61
62 let (wal, segments_path) = ensure_dirs_and_open_wal(path)?;
63 config.save(path)?;
64
65 let mut segments = SegmentHolder::default();
66 ensure_appendable_segment(&mut segments, path, &segments_path, &config)?;
67
68 let config_path = path.join(EDGE_CONFIG_FILE);
69 let config = SaveOnDisk::new(&config_path, config)
70 .map_err(|e| OperationError::service_error(e.to_string()))?;
71
72 Ok(Self {
73 path: path.into(),
74 config,
75 wal: parking_lot::Mutex::new(wal),
76 segments: LockedSegmentHolder::new(segments),
77 })
78 }
79
80 pub fn load(path: &Path, config: Option<EdgeConfig>) -> OperationResult<Self> {
89 let (wal, segments_path) = ensure_dirs_and_open_wal(path)?;
90
91 let mut config = resolve_initial_config(path, config)?;
92 let mut segments = load_segments(path, &segments_path, &mut config)?;
93
94 ensure_appendable_segment(
95 &mut segments,
96 path,
97 &segments_path,
98 config.as_ref().ok_or_else(|| {
99 OperationError::service_error(
100 "edge config is not provided and no segments were loaded",
101 )
102 })?,
103 )?;
104
105 let config = config.ok_or_else(|| {
106 OperationError::service_error("edge config is not provided and no segments were loaded")
107 })?;
108
109 let config_path = path.join(EDGE_CONFIG_FILE);
110 let config = SaveOnDisk::new(&config_path, config)
111 .map_err(|e| OperationError::service_error(e.to_string()))?;
112
113 Ok(Self {
114 path: path.into(),
115 config,
116 wal: parking_lot::Mutex::new(wal),
117 segments: LockedSegmentHolder::new(segments),
118 })
119 }
120
121 pub fn config(&self) -> parking_lot::RwLockReadGuard<'_, EdgeConfig> {
122 self.config.read()
123 }
124
125 pub fn path(&self) -> &Path {
126 &self.path
127 }
128
129 pub fn set_hnsw_config(&self, hnsw_config: crate::segment::types::HnswConfig) -> OperationResult<()> {
131 self.config
132 .write(|cfg| cfg.set_hnsw_config(hnsw_config))
133 .map_err(|e| OperationError::service_error(e.to_string()))
134 }
135
136 pub fn set_vector_hnsw_config(
139 &self,
140 vector_name: &str,
141 hnsw_config: crate::segment::types::HnswConfig,
142 ) -> OperationResult<()> {
143 let mut cfg = self.config.read().clone();
144 cfg.set_vector_hnsw_config(vector_name, hnsw_config)?;
145 self.config
146 .write(|c| *c = cfg)
147 .map_err(|e| OperationError::service_error(e.to_string()))
148 }
149
150 pub fn set_optimizers_config(&self, optimizers: EdgeOptimizersConfig) -> OperationResult<()> {
152 self.config
153 .write(|cfg| cfg.set_optimizers_config(optimizers))
154 .map_err(|e| OperationError::service_error(e.to_string()))
155 }
156
157 pub fn flush(&self) {
158 self.wal
159 .try_lock()
160 .expect("WAL lock acquired")
161 .flush()
162 .expect("WAL flushed");
163
164 self.segments
165 .try_read()
166 .expect("segment holder lock acquired")
167 .flush_all(true, true)
168 .expect("segments flushed");
169 }
170
171 pub fn drop_and_clean_config(self) -> OperationResult<()> {
174 let config_path = self.path.join(EDGE_CONFIG_FILE);
175 if config_path.exists() {
176 fs_err::remove_file(self.path.join(EDGE_CONFIG_FILE))?;
177 }
178 Ok(())
179 }
180}
181
182impl Drop for EdgeShard {
183 fn drop(&mut self) {
184 self.flush();
185 }
186}
187
188fn default_wal_options() -> WalOptions {
189 WalOptions {
190 segment_capacity: 32 * 1024 * 1024,
191 segment_queue_len: 0,
192 retain_closed: NonZero::new(1).unwrap(),
193 }
194}
195
196fn has_existing_segments(path: &Path) -> bool {
197 let segments_path = path.join(SEGMENTS_PATH);
198 let Ok(entries) = fs::read_dir(&segments_path) else {
199 return false;
200 };
201 for entry in entries.flatten() {
202 let p = entry.path();
203 if !p.is_dir() {
204 continue;
205 }
206 if p.file_name()
207 .and_then(|n| n.to_str())
208 .is_some_and(|n| n.starts_with('.'))
209 {
210 continue;
211 }
212 if normalize_segment_dir(&p).ok().flatten().is_some() {
213 return true;
214 }
215 }
216 false
217}
218
219fn ensure_dirs_and_open_wal(
220 path: &Path,
221) -> OperationResult<(SerdeWal<CollectionUpdateOperations>, PathBuf)> {
222 let wal_path = path.join(WAL_PATH);
223 if !wal_path.exists() {
224 fs::create_dir(&wal_path).map_err(|err| {
225 OperationError::service_error(format!("failed to create WAL directory: {err}"))
226 })?;
227 }
228
229 let wal = SerdeWal::new(&wal_path, default_wal_options()).map_err(|err| {
230 OperationError::service_error(format!("failed to open WAL {}: {err}", wal_path.display(),))
231 })?;
232
233 let segments_path = path.join(SEGMENTS_PATH);
234 if !segments_path.exists() {
235 fs::create_dir(&segments_path).map_err(|err| {
236 OperationError::service_error(format!("failed to create segments directory: {err}"))
237 })?;
238 }
239
240 Ok((wal, segments_path))
241}
242
243fn resolve_initial_config(
244 path: &Path,
245 config: Option<EdgeConfig>,
246) -> OperationResult<Option<EdgeConfig>> {
247 Ok(match config {
248 Some(c) => Some(c),
249 None => match EdgeConfig::load(path) {
250 Some(Ok(c)) => Some(c),
251 Some(Err(e)) => return Err(e),
252 None => None,
253 },
254 })
255}
256
257fn load_segments(
258 _path: &Path,
259 segments_path: &Path,
260 config: &mut Option<EdgeConfig>,
261) -> OperationResult<SegmentHolder> {
262 let segments_dir = fs::read_dir(segments_path).map_err(|err| {
263 OperationError::service_error(format!("failed to read segments directory: {err}"))
264 })?;
265
266 let mut segments = SegmentHolder::default();
267
268 for entry in segments_dir {
269 let entry = entry.map_err(|err| {
270 OperationError::service_error(format!(
271 "failed to read entry in segments directory: {err}",
272 ))
273 })?;
274
275 let segment_path = entry.path();
276
277 if !segment_path.is_dir() {
278 log::warn!(
279 "Skipping non-directory segment entry {}",
280 segment_path.display(),
281 );
282 continue;
283 }
284
285 if segment_path
286 .file_name()
287 .and_then(|n| n.to_str())
288 .is_some_and(|n| n.starts_with('.'))
289 {
290 log::warn!(
291 "Skipping hidden segment directory {}",
292 segment_path.display(),
293 );
294 continue;
295 }
296
297 let Some((segment_path, segment_uuid)) = normalize_segment_dir(&segment_path)? else {
298 continue;
299 };
300
301 let mut segment = load_segment(&segment_path, segment_uuid, None, &AtomicBool::new(false))
302 .map_err(|err| {
303 OperationError::service_error(format!(
304 "failed to load segment {}: {err}",
305 segment_path.display(),
306 ))
307 })?;
308
309 let segment_cfg = segment.config();
310 if let Some(cfg) = config.as_ref() {
311 cfg.check_compatible_with_segment_config(segment_cfg).map_err(
312 |err| OperationError::service_error(format!(
313 "segment {} is incompatible with provided config or previously loaded segments: {err}",
314 segment_path.display(),
315 ))
316 )?;
317 } else {
318 *config = Some(EdgeConfig::from_segment_config(segment_cfg));
319 }
320
321 segment.check_consistency_and_repair().map_err(|err| {
322 OperationError::service_error(format!(
323 "failed to repair segment {}: {err}",
324 segment_path.display(),
325 ))
326 })?;
327
328 segments.add_new(segment);
329 }
330
331 Ok(segments)
332}
333
334fn ensure_appendable_segment(
335 segments: &mut SegmentHolder,
336 path: &Path,
337 segments_path: &Path,
338 config: &EdgeConfig,
339) -> OperationResult<()> {
340 if segments.has_appendable_segment() {
341 return Ok(());
342 }
343
344 let payload_index_schema_path = path.join(PAYLOAD_INDEX_CONFIG_FILE);
345 let payload_index_schema = SaveOnDisk::load_or_init_default(&payload_index_schema_path)
346 .map_err(|err| {
347 OperationError::service_error(format!(
348 "failed to initialize payload index schema file {}: {err}",
349 payload_index_schema_path.display(),
350 ))
351 })?;
352
353 segments.create_appendable_segment(
354 segments_path,
355 config.plain_segment_config(),
356 Arc::new(payload_index_schema),
357 None,
358 )?;
359
360 debug_assert!(segments.has_appendable_segment());
361 Ok(())
362}
363
364pub(crate) const DEFAULT_EDGE_TIMEOUT: Duration = Duration::from_secs(3600);