velesdb-core 1.7.2

High-performance vector database engine written in Rust
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
//! Storage compaction for reclaiming space from deleted vectors.
//!
//! This module provides compaction functionality for `MmapStorage`,
//! allowing reclamation of disk space from deleted vectors.
//!
//! # EPIC-033/US-003: Disk Hole-Punch
//!
//! Two strategies are available:
//! - **Full compaction**: Rewrites entire file (best for high fragmentation)
//! - **Hole-punch**: Releases disk blocks in-place (best for sparse deletions)
//!
//! Hole-punch uses:
//! - Linux: `fallocate(FALLOC_FL_PUNCH_HOLE)`
//! - Windows: `FSCTL_SET_ZERO_DATA`

// Reason: Numeric casts in this file are intentional and bounded.
// Each cast site carries an inline #[allow] with a per-site justification.

use super::sharded_index::ShardedIndex;
use memmap2::MmapMut;
use parking_lot::RwLock;
use rustc_hash::FxHashMap;
use std::fs::{File, OpenOptions};
use std::io::{self, Write};
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};

// =========================================================================
// EPIC-033/US-003: Hole-Punch Implementation
// =========================================================================

/// Punches a hole in a file, releasing disk blocks for the specified range.
///
/// This operation zeros the data and releases the underlying disk blocks
/// back to the filesystem, reducing actual disk usage without changing file size.
///
/// # Platform Support
///
/// - **Linux**: Uses `fallocate(FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE)`
/// - **Windows**: Uses `FSCTL_SET_ZERO_DATA` DeviceIoControl
/// - **Other**: Falls back to writing zeros (no disk reclamation)
///
/// # Arguments
///
/// * `file` - Open file handle (must have write access)
/// * `offset` - Start offset of the hole
/// * `len` - Length of the hole in bytes
///
/// # Returns
///
/// `true` if disk space was actually reclaimed, `false` if only zeroed.
#[allow(unused_variables)]
pub fn punch_hole(file: &File, offset: u64, len: u64) -> io::Result<bool> {
    // Zero-length punch is a no-op on every platform. Return early to avoid
    // EINVAL from fallocate(2) on Linux and undefined behaviour from
    // FSCTL_SET_ZERO_DATA when file_offset == beyond_final_zero on Windows.
    if len == 0 {
        return Ok(false);
    }

    #[cfg(target_os = "linux")]
    {
        punch_hole_linux(file, offset, len)
    }

    #[cfg(target_os = "windows")]
    {
        punch_hole_windows(file, offset, len)
    }

    #[cfg(not(any(target_os = "linux", target_os = "windows")))]
    {
        // Fallback: just zero the region (no disk reclamation)
        punch_hole_fallback(file, offset, len)
    }
}

/// Linux implementation using fallocate with FALLOC_FL_PUNCH_HOLE.
#[cfg(target_os = "linux")]
fn punch_hole_linux(file: &File, offset: u64, len: u64) -> io::Result<bool> {
    use std::os::unix::io::AsRawFd;

    // FALLOC_FL_PUNCH_HOLE = 0x02, FALLOC_FL_KEEP_SIZE = 0x01
    const FALLOC_FL_KEEP_SIZE: i32 = 0x01;
    const FALLOC_FL_PUNCH_HOLE: i32 = 0x02;

    let fd = file.as_raw_fd();
    let mode = FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE;
    let offset_off_t = libc::off_t::try_from(offset).map_err(|_| {
        io::Error::new(
            io::ErrorKind::InvalidInput,
            "offset does not fit in libc::off_t",
        )
    })?;
    let len_off_t = libc::off_t::try_from(len).map_err(|_| {
        io::Error::new(
            io::ErrorKind::InvalidInput,
            "len does not fit in libc::off_t",
        )
    })?;

    // SAFETY: `libc::fallocate` requires a valid fd and offsets.
    // - Condition 1: `fd` comes from `file.as_raw_fd()` on an open file handle.
    // - Condition 2: `offset`/`len` are caller-provided ranges for the same file.
    // Reason: Hole punching is only exposed through this syscall on Linux.
    let ret = unsafe { libc::fallocate(fd, mode, offset_off_t, len_off_t) };

    if ret == 0 {
        Ok(true) // Disk space reclaimed
    } else {
        let err = io::Error::last_os_error();
        // EOPNOTSUPP means filesystem doesn't support hole punching
        if err.raw_os_error() == Some(libc::EOPNOTSUPP) {
            // Fall back to zeroing
            punch_hole_fallback(file, offset, len)
        } else {
            Err(err)
        }
    }
}

