mod binaryvec;
mod halfvec;
mod productvec;
mod scalarvec;
mod sparsevec;
pub mod vector;
pub use binaryvec::BinaryVec;
pub use halfvec::HalfVec;
pub use productvec::ProductVec;
pub use scalarvec::ScalarVec;
pub use sparsevec::SparseVec;
pub use vector::RuVector;
use pgrx::prelude::*;
use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
static VECTOR_CACHE_BYTES: AtomicUsize = AtomicUsize::new(0);
pub fn get_vector_cache_memory_mb() -> f64 {
VECTOR_CACHE_BYTES.load(Ordering::Relaxed) as f64 / (1024.0 * 1024.0)
}
pub(crate) fn track_allocation(bytes: usize) {
VECTOR_CACHE_BYTES.fetch_add(bytes, Ordering::Relaxed);
}
pub(crate) fn track_deallocation(bytes: usize) {
VECTOR_CACHE_BYTES.fetch_sub(bytes, Ordering::Relaxed);
}
pub trait VectorData {
unsafe fn data_ptr(&self) -> *const f32;
unsafe fn data_ptr_mut(&mut self) -> *mut f32;
fn dimensions(&self) -> usize;
fn as_slice(&self) -> &[f32];
fn as_mut_slice(&mut self) -> &mut [f32];
fn memory_size(&self) -> usize;
fn data_size(&self) -> usize {
self.dimensions() * std::mem::size_of::<f32>()
}
fn is_simd_aligned(&self) -> bool {
const ALIGNMENT: usize = 64; unsafe { (self.data_ptr() as usize) % ALIGNMENT == 0 }
}
fn is_inline(&self) -> bool {
self.memory_size() < TOAST_THRESHOLD
}
}
pub const TOAST_THRESHOLD: usize = 2000;
pub const INLINE_THRESHOLD: usize = 512;
#[repr(C)]
pub struct PgVectorContext {
pub total_bytes: AtomicUsize,
pub vector_count: AtomicU32,
pub peak_bytes: AtomicUsize,
}
impl PgVectorContext {
pub fn new() -> Self {
Self {
total_bytes: AtomicUsize::new(0),
vector_count: AtomicU32::new(0),
peak_bytes: AtomicUsize::new(0),
}
}
pub fn track_alloc(&self, bytes: usize) {
let new_total = self.total_bytes.fetch_add(bytes, Ordering::Relaxed) + bytes;
self.vector_count.fetch_add(1, Ordering::Relaxed);
let mut peak = self.peak_bytes.load(Ordering::Relaxed);
while new_total > peak {
match self.peak_bytes.compare_exchange_weak(
peak,
new_total,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(x) => peak = x,
}
}
}
pub fn track_dealloc(&self, bytes: usize) {
self.total_bytes.fetch_sub(bytes, Ordering::Relaxed);
self.vector_count.fetch_sub(1, Ordering::Relaxed);
}
pub fn current_bytes(&self) -> usize {
self.total_bytes.load(Ordering::Relaxed)
}
pub fn peak_bytes(&self) -> usize {
self.peak_bytes.load(Ordering::Relaxed)
}
pub fn count(&self) -> u32 {
self.vector_count.load(Ordering::Relaxed)
}
}
impl Default for PgVectorContext {
fn default() -> Self {
Self::new()
}
}
static GLOBAL_VECTOR_CONTEXT: PgVectorContext = PgVectorContext {
total_bytes: AtomicUsize::new(0),
vector_count: AtomicU32::new(0),
peak_bytes: AtomicUsize::new(0),
};
pub unsafe fn palloc_vector(dims: usize) -> *mut u8 {
let data_size = dims * std::mem::size_of::<f32>();
let header_size = std::mem::size_of::<VectorHeader>();
let total_size = header_size + data_size;
let ptr = pg_sys::palloc(total_size) as *mut u8;
GLOBAL_VECTOR_CONTEXT.track_alloc(total_size);
ptr
}
pub unsafe fn palloc_vector_aligned(dims: usize) -> *mut u8 {
let data_size = dims * std::mem::size_of::<f32>();
let header_size = std::mem::size_of::<VectorHeader>();
let total_size = header_size + data_size;
const ALIGNMENT: usize = 64;
let aligned_size = (total_size + ALIGNMENT - 1) & !(ALIGNMENT - 1);
let ptr = pg_sys::palloc(aligned_size) as *mut u8;
let aligned = (ptr as usize + ALIGNMENT - 1) & !(ALIGNMENT - 1);
GLOBAL_VECTOR_CONTEXT.track_alloc(aligned_size);
aligned as *mut u8
}
pub unsafe fn pfree_vector(ptr: *mut u8, dims: usize) {
let data_size = dims * std::mem::size_of::<f32>();
let header_size = std::mem::size_of::<VectorHeader>();
let total_size = header_size + data_size;
pg_sys::pfree(ptr as *mut std::os::raw::c_void);
GLOBAL_VECTOR_CONTEXT.track_dealloc(total_size);
}
#[repr(C, align(8))]
#[derive(Clone, Copy)]
pub struct VectorHeader {
pub vl_len: u32,
pub dimensions: u32,
}
impl VectorHeader {
pub fn new(dimensions: u32, data_size: usize) -> Self {
let total_size = std::mem::size_of::<Self>() + data_size;
Self {
vl_len: total_size as u32,
dimensions,
}
}
pub fn total_size(&self) -> usize {
self.vl_len as usize
}
pub fn data_size(&self) -> usize {
self.total_size() - std::mem::size_of::<Self>()
}
pub fn is_toasted(&self) -> bool {
(self.vl_len & 0x8000_0000) != 0
}
}
#[repr(C, align(64))] pub struct HnswSharedMem {
pub entry_point: AtomicU32,
pub node_count: AtomicU32,
pub max_layer: AtomicU32,
pub m: AtomicU32,
pub ef_construction: AtomicU32,
pub memory_bytes: AtomicUsize,
pub lock_exclusive: AtomicU32,
pub lock_shared: AtomicU32,
pub version: AtomicU32,
pub flags: AtomicU32,
}
impl HnswSharedMem {
pub fn new(m: u32, ef_construction: u32) -> Self {
Self {
entry_point: AtomicU32::new(u32::MAX), node_count: AtomicU32::new(0),
max_layer: AtomicU32::new(0),
m: AtomicU32::new(m),
ef_construction: AtomicU32::new(ef_construction),
memory_bytes: AtomicUsize::new(0),
lock_exclusive: AtomicU32::new(0),
lock_shared: AtomicU32::new(0),
version: AtomicU32::new(0),
flags: AtomicU32::new(0),
}
}
pub fn try_lock_exclusive(&self) -> bool {
self.lock_exclusive
.compare_exchange(0, 1, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
}
pub fn unlock_exclusive(&self) {
self.lock_exclusive.store(0, Ordering::Release);
}
pub fn lock_shared(&self) {
self.lock_shared.fetch_add(1, Ordering::Acquire);
}
pub fn unlock_shared(&self) {
self.lock_shared.fetch_sub(1, Ordering::Release);
}
pub fn is_locked_exclusive(&self) -> bool {
self.lock_exclusive.load(Ordering::Relaxed) != 0
}
pub fn shared_lock_count(&self) -> u32 {
self.lock_shared.load(Ordering::Relaxed)
}
pub fn increment_version(&self) -> u32 {
self.version.fetch_add(1, Ordering::Release)
}
pub fn version(&self) -> u32 {
self.version.load(Ordering::Acquire)
}
}
#[repr(C, align(64))]
pub struct IvfFlatSharedMem {
pub nlists: AtomicU32,
pub dimensions: AtomicU32,
pub vector_count: AtomicU32,
pub memory_bytes: AtomicUsize,
pub lock_exclusive: AtomicU32,
pub lock_shared: AtomicU32,
pub version: AtomicU32,
pub flags: AtomicU32,
}
impl IvfFlatSharedMem {
pub fn new(nlists: u32, dimensions: u32) -> Self {
Self {
nlists: AtomicU32::new(nlists),
dimensions: AtomicU32::new(dimensions),
vector_count: AtomicU32::new(0),
memory_bytes: AtomicUsize::new(0),
lock_exclusive: AtomicU32::new(0),
lock_shared: AtomicU32::new(0),
version: AtomicU32::new(0),
flags: AtomicU32::new(0),
}
}
pub fn try_lock_exclusive(&self) -> bool {
self.lock_exclusive
.compare_exchange(0, 1, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
}
pub fn unlock_exclusive(&self) {
self.lock_exclusive.store(0, Ordering::Release);
}
pub fn lock_shared(&self) {
self.lock_shared.fetch_add(1, Ordering::Acquire);
}
pub fn unlock_shared(&self) {
self.lock_shared.fetch_sub(1, Ordering::Release);
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ToastStrategy {
Inline,
Compressed,
External,
ExtendedCompressed,
}
impl ToastStrategy {
pub fn for_vector(dims: usize, compressibility: f32) -> Self {
let size = dims * std::mem::size_of::<f32>();
if size < INLINE_THRESHOLD {
Self::Inline
} else if size < TOAST_THRESHOLD {
if compressibility > 0.3 {
Self::Compressed
} else {
Self::Inline
}
} else if size < 8192 {
if compressibility > 0.2 {
Self::Compressed
} else {
Self::External
}
} else {
if compressibility > 0.15 {
Self::ExtendedCompressed
} else {
Self::External
}
}
}
}
pub fn estimate_compressibility(data: &[f32]) -> f32 {
if data.is_empty() {
return 0.0;
}
let mut zero_count = 0;
let mut repeated_count = 0;
let mut prev = f32::NAN;
for &val in data {
if val == 0.0 {
zero_count += 1;
}
if val == prev {
repeated_count += 1;
}
prev = val;
}
let zero_ratio = zero_count as f32 / data.len() as f32;
let repeat_ratio = repeated_count as f32 / data.len() as f32;
(zero_ratio * 0.7 + repeat_ratio * 0.3).min(1.0)
}
#[derive(Debug, Clone)]
pub struct VectorStorage {
pub strategy: ToastStrategy,
pub original_size: usize,
pub stored_size: usize,
pub compressed: bool,
pub external: bool,
}
impl VectorStorage {
pub fn inline(size: usize) -> Self {
Self {
strategy: ToastStrategy::Inline,
original_size: size,
stored_size: size,
compressed: false,
external: false,
}
}
pub fn compressed(original_size: usize, compressed_size: usize) -> Self {
Self {
strategy: ToastStrategy::Compressed,
original_size,
stored_size: compressed_size,
compressed: true,
external: false,
}
}
pub fn external(size: usize) -> Self {
Self {
strategy: ToastStrategy::External,
original_size: size,
stored_size: size,
compressed: false,
external: true,
}
}
pub fn compression_ratio(&self) -> f32 {
if self.original_size == 0 {
return 1.0;
}
self.stored_size as f32 / self.original_size as f32
}
pub fn space_saved(&self) -> usize {
self.original_size.saturating_sub(self.stored_size)
}
}
pub fn get_memory_stats() -> MemoryStats {
MemoryStats {
current_bytes: GLOBAL_VECTOR_CONTEXT.current_bytes(),
peak_bytes: GLOBAL_VECTOR_CONTEXT.peak_bytes(),
vector_count: GLOBAL_VECTOR_CONTEXT.count(),
cache_bytes: VECTOR_CACHE_BYTES.load(Ordering::Relaxed),
}
}
#[derive(Debug, Clone)]
pub struct MemoryStats {
pub current_bytes: usize,
pub peak_bytes: usize,
pub vector_count: u32,
pub cache_bytes: usize,
}
impl MemoryStats {
pub fn current_mb(&self) -> f64 {
self.current_bytes as f64 / (1024.0 * 1024.0)
}
pub fn peak_mb(&self) -> f64 {
self.peak_bytes as f64 / (1024.0 * 1024.0)
}
pub fn cache_mb(&self) -> f64 {
self.cache_bytes as f64 / (1024.0 * 1024.0)
}
pub fn total_mb(&self) -> f64 {
(self.current_bytes + self.cache_bytes) as f64 / (1024.0 * 1024.0)
}
}
#[pg_extern]
fn ruvector_memory_detailed() -> pgrx::JsonB {
let stats = get_memory_stats();
pgrx::JsonB(serde_json::json!({
"current_mb": stats.current_mb(),
"peak_mb": stats.peak_mb(),
"cache_mb": stats.cache_mb(),
"total_mb": stats.total_mb(),
"vector_count": stats.vector_count,
"current_bytes": stats.current_bytes,
"peak_bytes": stats.peak_bytes,
"cache_bytes": stats.cache_bytes,
}))
}
#[pg_extern]
fn ruvector_reset_peak_memory() {
GLOBAL_VECTOR_CONTEXT
.peak_bytes
.store(GLOBAL_VECTOR_CONTEXT.current_bytes(), Ordering::Relaxed);
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_vector_header() {
let header = VectorHeader::new(128, 512);
assert_eq!(header.dimensions, 128);
assert_eq!(header.data_size(), 512);
}
#[test]
fn test_hnsw_shared_mem() {
let shmem = HnswSharedMem::new(16, 64);
assert_eq!(shmem.m.load(Ordering::Relaxed), 16);
assert_eq!(shmem.ef_construction.load(Ordering::Relaxed), 64);
assert!(shmem.try_lock_exclusive());
assert!(!shmem.try_lock_exclusive()); shmem.unlock_exclusive();
assert!(shmem.try_lock_exclusive()); }
#[test]
fn test_toast_strategy() {
let strategy = ToastStrategy::for_vector(64, 0.0);
assert_eq!(strategy, ToastStrategy::Inline);
let strategy = ToastStrategy::for_vector(1024, 0.5);
assert_eq!(strategy, ToastStrategy::Compressed);
let strategy = ToastStrategy::for_vector(1024, 0.0);
assert_eq!(strategy, ToastStrategy::External);
}
#[test]
fn test_compressibility() {
let data = vec![0.0; 100];
let comp = estimate_compressibility(&data);
assert!(comp > 0.6);
let data: Vec<f32> = (0..100).map(|i| i as f32).collect();
let comp = estimate_compressibility(&data);
assert!(comp < 0.3);
}
#[test]
fn test_vector_storage() {
let storage = VectorStorage::compressed(1000, 400);
assert_eq!(storage.compression_ratio(), 0.4);
assert_eq!(storage.space_saved(), 600);
}
#[test]
fn test_memory_context() {
let ctx = PgVectorContext::new();
ctx.track_alloc(1024);
assert_eq!(ctx.current_bytes(), 1024);
assert_eq!(ctx.count(), 1);
ctx.track_alloc(512);
assert_eq!(ctx.current_bytes(), 1536);
assert_eq!(ctx.peak_bytes(), 1536);
ctx.track_dealloc(1024);
assert_eq!(ctx.current_bytes(), 512);
assert_eq!(ctx.peak_bytes(), 1536); }
}