#[cfg(feature = "vector")]
use super::*;
#[cfg(feature = "vector")]
impl Keyspace {
#[allow(clippy::too_many_arguments)]
pub fn vadd(
&mut self,
key: &str,
element: String,
vector: Vec<f32>,
metric: crate::types::vector::DistanceMetric,
quantization: crate::types::vector::QuantizationType,
connectivity: usize,
expansion_add: usize,
) -> Result<VAddResult, VectorWriteError> {
use crate::types::vector::VectorSet;
self.remove_if_expired(key);
let is_new = match self.entries.get(key) {
None => true,
Some(e) if matches!(e.value, Value::Vector(_)) => false,
Some(_) => return Err(VectorWriteError::WrongType),
};
let dim = vector.len();
let per_vector = dim
.saturating_mul(quantization.bytes_per_element())
.saturating_add(connectivity.saturating_mul(16))
.saturating_add(element.len())
.saturating_add(80);
let estimated_increase = if is_new {
memory::ENTRY_OVERHEAD + key.len() + VectorSet::BASE_OVERHEAD + per_vector
} else {
per_vector
};
if !self.enforce_memory_limit(estimated_increase) {
return Err(VectorWriteError::OutOfMemory);
}
if is_new {
let vs = VectorSet::new(dim, metric, quantization, connectivity, expansion_add)
.map_err(|e| VectorWriteError::IndexError(e.to_string()))?;
let value = Value::Vector(vs);
self.memory.add(key, &value);
let entry = Entry::new(value, None);
self.entries.insert(CompactString::from(key), entry);
self.bump_version(key);
}
let entry = match self.entries.get_mut(key) {
Some(e) => e,
None => return Err(VectorWriteError::IndexError("entry missing".into())),
};
let old_entry_size = entry.entry_size(key);
let added = match entry.value {
Value::Vector(ref mut vs) => vs
.add(element.clone(), &vector)
.map_err(|e| VectorWriteError::IndexError(e.to_string()))?,
_ => return Err(VectorWriteError::WrongType),
};
entry.touch(self.track_access);
let new_value_size = memory::value_size(&entry.value);
entry.cached_value_size = new_value_size as u32;
let new_entry_size = key.len() + new_value_size + memory::ENTRY_OVERHEAD;
self.memory.adjust(old_entry_size, new_entry_size);
self.bump_version(key);
Ok(VAddResult {
element,
vector,
added,
})
}
#[allow(clippy::too_many_arguments)]
pub fn vadd_batch(
&mut self,
key: &str,
entries: Vec<(String, Vec<f32>)>,
metric: crate::types::vector::DistanceMetric,
quantization: crate::types::vector::QuantizationType,
connectivity: usize,
expansion_add: usize,
) -> Result<VAddBatchResult, VectorWriteError> {
use crate::types::vector::VectorSet;
if entries.is_empty() {
return Ok(VAddBatchResult {
added_count: 0,
applied: Vec::new(),
});
}
self.remove_if_expired(key);
let is_new = match self.entries.get(key) {
None => true,
Some(e) if matches!(e.value, Value::Vector(_)) => false,
Some(_) => return Err(VectorWriteError::WrongType),
};
let dim = entries[0].1.len();
for (elem, vec) in &entries {
if vec.len() != dim {
return Err(VectorWriteError::IndexError(format!(
"dimension mismatch: expected {dim}, element '{elem}' has {}",
vec.len()
)));
}
for &v in vec {
if !v.is_finite() {
return Err(VectorWriteError::IndexError(format!(
"element '{elem}' contains NaN or infinity"
)));
}
}
}
let per_vector = dim
.saturating_mul(quantization.bytes_per_element())
.saturating_add(connectivity.saturating_mul(16))
.saturating_add(80);
let total_elem_names: usize = entries.iter().map(|(e, _)| e.len()).sum();
let vectors_cost = entries
.len()
.saturating_mul(per_vector)
.saturating_add(total_elem_names);
let estimated_increase = if is_new {
memory::ENTRY_OVERHEAD + key.len() + VectorSet::BASE_OVERHEAD + vectors_cost
} else {
vectors_cost
};
if !self.enforce_memory_limit(estimated_increase) {
return Err(VectorWriteError::OutOfMemory);
}
if is_new {
let vs = VectorSet::new(dim, metric, quantization, connectivity, expansion_add)
.map_err(|e| VectorWriteError::IndexError(e.to_string()))?;
let value = Value::Vector(vs);
self.memory.add(key, &value);
let new_entry = Entry::new(value, None);
self.entries.insert(CompactString::from(key), new_entry);
self.bump_version(key);
}
let batch_outcome = {
let entry = match self.entries.get_mut(key) {
Some(e) => e,
None => return Err(VectorWriteError::IndexError("entry missing".into())),
};
match entry.value {
Value::Vector(ref mut vs) => {
let per_elem = vs.per_element_bytes();
let result = vs
.add_batch_parallel(&entries)
.map_err(|e| VectorWriteError::IndexError(e.to_string()))?;
let bytes_added = if result.added_count > 0 {
let total_name_bytes: usize = entries.iter().map(|(e, _)| e.len()).sum();
let avg_name = total_name_bytes / entries.len();
result.added_count.saturating_mul(per_elem + avg_name)
} else {
0
};
entry.touch(self.track_access);
entry.cached_value_size =
(entry.cached_value_size as usize).saturating_add(bytes_added) as u32;
self.memory.grow_by(bytes_added);
if let Some(err) = result.error {
Err((err, result.added_count))
} else {
Ok((result.added_count, bytes_added))
}
}
_ => return Err(VectorWriteError::WrongType),
}
};
match batch_outcome {
Err((err, added_count)) => {
self.bump_version(key);
Err(VectorWriteError::PartialBatch {
message: format!(
"batch error: {err} ({added_count} vectors added before failure)",
),
applied: entries,
})
}
Ok((added_count, _bytes_added)) => {
self.bump_version(key);
Ok(VAddBatchResult {
added_count,
applied: entries,
})
}
}
}
pub fn vsim(
&mut self,
key: &str,
query: &[f32],
count: usize,
ef_search: usize,
) -> Result<Vec<crate::types::vector::SearchResult>, WrongType> {
if self.remove_if_expired(key) {
return Ok(Vec::new());
}
let entry = match self.entries.get_mut(key) {
Some(e) => e,
None => return Ok(Vec::new()),
};
entry.touch(self.track_access);
match entry.value {
Value::Vector(ref vs) => vs.search(query, count, ef_search).map_err(|_| WrongType),
_ => Err(WrongType),
}
}
pub fn vrem(&mut self, key: &str, element: &str) -> Result<bool, WrongType> {
if self.remove_if_expired(key) {
return Ok(false);
}
let entry = match self.entries.get_mut(key) {
Some(e) => e,
None => return Ok(false),
};
if !matches!(entry.value, Value::Vector(_)) {
return Err(WrongType);
}
let old_size = entry.entry_size(key);
let removed = match entry.value {
Value::Vector(ref mut vs) => vs.remove(element),
_ => return Err(WrongType),
};
if removed {
entry.touch(self.track_access);
let is_empty = matches!(entry.value, Value::Vector(ref vs) if vs.is_empty());
let new_vs = memory::value_size(&entry.value);
entry.cached_value_size = new_vs as u32;
let new_size = key.len() + new_vs + memory::ENTRY_OVERHEAD;
self.memory.adjust(old_size, new_size);
self.bump_version(key);
if is_empty {
self.memory.remove_with_size(new_size);
self.entries.remove(key);
}
}
Ok(removed)
}
pub fn vget(&mut self, key: &str, element: &str) -> Result<Option<Vec<f32>>, WrongType> {
if self.remove_if_expired(key) {
return Ok(None);
}
let entry = match self.entries.get_mut(key) {
Some(e) => e,
None => return Ok(None),
};
entry.touch(self.track_access);
match entry.value {
Value::Vector(ref vs) => Ok(vs.get(element)),
_ => Err(WrongType),
}
}
pub fn vcard(&mut self, key: &str) -> Result<usize, WrongType> {
if self.remove_if_expired(key) {
return Ok(0);
}
match self.entries.get(key) {
None => Ok(0),
Some(e) => match e.value {
Value::Vector(ref vs) => Ok(vs.len()),
_ => Err(WrongType),
},
}
}
pub fn vdim(&mut self, key: &str) -> Result<usize, WrongType> {
if self.remove_if_expired(key) {
return Ok(0);
}
match self.entries.get(key) {
None => Ok(0),
Some(e) => match e.value {
Value::Vector(ref vs) => Ok(vs.dim()),
_ => Err(WrongType),
},
}
}
pub fn vinfo(
&mut self,
key: &str,
) -> Result<Option<crate::types::vector::VectorSetInfo>, WrongType> {
if self.remove_if_expired(key) {
return Ok(None);
}
match self.entries.get(key) {
None => Ok(None),
Some(e) => match e.value {
Value::Vector(ref vs) => Ok(Some(vs.info())),
_ => Err(WrongType),
},
}
}
}