/// Windows implementation using FSCTL_SET_ZERO_DATA.
#[cfg(target_os = "windows")]
fn punch_hole_windows(file: &File, offset: u64, len: u64) -> io::Result<bool> {
    use std::os::windows::io::AsRawHandle;
    use windows_sys::Win32::Foundation::{FALSE, HANDLE};
    use windows_sys::Win32::System::Ioctl::FSCTL_SET_ZERO_DATA;
    use windows_sys::Win32::System::IO::DeviceIoControl;

    #[repr(C)]
    struct FileZeroDataInformation {
        file_offset: i64,
        beyond_final_zero: i64,
    }

    let handle = file.as_raw_handle() as HANDLE;
    // SAFETY: Win32 API requires i64 for file offsets. offset and len are typically < i64::MAX
    // on any realistic file system. Saturate to prevent undefined behavior on edge cases.
    #[allow(clippy::cast_possible_wrap)]
    let info = FileZeroDataInformation {
        file_offset: i64::try_from(offset).unwrap_or(i64::MAX),
        beyond_final_zero: i64::try_from(offset.saturating_add(len)).unwrap_or(i64::MAX),
    };

    let mut bytes_returned: u32 = 0;

    // SAFETY: `DeviceIoControl` requires valid handle/argument pointers.
    // - Condition 1: `handle` comes from `file.as_raw_handle()` for an open file.
    // - Condition 2: `info` and `bytes_returned` pointers are valid for the call duration.
    // Reason: Windows sparse-zero operation is only reachable via this API.
    let result = unsafe {
        DeviceIoControl(
            handle,
            FSCTL_SET_ZERO_DATA,
            std::ptr::addr_of!(info).cast(),
            // Reason: FileZeroDataInformation struct size is always <= 16 bytes; fits in u32.
            #[allow(clippy::cast_possible_truncation)]
            {
                std::mem::size_of::<FileZeroDataInformation>() as u32
            },
            std::ptr::null_mut(),
            0,
            std::ptr::addr_of_mut!(bytes_returned),
            std::ptr::null_mut(),
        )
    };

    if result == FALSE {
        // Fall back to zeroing if FSCTL fails
        punch_hole_fallback(file, offset, len)
    } else {
        Ok(true) // Disk space may be reclaimed (depends on filesystem)
    }
}

/// Fallback implementation: writes zeros (no disk reclamation).
#[cfg(any(
    not(any(target_os = "linux", target_os = "windows")),
    target_os = "linux",
    target_os = "windows"
))]
/// Chunk size for fallback zeroing (64KB).
const FALLBACK_CHUNK_SIZE: usize = 64 * 1024;

fn punch_hole_fallback(file: &File, offset: u64, len: u64) -> io::Result<bool> {
    use std::io::{Seek, SeekFrom, Write};

    let mut file = file.try_clone()?;
    file.seek(SeekFrom::Start(offset))?;

    // Write zeros in chunks to avoid large allocations
    let zeros = vec![0u8; FALLBACK_CHUNK_SIZE];
    // Reason: `len` represents a byte range within a single file; on supported
    // platforms (64-bit Linux/Windows) usize == u64, so no truncation occurs.
    // On 32-bit targets this function is only reachable for lengths <= usize::MAX.
    #[allow(clippy::cast_possible_truncation)]
    let mut remaining = len as usize;

    while remaining > 0 {
        let to_write = remaining.min(FALLBACK_CHUNK_SIZE);
        file.write_all(&zeros[..to_write])?;
        remaining -= to_write;
    }

    Ok(false) // No disk space reclaimed, only zeroed
}

