1use nodedb_types::NodeDbError;
11use nodedb_types::collection_config::VectorPrimaryConfig;
12use nodedb_types::vector_ann::VectorQuantization;
13use nodedb_types::vector_distance::DistanceMetric;
14
15const VALID_QUANTIZATIONS: &[&str] = &[
17 "none", "sq8", "pq", "rabitq", "bbq", "binary", "ternary", "opq",
18];
19
20pub fn parse_vector_primary_options(sql: &str) -> Result<Option<VectorPrimaryConfig>, NodeDbError> {
27 let primary_val = extract_with_str(sql, "primary");
28
29 match primary_val.as_deref() {
30 None
31 | Some("document_schemaless")
32 | Some("document_strict")
33 | Some("kv")
34 | Some("columnar")
35 | Some("timeseries")
36 | Some("spatial") => return Ok(None),
37 Some("vector") => {}
38 Some(other) => {
39 return Err(NodeDbError::bad_request(format!(
40 "unknown primary engine '{other}'; valid values: \
41 document_schemaless, document_strict, kv, columnar, timeseries, spatial, vector"
42 )));
43 }
44 }
45
46 let vector_field = extract_with_str(sql, "vector_field")
48 .ok_or_else(|| NodeDbError::bad_request("primary='vector' requires vector_field option"))?;
49 if vector_field.is_empty() {
50 return Err(NodeDbError::bad_request(
51 "vector_field must be a non-empty column name",
52 ));
53 }
54
55 let dim = extract_with_u32(sql, "dim").ok_or_else(|| {
57 NodeDbError::bad_request("primary='vector' requires dim option (e.g. dim=1024)")
58 })?;
59
60 let quantization = match extract_with_str(sql, "quantization").as_deref() {
62 None => VectorQuantization::default(),
63 Some(q) => parse_quantization(q)?,
64 };
65
66 let m: u8 = extract_with_u32(sql, "m")
68 .and_then(|v| u8::try_from(v).ok())
69 .unwrap_or(16);
70
71 let ef_construction: u16 = extract_with_u32(sql, "ef_construction")
73 .and_then(|v| u16::try_from(v).ok())
74 .unwrap_or(200);
75
76 let metric = match extract_with_str(sql, "metric").as_deref() {
78 None => DistanceMetric::Cosine,
79 Some(m) => parse_metric(m)?,
80 };
81
82 let payload_indexes = extract_payload_indexes(sql)
86 .into_iter()
87 .map(|f| (f, nodedb_types::PayloadIndexKind::Equality))
88 .collect();
89
90 Ok(Some(VectorPrimaryConfig {
91 vector_field,
92 dim,
93 quantization,
94 m,
95 ef_construction,
96 metric,
97 payload_indexes,
98 }))
99}
100
101pub fn validate_vector_field(
107 cfg: &VectorPrimaryConfig,
108 columns: &[(String, String)],
109) -> Result<(), NodeDbError> {
110 let col = columns
111 .iter()
112 .find(|(name, _)| name.eq_ignore_ascii_case(&cfg.vector_field));
113
114 let (_, type_str) = col.ok_or_else(|| {
115 NodeDbError::bad_request(format!(
116 "vector_field '{}' does not exist in the collection's column list",
117 cfg.vector_field
118 ))
119 })?;
120
121 if !type_str.to_uppercase().starts_with("VECTOR") {
122 return Err(NodeDbError::bad_request(format!(
123 "vector_field '{}' is of type '{}'; must be VECTOR(n)",
124 cfg.vector_field, type_str
125 )));
126 }
127
128 Ok(())
129}
130
131fn infer_payload_kind(upper_type: &str) -> nodedb_types::PayloadIndexKind {
133 use nodedb_types::PayloadIndexKind as K;
134 let head = upper_type
135 .split_once('(')
136 .map(|(p, _)| p)
137 .unwrap_or(upper_type)
138 .trim();
139 match head {
140 "BIGINT" | "INT" | "INTEGER" | "SMALLINT" | "TINYINT" | "BIGSERIAL" | "SERIAL"
141 | "FLOAT" | "DOUBLE" | "REAL" | "NUMERIC" | "DECIMAL" | "TIMESTAMP" | "TIMESTAMPTZ"
142 | "DATE" | "TIME" | "INSTANT" | "DATETIME" => K::Range,
143 "BOOL" | "BOOLEAN" => K::Boolean,
144 _ => K::Equality,
145 }
146}
147
148pub fn validate_payload_indexes(
154 cfg: &mut VectorPrimaryConfig,
155 columns: &[(String, String)],
156) -> Result<(), NodeDbError> {
157 for slot in cfg.payload_indexes.iter_mut() {
158 let field = slot.0.clone();
159 let col = columns
160 .iter()
161 .find(|(name, _)| name.eq_ignore_ascii_case(&field));
162
163 let (_, type_str) = col.ok_or_else(|| {
164 NodeDbError::bad_request(format!(
165 "payload_indexes field '{field}' does not exist in the collection's column list"
166 ))
167 })?;
168
169 let upper_type = type_str.to_uppercase();
170 if upper_type.starts_with("VECTOR")
171 || upper_type == "BLOB"
172 || upper_type == "BYTES"
173 || upper_type == "BYTEA"
174 {
175 return Err(NodeDbError::bad_request(format!(
176 "payload_indexes field '{field}' has type '{type_str}' which is not bitmap-eligible; \
177 only text, integer, boolean, and timestamp types are supported"
178 )));
179 }
180 slot.1 = infer_payload_kind(&upper_type);
181 }
182 Ok(())
183}
184
185pub fn parse_vector_primary_options_from_kvs(
192 options: &[(String, String)],
193) -> Result<Option<VectorPrimaryConfig>, NodeDbError> {
194 let get = |key: &str| -> Option<String> {
195 options
196 .iter()
197 .find(|(k, _)| k.eq_ignore_ascii_case(key))
198 .map(|(_, v)| v.clone())
199 };
200
201 let primary_val = get("primary");
202 match primary_val.as_deref() {
203 None
204 | Some("document_schemaless")
205 | Some("document_strict")
206 | Some("kv")
207 | Some("columnar")
208 | Some("timeseries")
209 | Some("spatial") => return Ok(None),
210 Some("vector") => {}
211 Some(other) => {
212 return Err(NodeDbError::bad_request(format!(
213 "unknown primary engine '{other}'; valid values: \
214 document_schemaless, document_strict, kv, columnar, timeseries, spatial, vector"
215 )));
216 }
217 }
218
219 let vector_field = get("vector_field")
220 .ok_or_else(|| NodeDbError::bad_request("primary='vector' requires vector_field option"))?;
221 if vector_field.is_empty() {
222 return Err(NodeDbError::bad_request(
223 "vector_field must be a non-empty column name",
224 ));
225 }
226
227 let dim = get("dim")
228 .and_then(|v| v.parse::<u32>().ok())
229 .ok_or_else(|| {
230 NodeDbError::bad_request("primary='vector' requires dim option (e.g. dim=1024)")
231 })?;
232
233 let quantization = match get("quantization").as_deref() {
234 None => VectorQuantization::default(),
235 Some(q) => parse_quantization(q)?,
236 };
237
238 let m: u8 = get("m")
239 .and_then(|v| v.parse::<u32>().ok())
240 .and_then(|v| u8::try_from(v).ok())
241 .unwrap_or(16);
242
243 let ef_construction: u16 = get("ef_construction")
244 .and_then(|v| v.parse::<u32>().ok())
245 .and_then(|v| u16::try_from(v).ok())
246 .unwrap_or(200);
247
248 let metric = match get("metric").as_deref() {
249 None => DistanceMetric::Cosine,
250 Some(m) => parse_metric(m)?,
251 };
252
253 let payload_indexes = get("payload_indexes")
256 .map(|v| {
257 v.split(',')
258 .filter_map(|s| {
259 let s = s
260 .trim()
261 .trim_matches('\'')
262 .trim_matches('"')
263 .trim()
264 .to_lowercase();
265 if s.is_empty() {
266 None
267 } else {
268 Some((s, nodedb_types::PayloadIndexKind::Equality))
269 }
270 })
271 .collect::<Vec<_>>()
272 })
273 .unwrap_or_default();
274
275 Ok(Some(VectorPrimaryConfig {
276 vector_field,
277 dim,
278 quantization,
279 m,
280 ef_construction,
281 metric,
282 payload_indexes,
283 }))
284}
285
286fn with_clause(sql: &str) -> &str {
291 let upper = sql.to_uppercase();
292 let Some(pos) = upper.find("WITH") else {
293 return sql;
294 };
295 if pos > 0 {
297 let before = sql.as_bytes()[pos - 1];
298 if before.is_ascii_alphanumeric() || before == b'_' {
299 return sql;
300 }
301 }
302 let after = &sql[pos + 4..];
303 let Some(open) = after.find('(') else {
304 return sql;
305 };
306 let inner = &after[open + 1..];
307 let Some(close) = inner.rfind(')') else {
308 return inner;
309 };
310 &inner[..close]
311}
312
313fn extract_with_str(sql: &str, key: &str) -> Option<String> {
315 let scope = with_clause(sql);
316 let upper = scope.to_uppercase();
317 let key_upper = key.to_uppercase();
318
319 let mut start = 0usize;
322 let pos = loop {
323 let rel = upper[start..].find(&key_upper)?;
324 let abs = start + rel;
325 let before_ok = abs == 0 || {
326 let b = scope.as_bytes()[abs - 1];
327 !(b.is_ascii_alphanumeric() || b == b'_')
328 };
329 let after_byte = scope
330 .as_bytes()
331 .get(abs + key.len())
332 .copied()
333 .unwrap_or(b' ');
334 let after_ok = !(after_byte.is_ascii_alphanumeric() || after_byte == b'_');
335 if before_ok && after_ok {
336 break abs;
337 }
338 start = abs + key.len();
339 };
340
341 let after = scope[pos + key.len()..].trim_start();
342 let after = after.strip_prefix('=')?;
343 let after = after.trim_start();
344
345 if let Some(rest) = after.strip_prefix('\'') {
347 let end = rest.find('\'')?;
348 let v = rest[..end].trim().to_lowercase();
349 return if v.is_empty() { None } else { Some(v) };
350 }
351 if let Some(rest) = after.strip_prefix('"') {
352 let end = rest.find('"')?;
353 let v = rest[..end].trim().to_lowercase();
354 return if v.is_empty() { None } else { Some(v) };
355 }
356
357 let end = after
359 .find(|c: char| c == ',' || c == ')' || c.is_whitespace())
360 .unwrap_or(after.len());
361 let v = after[..end].trim().to_lowercase();
362 if v.is_empty() { None } else { Some(v) }
363}
364
365fn extract_with_u32(sql: &str, key: &str) -> Option<u32> {
367 let raw = extract_with_str(sql, key)?;
368 raw.parse::<u32>().ok()
369}
370
371fn extract_payload_indexes(sql: &str) -> Vec<String> {
375 let scope = with_clause(sql);
376 let upper = scope.to_uppercase();
377 let pos = match upper.find("PAYLOAD_INDEXES") {
378 Some(p) => p,
379 None => return Vec::new(),
380 };
381
382 let after = scope[pos + "payload_indexes".len()..].trim_start();
383 let after = match after.strip_prefix('=') {
384 Some(a) => a.trim_start(),
385 None => return Vec::new(),
386 };
387
388 let after = match after.strip_prefix('[') {
390 Some(a) => a,
391 None => return Vec::new(),
392 };
393 let end = match after.find(']') {
394 Some(e) => e,
395 None => return Vec::new(),
396 };
397 let inner = &after[..end];
398
399 inner
401 .split(',')
402 .filter_map(|s| {
403 let s = s.trim();
404 let s = s
405 .strip_prefix('\'')
406 .and_then(|s| s.strip_suffix('\''))
407 .or_else(|| s.strip_prefix('"').and_then(|s| s.strip_suffix('"')))
408 .unwrap_or(s);
409 let s = s.trim().to_lowercase();
410 if s.is_empty() { None } else { Some(s) }
411 })
412 .collect()
413}
414
415fn parse_quantization(q: &str) -> Result<VectorQuantization, NodeDbError> {
417 match q.to_lowercase().as_str() {
418 "none" => Ok(VectorQuantization::None),
419 "sq8" => Ok(VectorQuantization::Sq8),
420 "pq" => Ok(VectorQuantization::Pq),
421 "rabitq" => Ok(VectorQuantization::RaBitQ),
422 "bbq" => Ok(VectorQuantization::Bbq),
423 "binary" => Ok(VectorQuantization::Binary),
424 "ternary" => Ok(VectorQuantization::Ternary),
425 "opq" => Ok(VectorQuantization::Opq),
426 other => Err(NodeDbError::bad_request(format!(
427 "unknown quantization '{other}'; valid values: {}",
428 VALID_QUANTIZATIONS.join(", ")
429 ))),
430 }
431}
432
433fn parse_metric(m: &str) -> Result<DistanceMetric, NodeDbError> {
435 match m.to_lowercase().as_str() {
436 "l2" | "euclidean" => Ok(DistanceMetric::L2),
437 "cosine" => Ok(DistanceMetric::Cosine),
438 "ip" | "inner_product" | "innerproduct" | "dot" => Ok(DistanceMetric::InnerProduct),
439 "manhattan" | "l1" => Ok(DistanceMetric::Manhattan),
440 "chebyshev" | "linf" | "l_inf" => Ok(DistanceMetric::Chebyshev),
441 "hamming" => Ok(DistanceMetric::Hamming),
442 "jaccard" => Ok(DistanceMetric::Jaccard),
443 "pearson" => Ok(DistanceMetric::Pearson),
444 other => Err(NodeDbError::bad_request(format!(
445 "unknown distance metric '{other}'; valid values: l2, cosine, ip, manhattan, \
446 chebyshev, hamming, jaccard, pearson"
447 ))),
448 }
449}
450
451#[cfg(test)]
452mod tests {
453 use super::*;
454
455 #[test]
458 fn happy_path_full_options() {
459 let sql = "CREATE COLLECTION embeds \
460 (id BIGINT PRIMARY KEY, vec VECTOR(1024), category TEXT) \
461 WITH (primary='vector', vector_field='vec', dim=1024, \
462 quantization='rabitq', m=32, ef_construction=200, \
463 metric='cosine', payload_indexes=['category'])";
464 let cfg = parse_vector_primary_options(sql)
465 .expect("parse ok")
466 .expect("should be Some");
467 assert_eq!(cfg.vector_field, "vec");
468 assert_eq!(cfg.dim, 1024);
469 assert_eq!(cfg.quantization, VectorQuantization::RaBitQ);
470 assert_eq!(cfg.m, 32);
471 assert_eq!(cfg.ef_construction, 200);
472 assert_eq!(cfg.metric, DistanceMetric::Cosine);
473 assert_eq!(
474 cfg.payload_indexes,
475 vec![(
476 "category".to_string(),
477 nodedb_types::PayloadIndexKind::Equality
478 )]
479 );
480 }
481
482 #[test]
483 fn happy_path_minimal_options() {
484 let sql = "CREATE COLLECTION v (id BIGINT PRIMARY KEY, vec VECTOR(128)) \
485 WITH (primary='vector', vector_field='vec', dim=128)";
486 let cfg = parse_vector_primary_options(sql)
487 .expect("parse ok")
488 .expect("should be Some");
489 assert_eq!(cfg.vector_field, "vec");
490 assert_eq!(cfg.dim, 128);
491 assert_eq!(cfg.m, 16);
492 assert_eq!(cfg.ef_construction, 200);
493 assert_eq!(cfg.metric, DistanceMetric::Cosine);
494 assert!(cfg.payload_indexes.is_empty());
495 }
496
497 #[test]
498 fn happy_path_multiple_payload_indexes() {
499 let sql = "CREATE COLLECTION v (id BIGINT PRIMARY KEY, vec VECTOR(128), a TEXT, b INT) \
500 WITH (primary='vector', vector_field='vec', dim=128, \
501 payload_indexes=['a', 'b'])";
502 let cfg = parse_vector_primary_options(sql)
503 .expect("parse ok")
504 .expect("should be Some");
505 use nodedb_types::PayloadIndexKind as K;
506 assert_eq!(
507 cfg.payload_indexes,
508 vec![
509 ("a".to_string(), K::Equality),
510 ("b".to_string(), K::Equality)
511 ]
512 );
513 }
514
515 #[test]
518 fn no_primary_returns_none() {
519 let sql = "CREATE COLLECTION c (id BIGINT PRIMARY KEY)";
520 let result = parse_vector_primary_options(sql).expect("parse ok");
521 assert!(result.is_none());
522 }
523
524 #[test]
525 fn primary_document_returns_none() {
526 let sql =
527 "CREATE COLLECTION c (id BIGINT PRIMARY KEY) WITH (primary='document_schemaless')";
528 let result = parse_vector_primary_options(sql).expect("parse ok");
529 assert!(result.is_none());
530 }
531
532 #[test]
533 fn primary_strict_returns_none() {
534 let sql = "CREATE COLLECTION c (id BIGINT PRIMARY KEY) WITH (primary='document_strict')";
535 let result = parse_vector_primary_options(sql).expect("parse ok");
536 assert!(result.is_none());
537 }
538
539 #[test]
540 fn primary_columnar_returns_none() {
541 let sql = "CREATE COLLECTION c (id BIGINT PRIMARY KEY) WITH (primary='columnar')";
542 let result = parse_vector_primary_options(sql).expect("parse ok");
543 assert!(result.is_none());
544 }
545
546 #[test]
549 fn missing_vector_field_returns_error() {
550 let sql = "CREATE COLLECTION c (id BIGINT PRIMARY KEY, v VECTOR(64)) \
551 WITH (primary='vector', dim=64)";
552 let err = parse_vector_primary_options(sql).expect_err("should error");
553 let msg = format!("{err}");
554 assert!(
555 msg.contains("vector_field"),
556 "expected vector_field in error: {msg}"
557 );
558 }
559
560 #[test]
561 fn missing_dim_returns_error() {
562 let sql = "CREATE COLLECTION c (id BIGINT PRIMARY KEY, v VECTOR(64)) \
563 WITH (primary='vector', vector_field='v')";
564 let err = parse_vector_primary_options(sql).expect_err("should error");
565 let msg = format!("{err}");
566 assert!(msg.contains("dim"), "expected dim in error: {msg}");
567 }
568
569 #[test]
572 fn unknown_quantization_returns_error() {
573 let sql = "CREATE COLLECTION c (id BIGINT PRIMARY KEY, v VECTOR(64)) \
574 WITH (primary='vector', vector_field='v', dim=64, quantization='ivfflat')";
575 let err = parse_vector_primary_options(sql).expect_err("should error");
576 let msg = format!("{err}");
577 assert!(
578 msg.contains("ivfflat"),
579 "expected codec name in error: {msg}"
580 );
581 }
582
583 #[test]
586 fn all_valid_quantizations_accepted() {
587 for q in VALID_QUANTIZATIONS {
588 let sql = format!(
589 "CREATE COLLECTION c (id BIGINT PRIMARY KEY, v VECTOR(64)) \
590 WITH (primary='vector', vector_field='v', dim=64, quantization='{q}')"
591 );
592 let result = parse_vector_primary_options(&sql);
593 assert!(
594 result.is_ok(),
595 "quantization '{q}' should be accepted, got: {result:?}"
596 );
597 }
598 }
599
600 #[test]
603 fn validate_vector_field_ok() {
604 let cfg = VectorPrimaryConfig {
605 vector_field: "vec".to_string(),
606 dim: 128,
607 ..VectorPrimaryConfig::default()
608 };
609 let cols = vec![
610 ("id".to_string(), "BIGINT".to_string()),
611 ("vec".to_string(), "VECTOR(128)".to_string()),
612 ];
613 validate_vector_field(&cfg, &cols).expect("should be ok");
614 }
615
616 #[test]
617 fn validate_vector_field_nonexistent_column_errors() {
618 let cfg = VectorPrimaryConfig {
619 vector_field: "missing".to_string(),
620 dim: 128,
621 ..VectorPrimaryConfig::default()
622 };
623 let cols = vec![("id".to_string(), "BIGINT".to_string())];
624 let err = validate_vector_field(&cfg, &cols).expect_err("should error");
625 let msg = format!("{err}");
626 assert!(
627 msg.contains("missing"),
628 "expected column name in error: {msg}"
629 );
630 }
631
632 #[test]
633 fn validate_vector_field_wrong_type_errors() {
634 let cfg = VectorPrimaryConfig {
635 vector_field: "name".to_string(),
636 dim: 128,
637 ..VectorPrimaryConfig::default()
638 };
639 let cols = vec![("name".to_string(), "TEXT".to_string())];
640 let err = validate_vector_field(&cfg, &cols).expect_err("should error");
641 let msg = format!("{err}");
642 assert!(
643 msg.contains("VECTOR"),
644 "expected VECTOR mention in error: {msg}"
645 );
646 }
647
648 #[test]
651 fn validate_payload_indexes_ok() {
652 let mut cfg = VectorPrimaryConfig {
653 vector_field: "vec".to_string(),
654 dim: 128,
655 payload_indexes: vec![(
656 "category".to_string(),
657 nodedb_types::PayloadIndexKind::Equality,
658 )],
659 ..VectorPrimaryConfig::default()
660 };
661 let cols = vec![
662 ("vec".to_string(), "VECTOR(128)".to_string()),
663 ("category".to_string(), "TEXT".to_string()),
664 ];
665 validate_payload_indexes(&mut cfg, &cols).expect("should be ok");
666 }
667
668 #[test]
669 fn validate_payload_indexes_nonexistent_errors() {
670 let mut cfg = VectorPrimaryConfig {
671 vector_field: "vec".to_string(),
672 dim: 128,
673 payload_indexes: vec![(
674 "ghost".to_string(),
675 nodedb_types::PayloadIndexKind::Equality,
676 )],
677 ..VectorPrimaryConfig::default()
678 };
679 let cols = vec![("vec".to_string(), "VECTOR(128)".to_string())];
680 let err = validate_payload_indexes(&mut cfg, &cols).expect_err("should error");
681 let msg = format!("{err}");
682 assert!(msg.contains("ghost"), "expected field name in error: {msg}");
683 }
684
685 #[test]
686 fn validate_payload_indexes_vector_type_rejected() {
687 let mut cfg = VectorPrimaryConfig {
688 vector_field: "vec".to_string(),
689 dim: 128,
690 payload_indexes: vec![("vec".to_string(), nodedb_types::PayloadIndexKind::Equality)],
691 ..VectorPrimaryConfig::default()
692 };
693 let cols = vec![("vec".to_string(), "VECTOR(128)".to_string())];
694 let err = validate_payload_indexes(&mut cfg, &cols).expect_err("should error");
695 let msg = format!("{err}");
696 assert!(
697 msg.contains("bitmap-eligible"),
698 "expected bitmap-eligible in error: {msg}"
699 );
700 }
701
702 #[test]
703 fn validate_payload_indexes_blob_type_rejected() {
704 let mut cfg = VectorPrimaryConfig {
705 vector_field: "vec".to_string(),
706 dim: 128,
707 payload_indexes: vec![("data".to_string(), nodedb_types::PayloadIndexKind::Equality)],
708 ..VectorPrimaryConfig::default()
709 };
710 let cols = vec![
711 ("vec".to_string(), "VECTOR(128)".to_string()),
712 ("data".to_string(), "BLOB".to_string()),
713 ];
714 let err = validate_payload_indexes(&mut cfg, &cols).expect_err("should error");
715 let msg = format!("{err}");
716 assert!(
717 msg.contains("bitmap-eligible"),
718 "expected bitmap-eligible in error: {msg}"
719 );
720 }
721}