1use std::collections::BTreeSet;
2
3use selene_core::{DbString, LabelSet, PropertyMap, Value, VectorValue};
4
5use crate::error::{GraphError, GraphResult};
6
7use super::{VectorIndexKind, VectorIndexMap};
8
9#[derive(Clone, Debug, Eq, PartialEq, thiserror::Error)]
11pub enum VectorIndexValueError {
12 #[error("kind mismatch: observed {observed}")]
14 KindMismatch {
15 observed: &'static str,
17 },
18 #[error("dimension mismatch: expected {expected}, observed {observed}")]
20 DimensionMismatch {
21 expected: u32,
23 observed: usize,
25 },
26 #[error("metric rejection: {observed}")]
28 MetricRejected {
29 observed: String,
31 },
32}
33
34impl VectorIndexValueError {
35 fn observed(&self) -> String {
36 match self {
37 Self::KindMismatch { observed } => (*observed).to_owned(),
38 Self::DimensionMismatch { observed, .. } => format!("VECTOR<{observed}>"),
39 Self::MetricRejected { observed } => observed.clone(),
40 }
41 }
42}
43
44pub(crate) fn apply_node_create(
45 indexes: &mut VectorIndexMap,
46 labels: &LabelSet,
47 props: &PropertyMap,
48 row: u32,
49) -> GraphResult<()> {
50 for label in labels.iter() {
51 for (property, value) in props.iter() {
52 if is_null(value) {
53 continue;
54 }
55 insert_commit(indexes, label.clone(), property.clone(), value, row)?;
56 }
57 }
58 Ok(())
59}
60
61pub(crate) fn apply_node_delete(
62 indexes: &mut VectorIndexMap,
63 labels: &LabelSet,
64 props: &PropertyMap,
65 row: u32,
66) -> GraphResult<()> {
67 for label in labels.iter() {
68 for (property, value) in props.iter() {
69 if is_null(value) {
70 continue;
71 }
72 remove_commit(indexes, label.clone(), property.clone(), value, row)?;
73 }
74 }
75 Ok(())
76}
77
78pub(crate) fn apply_node_update(
79 indexes: &mut VectorIndexMap,
80 old_labels: &LabelSet,
81 old_props: &PropertyMap,
82 new_labels: &LabelSet,
83 new_props: &PropertyMap,
84 row: u32,
85) -> GraphResult<()> {
86 let candidates = candidate_keys(indexes, old_labels, old_props, new_labels, new_props);
87 for (label, property) in candidates {
88 match (
89 indexable_value(old_labels, old_props, &label, &property),
90 indexable_value(new_labels, new_props, &label, &property),
91 ) {
92 (Some(old_value), Some(new_value)) => {
93 replace_commit(
94 indexes,
95 label.clone(),
96 property.clone(),
97 old_value,
98 new_value,
99 row,
100 )?;
101 }
102 (Some(value), None) => {
103 remove_commit(indexes, label.clone(), property.clone(), value, row)?;
104 }
105 (None, Some(value)) => {
106 insert_commit(indexes, label.clone(), property.clone(), value, row)?;
107 }
108 (None, None) => {}
109 }
110 }
111 Ok(())
112}
113
114fn candidate_keys(
115 indexes: &VectorIndexMap,
116 old_labels: &LabelSet,
117 old_props: &PropertyMap,
118 new_labels: &LabelSet,
119 new_props: &PropertyMap,
120) -> BTreeSet<(DbString, DbString)> {
121 if indexes.is_empty() {
122 return BTreeSet::new();
123 }
124 let mut labels: BTreeSet<DbString> = BTreeSet::new();
125 labels.extend(old_labels.iter().cloned());
126 labels.extend(new_labels.iter().cloned());
127
128 let mut properties: BTreeSet<DbString> = BTreeSet::new();
129 properties.extend(old_props.keys().cloned());
130 properties.extend(new_props.keys().cloned());
131
132 let mut candidates = BTreeSet::new();
133 for label in &labels {
134 for property in &properties {
135 let key = (label.clone(), property.clone());
136 if indexes.contains_key(&key) {
137 candidates.insert(key);
138 }
139 }
140 }
141 candidates
142}
143
144fn indexable_value<'a>(
145 labels: &LabelSet,
146 props: &'a PropertyMap,
147 label: &DbString,
148 property: &DbString,
149) -> Option<&'a Value> {
150 if !labels.contains(label) {
151 return None;
152 }
153 props.get(property).filter(|value| !is_null(value))
154}
155
156fn insert_commit(
157 indexes: &mut VectorIndexMap,
158 label: DbString,
159 property: DbString,
160 value: &Value,
161 row: u32,
162) -> GraphResult<()> {
163 if let Some(entry) = indexes.get_mut(&(label.clone(), property.clone())) {
164 let vector = admit(value, entry.kind(), entry.dimension())
165 .map_err(|err| index_rejection(label, property, entry.dimension(), err))?;
166 std::sync::Arc::make_mut(&mut entry.index).insert_value(row, vector)?;
167 }
168 Ok(())
169}
170
171fn remove_commit(
172 indexes: &mut VectorIndexMap,
173 label: DbString,
174 property: DbString,
175 value: &Value,
176 row: u32,
177) -> GraphResult<()> {
178 if let Some(entry) = indexes.get_mut(&(label.clone(), property.clone())) {
179 admit(value, entry.kind(), entry.dimension())
180 .map_err(|err| index_rejection(label, property, entry.dimension(), err))?;
181 std::sync::Arc::make_mut(&mut entry.index).remove_row(row);
182 }
183 Ok(())
184}
185
186fn replace_commit(
187 indexes: &mut VectorIndexMap,
188 label: DbString,
189 property: DbString,
190 old_value: &Value,
191 new_value: &Value,
192 row: u32,
193) -> GraphResult<()> {
194 if let Some(entry) = indexes.get_mut(&(label.clone(), property.clone())) {
195 admit(old_value, entry.kind(), entry.dimension()).map_err(|err| {
196 index_rejection(label.clone(), property.clone(), entry.dimension(), err)
197 })?;
198 let vector = admit(new_value, entry.kind(), entry.dimension())
199 .map_err(|err| index_rejection(label, property, entry.dimension(), err))?;
200 std::sync::Arc::make_mut(&mut entry.index).insert_value(row, vector)?;
201 }
202 Ok(())
203}
204
205pub(super) fn admit(
206 value: &Value,
207 kind: VectorIndexKind,
208 expected_dimension: u32,
209) -> Result<&VectorValue, VectorIndexValueError> {
210 let Value::Vector(vector) = value else {
211 return Err(VectorIndexValueError::KindMismatch {
212 observed: value_kind_name(value),
213 });
214 };
215 if vector.dimension() != expected_dimension as usize {
216 return Err(VectorIndexValueError::DimensionMismatch {
217 expected: expected_dimension,
218 observed: vector.dimension(),
219 });
220 }
221 if let Some(metric) = kind.ann_metric() {
222 metric
223 .distance(vector, vector)
224 .map_err(|err| VectorIndexValueError::MetricRejected {
225 observed: err.to_string(),
226 })?;
227 }
228 Ok(vector)
229}
230
231pub(super) fn index_rejection(
232 label: DbString,
233 property: DbString,
234 expected_dimension: u32,
235 err: VectorIndexValueError,
236) -> GraphError {
237 GraphError::VectorIndexValueRejected {
238 label,
239 property,
240 expected_dimension,
241 observed: err.observed(),
242 }
243}
244
245pub(super) fn warn_rejected(
246 op: &'static str,
247 label: DbString,
248 property: DbString,
249 row: u32,
250 err: &VectorIndexValueError,
251) {
252 tracing::warn!(
253 op,
254 %label,
255 %property,
256 row,
257 error = %err,
258 "skipped vector-index update for value that does not match the registered vector index"
259 );
260}
261
262pub(super) const fn is_null(value: &Value) -> bool {
263 matches!(value, Value::Null)
264}
265
266const fn value_kind_name(value: &Value) -> &'static str {
267 match value {
268 Value::Null => "Null",
269 Value::Bool(_) => "Bool",
270 Value::Int(_) => "Int",
271 Value::Uint(_) => "Uint",
272 Value::Int128(_) => "Int128",
273 Value::Uint128(_) => "Uint128",
274 Value::Float(_) => "Float",
275 Value::Float32(_) => "Float32",
276 Value::Decimal(_) => "Decimal",
277 Value::String(_) => "String",
278 Value::Bytes(_) => "Bytes",
279 Value::List(_) => "List",
280 Value::Record(_) => "Record",
281 Value::RecordTyped(_) => "RecordTyped",
282 Value::Path(_) => "Path",
283 Value::NodeRef(_) => "NodeRef",
284 Value::EdgeRef(_) => "EdgeRef",
285 Value::GraphRef(_) => "GraphRef",
286 Value::TableRef(_) => "TableRef",
287 Value::ZonedDateTime(_) => "ZonedDateTime",
288 Value::LocalDateTime(_) => "LocalDateTime",
289 Value::Date(_) => "Date",
290 Value::ZonedTime(_) => "ZonedTime",
291 Value::LocalTime(_) => "LocalTime",
292 Value::Duration(_) => "Duration",
293 Value::Extended { .. } => "Extended",
294 Value::Uuid(_) => "Uuid",
295 Value::Vector(_) => "Vector",
296 Value::Json(_) => "Json",
297 _ => "Unknown",
298 }
299}