/// Recovers from interrupted compaction on startup.
///
/// Issue #318: On Windows, `atomic_replace()` uses a two-step rename
/// (dst -> `.bak`, src -> dst). A crash between the two leaves either
/// a `.bak` or `.new` file on disk. This function detects and repairs
/// such states before the mmap file is opened.
///
/// # Recovery Rules
///
/// - `.bak` exists, original missing -> restore from `.bak`
/// - `.bak` exists, original exists -> remove `.bak` (compaction completed)
/// - `.new` exists -> remove it (incomplete compaction)
pub fn recover_compaction_artifacts(data_path: &Path) -> io::Result<()> {
    let bak_path = data_path.with_extension("dat.bak");
    let new_path = data_path.with_extension("dat.tmp");

    // Handle .bak file
    if bak_path.exists() {
        if data_path.exists() {
            // Both exist: previous compaction completed, clean up backup
            std::fs::remove_file(&bak_path)?;
        } else {
            // Only backup exists: compaction crashed after rename-to-backup
            std::fs::rename(&bak_path, data_path)?;
        }
    }

    // Handle incomplete .tmp file (new data that was never swapped in)
    if new_path.exists() {
        std::fs::remove_file(&new_path)?;
    }

    Ok(())
}

/// Cross-platform atomic file replacement.
///
/// On Unix, `rename()` atomically replaces the destination.
/// On Windows, `rename()` fails if destination exists, so we use a backup strategy.
fn atomic_replace(src: &Path, dst: &Path) -> io::Result<()> {
    #[cfg(unix)]
    {
        std::fs::rename(src, dst)
    }

    #[cfg(windows)]
    {
        // Windows: rename fails if dst exists
        // Strategy: dst -> backup, src -> dst, remove backup
        let backup = dst.with_extension("dat.bak");

        // Remove stale backup if exists
        let _ = std::fs::remove_file(&backup);

        // Move existing dst to backup (if exists)
        if dst.exists() {
            std::fs::rename(dst, &backup)?;
        }

        // Move src to dst
        match std::fs::rename(src, dst) {
            Ok(()) => {
                // Success: remove backup
                let _ = std::fs::remove_file(&backup);
                Ok(())
            }
            Err(e) => {
                // Failed: try to restore backup
                if backup.exists() {
                    let _ = std::fs::rename(&backup, dst);
                }
                Err(e)
            }
        }
    }

    #[cfg(not(any(unix, windows)))]
    {
        // Fallback for other platforms
        std::fs::rename(src, dst)
    }
}

/// Compaction configuration and state.
/// EPIC-033/US-004: Updated to use ShardedIndex for reduced lock contention.
pub(super) struct CompactionContext<'a> {
    pub path: &'a Path,
    pub dimension: usize,
    pub index: &'a ShardedIndex,
    pub mmap: &'a RwLock<MmapMut>,
    pub next_offset: &'a AtomicUsize,
    pub wal: &'a RwLock<io::BufWriter<File>>,
    pub initial_size: u64,
}

