Skip to main content

selene_graph/vector_index/
maintenance.rs

1use std::collections::BTreeSet;
2
3use selene_core::{DbString, LabelSet, PropertyMap, Value, VectorValue};
4
5use crate::error::{GraphError, GraphResult};
6
7use super::{VectorIndexKind, VectorIndexMap};
8
9/// Error returned when a value cannot be admitted to a vector index.
10#[derive(Clone, Debug, Eq, PartialEq, thiserror::Error)]
11pub enum VectorIndexValueError {
12    /// Value is not a vector.
13    #[error("kind mismatch: observed {observed}")]
14    KindMismatch {
15        /// Observed value kind.
16        observed: &'static str,
17    },
18    /// Vector dimensionality differs from the registration.
19    #[error("dimension mismatch: expected {expected}, observed {observed}")]
20    DimensionMismatch {
21        /// Expected vector dimensionality.
22        expected: u32,
23        /// Observed vector dimensionality.
24        observed: usize,
25    },
26    /// Vector is structurally valid but invalid for the index metric.
27    #[error("metric rejection: {observed}")]
28    MetricRejected {
29        /// Observed metric rejection reason.
30        observed: String,
31    },
32}
33
34impl VectorIndexValueError {
35    fn observed(&self) -> String {
36        match self {
37            Self::KindMismatch { observed } => (*observed).to_owned(),
38            Self::DimensionMismatch { observed, .. } => format!("VECTOR<{observed}>"),
39            Self::MetricRejected { observed } => observed.clone(),
40        }
41    }
42}
43
44pub(crate) fn apply_node_create(
45    indexes: &mut VectorIndexMap,
46    labels: &LabelSet,
47    props: &PropertyMap,
48    row: u32,
49) -> GraphResult<()> {
50    for label in labels.iter() {
51        for (property, value) in props.iter() {
52            if is_null(value) {
53                continue;
54            }
55            insert_commit(indexes, label.clone(), property.clone(), value, row)?;
56        }
57    }
58    Ok(())
59}
60
61pub(crate) fn apply_node_delete(
62    indexes: &mut VectorIndexMap,
63    labels: &LabelSet,
64    props: &PropertyMap,
65    row: u32,
66) -> GraphResult<()> {
67    for label in labels.iter() {
68        for (property, value) in props.iter() {
69            if is_null(value) {
70                continue;
71            }
72            remove_commit(indexes, label.clone(), property.clone(), value, row)?;
73        }
74    }
75    Ok(())
76}
77
78pub(crate) fn apply_node_update(
79    indexes: &mut VectorIndexMap,
80    old_labels: &LabelSet,
81    old_props: &PropertyMap,
82    new_labels: &LabelSet,
83    new_props: &PropertyMap,
84    row: u32,
85) -> GraphResult<()> {
86    let candidates = candidate_keys(indexes, old_labels, old_props, new_labels, new_props);
87    for (label, property) in candidates {
88        match (
89            indexable_value(old_labels, old_props, &label, &property),
90            indexable_value(new_labels, new_props, &label, &property),
91        ) {
92            (Some(old_value), Some(new_value)) => {
93                replace_commit(
94                    indexes,
95                    label.clone(),
96                    property.clone(),
97                    old_value,
98                    new_value,
99                    row,
100                )?;
101            }
102            (Some(value), None) => {
103                remove_commit(indexes, label.clone(), property.clone(), value, row)?;
104            }
105            (None, Some(value)) => {
106                insert_commit(indexes, label.clone(), property.clone(), value, row)?;
107            }
108            (None, None) => {}
109        }
110    }
111    Ok(())
112}
113
114fn candidate_keys(
115    indexes: &VectorIndexMap,
116    old_labels: &LabelSet,
117    old_props: &PropertyMap,
118    new_labels: &LabelSet,
119    new_props: &PropertyMap,
120) -> BTreeSet<(DbString, DbString)> {
121    if indexes.is_empty() {
122        return BTreeSet::new();
123    }
124    let mut labels: BTreeSet<DbString> = BTreeSet::new();
125    labels.extend(old_labels.iter().cloned());
126    labels.extend(new_labels.iter().cloned());
127
128    let mut properties: BTreeSet<DbString> = BTreeSet::new();
129    properties.extend(old_props.keys().cloned());
130    properties.extend(new_props.keys().cloned());
131
132    let mut candidates = BTreeSet::new();
133    for label in &labels {
134        for property in &properties {
135            let key = (label.clone(), property.clone());
136            if indexes.contains_key(&key) {
137                candidates.insert(key);
138            }
139        }
140    }
141    candidates
142}
143
144fn indexable_value<'a>(
145    labels: &LabelSet,
146    props: &'a PropertyMap,
147    label: &DbString,
148    property: &DbString,
149) -> Option<&'a Value> {
150    if !labels.contains(label) {
151        return None;
152    }
153    props.get(property).filter(|value| !is_null(value))
154}
155
156fn insert_commit(
157    indexes: &mut VectorIndexMap,
158    label: DbString,
159    property: DbString,
160    value: &Value,
161    row: u32,
162) -> GraphResult<()> {
163    if let Some(entry) = indexes.get_mut(&(label.clone(), property.clone())) {
164        let vector = admit(value, entry.kind(), entry.dimension())
165            .map_err(|err| index_rejection(label, property, entry.dimension(), err))?;
166        std::sync::Arc::make_mut(&mut entry.index).insert_value(row, vector)?;
167    }
168    Ok(())
169}
170
171fn remove_commit(
172    indexes: &mut VectorIndexMap,
173    label: DbString,
174    property: DbString,
175    value: &Value,
176    row: u32,
177) -> GraphResult<()> {
178    if let Some(entry) = indexes.get_mut(&(label.clone(), property.clone())) {
179        admit(value, entry.kind(), entry.dimension())
180            .map_err(|err| index_rejection(label, property, entry.dimension(), err))?;
181        std::sync::Arc::make_mut(&mut entry.index).remove_row(row);
182    }
183    Ok(())
184}
185
186fn replace_commit(
187    indexes: &mut VectorIndexMap,
188    label: DbString,
189    property: DbString,
190    old_value: &Value,
191    new_value: &Value,
192    row: u32,
193) -> GraphResult<()> {
194    if let Some(entry) = indexes.get_mut(&(label.clone(), property.clone())) {
195        admit(old_value, entry.kind(), entry.dimension()).map_err(|err| {
196            index_rejection(label.clone(), property.clone(), entry.dimension(), err)
197        })?;
198        let vector = admit(new_value, entry.kind(), entry.dimension())
199            .map_err(|err| index_rejection(label, property, entry.dimension(), err))?;
200        std::sync::Arc::make_mut(&mut entry.index).insert_value(row, vector)?;
201    }
202    Ok(())
203}
204
205pub(super) fn admit(
206    value: &Value,
207    kind: VectorIndexKind,
208    expected_dimension: u32,
209) -> Result<&VectorValue, VectorIndexValueError> {
210    let Value::Vector(vector) = value else {
211        return Err(VectorIndexValueError::KindMismatch {
212            observed: value_kind_name(value),
213        });
214    };
215    if vector.dimension() != expected_dimension as usize {
216        return Err(VectorIndexValueError::DimensionMismatch {
217            expected: expected_dimension,
218            observed: vector.dimension(),
219        });
220    }
221    if let Some(metric) = kind.ann_metric() {
222        metric
223            .distance(vector, vector)
224            .map_err(|err| VectorIndexValueError::MetricRejected {
225                observed: err.to_string(),
226            })?;
227    }
228    Ok(vector)
229}
230
231pub(super) fn index_rejection(
232    label: DbString,
233    property: DbString,
234    expected_dimension: u32,
235    err: VectorIndexValueError,
236) -> GraphError {
237    GraphError::VectorIndexValueRejected {
238        label,
239        property,
240        expected_dimension,
241        observed: err.observed(),
242    }
243}
244
245pub(super) fn warn_rejected(
246    op: &'static str,
247    label: DbString,
248    property: DbString,
249    row: u32,
250    err: &VectorIndexValueError,
251) {
252    tracing::warn!(
253        op,
254        %label,
255        %property,
256        row,
257        error = %err,
258        "skipped vector-index update for value that does not match the registered vector index"
259    );
260}
261
262pub(super) const fn is_null(value: &Value) -> bool {
263    matches!(value, Value::Null)
264}
265
266const fn value_kind_name(value: &Value) -> &'static str {
267    match value {
268        Value::Null => "Null",
269        Value::Bool(_) => "Bool",
270        Value::Int(_) => "Int",
271        Value::Uint(_) => "Uint",
272        Value::Int128(_) => "Int128",
273        Value::Uint128(_) => "Uint128",
274        Value::Float(_) => "Float",
275        Value::Float32(_) => "Float32",
276        Value::Decimal(_) => "Decimal",
277        Value::String(_) => "String",
278        Value::Bytes(_) => "Bytes",
279        Value::List(_) => "List",
280        Value::Record(_) => "Record",
281        Value::RecordTyped(_) => "RecordTyped",
282        Value::Path(_) => "Path",
283        Value::NodeRef(_) => "NodeRef",
284        Value::EdgeRef(_) => "EdgeRef",
285        Value::GraphRef(_) => "GraphRef",
286        Value::TableRef(_) => "TableRef",
287        Value::ZonedDateTime(_) => "ZonedDateTime",
288        Value::LocalDateTime(_) => "LocalDateTime",
289        Value::Date(_) => "Date",
290        Value::ZonedTime(_) => "ZonedTime",
291        Value::LocalTime(_) => "LocalTime",
292        Value::Duration(_) => "Duration",
293        Value::Extended { .. } => "Extended",
294        Value::Uuid(_) => "Uuid",
295        Value::Vector(_) => "Vector",
296        Value::Json(_) => "Json",
297        _ => "Unknown",
298    }
299}