1use nodedb_types::NodeDbError;
11use nodedb_types::collection_config::VectorPrimaryConfig;
12use nodedb_types::vector_ann::VectorQuantization;
13use nodedb_types::vector_distance::DistanceMetric;
14use nodedb_types::vector_dtype::VectorStorageDtype;
15
16const VALID_QUANTIZATIONS: &[&str] = &[
18 "none", "sq8", "pq", "rabitq", "bbq", "binary", "ternary", "opq",
19];
20
21pub fn parse_vector_primary_options(sql: &str) -> Result<Option<VectorPrimaryConfig>, NodeDbError> {
28 let primary_val = extract_with_str(sql, "primary");
29
30 match primary_val.as_deref() {
31 None
32 | Some("document_schemaless")
33 | Some("document_strict")
34 | Some("kv")
35 | Some("columnar")
36 | Some("timeseries")
37 | Some("spatial") => return Ok(None),
38 Some("vector") => {}
39 Some(other) => {
40 return Err(NodeDbError::bad_request(format!(
41 "unknown primary engine '{other}'; valid values: \
42 document_schemaless, document_strict, kv, columnar, timeseries, spatial, vector"
43 )));
44 }
45 }
46
47 let vector_field = extract_with_str(sql, "vector_field")
49 .ok_or_else(|| NodeDbError::bad_request("primary='vector' requires vector_field option"))?;
50 if vector_field.is_empty() {
51 return Err(NodeDbError::bad_request(
52 "vector_field must be a non-empty column name",
53 ));
54 }
55
56 let dim = extract_with_u32(sql, "dim").ok_or_else(|| {
58 NodeDbError::bad_request("primary='vector' requires dim option (e.g. dim=1024)")
59 })?;
60
61 let quantization = match extract_with_str(sql, "quantization").as_deref() {
63 None => VectorQuantization::default(),
64 Some(q) => parse_quantization(q)?,
65 };
66
67 let m: u8 = extract_with_u32(sql, "m")
69 .and_then(|v| u8::try_from(v).ok())
70 .unwrap_or(16);
71
72 let ef_construction: u16 = extract_with_u32(sql, "ef_construction")
74 .and_then(|v| u16::try_from(v).ok())
75 .unwrap_or(200);
76
77 let metric = match extract_with_str(sql, "metric").as_deref() {
79 None => DistanceMetric::Cosine,
80 Some(m) => parse_metric(m)?,
81 };
82
83 let storage_dtype = parse_storage_dtype(extract_with_str(sql, "storage_dtype").as_deref())?;
85
86 let payload_indexes = extract_payload_indexes(sql)
90 .into_iter()
91 .map(|f| (f, nodedb_types::PayloadIndexKind::Equality))
92 .collect();
93
94 Ok(Some(VectorPrimaryConfig {
95 vector_field,
96 dim,
97 quantization,
98 m,
99 ef_construction,
100 metric,
101 storage_dtype,
102 payload_indexes,
103 }))
104}
105
106pub fn validate_vector_field(
112 cfg: &VectorPrimaryConfig,
113 columns: &[(String, String)],
114) -> Result<(), NodeDbError> {
115 let col = columns
116 .iter()
117 .find(|(name, _)| name.eq_ignore_ascii_case(&cfg.vector_field));
118
119 let (_, type_str) = col.ok_or_else(|| {
120 NodeDbError::bad_request(format!(
121 "vector_field '{}' does not exist in the collection's column list",
122 cfg.vector_field
123 ))
124 })?;
125
126 if !type_str.to_uppercase().starts_with("VECTOR") {
127 return Err(NodeDbError::bad_request(format!(
128 "vector_field '{}' is of type '{}'; must be VECTOR(n)",
129 cfg.vector_field, type_str
130 )));
131 }
132
133 Ok(())
134}
135
136fn infer_payload_kind(upper_type: &str) -> nodedb_types::PayloadIndexKind {
138 use nodedb_types::PayloadIndexKind as K;
139 let head = upper_type
140 .split_once('(')
141 .map(|(p, _)| p)
142 .unwrap_or(upper_type)
143 .trim();
144 match head {
145 "BIGINT" | "INT" | "INTEGER" | "SMALLINT" | "TINYINT" | "BIGSERIAL" | "SERIAL"
146 | "FLOAT" | "DOUBLE" | "REAL" | "NUMERIC" | "DECIMAL" | "TIMESTAMP" | "TIMESTAMPTZ"
147 | "DATE" | "TIME" | "INSTANT" | "DATETIME" => K::Range,
148 "BOOL" | "BOOLEAN" => K::Boolean,
149 _ => K::Equality,
150 }
151}
152
153pub fn validate_payload_indexes(
159 cfg: &mut VectorPrimaryConfig,
160 columns: &[(String, String)],
161) -> Result<(), NodeDbError> {
162 for slot in cfg.payload_indexes.iter_mut() {
163 let field = slot.0.clone();
164 let col = columns
165 .iter()
166 .find(|(name, _)| name.eq_ignore_ascii_case(&field));
167
168 let (_, type_str) = col.ok_or_else(|| {
169 NodeDbError::bad_request(format!(
170 "payload_indexes field '{field}' does not exist in the collection's column list"
171 ))
172 })?;
173
174 let upper_type = type_str.to_uppercase();
175 if upper_type.starts_with("VECTOR")
176 || upper_type == "BLOB"
177 || upper_type == "BYTES"
178 || upper_type == "BYTEA"
179 {
180 return Err(NodeDbError::bad_request(format!(
181 "payload_indexes field '{field}' has type '{type_str}' which is not bitmap-eligible; \
182 only text, integer, boolean, and timestamp types are supported"
183 )));
184 }
185 slot.1 = infer_payload_kind(&upper_type);
186 }
187 Ok(())
188}
189
190pub fn parse_vector_primary_options_from_kvs(
197 options: &[(String, String)],
198) -> Result<Option<VectorPrimaryConfig>, NodeDbError> {
199 let get = |key: &str| -> Option<String> {
200 options
201 .iter()
202 .find(|(k, _)| k.eq_ignore_ascii_case(key))
203 .map(|(_, v)| v.clone())
204 };
205
206 let primary_val = get("primary");
207 match primary_val.as_deref() {
208 None
209 | Some("document_schemaless")
210 | Some("document_strict")
211 | Some("kv")
212 | Some("columnar")
213 | Some("timeseries")
214 | Some("spatial") => return Ok(None),
215 Some("vector") => {}
216 Some(other) => {
217 return Err(NodeDbError::bad_request(format!(
218 "unknown primary engine '{other}'; valid values: \
219 document_schemaless, document_strict, kv, columnar, timeseries, spatial, vector"
220 )));
221 }
222 }
223
224 let vector_field = get("vector_field")
225 .ok_or_else(|| NodeDbError::bad_request("primary='vector' requires vector_field option"))?;
226 if vector_field.is_empty() {
227 return Err(NodeDbError::bad_request(
228 "vector_field must be a non-empty column name",
229 ));
230 }
231
232 let dim = get("dim")
233 .and_then(|v| v.parse::<u32>().ok())
234 .ok_or_else(|| {
235 NodeDbError::bad_request("primary='vector' requires dim option (e.g. dim=1024)")
236 })?;
237
238 let quantization = match get("quantization").as_deref() {
239 None => VectorQuantization::default(),
240 Some(q) => parse_quantization(q)?,
241 };
242
243 let m: u8 = get("m")
244 .and_then(|v| v.parse::<u32>().ok())
245 .and_then(|v| u8::try_from(v).ok())
246 .unwrap_or(16);
247
248 let ef_construction: u16 = get("ef_construction")
249 .and_then(|v| v.parse::<u32>().ok())
250 .and_then(|v| u16::try_from(v).ok())
251 .unwrap_or(200);
252
253 let metric = match get("metric").as_deref() {
254 None => DistanceMetric::Cosine,
255 Some(m) => parse_metric(m)?,
256 };
257
258 let storage_dtype = parse_storage_dtype(get("storage_dtype").as_deref())?;
259
260 let payload_indexes = get("payload_indexes")
263 .map(|v| {
264 v.split(',')
265 .filter_map(|s| {
266 let s = s
267 .trim()
268 .trim_matches('\'')
269 .trim_matches('"')
270 .trim()
271 .to_lowercase();
272 if s.is_empty() {
273 None
274 } else {
275 Some((s, nodedb_types::PayloadIndexKind::Equality))
276 }
277 })
278 .collect::<Vec<_>>()
279 })
280 .unwrap_or_default();
281
282 Ok(Some(VectorPrimaryConfig {
283 vector_field,
284 dim,
285 quantization,
286 m,
287 ef_construction,
288 metric,
289 storage_dtype,
290 payload_indexes,
291 }))
292}
293
294fn with_clause(sql: &str) -> &str {
299 let upper = sql.to_uppercase();
300 let Some(pos) = upper.find("WITH") else {
301 return sql;
302 };
303 if pos > 0 {
305 let before = sql.as_bytes()[pos - 1];
306 if before.is_ascii_alphanumeric() || before == b'_' {
307 return sql;
308 }
309 }
310 let after = &sql[pos + 4..];
311 let Some(open) = after.find('(') else {
312 return sql;
313 };
314 let inner = &after[open + 1..];
315 let Some(close) = inner.rfind(')') else {
316 return inner;
317 };
318 &inner[..close]
319}
320
321fn extract_with_str(sql: &str, key: &str) -> Option<String> {
323 let scope = with_clause(sql);
324 let upper = scope.to_uppercase();
325 let key_upper = key.to_uppercase();
326
327 let mut start = 0usize;
330 let pos = loop {
331 let rel = upper[start..].find(&key_upper)?;
332 let abs = start + rel;
333 let before_ok = abs == 0 || {
334 let b = scope.as_bytes()[abs - 1];
335 !(b.is_ascii_alphanumeric() || b == b'_')
336 };
337 let after_byte = scope
338 .as_bytes()
339 .get(abs + key.len())
340 .copied()
341 .unwrap_or(b' ');
342 let after_ok = !(after_byte.is_ascii_alphanumeric() || after_byte == b'_');
343 if before_ok && after_ok {
344 break abs;
345 }
346 start = abs + key.len();
347 };
348
349 let after = scope[pos + key.len()..].trim_start();
350 let after = after.strip_prefix('=')?;
351 let after = after.trim_start();
352
353 if let Some(rest) = after.strip_prefix('\'') {
355 let end = rest.find('\'')?;
356 let v = rest[..end].trim().to_lowercase();
357 return if v.is_empty() { None } else { Some(v) };
358 }
359 if let Some(rest) = after.strip_prefix('"') {
360 let end = rest.find('"')?;
361 let v = rest[..end].trim().to_lowercase();
362 return if v.is_empty() { None } else { Some(v) };
363 }
364
365 let end = after
367 .find(|c: char| c == ',' || c == ')' || c.is_whitespace())
368 .unwrap_or(after.len());
369 let v = after[..end].trim().to_lowercase();
370 if v.is_empty() { None } else { Some(v) }
371}
372
373fn extract_with_u32(sql: &str, key: &str) -> Option<u32> {
375 let raw = extract_with_str(sql, key)?;
376 raw.parse::<u32>().ok()
377}
378
379fn extract_payload_indexes(sql: &str) -> Vec<String> {
383 let scope = with_clause(sql);
384 let upper = scope.to_uppercase();
385 let pos = match upper.find("PAYLOAD_INDEXES") {
386 Some(p) => p,
387 None => return Vec::new(),
388 };
389
390 let after = scope[pos + "payload_indexes".len()..].trim_start();
391 let after = match after.strip_prefix('=') {
392 Some(a) => a.trim_start(),
393 None => return Vec::new(),
394 };
395
396 let after = match after.strip_prefix('[') {
398 Some(a) => a,
399 None => return Vec::new(),
400 };
401 let end = match after.find(']') {
402 Some(e) => e,
403 None => return Vec::new(),
404 };
405 let inner = &after[..end];
406
407 inner
409 .split(',')
410 .filter_map(|s| {
411 let s = s.trim();
412 let s = s
413 .strip_prefix('\'')
414 .and_then(|s| s.strip_suffix('\''))
415 .or_else(|| s.strip_prefix('"').and_then(|s| s.strip_suffix('"')))
416 .unwrap_or(s);
417 let s = s.trim().to_lowercase();
418 if s.is_empty() { None } else { Some(s) }
419 })
420 .collect()
421}
422
423fn parse_quantization(q: &str) -> Result<VectorQuantization, NodeDbError> {
425 match q.to_lowercase().as_str() {
426 "none" => Ok(VectorQuantization::None),
427 "sq8" => Ok(VectorQuantization::Sq8),
428 "pq" => Ok(VectorQuantization::Pq),
429 "rabitq" => Ok(VectorQuantization::RaBitQ),
430 "bbq" => Ok(VectorQuantization::Bbq),
431 "binary" => Ok(VectorQuantization::Binary),
432 "ternary" => Ok(VectorQuantization::Ternary),
433 "opq" => Ok(VectorQuantization::Opq),
434 other => Err(NodeDbError::bad_request(format!(
435 "unknown quantization '{other}'; valid values: {}",
436 VALID_QUANTIZATIONS.join(", ")
437 ))),
438 }
439}
440
441fn parse_storage_dtype(s: Option<&str>) -> Result<VectorStorageDtype, NodeDbError> {
445 let Some(s) = s else {
446 return Ok(VectorStorageDtype::default());
447 };
448 VectorStorageDtype::parse(s).ok_or_else(|| {
449 NodeDbError::bad_request(format!(
450 "unknown storage_dtype '{s}'; valid values: f32, f16, bf16"
451 ))
452 })
453}
454
455fn parse_metric(m: &str) -> Result<DistanceMetric, NodeDbError> {
457 match m.to_lowercase().as_str() {
458 "l2" | "euclidean" => Ok(DistanceMetric::L2),
459 "cosine" => Ok(DistanceMetric::Cosine),
460 "ip" | "inner_product" | "innerproduct" | "dot" => Ok(DistanceMetric::InnerProduct),
461 "manhattan" | "l1" => Ok(DistanceMetric::Manhattan),
462 "chebyshev" | "linf" | "l_inf" => Ok(DistanceMetric::Chebyshev),
463 "hamming" => Ok(DistanceMetric::Hamming),
464 "jaccard" => Ok(DistanceMetric::Jaccard),
465 "pearson" => Ok(DistanceMetric::Pearson),
466 other => Err(NodeDbError::bad_request(format!(
467 "unknown distance metric '{other}'; valid values: l2, cosine, ip, manhattan, \
468 chebyshev, hamming, jaccard, pearson"
469 ))),
470 }
471}
472
473#[cfg(test)]
474mod tests {
475 use super::*;
476
477 #[test]
480 fn happy_path_full_options() {
481 let sql = "CREATE COLLECTION embeds \
482 (id BIGINT PRIMARY KEY, vec VECTOR(1024), category TEXT) \
483 WITH (primary='vector', vector_field='vec', dim=1024, \
484 quantization='rabitq', m=32, ef_construction=200, \
485 metric='cosine', payload_indexes=['category'])";
486 let cfg = parse_vector_primary_options(sql)
487 .expect("parse ok")
488 .expect("should be Some");
489 assert_eq!(cfg.vector_field, "vec");
490 assert_eq!(cfg.dim, 1024);
491 assert_eq!(cfg.quantization, VectorQuantization::RaBitQ);
492 assert_eq!(cfg.m, 32);
493 assert_eq!(cfg.ef_construction, 200);
494 assert_eq!(cfg.metric, DistanceMetric::Cosine);
495 assert_eq!(
496 cfg.payload_indexes,
497 vec![(
498 "category".to_string(),
499 nodedb_types::PayloadIndexKind::Equality
500 )]
501 );
502 }
503
504 #[test]
505 fn happy_path_minimal_options() {
506 let sql = "CREATE COLLECTION v (id BIGINT PRIMARY KEY, vec VECTOR(128)) \
507 WITH (primary='vector', vector_field='vec', dim=128)";
508 let cfg = parse_vector_primary_options(sql)
509 .expect("parse ok")
510 .expect("should be Some");
511 assert_eq!(cfg.vector_field, "vec");
512 assert_eq!(cfg.dim, 128);
513 assert_eq!(cfg.m, 16);
514 assert_eq!(cfg.ef_construction, 200);
515 assert_eq!(cfg.metric, DistanceMetric::Cosine);
516 assert!(cfg.payload_indexes.is_empty());
517 }
518
519 #[test]
520 fn happy_path_multiple_payload_indexes() {
521 let sql = "CREATE COLLECTION v (id BIGINT PRIMARY KEY, vec VECTOR(128), a TEXT, b INT) \
522 WITH (primary='vector', vector_field='vec', dim=128, \
523 payload_indexes=['a', 'b'])";
524 let cfg = parse_vector_primary_options(sql)
525 .expect("parse ok")
526 .expect("should be Some");
527 use nodedb_types::PayloadIndexKind as K;
528 assert_eq!(
529 cfg.payload_indexes,
530 vec![
531 ("a".to_string(), K::Equality),
532 ("b".to_string(), K::Equality)
533 ]
534 );
535 }
536
537 #[test]
540 fn no_primary_returns_none() {
541 let sql = "CREATE COLLECTION c (id BIGINT PRIMARY KEY)";
542 let result = parse_vector_primary_options(sql).expect("parse ok");
543 assert!(result.is_none());
544 }
545
546 #[test]
547 fn primary_document_returns_none() {
548 let sql =
549 "CREATE COLLECTION c (id BIGINT PRIMARY KEY) WITH (primary='document_schemaless')";
550 let result = parse_vector_primary_options(sql).expect("parse ok");
551 assert!(result.is_none());
552 }
553
554 #[test]
555 fn primary_strict_returns_none() {
556 let sql = "CREATE COLLECTION c (id BIGINT PRIMARY KEY) WITH (primary='document_strict')";
557 let result = parse_vector_primary_options(sql).expect("parse ok");
558 assert!(result.is_none());
559 }
560
561 #[test]
562 fn primary_columnar_returns_none() {
563 let sql = "CREATE COLLECTION c (id BIGINT PRIMARY KEY) WITH (primary='columnar')";
564 let result = parse_vector_primary_options(sql).expect("parse ok");
565 assert!(result.is_none());
566 }
567
568 #[test]
571 fn missing_vector_field_returns_error() {
572 let sql = "CREATE COLLECTION c (id BIGINT PRIMARY KEY, v VECTOR(64)) \
573 WITH (primary='vector', dim=64)";
574 let err = parse_vector_primary_options(sql).expect_err("should error");
575 let msg = format!("{err}");
576 assert!(
577 msg.contains("vector_field"),
578 "expected vector_field in error: {msg}"
579 );
580 }
581
582 #[test]
583 fn missing_dim_returns_error() {
584 let sql = "CREATE COLLECTION c (id BIGINT PRIMARY KEY, v VECTOR(64)) \
585 WITH (primary='vector', vector_field='v')";
586 let err = parse_vector_primary_options(sql).expect_err("should error");
587 let msg = format!("{err}");
588 assert!(msg.contains("dim"), "expected dim in error: {msg}");
589 }
590
591 #[test]
594 fn unknown_quantization_returns_error() {
595 let sql = "CREATE COLLECTION c (id BIGINT PRIMARY KEY, v VECTOR(64)) \
596 WITH (primary='vector', vector_field='v', dim=64, quantization='ivfflat')";
597 let err = parse_vector_primary_options(sql).expect_err("should error");
598 let msg = format!("{err}");
599 assert!(
600 msg.contains("ivfflat"),
601 "expected codec name in error: {msg}"
602 );
603 }
604
605 #[test]
608 fn all_valid_quantizations_accepted() {
609 for q in VALID_QUANTIZATIONS {
610 let sql = format!(
611 "CREATE COLLECTION c (id BIGINT PRIMARY KEY, v VECTOR(64)) \
612 WITH (primary='vector', vector_field='v', dim=64, quantization='{q}')"
613 );
614 let result = parse_vector_primary_options(&sql);
615 assert!(
616 result.is_ok(),
617 "quantization '{q}' should be accepted, got: {result:?}"
618 );
619 }
620 }
621
622 #[test]
625 fn validate_vector_field_ok() {
626 let cfg = VectorPrimaryConfig {
627 vector_field: "vec".to_string(),
628 dim: 128,
629 ..VectorPrimaryConfig::default()
630 };
631 let cols = vec![
632 ("id".to_string(), "BIGINT".to_string()),
633 ("vec".to_string(), "VECTOR(128)".to_string()),
634 ];
635 validate_vector_field(&cfg, &cols).expect("should be ok");
636 }
637
638 #[test]
639 fn validate_vector_field_nonexistent_column_errors() {
640 let cfg = VectorPrimaryConfig {
641 vector_field: "missing".to_string(),
642 dim: 128,
643 ..VectorPrimaryConfig::default()
644 };
645 let cols = vec![("id".to_string(), "BIGINT".to_string())];
646 let err = validate_vector_field(&cfg, &cols).expect_err("should error");
647 let msg = format!("{err}");
648 assert!(
649 msg.contains("missing"),
650 "expected column name in error: {msg}"
651 );
652 }
653
654 #[test]
655 fn validate_vector_field_wrong_type_errors() {
656 let cfg = VectorPrimaryConfig {
657 vector_field: "name".to_string(),
658 dim: 128,
659 ..VectorPrimaryConfig::default()
660 };
661 let cols = vec![("name".to_string(), "TEXT".to_string())];
662 let err = validate_vector_field(&cfg, &cols).expect_err("should error");
663 let msg = format!("{err}");
664 assert!(
665 msg.contains("VECTOR"),
666 "expected VECTOR mention in error: {msg}"
667 );
668 }
669
670 #[test]
673 fn validate_payload_indexes_ok() {
674 let mut cfg = VectorPrimaryConfig {
675 vector_field: "vec".to_string(),
676 dim: 128,
677 payload_indexes: vec![(
678 "category".to_string(),
679 nodedb_types::PayloadIndexKind::Equality,
680 )],
681 ..VectorPrimaryConfig::default()
682 };
683 let cols = vec![
684 ("vec".to_string(), "VECTOR(128)".to_string()),
685 ("category".to_string(), "TEXT".to_string()),
686 ];
687 validate_payload_indexes(&mut cfg, &cols).expect("should be ok");
688 }
689
690 #[test]
691 fn validate_payload_indexes_nonexistent_errors() {
692 let mut cfg = VectorPrimaryConfig {
693 vector_field: "vec".to_string(),
694 dim: 128,
695 payload_indexes: vec![(
696 "ghost".to_string(),
697 nodedb_types::PayloadIndexKind::Equality,
698 )],
699 ..VectorPrimaryConfig::default()
700 };
701 let cols = vec![("vec".to_string(), "VECTOR(128)".to_string())];
702 let err = validate_payload_indexes(&mut cfg, &cols).expect_err("should error");
703 let msg = format!("{err}");
704 assert!(msg.contains("ghost"), "expected field name in error: {msg}");
705 }
706
707 #[test]
708 fn validate_payload_indexes_vector_type_rejected() {
709 let mut cfg = VectorPrimaryConfig {
710 vector_field: "vec".to_string(),
711 dim: 128,
712 payload_indexes: vec![("vec".to_string(), nodedb_types::PayloadIndexKind::Equality)],
713 ..VectorPrimaryConfig::default()
714 };
715 let cols = vec![("vec".to_string(), "VECTOR(128)".to_string())];
716 let err = validate_payload_indexes(&mut cfg, &cols).expect_err("should error");
717 let msg = format!("{err}");
718 assert!(
719 msg.contains("bitmap-eligible"),
720 "expected bitmap-eligible in error: {msg}"
721 );
722 }
723
724 #[test]
725 fn validate_payload_indexes_blob_type_rejected() {
726 let mut cfg = VectorPrimaryConfig {
727 vector_field: "vec".to_string(),
728 dim: 128,
729 payload_indexes: vec![("data".to_string(), nodedb_types::PayloadIndexKind::Equality)],
730 ..VectorPrimaryConfig::default()
731 };
732 let cols = vec![
733 ("vec".to_string(), "VECTOR(128)".to_string()),
734 ("data".to_string(), "BLOB".to_string()),
735 ];
736 let err = validate_payload_indexes(&mut cfg, &cols).expect_err("should error");
737 let msg = format!("{err}");
738 assert!(
739 msg.contains("bitmap-eligible"),
740 "expected bitmap-eligible in error: {msg}"
741 );
742 }
743}