1use std::collections::{BTreeMap, BTreeSet, HashMap};
64use std::io::Write;
65
66use thiserror::Error;
67
68use crate::proto::redis::ft_filter::{self, FilterExpr};
69use crate::vector::registry::{VectorRegistry, VectorTable};
70use crate::vector::schema::{
71 DistanceMetric, IndexAlgorithm, MetadataField, MetadataFieldType, VectorSchema, VectorType,
72};
73
74#[derive(Debug, Error)]
82#[non_exhaustive]
83pub enum FtError {
84 #[error("unknown command: {0}")]
86 UnknownCommand(String),
87 #[error("syntax error: {0}")]
89 Syntax(String),
90 #[error("not supported in this build: {0}")]
93 Unsupported(String),
94 #[error("index not found: {0}")]
96 NotFound(String),
97 #[error("index already exists: {0}")]
99 AlreadyExists(String),
100 #[error("dimension mismatch: index={index_dim}, payload={payload_dim}")]
103 DimensionMismatch {
104 index_dim: usize,
106 payload_dim: usize,
108 },
109 #[error("engine: {0}")]
111 Engine(String),
112}
113
114#[derive(Clone, Debug, PartialEq)]
116pub enum FtCommand {
117 Create(CreateRequest),
119 Search(SearchRequest),
123 SearchText(SearchTextRequest),
128 SearchFilter(SearchFilterRequest),
131 Aggregate(AggregateRequest),
133 Explain(ExplainRequest),
135 Alter(AlterRequest),
137 Regex(RegexRequest),
140 Info {
142 name: String,
144 },
145 List,
147 DropIndex {
149 name: String,
151 delete_documents: bool,
153 },
154}
155
156#[derive(Clone, Copy, Debug, Eq, PartialEq)]
158pub enum DocType {
159 Hash,
161}
162
163#[derive(Clone, Debug, PartialEq)]
165pub struct CreateRequest {
166 pub name: String,
168 pub doc_type: DocType,
170 pub schema: VectorSchema,
172}
173
174#[derive(Clone, Copy, Debug, Eq, PartialEq)]
176pub enum SortDirection {
177 Asc,
179 Desc,
181}
182
183#[derive(Clone, Debug, PartialEq)]
192pub struct SearchRequest {
193 pub name: String,
195 pub k: usize,
197 pub vector_field: String,
199 pub vector_bytes: Vec<u8>,
201 pub filter: Option<FilterExpr>,
207 pub return_fields: Option<Vec<String>>,
212 pub limit: Option<(usize, usize)>,
215 pub sortby: Option<(String, SortDirection)>,
218 pub nocontent: bool,
221}
222
223#[derive(Clone, Debug, PartialEq)]
228pub struct SearchFilterRequest {
229 pub name: String,
231 pub filter: FilterExpr,
234 pub return_fields: Option<Vec<String>>,
236 pub limit: Option<(usize, usize)>,
238 pub sortby: Option<(String, SortDirection)>,
240 pub nocontent: bool,
242}
243
244#[derive(Clone, Debug, PartialEq)]
249pub struct SearchTextRequest {
250 pub name: String,
252 pub field: String,
254 pub query: Vec<u8>,
256 pub return_fields: Option<Vec<String>>,
258 pub limit: Option<(usize, usize)>,
260 pub sortby: Option<(String, SortDirection)>,
262 pub nocontent: bool,
264}
265
266#[derive(Clone, Debug, PartialEq)]
268pub enum ReducerKind {
269 Count,
271 Sum {
274 field: String,
276 },
277 Avg {
280 field: String,
282 },
283}
284
285#[derive(Clone, Debug, PartialEq)]
287pub struct ReducerSpec {
288 pub kind: ReducerKind,
290 pub alias: String,
292}
293
294#[derive(Clone, Debug, PartialEq)]
296pub struct AggregateRequest {
297 pub name: String,
299 pub group_by: Vec<String>,
302 pub reducers: Vec<ReducerSpec>,
304 pub limit: Option<(usize, usize)>,
307}
308
309#[derive(Clone, Debug, PartialEq)]
311pub struct ExplainRequest {
312 pub name: String,
314 pub query: Vec<u8>,
316}
317
318#[derive(Clone, Debug, PartialEq)]
322pub struct AlterRequest {
323 pub name: String,
325 pub field: String,
327 pub field_type: MetadataFieldType,
330}
331
332#[derive(Clone, Debug, PartialEq)]
337pub struct RegexRequest {
338 pub name: String,
340 pub field: String,
342 pub pattern: String,
344 pub max_errors: u16,
346}
347
348#[derive(Clone, Debug, PartialEq)]
352pub enum FtOutcome {
353 Ok,
355 List(Vec<String>),
357 Info(Vec<(String, InfoValue)>),
359 Search {
362 total: usize,
364 hits: Vec<SearchHit>,
366 },
367 SearchNoContent {
371 total: usize,
373 doc_ids: Vec<Vec<u8>>,
376 },
377 Aggregate {
380 total_groups: usize,
382 rows: Vec<Vec<(String, Vec<u8>)>>,
386 },
387 Explain(String),
389 DropOk {
392 deleted_documents: bool,
395 document_count: usize,
397 },
398}
399
400#[derive(Clone, Debug, PartialEq)]
402pub struct SearchHit {
403 pub doc_id: Vec<u8>,
405 pub score: f32,
407 pub fields: Vec<(String, Vec<u8>)>,
411}
412
413#[derive(Clone, Debug, PartialEq)]
416pub enum InfoValue {
417 String(String),
419 Integer(i64),
421 Array(Vec<InfoValue>),
423}
424
425pub fn parse_command(args: &[&[u8]]) -> Result<FtCommand, FtError> {
437 let head = args
438 .first()
439 .ok_or_else(|| FtError::UnknownCommand(String::new()))?;
440 let cmd = ascii_upper(head);
441 let rest = &args[1..];
442 match cmd.as_slice() {
443 b"FT.CREATE" => parse_create(rest).map(FtCommand::Create),
444 b"FT.SEARCH" => parse_search(rest),
445 b"FT.AGGREGATE" => parse_aggregate(rest).map(FtCommand::Aggregate),
446 b"FT.EXPLAIN" => parse_explain(rest).map(FtCommand::Explain),
447 b"FT.ALTER" => parse_alter(rest).map(FtCommand::Alter),
448 b"FT.REGEX" => parse_regex(rest).map(FtCommand::Regex),
449 b"FT.INFO" => parse_info(rest),
450 b"FT.LIST" | b"FT._LIST" => parse_list(rest),
451 b"FT.DROPINDEX" => parse_dropindex(rest),
452 other => {
453 if other.starts_with(b"FT.") {
460 Err(FtError::Unsupported(
461 String::from_utf8_lossy(other).into_owned(),
462 ))
463 } else {
464 Err(FtError::UnknownCommand(
465 String::from_utf8_lossy(other).into_owned(),
466 ))
467 }
468 }
469 }
470}
471
472pub fn execute(registry: &VectorRegistry, cmd: FtCommand) -> Result<FtOutcome, FtError> {
480 match cmd {
481 FtCommand::Create(req) => execute_create(registry, req),
482 FtCommand::Search(req) => execute_search(registry, &req),
483 FtCommand::SearchText(req) => execute_search_text(registry, &req),
484 FtCommand::SearchFilter(req) => execute_search_filter(registry, &req),
485 FtCommand::Aggregate(req) => execute_aggregate(registry, &req),
486 FtCommand::Explain(req) => execute_explain(registry, &req),
487 FtCommand::Alter(req) => execute_alter(registry, &req),
488 FtCommand::Regex(req) => execute_regex(registry, &req),
489 FtCommand::Info { name } => execute_info(registry, name),
490 FtCommand::List => Ok(FtOutcome::List(registry.list())),
491 FtCommand::DropIndex {
492 name,
493 delete_documents,
494 } => execute_dropindex(registry, name, delete_documents),
495 }
496}
497
498#[must_use]
506pub fn dispatch(registry: &VectorRegistry, args: &[&[u8]]) -> Vec<u8> {
507 match parse_command(args) {
508 Ok(cmd) => match execute(registry, cmd) {
509 Ok(outcome) => render_outcome(&outcome),
510 Err(err) => render_error(&err),
511 },
512 Err(err) => render_error(&err),
513 }
514}
515
516pub fn maybe_index_hset(
531 registry: &VectorRegistry,
532 args: &[&[u8]],
533) -> Result<Option<String>, FtError> {
534 if args.is_empty() {
535 return Err(FtError::Syntax("HSET requires a key".to_string()));
536 }
537 let key = args[0];
538 let pairs = &args[1..];
539 if pairs.is_empty() || !pairs.len().is_multiple_of(2) {
540 return Err(FtError::Syntax(
541 "HSET requires field/value pairs".to_string(),
542 ));
543 }
544 for name in registry.list() {
545 let Some(table) = registry.get(&name) else {
546 continue;
547 };
548 if table.schema.prefixes.iter().any(|p| key.starts_with(p)) {
549 insert_into_index(&table, key, pairs)?;
550 return Ok(Some(name));
551 }
552 }
553 Ok(None)
554}
555
556fn parse_create(rest: &[&[u8]]) -> Result<CreateRequest, FtError> {
559 let mut it = TokenCursor::new(rest);
561 let name = it.next_string("FT.CREATE: missing index name")?;
562
563 expect_keyword(it.next_required("FT.CREATE: expected ON")?, "ON")?;
564 let doc_type_tok = it.next_required("FT.CREATE: expected doc type")?;
565 let doc_type_up = ascii_upper(doc_type_tok);
566 let doc_type = match doc_type_up.as_slice() {
567 b"HASH" => DocType::Hash,
568 _ => {
569 return Err(FtError::Unsupported(format!(
570 "FT.CREATE doc type {}",
571 String::from_utf8_lossy(doc_type_tok)
572 )));
573 }
574 };
575
576 let mut prefixes: Vec<Vec<u8>> = Vec::new();
578 if matches_keyword(it.peek(), "PREFIX") {
579 it.advance();
580 let n_tok = it.next_required("FT.CREATE: PREFIX expects a count")?;
581 let n = parse_unsigned(n_tok, "FT.CREATE: PREFIX count")?;
582 for _ in 0..n {
583 let p = it.next_required("FT.CREATE: missing PREFIX value")?;
584 prefixes.push(p.to_vec());
585 }
586 }
587 if prefixes.is_empty() {
588 return Err(FtError::Syntax(
589 "FT.CREATE requires at least one PREFIX value".to_string(),
590 ));
591 }
592
593 expect_keyword(it.next_required("FT.CREATE: expected SCHEMA")?, "SCHEMA")?;
594
595 let (vector_field, metadata_fields) = parse_create_schema_body(&mut it)?;
596 let (vec_name, vec_type, dim, distance, algorithm) = vector_field.ok_or_else(|| {
597 FtError::Syntax("FT.CREATE: SCHEMA must declare a VECTOR field".to_string())
598 })?;
599
600 let schema = VectorSchema {
601 vector_field: vec_name,
602 vector_type: vec_type,
603 dim,
604 distance,
605 algorithm,
606 prefixes,
607 metadata_fields,
608 };
609 Ok(CreateRequest {
610 name,
611 doc_type,
612 schema,
613 })
614}
615
616type CreateVectorClause = (String, VectorType, u16, DistanceMetric, IndexAlgorithm);
622
623fn parse_create_schema_body(
629 it: &mut TokenCursor<'_>,
630) -> Result<(Option<CreateVectorClause>, Vec<MetadataField>), FtError> {
631 let mut vector_field: Option<CreateVectorClause> = None;
632 let mut metadata_fields: Vec<MetadataField> = Vec::new();
633 while let Some(field_tok) = it.next() {
634 let field_name = utf8(field_tok, "FT.CREATE: field name")?;
635 let kind_tok = it.next_required("FT.CREATE: missing field kind")?;
636 let kind_up = ascii_upper(kind_tok);
637 match kind_up.as_slice() {
638 b"TEXT" => {
639 consume_field_modifiers(it);
640 metadata_fields.push(MetadataField {
641 name: field_name,
642 field_type: MetadataFieldType::Text,
643 tag_separator: None,
644 });
645 }
646 b"NUMERIC" => {
647 consume_field_modifiers(it);
648 metadata_fields.push(MetadataField {
649 name: field_name,
650 field_type: MetadataFieldType::Numeric,
651 tag_separator: None,
652 });
653 }
654 b"TAG" => {
655 let separator = parse_tag_modifiers(it, &field_name)?;
656 metadata_fields.push(MetadataField {
657 name: field_name,
658 field_type: MetadataFieldType::Tag,
659 tag_separator: separator,
660 });
661 }
662 b"GEO" => {
663 consume_field_modifiers(it);
664 metadata_fields.push(MetadataField {
665 name: field_name,
666 field_type: MetadataFieldType::Geo,
667 tag_separator: None,
668 });
669 }
670 b"VECTOR" => {
671 if vector_field.is_some() {
672 return Err(FtError::Unsupported(
673 "multiple VECTOR fields per index".to_string(),
674 ));
675 }
676 let parsed = parse_vector_clause(it)?;
677 vector_field = Some((field_name, parsed.0, parsed.1, parsed.2, parsed.3));
678 }
679 other => {
680 return Err(FtError::Unsupported(format!(
681 "FT.CREATE field kind {}",
682 String::from_utf8_lossy(other)
683 )));
684 }
685 }
686 }
687 Ok((vector_field, metadata_fields))
688}
689
690fn consume_field_modifiers(it: &mut TokenCursor<'_>) {
697 while let Some(tok) = it.peek() {
698 let up = ascii_upper(tok);
699 match up.as_slice() {
700 b"SORTABLE" | b"NOINDEX" | b"UNF" | b"CASESENSITIVE" | b"NOSTEM" => {
701 it.advance();
702 }
703 b"WEIGHT" | b"PHONETIC" => {
704 it.advance();
705 if it.peek().is_some() {
710 it.advance();
711 }
712 }
713 _ => break,
714 }
715 }
716}
717
718fn parse_tag_modifiers(it: &mut TokenCursor<'_>, field_name: &str) -> Result<Option<u8>, FtError> {
724 let mut separator: Option<u8> = None;
725 while let Some(tok) = it.peek() {
726 let up = ascii_upper(tok);
727 match up.as_slice() {
728 b"SEPARATOR" => {
729 it.advance();
730 let sep_tok =
731 it.next_required("FT.CREATE: TAG SEPARATOR expects a single-character value")?;
732 if sep_tok.len() != 1 {
733 return Err(FtError::Syntax(format!(
734 "FT.CREATE: TAG SEPARATOR for field {field_name} must be a single ASCII byte",
735 )));
736 }
737 separator = Some(sep_tok[0]);
738 }
739 b"SORTABLE" | b"NOINDEX" | b"UNF" | b"CASESENSITIVE" => {
740 it.advance();
741 }
742 _ => break,
743 }
744 }
745 Ok(separator)
746}
747
748fn parse_vector_clause(
749 it: &mut TokenCursor<'_>,
750) -> Result<(VectorType, u16, DistanceMetric, IndexAlgorithm), FtError> {
751 let alg_tok = it.next_required("FT.CREATE: VECTOR missing algorithm")?;
752 let alg_up = ascii_upper(alg_tok);
753 let algorithm = match alg_up.as_slice() {
754 b"HNSW" => IndexAlgorithm::Hnsw,
755 b"FLAT" => {
756 return Err(FtError::Unsupported(
757 "FT.CREATE: FLAT vector index not supported in this build".to_string(),
758 ));
759 }
760 other => {
761 return Err(FtError::Unsupported(format!(
762 "FT.CREATE VECTOR algorithm {}",
763 String::from_utf8_lossy(other)
764 )));
765 }
766 };
767 let pair_count_tok = it.next_required("FT.CREATE: VECTOR missing parameter count")?;
768 let pair_count = parse_unsigned(pair_count_tok, "FT.CREATE VECTOR parameter count")?;
769 if !pair_count.is_multiple_of(2) {
770 return Err(FtError::Syntax(
771 "FT.CREATE VECTOR parameter count must be even".to_string(),
772 ));
773 }
774 let mut vec_type: Option<VectorType> = None;
775 let mut dim: Option<u16> = None;
776 let mut distance: Option<DistanceMetric> = None;
777 let pair_pairs = pair_count / 2;
778 for _ in 0..pair_pairs {
779 let key_tok = it.next_required("FT.CREATE: VECTOR missing parameter key")?;
780 let val_tok = it.next_required("FT.CREATE: VECTOR missing parameter value")?;
781 let key_up = ascii_upper(key_tok);
782 let val_up = ascii_upper(val_tok);
783 match key_up.as_slice() {
784 b"TYPE" => match val_up.as_slice() {
785 b"FLOAT32" => vec_type = Some(VectorType::Float32),
786 b"FLOAT16" => {
787 return Err(FtError::Unsupported(
788 "FT.CREATE VECTOR TYPE FLOAT16 not supported in this build".to_string(),
789 ));
790 }
791 other => {
792 return Err(FtError::Unsupported(format!(
793 "FT.CREATE VECTOR TYPE {}",
794 String::from_utf8_lossy(other)
795 )));
796 }
797 },
798 b"DIM" => {
799 let d = parse_unsigned(val_tok, "FT.CREATE VECTOR DIM")?;
800 if d == 0 || d > usize::from(u16::MAX) {
801 return Err(FtError::Syntax(
802 "FT.CREATE VECTOR DIM out of range".to_string(),
803 ));
804 }
805 dim = Some(u16::try_from(d).expect("dim fits u16"));
806 }
807 b"DISTANCE_METRIC" => {
808 distance = Some(match val_up.as_slice() {
809 b"COSINE" => DistanceMetric::Cosine,
810 b"L2" | b"EUCLIDEAN" => DistanceMetric::L2,
811 b"IP" | b"DOTPRODUCT" | b"DOT_PRODUCT" => DistanceMetric::InnerProduct,
812 other => {
813 return Err(FtError::Unsupported(format!(
814 "FT.CREATE DISTANCE_METRIC {}",
815 String::from_utf8_lossy(other)
816 )));
817 }
818 });
819 }
820 _ => {}
824 }
825 }
826 let vec_type =
827 vec_type.ok_or_else(|| FtError::Syntax("FT.CREATE VECTOR missing TYPE".to_string()))?;
828 let dim = dim.ok_or_else(|| FtError::Syntax("FT.CREATE VECTOR missing DIM".to_string()))?;
829 let distance = distance
830 .ok_or_else(|| FtError::Syntax("FT.CREATE VECTOR missing DISTANCE_METRIC".to_string()))?;
831 Ok((vec_type, dim, distance, algorithm))
832}
833
834fn execute_create(registry: &VectorRegistry, req: CreateRequest) -> Result<FtOutcome, FtError> {
835 use crate::vector::registry::RegistryError;
836 match registry.create(req.name.clone(), req.schema) {
837 Ok(()) => Ok(FtOutcome::Ok),
838 Err(RegistryError::AlreadyExists(name)) => Err(FtError::AlreadyExists(name)),
839 Err(RegistryError::UnsupportedAlgorithm(_)) => Err(FtError::Unsupported(
840 "FT.CREATE: unsupported VECTOR algorithm".to_string(),
841 )),
842 Err(other) => Err(FtError::Engine(other.to_string())),
843 }
844}
845
846fn parse_search(rest: &[&[u8]]) -> Result<FtCommand, FtError> {
859 let mut it = TokenCursor::new(rest);
860 let name = it.next_string("FT.SEARCH: missing index name")?;
861 let query = it.next_required("FT.SEARCH: missing query expression")?;
862
863 let (filter_part, knn_part) = split_knn_suffix(query)?;
865
866 if let Some(knn_bytes) = knn_part {
867 let filter = parse_lhs_filter(filter_part)?;
871 let (k, vec_field, param_name) = parse_knn_clause(knn_bytes)?;
872 let mut params: HashMap<Vec<u8>, Vec<u8>> = HashMap::new();
873 let mut opts = SearchClauseOptions::default();
874 consume_search_trailing_clauses_with_params(&mut it, &mut params, &mut opts)?;
875 let vector_bytes = params
876 .remove(param_name.as_bytes())
877 .ok_or_else(|| FtError::Syntax(format!("FT.SEARCH: PARAMS missing ${param_name}")))?;
878 return Ok(FtCommand::Search(SearchRequest {
879 name,
880 k,
881 vector_field: vec_field,
882 vector_bytes,
883 filter,
884 return_fields: opts.return_fields,
885 limit: opts.limit,
886 sortby: opts.sortby,
887 nocontent: opts.nocontent,
888 }));
889 }
890
891 if let Some(parsed) = try_parse_simple_text_field_query(filter_part)? {
895 let (field, substring) = parsed;
896 let mut opts = SearchClauseOptions::default();
897 consume_search_trailing_clauses(&mut it, false, &mut opts)?;
898 return Ok(FtCommand::SearchText(SearchTextRequest {
899 name,
900 field,
901 query: substring,
902 return_fields: opts.return_fields,
903 limit: opts.limit,
904 sortby: opts.sortby,
905 nocontent: opts.nocontent,
906 }));
907 }
908
909 let filter = ft_filter::parse_expr(filter_part)?;
910 let mut opts = SearchClauseOptions::default();
911 consume_search_trailing_clauses(&mut it, false, &mut opts)?;
912 Ok(FtCommand::SearchFilter(SearchFilterRequest {
913 name,
914 filter,
915 return_fields: opts.return_fields,
916 limit: opts.limit,
917 sortby: opts.sortby,
918 nocontent: opts.nocontent,
919 }))
920}
921
922fn split_knn_suffix(query: &[u8]) -> Result<(&[u8], Option<&[u8]>), FtError> {
927 let trimmed = trim_ascii_bytes(query);
928 let Some(arrow) = find_byte_subseq(trimmed, b"=>") else {
929 return Ok((trimmed, None));
930 };
931 let lhs = trim_ascii_bytes(&trimmed[..arrow]);
932 let rhs = trim_ascii_bytes(&trimmed[arrow + 2..]);
933 if !rhs.starts_with(b"[") || !rhs.ends_with(b"]") {
934 return Err(FtError::Syntax(
935 "FT.SEARCH query: expected '[KNN ...]' after '=>'".to_string(),
936 ));
937 }
938 let inner = &rhs[1..rhs.len() - 1];
939 Ok((lhs, Some(inner)))
940}
941
942fn parse_lhs_filter(lhs: &[u8]) -> Result<Option<FilterExpr>, FtError> {
946 let trimmed = trim_ascii_bytes(lhs);
947 if trimmed.is_empty() || trimmed == b"*" {
948 return Ok(None);
949 }
950 let expr = ft_filter::parse_expr(trimmed)?;
951 if matches!(expr, FilterExpr::All) {
952 Ok(None)
953 } else {
954 Ok(Some(expr))
955 }
956}
957
958fn trim_ascii_bytes(s: &[u8]) -> &[u8] {
959 let mut start = 0;
960 let mut end = s.len();
961 while start < end && s[start].is_ascii_whitespace() {
962 start += 1;
963 }
964 while end > start && s[end - 1].is_ascii_whitespace() {
965 end -= 1;
966 }
967 &s[start..end]
968}
969
970fn try_parse_simple_text_field_query(lhs: &[u8]) -> Result<Option<(String, Vec<u8>)>, FtError> {
976 let trimmed = trim_ascii_bytes(lhs);
977 if trimmed.is_empty() || trimmed[0] != b'@' {
978 return Ok(None);
979 }
980 for &b in trimmed {
984 if matches!(b, b'(' | b')' | b'[' | b']' | b'{' | b'}' | b'|' | b'"') {
985 return Ok(None);
986 }
987 if b.is_ascii_whitespace() {
988 return Ok(None);
989 }
990 }
991 let body = &trimmed[1..];
992 let colon = body
993 .iter()
994 .position(|&b| b == b':')
995 .ok_or_else(|| FtError::Syntax("FT.SEARCH text query: missing ':'".to_string()))?;
996 let field_bytes = &body[..colon];
997 let substring = &body[colon + 1..];
998 if field_bytes.is_empty() {
999 return Err(FtError::Syntax(
1000 "FT.SEARCH text query: empty field name".to_string(),
1001 ));
1002 }
1003 if substring.is_empty() {
1004 return Ok(None);
1005 }
1006 let field = std::str::from_utf8(field_bytes)
1007 .map(str::to_string)
1008 .map_err(|_| FtError::Syntax("FT.SEARCH text query: field is not UTF-8".to_string()))?;
1009 Ok(Some((field, substring.to_vec())))
1010}
1011
1012fn parse_knn_clause(body: &[u8]) -> Result<(usize, String, String), FtError> {
1015 let s = std::str::from_utf8(body)
1016 .map_err(|_| FtError::Syntax("FT.SEARCH KNN clause is not UTF-8".to_string()))?;
1017 let mut parts = s.split_ascii_whitespace();
1018 let knn_kw = parts
1019 .next()
1020 .ok_or_else(|| FtError::Syntax("FT.SEARCH query: empty KNN clause".to_string()))?;
1021 if !knn_kw.eq_ignore_ascii_case("KNN") {
1022 return Err(FtError::Unsupported(format!(
1023 "FT.SEARCH query operator: {knn_kw}"
1024 )));
1025 }
1026 let k_str = parts
1027 .next()
1028 .ok_or_else(|| FtError::Syntax("FT.SEARCH query: KNN expects k".to_string()))?;
1029 let k: usize = k_str
1030 .parse()
1031 .map_err(|_| FtError::Syntax(format!("FT.SEARCH query: invalid k {k_str}")))?;
1032 let field_tok = parts
1033 .next()
1034 .ok_or_else(|| FtError::Syntax("FT.SEARCH query: KNN expects @field".to_string()))?;
1035 let field = field_tok
1036 .strip_prefix('@')
1037 .ok_or_else(|| FtError::Syntax("FT.SEARCH query: field must start with @".to_string()))?;
1038 let param_tok = parts
1039 .next()
1040 .ok_or_else(|| FtError::Syntax("FT.SEARCH query: KNN expects $param".to_string()))?;
1041 let param = param_tok
1042 .strip_prefix('$')
1043 .ok_or_else(|| FtError::Syntax("FT.SEARCH query: param must start with $".to_string()))?;
1044 if parts.next().is_some() {
1045 return Err(FtError::Unsupported(
1046 "FT.SEARCH query: extra tokens after KNN expression".to_string(),
1047 ));
1048 }
1049 if k == 0 {
1050 return Err(FtError::Syntax("FT.SEARCH KNN k must be > 0".to_string()));
1051 }
1052 Ok((k, field.to_string(), param.to_string()))
1053}
1054
1055fn consume_search_trailing_clauses_with_params(
1060 it: &mut TokenCursor<'_>,
1061 params: &mut HashMap<Vec<u8>, Vec<u8>>,
1062 opts: &mut SearchClauseOptions,
1063) -> Result<(), FtError> {
1064 loop {
1065 let Some(tok) = it.next() else { break };
1066 let up = ascii_upper(tok);
1067 match up.as_slice() {
1068 b"PARAMS" => {
1069 let n_tok = it.next_required("FT.SEARCH: PARAMS expects a count")?;
1070 let n = parse_unsigned(n_tok, "FT.SEARCH PARAMS count")?;
1071 if !n.is_multiple_of(2) {
1072 return Err(FtError::Syntax(
1073 "FT.SEARCH PARAMS count must be even".to_string(),
1074 ));
1075 }
1076 for _ in 0..(n / 2) {
1077 let k_tok = it.next_required("FT.SEARCH: PARAMS expects key/value pair")?;
1078 let v_tok = it.next_required("FT.SEARCH: PARAMS expects key/value pair")?;
1079 params.insert(k_tok.to_vec(), v_tok.to_vec());
1080 }
1081 }
1082 b"RETURN" => parse_return_clause(it, opts)?,
1083 b"SORTBY" => parse_sortby_clause(it, opts)?,
1084 b"LIMIT" => parse_limit_clause(it, opts)?,
1085 b"NOCONTENT" => opts.nocontent = true,
1086 b"DIALECT" => {
1087 it.next_required("FT.SEARCH: DIALECT expects a value")?;
1088 }
1089 b"WITHSCORES" => {}
1090 other => {
1091 return Err(FtError::Unsupported(format!(
1092 "FT.SEARCH clause {}",
1093 String::from_utf8_lossy(other)
1094 )));
1095 }
1096 }
1097 }
1098 Ok(())
1099}
1100
1101#[derive(Default)]
1107struct SearchClauseOptions {
1108 return_fields: Option<Vec<String>>,
1109 limit: Option<(usize, usize)>,
1110 sortby: Option<(String, SortDirection)>,
1111 nocontent: bool,
1112}
1113
1114fn parse_return_clause(
1115 it: &mut TokenCursor<'_>,
1116 opts: &mut SearchClauseOptions,
1117) -> Result<(), FtError> {
1118 let n_tok = it.next_required("FT.SEARCH: RETURN expects a count")?;
1119 let n = parse_unsigned(n_tok, "FT.SEARCH RETURN count")?;
1120 let mut fields: Vec<String> = Vec::with_capacity(n);
1121 for _ in 0..n {
1122 let f_tok = it.next_required("FT.SEARCH: RETURN expects field name")?;
1123 let trimmed: &[u8] = if f_tok.first() == Some(&b'@') {
1127 &f_tok[1..]
1128 } else {
1129 f_tok
1130 };
1131 fields.push(utf8(trimmed, "FT.SEARCH RETURN field name")?);
1132 }
1133 opts.return_fields = Some(fields);
1134 Ok(())
1135}
1136
1137fn parse_limit_clause(
1138 it: &mut TokenCursor<'_>,
1139 opts: &mut SearchClauseOptions,
1140) -> Result<(), FtError> {
1141 let off_tok = it.next_required("FT.SEARCH: LIMIT expects offset")?;
1142 let cnt_tok = it.next_required("FT.SEARCH: LIMIT expects count")?;
1143 let off = parse_unsigned(off_tok, "FT.SEARCH LIMIT offset")?;
1144 let cnt = parse_unsigned(cnt_tok, "FT.SEARCH LIMIT count")?;
1145 opts.limit = Some((off, cnt));
1146 Ok(())
1147}
1148
1149fn parse_sortby_clause(
1150 it: &mut TokenCursor<'_>,
1151 opts: &mut SearchClauseOptions,
1152) -> Result<(), FtError> {
1153 let f_tok = it.next_required("FT.SEARCH: SORTBY expects @field")?;
1154 let field_bytes: &[u8] = if f_tok.first() == Some(&b'@') {
1155 &f_tok[1..]
1156 } else {
1157 f_tok
1158 };
1159 let field = utf8(field_bytes, "FT.SEARCH SORTBY field")?;
1160 let direction = if let Some(next) = it.peek() {
1165 let up = ascii_upper(next);
1166 match up.as_slice() {
1167 b"ASC" => {
1168 it.advance();
1169 SortDirection::Asc
1170 }
1171 b"DESC" => {
1172 it.advance();
1173 SortDirection::Desc
1174 }
1175 _ => SortDirection::Asc,
1176 }
1177 } else {
1178 SortDirection::Asc
1179 };
1180 opts.sortby = Some((field, direction));
1181 Ok(())
1182}
1183
1184fn try_parse_text_field_query(query: &[u8]) -> Result<Option<(String, Vec<u8>)>, FtError> {
1190 if query.is_empty() || query[0] != b'@' {
1191 return Ok(None);
1192 }
1193 if find_byte_subseq(query, b"=>").is_some() {
1197 return Err(FtError::Unsupported(format!(
1198 "FT.SEARCH query: {}",
1199 String::from_utf8_lossy(query)
1200 )));
1201 }
1202 let body = &query[1..];
1203 let colon = body
1204 .iter()
1205 .position(|&b| b == b':')
1206 .ok_or_else(|| FtError::Syntax("FT.SEARCH text query: missing ':'".to_string()))?;
1207 let field_bytes = &body[..colon];
1208 let substring = &body[colon + 1..];
1209 if field_bytes.is_empty() {
1210 return Err(FtError::Syntax(
1211 "FT.SEARCH text query: empty field name".to_string(),
1212 ));
1213 }
1214 let field = std::str::from_utf8(field_bytes)
1215 .map(str::to_string)
1216 .map_err(|_| FtError::Syntax("FT.SEARCH text query: field is not UTF-8".to_string()))?;
1217 Ok(Some((field, substring.to_vec())))
1218}
1219
1220fn find_byte_subseq(haystack: &[u8], needle: &[u8]) -> Option<usize> {
1224 if needle.is_empty() || haystack.len() < needle.len() {
1225 return None;
1226 }
1227 haystack
1228 .windows(needle.len())
1229 .position(|window| window == needle)
1230}
1231
1232fn consume_search_trailing_clauses(
1238 it: &mut TokenCursor<'_>,
1239 allow_params: bool,
1240 opts: &mut SearchClauseOptions,
1241) -> Result<(), FtError> {
1242 loop {
1243 let Some(tok) = it.next() else { break };
1244 let up = ascii_upper(tok);
1245 match up.as_slice() {
1246 b"PARAMS" if allow_params => {
1247 let n_tok = it.next_required("FT.SEARCH: PARAMS expects a count")?;
1248 let n = parse_unsigned(n_tok, "FT.SEARCH PARAMS count")?;
1249 if !n.is_multiple_of(2) {
1250 return Err(FtError::Syntax(
1251 "FT.SEARCH PARAMS count must be even".to_string(),
1252 ));
1253 }
1254 for _ in 0..n {
1255 it.next_required("FT.SEARCH: PARAMS expects key/value pair")?;
1256 }
1257 }
1258 b"RETURN" => parse_return_clause(it, opts)?,
1259 b"SORTBY" => parse_sortby_clause(it, opts)?,
1260 b"LIMIT" => parse_limit_clause(it, opts)?,
1261 b"NOCONTENT" => opts.nocontent = true,
1262 b"DIALECT" => {
1263 it.next_required("FT.SEARCH: DIALECT expects a value")?;
1264 }
1265 b"WITHSCORES" => {}
1266 other => {
1267 return Err(FtError::Unsupported(format!(
1268 "FT.SEARCH clause {}",
1269 String::from_utf8_lossy(other)
1270 )));
1271 }
1272 }
1273 }
1274 Ok(())
1275}
1276
1277fn parse_knn_query(query: &[u8]) -> Result<(usize, String, String), FtError> {
1279 let s = std::str::from_utf8(query)
1280 .map_err(|_| FtError::Syntax("FT.SEARCH query is not UTF-8".to_string()))?;
1281 let trimmed = s.trim();
1282 let stripped = trimmed
1283 .strip_prefix("*=>")
1284 .ok_or_else(|| FtError::Unsupported(format!("FT.SEARCH query: {trimmed}")))?
1285 .trim_start();
1286 let inner = stripped
1287 .strip_prefix('[')
1288 .and_then(|s| s.strip_suffix(']'))
1289 .ok_or_else(|| FtError::Syntax("FT.SEARCH query: missing brackets".to_string()))?;
1290 let mut parts = inner.split_ascii_whitespace();
1291 let knn_kw = parts
1292 .next()
1293 .ok_or_else(|| FtError::Syntax("FT.SEARCH query: empty".to_string()))?;
1294 if !knn_kw.eq_ignore_ascii_case("KNN") {
1295 return Err(FtError::Unsupported(format!(
1296 "FT.SEARCH query operator: {knn_kw}"
1297 )));
1298 }
1299 let k_str = parts
1300 .next()
1301 .ok_or_else(|| FtError::Syntax("FT.SEARCH query: KNN expects k".to_string()))?;
1302 let k: usize = k_str
1303 .parse()
1304 .map_err(|_| FtError::Syntax(format!("FT.SEARCH query: invalid k {k_str}")))?;
1305 let field_tok = parts
1306 .next()
1307 .ok_or_else(|| FtError::Syntax("FT.SEARCH query: KNN expects @field".to_string()))?;
1308 let field = field_tok
1309 .strip_prefix('@')
1310 .ok_or_else(|| FtError::Syntax("FT.SEARCH query: field must start with @".to_string()))?;
1311 let param_tok = parts
1312 .next()
1313 .ok_or_else(|| FtError::Syntax("FT.SEARCH query: KNN expects $param".to_string()))?;
1314 let param = param_tok
1315 .strip_prefix('$')
1316 .ok_or_else(|| FtError::Syntax("FT.SEARCH query: param must start with $".to_string()))?;
1317 if parts.next().is_some() {
1318 return Err(FtError::Unsupported(
1319 "FT.SEARCH query: extra tokens after KNN expression".to_string(),
1320 ));
1321 }
1322 if k == 0 {
1323 return Err(FtError::Syntax("FT.SEARCH KNN k must be > 0".to_string()));
1324 }
1325 Ok((k, field.to_string(), param.to_string()))
1326}
1327
1328fn execute_search(registry: &VectorRegistry, req: &SearchRequest) -> Result<FtOutcome, FtError> {
1329 let table = registry
1330 .get(&req.name)
1331 .ok_or_else(|| FtError::NotFound(req.name.clone()))?;
1332 if table.schema.vector_field != req.vector_field {
1333 return Err(FtError::Syntax(format!(
1334 "FT.SEARCH: query references @{} but index vector field is {}",
1335 req.vector_field, table.schema.vector_field
1336 )));
1337 }
1338 let dim = usize::from(table.schema.dim);
1339 let query = decode_le_f32(&req.vector_bytes, dim)?;
1340
1341 let allowed: Option<BTreeSet<Vec<u8>>> = if let Some(filter) = &req.filter {
1351 let universe: BTreeSet<Vec<u8>> = table.indexed_keys().into_iter().collect();
1352 let matched = ft_filter::evaluate(filter, &table, &universe)?;
1353 Some(matched)
1354 } else {
1355 None
1356 };
1357
1358 let oversample_k = match allowed.as_ref() {
1359 None => req.k,
1361 Some(set) => {
1362 set.len().max(req.k)
1367 }
1368 };
1369 let raw = if oversample_k == 0 {
1370 Vec::new()
1371 } else {
1372 table
1373 .engine
1374 .search(&query, oversample_k, None)
1375 .map_err(|e| FtError::Engine(e.to_string()))?
1376 };
1377
1378 let mut out = Vec::new();
1379 for (row, score) in raw {
1380 if let Some(allowed) = &allowed {
1381 if !allowed.contains(&row.key) {
1382 continue;
1383 }
1384 }
1385 let mut fields: Vec<(String, Vec<u8>)> = Vec::new();
1386 fields.push(("__vec_score".to_string(), format_float(score).into_bytes()));
1387 for (k, v) in &row.metadata {
1388 let value_bytes = match v {
1389 serde_json::Value::String(s) => s.clone().into_bytes(),
1390 other => other.to_string().into_bytes(),
1391 };
1392 fields.push((k.clone(), value_bytes));
1393 }
1394 out.push(SearchHit {
1395 doc_id: row.key,
1396 score,
1397 fields,
1398 });
1399 if out.len() >= req.k {
1400 break;
1401 }
1402 }
1403 Ok(finalize_search_outcome(
1404 out,
1405 req.sortby.as_ref(),
1406 req.limit,
1407 req.return_fields.as_deref(),
1408 req.nocontent,
1409 ))
1410}
1411
1412fn execute_search_filter(
1419 registry: &VectorRegistry,
1420 req: &SearchFilterRequest,
1421) -> Result<FtOutcome, FtError> {
1422 let table = registry
1423 .get(&req.name)
1424 .ok_or_else(|| FtError::NotFound(req.name.clone()))?;
1425 let universe: BTreeSet<Vec<u8>> = table.indexed_keys().into_iter().collect();
1426 let matched = ft_filter::evaluate(&req.filter, &table, &universe)?;
1427
1428 let mut out: Vec<SearchHit> = Vec::with_capacity(matched.len());
1429 for key in matched {
1430 let row = match table.engine.get(&key) {
1431 Ok(Some(r)) => r,
1432 Ok(None) => continue,
1433 Err(e) => return Err(FtError::Engine(e.to_string())),
1434 };
1435 let mut fields: Vec<(String, Vec<u8>)> = Vec::new();
1436 for (k, v) in &row.metadata {
1437 let value_bytes = match v {
1438 serde_json::Value::String(s) => s.clone().into_bytes(),
1439 other => other.to_string().into_bytes(),
1440 };
1441 fields.push((k.clone(), value_bytes));
1442 }
1443 out.push(SearchHit {
1444 doc_id: row.key,
1445 score: 0.0,
1446 fields,
1447 });
1448 }
1449 Ok(finalize_search_outcome(
1450 out,
1451 req.sortby.as_ref(),
1452 req.limit,
1453 req.return_fields.as_deref(),
1454 req.nocontent,
1455 ))
1456}
1457
1458fn execute_search_text(
1459 registry: &VectorRegistry,
1460 req: &SearchTextRequest,
1461) -> Result<FtOutcome, FtError> {
1462 let table = registry
1463 .get(&req.name)
1464 .ok_or_else(|| FtError::NotFound(req.name.clone()))?;
1465 if !table.has_text_field(&req.field) {
1466 return Err(FtError::Syntax(format!(
1467 "FT.SEARCH: index {} has no TEXT field {}",
1468 req.name, req.field
1469 )));
1470 }
1471 let raw_hits = table
1472 .search_text_substring(&req.field, &req.query)
1473 .ok_or_else(|| {
1474 FtError::Engine(format!(
1475 "text index for field {} not provisioned",
1476 req.field
1477 ))
1478 })?;
1479 let mut out = Vec::with_capacity(raw_hits.len());
1480 for (key, text) in raw_hits {
1481 let mut fields: Vec<(String, Vec<u8>)> = vec![(req.field.clone(), text)];
1482 if let Some((sort_field, _)) = &req.sortby {
1488 if sort_field != &req.field {
1489 if let Ok(Some(row)) = table.engine.get(&key) {
1490 if let Some(v) = row.metadata.get(sort_field) {
1491 let bytes = match v {
1492 serde_json::Value::String(s) => s.clone().into_bytes(),
1493 other => other.to_string().into_bytes(),
1494 };
1495 fields.push((sort_field.clone(), bytes));
1496 }
1497 }
1498 }
1499 }
1500 out.push(SearchHit {
1501 doc_id: key,
1502 score: 0.0,
1503 fields,
1504 });
1505 }
1506 Ok(finalize_search_outcome(
1507 out,
1508 req.sortby.as_ref(),
1509 req.limit,
1510 req.return_fields.as_deref(),
1511 req.nocontent,
1512 ))
1513}
1514
1515fn finalize_search_outcome(
1526 mut hits: Vec<SearchHit>,
1527 sortby: Option<&(String, SortDirection)>,
1528 limit: Option<(usize, usize)>,
1529 return_fields: Option<&[String]>,
1530 nocontent: bool,
1531) -> FtOutcome {
1532 if let Some((field, dir)) = sortby {
1533 sort_hits_by_field(&mut hits, field, *dir);
1534 }
1535 if let Some((offset, count)) = limit {
1536 if offset >= hits.len() {
1537 hits.clear();
1538 } else {
1539 let end = offset.saturating_add(count).min(hits.len());
1540 hits = hits.drain(offset..end).collect();
1541 }
1542 }
1543 if let Some(fields) = return_fields {
1544 for hit in &mut hits {
1545 hit.fields
1546 .retain(|(name, _)| fields.iter().any(|f| f == name));
1547 }
1548 }
1549 let total = hits.len();
1550 if nocontent {
1551 let doc_ids = hits.into_iter().map(|h| h.doc_id).collect();
1552 FtOutcome::SearchNoContent { total, doc_ids }
1553 } else {
1554 FtOutcome::Search { total, hits }
1555 }
1556}
1557
1558fn sort_hits_by_field(hits: &mut [SearchHit], field: &str, dir: SortDirection) {
1563 hits.sort_by(|a, b| {
1564 let av = lookup_field(a, field);
1565 let bv = lookup_field(b, field);
1566 match (av, bv) {
1568 (Some(a_bytes), Some(b_bytes)) => {
1569 match (parse_sort_key(a_bytes), parse_sort_key(b_bytes)) {
1570 (Some(a_num), Some(b_num)) => match dir {
1571 SortDirection::Asc => a_num
1572 .partial_cmp(&b_num)
1573 .unwrap_or(std::cmp::Ordering::Equal),
1574 SortDirection::Desc => b_num
1575 .partial_cmp(&a_num)
1576 .unwrap_or(std::cmp::Ordering::Equal),
1577 },
1578 _ => match dir {
1579 SortDirection::Asc => a_bytes.cmp(b_bytes),
1580 SortDirection::Desc => b_bytes.cmp(a_bytes),
1581 },
1582 }
1583 }
1584 (Some(_), None) => std::cmp::Ordering::Less,
1585 (None, Some(_)) => std::cmp::Ordering::Greater,
1586 (None, None) => std::cmp::Ordering::Equal,
1587 }
1588 });
1589}
1590
1591fn lookup_field<'a>(hit: &'a SearchHit, name: &str) -> Option<&'a [u8]> {
1592 hit.fields
1593 .iter()
1594 .find(|(n, _)| n == name)
1595 .map(|(_, v)| v.as_slice())
1596}
1597
1598fn parse_sort_key(bytes: &[u8]) -> Option<f64> {
1602 let s = std::str::from_utf8(bytes).ok()?;
1603 let parsed: f64 = s.trim().parse().ok()?;
1604 if parsed.is_finite() {
1605 Some(parsed)
1606 } else {
1607 None
1608 }
1609}
1610
1611fn parse_aggregate(rest: &[&[u8]]) -> Result<AggregateRequest, FtError> {
1619 let mut it = TokenCursor::new(rest);
1620 let name = it.next_string("FT.AGGREGATE: missing index name")?;
1621 let _query = it.next_required("FT.AGGREGATE: missing query expression")?;
1622
1623 let mut group_by: Vec<String> = Vec::new();
1624 let mut reducers: Vec<ReducerSpec> = Vec::new();
1625 let mut limit: Option<(usize, usize)> = None;
1626 let mut saw_groupby = false;
1627
1628 while let Some(tok) = it.next() {
1629 let up = ascii_upper(tok);
1630 match up.as_slice() {
1631 b"GROUPBY" => {
1632 saw_groupby = true;
1633 parse_aggregate_groupby(&mut it, &mut group_by)?;
1634 }
1635 b"REDUCE" => {
1636 reducers.push(parse_aggregate_reduce(&mut it)?);
1637 }
1638 b"LIMIT" => {
1639 let off_tok = it.next_required("FT.AGGREGATE: LIMIT expects offset")?;
1640 let cnt_tok = it.next_required("FT.AGGREGATE: LIMIT expects count")?;
1641 let off = parse_unsigned(off_tok, "FT.AGGREGATE LIMIT offset")?;
1642 let cnt = parse_unsigned(cnt_tok, "FT.AGGREGATE LIMIT count")?;
1643 limit = Some((off, cnt));
1644 }
1645 other => {
1646 return Err(FtError::Unsupported(format!(
1647 "FT.AGGREGATE clause {}",
1648 String::from_utf8_lossy(other)
1649 )));
1650 }
1651 }
1652 }
1653
1654 if !saw_groupby {
1655 return Err(FtError::Unsupported(
1656 "FT.AGGREGATE without GROUPBY".to_string(),
1657 ));
1658 }
1659 if reducers.is_empty() {
1660 return Err(FtError::Syntax(
1661 "FT.AGGREGATE: GROUPBY requires at least one REDUCE".to_string(),
1662 ));
1663 }
1664 Ok(AggregateRequest {
1665 name,
1666 group_by,
1667 reducers,
1668 limit,
1669 })
1670}
1671
1672fn parse_aggregate_groupby(
1673 it: &mut TokenCursor<'_>,
1674 group_by: &mut Vec<String>,
1675) -> Result<(), FtError> {
1676 let n_tok = it.next_required("FT.AGGREGATE: GROUPBY expects a count")?;
1677 let n = parse_unsigned(n_tok, "FT.AGGREGATE GROUPBY count")?;
1678 if n == 0 {
1679 return Err(FtError::Syntax(
1680 "FT.AGGREGATE GROUPBY count must be > 0".to_string(),
1681 ));
1682 }
1683 for _ in 0..n {
1684 let f_tok = it.next_required("FT.AGGREGATE: GROUPBY expects @field")?;
1685 let bytes: &[u8] = if f_tok.first() == Some(&b'@') {
1686 &f_tok[1..]
1687 } else {
1688 f_tok
1689 };
1690 group_by.push(utf8(bytes, "FT.AGGREGATE GROUPBY field")?);
1691 }
1692 Ok(())
1693}
1694
1695fn parse_aggregate_reduce(it: &mut TokenCursor<'_>) -> Result<ReducerSpec, FtError> {
1696 let kind_tok = it.next_required("FT.AGGREGATE: REDUCE expects a kind")?;
1697 let kind_up = ascii_upper(kind_tok);
1698 let arg_count_tok = it.next_required("FT.AGGREGATE: REDUCE expects an argument count")?;
1699 let arg_count = parse_unsigned(arg_count_tok, "FT.AGGREGATE REDUCE arg count")?;
1700 let kind = match kind_up.as_slice() {
1701 b"COUNT" => {
1702 if arg_count != 0 {
1703 return Err(FtError::Syntax(
1704 "FT.AGGREGATE REDUCE COUNT expects 0 args".to_string(),
1705 ));
1706 }
1707 ReducerKind::Count
1708 }
1709 b"SUM" => {
1710 if arg_count != 1 {
1711 return Err(FtError::Syntax(
1712 "FT.AGGREGATE REDUCE SUM expects 1 arg".to_string(),
1713 ));
1714 }
1715 ReducerKind::Sum {
1716 field: take_field_arg(it, "FT.AGGREGATE: SUM expects @field")?,
1717 }
1718 }
1719 b"AVG" => {
1720 if arg_count != 1 {
1721 return Err(FtError::Syntax(
1722 "FT.AGGREGATE REDUCE AVG expects 1 arg".to_string(),
1723 ));
1724 }
1725 ReducerKind::Avg {
1726 field: take_field_arg(it, "FT.AGGREGATE: AVG expects @field")?,
1727 }
1728 }
1729 other => {
1730 for _ in 0..arg_count {
1737 let _ = it.next();
1738 }
1739 return Err(FtError::Unsupported(format!(
1740 "FT.AGGREGATE REDUCE {}",
1741 String::from_utf8_lossy(other)
1742 )));
1743 }
1744 };
1745 let as_tok = it.next_required("FT.AGGREGATE: REDUCE expects AS <name>")?;
1746 if !as_tok.eq_ignore_ascii_case(b"AS") {
1747 return Err(FtError::Syntax(
1748 "FT.AGGREGATE REDUCE clause missing AS".to_string(),
1749 ));
1750 }
1751 let alias = it.next_string("FT.AGGREGATE: REDUCE AS expects a name")?;
1752 Ok(ReducerSpec { kind, alias })
1753}
1754
1755fn take_field_arg(it: &mut TokenCursor<'_>, msg: &str) -> Result<String, FtError> {
1756 let tok = it.next_required(msg)?;
1757 let bytes: &[u8] = if tok.first() == Some(&b'@') {
1758 &tok[1..]
1759 } else {
1760 tok
1761 };
1762 utf8(bytes, msg)
1763}
1764
1765fn parse_explain(rest: &[&[u8]]) -> Result<ExplainRequest, FtError> {
1766 let mut it = TokenCursor::new(rest);
1767 let name = it.next_string("FT.EXPLAIN: missing index name")?;
1768 let query_tok = it.next_required("FT.EXPLAIN: missing query expression")?;
1769 while let Some(tok) = it.next() {
1773 let up = ascii_upper(tok);
1774 match up.as_slice() {
1775 b"DIALECT" => {
1776 it.next_required("FT.EXPLAIN: DIALECT expects a value")?;
1777 }
1778 other => {
1779 return Err(FtError::Unsupported(format!(
1780 "FT.EXPLAIN clause {}",
1781 String::from_utf8_lossy(other)
1782 )));
1783 }
1784 }
1785 }
1786 Ok(ExplainRequest {
1787 name,
1788 query: query_tok.to_vec(),
1789 })
1790}
1791
1792fn parse_alter(rest: &[&[u8]]) -> Result<AlterRequest, FtError> {
1793 let mut it = TokenCursor::new(rest);
1794 let name = it.next_string("FT.ALTER: missing index name")?;
1795 let op_tok = it.next_required("FT.ALTER: expected ADD")?;
1796 let op_up = ascii_upper(op_tok);
1797 match op_up.as_slice() {
1798 b"ADD" => {}
1799 b"DROP" => {
1800 return Err(FtError::Unsupported("FT.ALTER DROP".to_string()));
1801 }
1802 b"SCHEMA" => {
1803 return Err(FtError::Unsupported("FT.ALTER SCHEMA".to_string()));
1808 }
1809 other => {
1810 return Err(FtError::Syntax(format!(
1811 "FT.ALTER: expected ADD, got {}",
1812 String::from_utf8_lossy(other)
1813 )));
1814 }
1815 }
1816 let field = it.next_string("FT.ALTER ADD: missing field name")?;
1817 let type_tok = it.next_required("FT.ALTER ADD: missing field type")?;
1818 let type_up = ascii_upper(type_tok);
1819 let field_type = match type_up.as_slice() {
1820 b"TEXT" => MetadataFieldType::Text,
1821 b"TAG" => MetadataFieldType::Tag,
1822 b"VECTOR" => {
1823 return Err(FtError::Unsupported(
1824 "FT.ALTER ADD VECTOR (rebuild required)".to_string(),
1825 ));
1826 }
1827 b"NUMERIC" | b"GEO" => {
1828 return Err(FtError::Unsupported(format!(
1829 "FT.ALTER ADD {}",
1830 String::from_utf8_lossy(type_tok)
1831 )));
1832 }
1833 other => {
1834 return Err(FtError::Syntax(format!(
1835 "FT.ALTER ADD: unknown type {}",
1836 String::from_utf8_lossy(other)
1837 )));
1838 }
1839 };
1840 if it.peek().is_some() {
1841 return Err(FtError::Syntax(
1842 "FT.ALTER ADD: unexpected trailing tokens".to_string(),
1843 ));
1844 }
1845 Ok(AlterRequest {
1846 name,
1847 field,
1848 field_type,
1849 })
1850}
1851
1852fn execute_aggregate(
1853 registry: &VectorRegistry,
1854 req: &AggregateRequest,
1855) -> Result<FtOutcome, FtError> {
1856 let table = registry
1857 .get(&req.name)
1858 .ok_or_else(|| FtError::NotFound(req.name.clone()))?;
1859 let mut groups: BTreeMap<Vec<u8>, GroupAccumulator> = BTreeMap::new();
1868 for key in table.indexed_keys() {
1869 let row = match table.engine.get(&key) {
1870 Ok(Some(r)) => r,
1871 Ok(None) => continue,
1872 Err(e) => return Err(FtError::Engine(e.to_string())),
1873 };
1874 let group_key = build_group_key(&req.group_by, &row.metadata);
1875 let entry = groups.entry(group_key).or_insert_with(|| {
1876 GroupAccumulator::new(
1877 req.group_by
1878 .iter()
1879 .map(|f| {
1880 (
1881 f.clone(),
1882 metadata_string(row.metadata.get(f)).unwrap_or_default(),
1883 )
1884 })
1885 .collect(),
1886 req.reducers.len(),
1887 )
1888 });
1889 entry.observe(&req.reducers, &row.metadata);
1890 }
1891
1892 let mut rows: Vec<Vec<(String, Vec<u8>)>> = Vec::with_capacity(groups.len());
1896 for (_, mut group) in groups {
1897 let mut row: Vec<(String, Vec<u8>)> = std::mem::take(&mut group.fields);
1898 for (i, reducer) in req.reducers.iter().enumerate() {
1899 let value = group.render_reducer(i, reducer);
1900 row.push((reducer.alias.clone(), value));
1901 }
1902 rows.push(row);
1903 }
1904
1905 if let Some((offset, count)) = req.limit {
1906 if offset >= rows.len() {
1907 rows.clear();
1908 } else {
1909 let end = offset.saturating_add(count).min(rows.len());
1910 rows = rows.drain(offset..end).collect();
1911 }
1912 }
1913
1914 let total_groups = rows.len();
1915 Ok(FtOutcome::Aggregate { total_groups, rows })
1916}
1917
1918struct GroupAccumulator {
1922 fields: Vec<(String, Vec<u8>)>,
1923 slots: Vec<ReducerAccum>,
1924}
1925
1926#[derive(Default)]
1927struct ReducerAccum {
1928 count: u64,
1929 sum: f64,
1930 saw_numeric: bool,
1934}
1935
1936impl GroupAccumulator {
1937 fn new(fields: Vec<(String, Vec<u8>)>, n_reducers: usize) -> Self {
1938 let mut slots = Vec::with_capacity(n_reducers);
1939 for _ in 0..n_reducers {
1940 slots.push(ReducerAccum::default());
1941 }
1942 Self { fields, slots }
1943 }
1944
1945 fn observe(&mut self, reducers: &[ReducerSpec], metadata: &HashMap<String, serde_json::Value>) {
1946 for (i, reducer) in reducers.iter().enumerate() {
1947 let slot = &mut self.slots[i];
1948 slot.count = slot.count.saturating_add(1);
1949 match &reducer.kind {
1950 ReducerKind::Count => {}
1951 ReducerKind::Sum { field } | ReducerKind::Avg { field } => {
1952 if let Some(v) = metadata.get(field) {
1953 if let Some(n) = metadata_number(v) {
1954 slot.sum += n;
1955 slot.saw_numeric = true;
1956 }
1957 }
1958 }
1959 }
1960 }
1961 }
1962
1963 fn render_reducer(&self, i: usize, reducer: &ReducerSpec) -> Vec<u8> {
1964 let slot = &self.slots[i];
1965 match &reducer.kind {
1966 ReducerKind::Count => slot.count.to_string().into_bytes(),
1967 ReducerKind::Sum { .. } => {
1968 if slot.saw_numeric {
1969 format_float_f64(slot.sum).into_bytes()
1970 } else {
1971 b"0".to_vec()
1972 }
1973 }
1974 ReducerKind::Avg { .. } => {
1975 if slot.saw_numeric && slot.count > 0 {
1976 let denom = u32::try_from(slot.count).unwrap_or(u32::MAX);
1986 let mean = slot.sum / f64::from(denom);
1987 format_float_f64(mean).into_bytes()
1988 } else {
1989 b"0".to_vec()
1990 }
1991 }
1992 }
1993 }
1994}
1995
1996fn build_group_key(group_by: &[String], metadata: &HashMap<String, serde_json::Value>) -> Vec<u8> {
1997 let mut key: Vec<u8> = Vec::new();
1998 for field in group_by {
1999 let v = metadata_string(metadata.get(field)).unwrap_or_default();
2000 if !key.is_empty() {
2005 key.push(0x1f);
2006 }
2007 key.extend_from_slice(&v);
2008 }
2009 key
2010}
2011
2012fn metadata_string(value: Option<&serde_json::Value>) -> Option<Vec<u8>> {
2013 match value? {
2014 serde_json::Value::String(s) => Some(s.clone().into_bytes()),
2015 other => Some(other.to_string().into_bytes()),
2016 }
2017}
2018
2019fn metadata_number(value: &serde_json::Value) -> Option<f64> {
2020 match value {
2021 serde_json::Value::Number(n) => n.as_f64(),
2022 serde_json::Value::String(s) => s.trim().parse::<f64>().ok(),
2023 _ => None,
2024 }
2025}
2026
2027fn execute_explain(registry: &VectorRegistry, req: &ExplainRequest) -> Result<FtOutcome, FtError> {
2028 let table = registry
2029 .get(&req.name)
2030 .ok_or_else(|| FtError::NotFound(req.name.clone()))?;
2031 let plan = if let Some((field, substring)) = try_parse_text_field_query(&req.query)? {
2036 let trigrams = trigram_preview(&substring);
2039 format!(
2040 "@{field}: SUBSTRING\n field: {field}\n query: {query}\n index: trigram+bloom\n trigrams: {trigrams}\n",
2041 field = field,
2042 query = String::from_utf8_lossy(&substring),
2043 trigrams = trigrams,
2044 )
2045 } else if let Ok((k, vec_field, param_name)) = parse_knn_query(&req.query) {
2046 let dim = table.schema.dim;
2047 let metric = format!("{:?}", table.schema.distance).to_uppercase();
2048 let alg = format!("{:?}", table.schema.algorithm).to_uppercase();
2049 format!(
2050 "VECTOR KNN\n index: {idx}\n algorithm: {alg}\n metric: {metric}\n field: {field}\n k: {k}\n dim: {dim}\n param: ${param}\n",
2051 idx = req.name,
2052 alg = alg,
2053 metric = metric,
2054 field = vec_field,
2055 k = k,
2056 dim = dim,
2057 param = param_name,
2058 )
2059 } else {
2060 format!(
2065 "UNKNOWN QUERY\n index: {idx}\n query: {q}\n note: only @field:substring and *=>[KNN ...] are planned in this build\n",
2066 idx = req.name,
2067 q = String::from_utf8_lossy(&req.query),
2068 )
2069 };
2070 Ok(FtOutcome::Explain(plan))
2071}
2072
2073fn trigram_preview(query: &[u8]) -> String {
2077 if query.len() < 3 {
2078 return "<short-circuit: scan>".to_string();
2079 }
2080 let mut parts: Vec<String> = Vec::new();
2081 for window in query.windows(3).take(8) {
2082 parts.push(format!("\"{}\"", String::from_utf8_lossy(window)));
2083 }
2084 let suffix = if query.len() > 8 + 3 - 1 { ", ..." } else { "" };
2085 format!("[{}{}]", parts.join(", "), suffix)
2086}
2087
2088fn execute_alter(registry: &VectorRegistry, req: &AlterRequest) -> Result<FtOutcome, FtError> {
2089 let table = registry
2090 .get(&req.name)
2091 .ok_or_else(|| FtError::NotFound(req.name.clone()))?;
2092 match req.field_type {
2093 MetadataFieldType::Text => {
2094 let _new = table.add_text_field(&req.field);
2100 Ok(FtOutcome::Ok)
2101 }
2102 MetadataFieldType::Tag => {
2103 Ok(FtOutcome::Ok)
2110 }
2111 MetadataFieldType::Numeric | MetadataFieldType::Geo => {
2112 Err(FtError::Unsupported(format!(
2117 "FT.ALTER ADD type {:?}",
2118 req.field_type
2119 )))
2120 }
2121 }
2122}
2123
2124fn format_float_f64(f: f64) -> String {
2125 format!("{f:.6}")
2130}
2131
2132fn parse_regex(rest: &[&[u8]]) -> Result<RegexRequest, FtError> {
2142 let mut it = TokenCursor::new(rest);
2143 let name = it.next_string("FT.REGEX: missing index name")?;
2144 let field = it.next_string("FT.REGEX: missing field name")?;
2145 let pattern_tok = it.next_required("FT.REGEX: missing pattern")?;
2146 let pattern = std::str::from_utf8(pattern_tok)
2147 .map(str::to_string)
2148 .map_err(|_| FtError::Syntax("FT.REGEX: pattern is not UTF-8".to_string()))?;
2149 let mut max_errors: u16 = 0;
2150 for tok in &mut it {
2151 let up = ascii_upper(tok);
2152 if let Some(rest) = up.strip_prefix(b"K=") {
2153 let s = std::str::from_utf8(rest)
2154 .map_err(|_| FtError::Syntax("FT.REGEX: K= value is not UTF-8".to_string()))?;
2155 let n: u32 = s
2156 .parse()
2157 .map_err(|_| FtError::Syntax(format!("FT.REGEX: invalid K= value {s}")))?;
2158 max_errors = u16::try_from(n)
2159 .map_err(|_| FtError::Syntax(format!("FT.REGEX: K= value {n} exceeds u16")))?;
2160 } else {
2161 return Err(FtError::Unsupported(format!(
2162 "FT.REGEX option {}",
2163 String::from_utf8_lossy(tok)
2164 )));
2165 }
2166 }
2167 Ok(RegexRequest {
2168 name,
2169 field,
2170 pattern,
2171 max_errors,
2172 })
2173}
2174
2175fn execute_regex(registry: &VectorRegistry, req: &RegexRequest) -> Result<FtOutcome, FtError> {
2176 let table = registry
2177 .get(&req.name)
2178 .ok_or_else(|| FtError::NotFound(req.name.clone()))?;
2179 if !table.has_text_field(&req.field) {
2180 return Err(FtError::Syntax(format!(
2181 "FT.REGEX: index {} has no TEXT field {}",
2182 req.name, req.field
2183 )));
2184 }
2185 let raw_hits = if req.max_errors == 0 {
2186 table
2187 .search_text_regex(&req.field, &req.pattern)
2188 .ok_or_else(|| {
2189 FtError::Engine(format!(
2190 "text index for field {} not provisioned",
2191 req.field
2192 ))
2193 })?
2194 .map_err(|e| FtError::Syntax(format!("FT.REGEX: invalid pattern: {e}")))?
2195 } else {
2196 table
2197 .search_text_regex_approx(&req.field, &req.pattern, req.max_errors)
2198 .ok_or_else(|| {
2199 FtError::Engine(format!(
2200 "text index for field {} not provisioned",
2201 req.field
2202 ))
2203 })?
2204 .map_err(|e| FtError::Engine(format!("FT.REGEX (approximate): {e}")))?
2205 };
2206 let mut out = Vec::with_capacity(raw_hits.len());
2207 for (key, text) in raw_hits {
2208 out.push(SearchHit {
2209 doc_id: key,
2210 score: 0.0,
2211 fields: vec![(req.field.clone(), text)],
2212 });
2213 }
2214 let total = out.len();
2215 Ok(FtOutcome::Search { total, hits: out })
2216}
2217
2218fn parse_info(rest: &[&[u8]]) -> Result<FtCommand, FtError> {
2221 let mut it = TokenCursor::new(rest);
2222 let name = it.next_string("FT.INFO: missing index name")?;
2223 if it.peek().is_some() {
2224 return Err(FtError::Syntax(
2225 "FT.INFO: unexpected trailing tokens".to_string(),
2226 ));
2227 }
2228 Ok(FtCommand::Info { name })
2229}
2230
2231fn parse_list(rest: &[&[u8]]) -> Result<FtCommand, FtError> {
2232 if !rest.is_empty() {
2233 return Err(FtError::Syntax("FT.LIST: takes no arguments".to_string()));
2234 }
2235 Ok(FtCommand::List)
2236}
2237
2238fn parse_dropindex(rest: &[&[u8]]) -> Result<FtCommand, FtError> {
2239 let mut it = TokenCursor::new(rest);
2240 let name = it.next_string("FT.DROPINDEX: missing index name")?;
2241 let mut delete_documents = false;
2242 loop {
2243 let Some(tok) = it.next() else { break };
2244 let up = ascii_upper(tok);
2245 match up.as_slice() {
2246 b"DD" => delete_documents = true,
2247 other => {
2248 return Err(FtError::Unsupported(format!(
2249 "FT.DROPINDEX option {}",
2250 String::from_utf8_lossy(other)
2251 )));
2252 }
2253 }
2254 }
2255 Ok(FtCommand::DropIndex {
2256 name,
2257 delete_documents,
2258 })
2259}
2260
2261fn execute_info(registry: &VectorRegistry, name: String) -> Result<FtOutcome, FtError> {
2262 let info = registry
2263 .info(&name)
2264 .ok_or_else(|| FtError::NotFound(name.clone()))?;
2265 let table = registry.get(&name).ok_or(FtError::NotFound(name))?;
2266 let mut out: Vec<(String, InfoValue)> = Vec::new();
2267 out.push(("index_name".to_string(), InfoValue::String(info.name)));
2268 out.push((
2269 "algorithm".to_string(),
2270 InfoValue::String(format!("{:?}", info.algorithm).to_uppercase()),
2271 ));
2272 out.push((
2273 "distance_metric".to_string(),
2274 InfoValue::String(format!("{:?}", info.distance).to_uppercase()),
2275 ));
2276 out.push((
2277 "vector_field".to_string(),
2278 InfoValue::String(table.schema.vector_field.clone()),
2279 ));
2280 out.push((
2281 "vector_type".to_string(),
2282 InfoValue::String(format!("{:?}", table.schema.vector_type).to_uppercase()),
2283 ));
2284 out.push(("dim".to_string(), InfoValue::Integer(i64::from(info.dim))));
2285 let prefixes_value = InfoValue::Array(
2286 table
2287 .schema
2288 .prefixes
2289 .iter()
2290 .map(|p| InfoValue::String(String::from_utf8_lossy(p).into_owned()))
2291 .collect(),
2292 );
2293 out.push(("prefixes".to_string(), prefixes_value));
2294 let metadata_value = InfoValue::Array(
2295 table
2296 .schema
2297 .metadata_fields
2298 .iter()
2299 .map(|f| {
2300 InfoValue::Array(vec![
2301 InfoValue::String(f.name.clone()),
2302 InfoValue::String(format!("{:?}", f.field_type).to_uppercase()),
2303 ])
2304 })
2305 .collect(),
2306 );
2307 out.push(("schema_fields".to_string(), metadata_value));
2308 out.push((
2309 "num_docs".to_string(),
2310 InfoValue::Integer(i64::try_from(info.live_rows).unwrap_or(i64::MAX)),
2311 ));
2312 out.push((
2313 "tracked_rows".to_string(),
2314 InfoValue::Integer(i64::try_from(info.tracked_rows).unwrap_or(i64::MAX)),
2315 ));
2316 Ok(FtOutcome::Info(out))
2317}
2318
2319fn execute_dropindex(
2320 registry: &VectorRegistry,
2321 name: String,
2322 delete_documents: bool,
2323) -> Result<FtOutcome, FtError> {
2324 use crate::vector::registry::RegistryError;
2325 if delete_documents {
2326 match registry.drop_with_dd(&name) {
2327 Ok(keys) => Ok(FtOutcome::DropOk {
2328 deleted_documents: true,
2329 document_count: keys.len(),
2330 }),
2331 Err(RegistryError::NotFound(_)) => Err(FtError::NotFound(name)),
2332 Err(other) => Err(FtError::Engine(other.to_string())),
2333 }
2334 } else {
2335 match registry.drop(&name) {
2336 Ok(_) => Ok(FtOutcome::DropOk {
2337 deleted_documents: false,
2338 document_count: 0,
2339 }),
2340 Err(RegistryError::NotFound(_)) => Err(FtError::NotFound(name)),
2341 Err(other) => Err(FtError::Engine(other.to_string())),
2342 }
2343 }
2344}
2345
2346fn insert_into_index(table: &VectorTable, key: &[u8], pairs: &[&[u8]]) -> Result<(), FtError> {
2349 let mut vector: Option<Vec<f32>> = None;
2350 let mut metadata: HashMap<String, serde_json::Value> = HashMap::new();
2351 let mut text_writes: Vec<(String, Vec<u8>)> = Vec::new();
2352 let mut chunks = pairs.chunks_exact(2);
2353 for chunk in &mut chunks {
2354 let field = chunk[0];
2355 let value = chunk[1];
2356 let field_str = std::str::from_utf8(field)
2357 .map_err(|_| FtError::Syntax("HSET field name is not UTF-8".to_string()))?;
2358 if field_str == table.schema.vector_field {
2359 let dim = usize::from(table.schema.dim);
2360 vector = Some(decode_le_f32(value, dim)?);
2361 } else {
2362 let value_str = String::from_utf8_lossy(value).into_owned();
2363 metadata.insert(field_str.to_string(), serde_json::Value::String(value_str));
2364 if table.has_text_field(field_str) {
2370 text_writes.push((field_str.to_string(), value.to_vec()));
2371 }
2372 }
2373 }
2374 if !chunks.remainder().is_empty() {
2375 return Err(FtError::Syntax(
2376 "HSET requires a value for every field".to_string(),
2377 ));
2378 }
2379 let v = vector.ok_or_else(|| {
2380 FtError::Syntax(format!(
2381 "HSET into indexed prefix is missing the vector field '{}'",
2382 table.schema.vector_field
2383 ))
2384 })?;
2385 table
2386 .engine
2387 .upsert(key.to_vec(), &v, metadata)
2388 .map_err(|e| FtError::Engine(e.to_string()))?;
2389 for (field, bytes) in text_writes {
2390 table.upsert_text_field(&field, key, &bytes);
2391 }
2392 table.record_indexed_key(key.to_vec());
2393 Ok(())
2394}
2395
2396#[must_use]
2400pub fn render_outcome(outcome: &FtOutcome) -> Vec<u8> {
2401 let mut out = Vec::new();
2402 match outcome {
2403 FtOutcome::Ok | FtOutcome::DropOk { .. } => {
2404 out.extend_from_slice(b"+OK\r\n");
2405 }
2406 FtOutcome::List(names) => {
2407 write_array_header(&mut out, names.len());
2408 for name in names {
2409 write_bulk(&mut out, name.as_bytes());
2410 }
2411 }
2412 FtOutcome::Info(pairs) => {
2413 write_array_header(&mut out, pairs.len() * 2);
2414 for (k, v) in pairs {
2415 write_bulk(&mut out, k.as_bytes());
2416 write_info_value(&mut out, v);
2417 }
2418 }
2419 FtOutcome::Search { total, hits } => {
2420 let total_i64 = i64::try_from(*total).unwrap_or(i64::MAX);
2422 write_array_header(&mut out, 1 + hits.len() * 2);
2423 write_integer(&mut out, total_i64);
2424 for hit in hits {
2425 write_bulk(&mut out, &hit.doc_id);
2426 write_array_header(&mut out, hit.fields.len() * 2);
2427 for (fk, fv) in &hit.fields {
2428 write_bulk(&mut out, fk.as_bytes());
2429 write_bulk(&mut out, fv);
2430 }
2431 }
2432 }
2433 FtOutcome::SearchNoContent { total, doc_ids } => {
2434 let total_i64 = i64::try_from(*total).unwrap_or(i64::MAX);
2436 write_array_header(&mut out, 1 + doc_ids.len());
2437 write_integer(&mut out, total_i64);
2438 for id in doc_ids {
2439 write_bulk(&mut out, id);
2440 }
2441 }
2442 FtOutcome::Aggregate { total_groups, rows } => {
2443 let total_i64 = i64::try_from(*total_groups).unwrap_or(i64::MAX);
2446 write_array_header(&mut out, 1 + rows.len());
2447 write_integer(&mut out, total_i64);
2448 for row in rows {
2449 write_array_header(&mut out, row.len() * 2);
2450 for (name, value) in row {
2451 write_bulk(&mut out, name.as_bytes());
2452 write_bulk(&mut out, value);
2453 }
2454 }
2455 }
2456 FtOutcome::Explain(plan) => {
2457 write_bulk(&mut out, plan.as_bytes());
2458 }
2459 }
2460 out
2461}
2462
2463#[must_use]
2465pub fn render_error(err: &FtError) -> Vec<u8> {
2466 let mut buf = Vec::new();
2467 let _ = write!(buf, "-ERR {err}\r\n");
2472 buf
2473}
2474
2475fn write_array_header(out: &mut Vec<u8>, n: usize) {
2476 let _ = write!(out, "*{n}\r\n");
2477}
2478
2479fn write_bulk(out: &mut Vec<u8>, payload: &[u8]) {
2480 let _ = write!(out, "${}\r\n", payload.len());
2481 out.extend_from_slice(payload);
2482 out.extend_from_slice(b"\r\n");
2483}
2484
2485fn write_integer(out: &mut Vec<u8>, n: i64) {
2486 let _ = write!(out, ":{n}\r\n");
2487}
2488
2489fn write_info_value(out: &mut Vec<u8>, value: &InfoValue) {
2490 match value {
2491 InfoValue::String(s) => write_bulk(out, s.as_bytes()),
2492 InfoValue::Integer(n) => write_integer(out, *n),
2493 InfoValue::Array(items) => {
2494 write_array_header(out, items.len());
2495 for it in items {
2496 write_info_value(out, it);
2497 }
2498 }
2499 }
2500}
2501
2502fn ascii_upper(bytes: &[u8]) -> Vec<u8> {
2505 bytes.iter().map(u8::to_ascii_uppercase).collect()
2506}
2507
2508fn matches_keyword(tok: Option<&[u8]>, kw: &str) -> bool {
2509 tok.is_some_and(|t| t.eq_ignore_ascii_case(kw.as_bytes()))
2510}
2511
2512fn expect_keyword(tok: &[u8], kw: &str) -> Result<(), FtError> {
2513 if tok.eq_ignore_ascii_case(kw.as_bytes()) {
2514 Ok(())
2515 } else {
2516 Err(FtError::Syntax(format!(
2517 "expected {kw}, got {}",
2518 String::from_utf8_lossy(tok)
2519 )))
2520 }
2521}
2522
2523fn parse_unsigned(tok: &[u8], context: &str) -> Result<usize, FtError> {
2524 let s =
2525 std::str::from_utf8(tok).map_err(|_| FtError::Syntax(format!("{context}: not UTF-8")))?;
2526 s.parse::<usize>()
2527 .map_err(|_| FtError::Syntax(format!("{context}: not a non-negative integer ({s})")))
2528}
2529
2530fn utf8(tok: &[u8], context: &str) -> Result<String, FtError> {
2531 std::str::from_utf8(tok)
2532 .map(str::to_string)
2533 .map_err(|_| FtError::Syntax(format!("{context}: not UTF-8")))
2534}
2535
2536fn decode_le_f32(bytes: &[u8], expected_dim: usize) -> Result<Vec<f32>, FtError> {
2537 if !bytes.len().is_multiple_of(4) {
2538 return Err(FtError::Syntax(
2539 "vector payload length is not a multiple of 4 bytes".to_string(),
2540 ));
2541 }
2542 let dim = bytes.len() / 4;
2543 if dim != expected_dim {
2544 return Err(FtError::DimensionMismatch {
2545 index_dim: expected_dim,
2546 payload_dim: dim,
2547 });
2548 }
2549 let mut out = Vec::with_capacity(dim);
2550 for chunk in bytes.chunks_exact(4) {
2551 let arr = [chunk[0], chunk[1], chunk[2], chunk[3]];
2552 out.push(f32::from_le_bytes(arr));
2553 }
2554 Ok(out)
2555}
2556
2557fn format_float(f: f32) -> String {
2558 format!("{f:.6}")
2562}
2563
2564struct TokenCursor<'a> {
2567 args: &'a [&'a [u8]],
2568 idx: usize,
2569}
2570
2571impl<'a> TokenCursor<'a> {
2572 fn new(args: &'a [&'a [u8]]) -> Self {
2573 Self { args, idx: 0 }
2574 }
2575
2576 fn peek(&self) -> Option<&'a [u8]> {
2577 self.args.get(self.idx).copied()
2578 }
2579
2580 fn advance(&mut self) {
2581 self.idx += 1;
2582 }
2583
2584 fn next_required(&mut self, msg: &str) -> Result<&'a [u8], FtError> {
2585 let tok = self
2586 .args
2587 .get(self.idx)
2588 .copied()
2589 .ok_or_else(|| FtError::Syntax(msg.to_string()))?;
2590 self.idx += 1;
2591 Ok(tok)
2592 }
2593
2594 fn next_string(&mut self, msg: &str) -> Result<String, FtError> {
2595 let tok = self.next_required(msg)?;
2596 utf8(tok, msg)
2597 }
2598}
2599
2600impl<'a> Iterator for TokenCursor<'a> {
2601 type Item = &'a [u8];
2602
2603 fn next(&mut self) -> Option<Self::Item> {
2604 let tok = self.args.get(self.idx).copied()?;
2605 self.idx += 1;
2606 Some(tok)
2607 }
2608}
2609
2610#[cfg(test)]
2611mod tests {
2612 use super::*;
2613
2614 fn slices<'a>(v: &'a [&'a [u8]]) -> Vec<&'a [u8]> {
2615 v.to_vec()
2616 }
2617
2618 #[test]
2619 fn ascii_upper_lowercases_to_uppercase() {
2620 assert_eq!(ascii_upper(b"ft.create"), b"FT.CREATE".to_vec());
2621 }
2622
2623 #[test]
2624 fn parse_create_minimal() {
2625 let v: Vec<&[u8]> = vec![
2626 b"FT.CREATE",
2627 b"idx",
2628 b"ON",
2629 b"HASH",
2630 b"PREFIX",
2631 b"1",
2632 b"docs:",
2633 b"SCHEMA",
2634 b"vec",
2635 b"VECTOR",
2636 b"HNSW",
2637 b"6",
2638 b"TYPE",
2639 b"FLOAT32",
2640 b"DIM",
2641 b"4",
2642 b"DISTANCE_METRIC",
2643 b"COSINE",
2644 ];
2645 let cmd = parse_command(&slices(&v)).expect("parse ok");
2646 let FtCommand::Create(req) = cmd else {
2647 panic!("expected create");
2648 };
2649 assert_eq!(req.name, "idx");
2650 assert_eq!(req.doc_type, DocType::Hash);
2651 assert_eq!(req.schema.dim, 4);
2652 assert_eq!(req.schema.vector_field, "vec");
2653 assert_eq!(req.schema.distance, DistanceMetric::Cosine);
2654 assert_eq!(req.schema.algorithm, IndexAlgorithm::Hnsw);
2655 assert_eq!(req.schema.prefixes, vec![b"docs:".to_vec()]);
2656 }
2657
2658 #[test]
2659 fn parse_knn_query_extracts_pieces() {
2660 let (k, field, param) = parse_knn_query(b"*=>[KNN 5 @vec $blob]").unwrap();
2661 assert_eq!(k, 5);
2662 assert_eq!(field, "vec");
2663 assert_eq!(param, "blob");
2664 }
2665
2666 #[test]
2667 fn parse_knn_query_rejects_filter() {
2668 let err = parse_knn_query(b"@title:foo=>[KNN 5 @vec $blob]").unwrap_err();
2669 assert!(matches!(err, FtError::Unsupported(_)));
2670 }
2671
2672 #[test]
2673 fn decode_le_f32_round_trips_a_short_vector() {
2674 let mut bytes = Vec::new();
2675 for v in [1.0_f32, -1.5, 2.25, 0.0] {
2676 bytes.extend_from_slice(&v.to_le_bytes());
2677 }
2678 let out = decode_le_f32(&bytes, 4).unwrap();
2679 assert_eq!(out, vec![1.0_f32, -1.5, 2.25, 0.0]);
2680 }
2681
2682 #[test]
2683 fn decode_le_f32_rejects_dim_mismatch() {
2684 let bytes = vec![0u8; 8];
2685 let err = decode_le_f32(&bytes, 4).unwrap_err();
2686 assert!(matches!(err, FtError::DimensionMismatch { .. }));
2687 }
2688}