impl CompactionContext<'_> {
    /// Compacts the storage by rewriting only active vectors.
    ///
    /// This reclaims disk space from deleted vectors by:
    /// 1. Writing all active vectors to a new temporary file
    /// 2. Atomically replacing the old file with the new one
    ///
    /// # TS-CORE-004: Storage Compaction
    ///
    /// This operation is quasi-atomic via `rename()` for crash safety.
    /// Reads remain available during compaction (copy-on-write pattern).
    ///
    /// # Returns
    ///
    /// The number of bytes reclaimed.
    ///
    /// # Errors
    ///
    /// Returns an error if file operations fail.
    pub fn compact(&self) -> io::Result<usize> {
        let vector_size = self.dimension * std::mem::size_of::<f32>();

        // 1. Get current state (EPIC-033/US-004: Use ShardedIndex)
        let active_count = self.index.len();

        if active_count == 0 {
            return Ok(0);
        }

        // Calculate space used vs allocated
        // M-2: Acquire ordering for cross-platform visibility of mmap writes
        let current_offset = self.next_offset.load(Ordering::Acquire);
        let active_size = active_count * vector_size;

        if current_offset <= active_size {
            return Ok(0);
        }

        let bytes_to_reclaim = current_offset - active_size;

        // 2. Create temporary file for compacted data
        let temp_path = self.path.join("vectors.dat.tmp");
        let temp_file = OpenOptions::new()
            .read(true)
            .write(true)
            .create(true)
            .truncate(true)
            .open(&temp_path)?;

        // Size the temp file for active vectors
        // Reason: active_size = active_count * vector_size; both are bounded by
        // available memory (usize), so usize -> u64 widens and never truncates.
        #[allow(clippy::cast_possible_truncation)]
        let new_size = (active_size as u64).max(self.initial_size);
        temp_file.set_len(new_size)?;

        // SAFETY: `MmapMut::map_mut` requires a writable file sized for mapping.
        // - Condition 1: `temp_file` was opened read/write and resized via `set_len`.
        // - Condition 2: Mapping length is derived from the file's current size.
        // Reason: Compaction copies active bytes through a mutable mmap.
        let mut temp_mmap = unsafe { MmapMut::map_mut(&temp_file)? };

        // 3. Copy active vectors to new file with new offsets
        // EPIC-033/US-004: Snapshot index to HashMap for iteration
        let old_index = self.index.to_hashmap();
        let mmap = self.mmap.read();
        let mut new_index: FxHashMap<u64, usize> = FxHashMap::default();
        new_index.reserve(active_count);

        let mut new_offset = 0usize;
        for (&id, &old_offset) in &old_index {
            let src = &mmap[old_offset..old_offset + vector_size];
            temp_mmap[new_offset..new_offset + vector_size].copy_from_slice(src);
            new_index.insert(id, new_offset);
            new_offset += vector_size;
        }

        drop(mmap);

        // 4. Flush temp file
        temp_mmap.flush()?;
        drop(temp_mmap);
        drop(temp_file);

        // 5. Atomic swap: rename temp -> main (cross-platform)
        let data_path = self.path.join("vectors.dat");
        atomic_replace(&temp_path, &data_path)?;

        // 6. Reopen the compacted file
        let new_data_file = OpenOptions::new().read(true).write(true).open(&data_path)?;
        // SAFETY: `MmapMut::map_mut` requires a writable file sized for mapping.
        // - Condition 1: `new_data_file` is opened read/write after atomic replace.
        // - Condition 2: File contents are fully materialized by the preceding flush/rename flow.
        // Reason: Reloading mmap is required to switch storage to compacted bytes.
        let new_mmap = unsafe { MmapMut::map_mut(&new_data_file)? };

        // 7. Update internal state
        // Issue #316: Atomic index swap — replace mmap and index without
        // an intermediate empty state visible to concurrent readers.
        *self.mmap.write() = new_mmap;
        self.index.replace_all(new_index);
        // Reason: Release ordering pairs with the Acquire loads in
        // `should_compact` and `compact` to ensure readers on ARM/weak-memory
        // architectures observe the updated mmap and index before seeing the
        // new offset value.
        self.next_offset.store(new_offset, Ordering::Release);

        // 8. Write compaction marker to WAL
        {
            let mut wal = self.wal.write();
            wal.write_all(&[4u8])?;
            wal.flush()?;
        }

        Ok(bytes_to_reclaim)
    }

    /// Returns the fragmentation ratio (0.0 = no fragmentation, 1.0 = 100% fragmented).
    ///
    /// Use this to decide when to trigger compaction.
    /// A ratio > 0.3 (30% fragmentation) is a good threshold.
    #[must_use]
    pub fn fragmentation_ratio(&self) -> f64 {
        // EPIC-033/US-004: Use ShardedIndex directly
        let active_count = self.index.len();

        if active_count == 0 {
            return 0.0;
        }

        let vector_size = self.dimension * std::mem::size_of::<f32>();
        let active_size = active_count * vector_size;
        // M-2: Acquire ordering for cross-platform visibility
        let current_offset = self.next_offset.load(Ordering::Acquire);

        if current_offset == 0 {
            return 0.0;
        }

        #[allow(clippy::cast_precision_loss)]
        let ratio = 1.0 - (active_size as f64 / current_offset as f64);
        ratio.max(0.0)
    }
}