Skip to main content

dynomite/proto/redis/
ft.rs

1//! RediSearch FT.* command surface.
2//!
3//! This module is the in-process home of the
4//! `FT.CREATE` / `FT.SEARCH` / `FT.INFO` / `FT.LIST` /
5//! `FT.DROPINDEX` / `FT.AGGREGATE` / `FT.EXPLAIN` /
6//! `FT.ALTER` parsers and executors that route through the
7//! [`crate::vector::registry::VectorRegistry`] landed in
8//! Phase B. It also exposes [`maybe_index_hset`], the HSET
9//! interception helper that the Redis dispatcher consults so
10//! that a write to a key matching a registered prefix turns
11//! into a vector upsert in the corresponding index.
12//!
13//! The module is intentionally self-contained: it consumes a
14//! slice of byte strings (the parsed RESP arguments) and emits
15//! a RESP-encoded response (`Vec<u8>`). It performs no I/O of
16//! its own. That separation lets the FT.* surface be exercised
17//! by integration tests directly, with the wire-format parser
18//! and the dispatcher kept on the side.
19//!
20//! # Subset implemented today
21//!
22//! * `FT.CREATE <idx> ON HASH PREFIX <n> <prefix>... SCHEMA
23//!   ( <field> TEXT | <field> VECTOR HNSW <m> TYPE FLOAT32
24//!     DIM <d> DISTANCE_METRIC COSINE|EUCLIDEAN|DOTPRODUCT )+`
25//! * `FT.SEARCH <idx> "*=>[KNN <k> @<field> $<param>]"
26//!     PARAMS 2 <param> <bytes>
27//!     [RETURN <n> <field>...] [LIMIT <off> <cnt>]
28//!     [SORTBY @<field> [ASC|DESC]] [NOCONTENT]`
29//!   (vector k-NN form, optionally pre-filtered by a
30//!   filter expression on the LHS of the `=>`).
31//! * `FT.SEARCH <idx> "<filter>"` (filter-only form;
32//!   accepts numeric ranges `@f:[min max]`, tag sets
33//!   `@f:{a|b|c}`, text substrings `@f:word`, and the
34//!   boolean combinators AND / OR / NOT / grouping). See
35//!   [`super::ft_filter`].
36//! * `FT.SEARCH <idx> "@<field>:<substring>"` (text
37//!   substring form; routed through the trigram +
38//!   bloom-filter inverted index for any `TEXT`-typed schema
39//!   field). Accepts the same `RETURN` / `LIMIT` / `SORTBY`
40//!   / `NOCONTENT` projection clauses.
41//! * `FT.AGGREGATE <idx> <query> GROUPBY <n> @<field>...
42//!     ( REDUCE COUNT 0 AS <name>
43//!     | REDUCE SUM 1 @<field> AS <name>
44//!     | REDUCE AVG 1 @<field> AS <name> )+
45//!     [LIMIT <off> <cnt>]` (the basic aggregation pipeline).
46//! * `FT.EXPLAIN <idx> <query>` (textual query plan).
47//! * `FT.ALTER <idx> ADD <field> ( TEXT | TAG )` (schema
48//!   extension; rejects DROP / SCHEMA REPLACE and refuses
49//!   to add VECTOR fields, which require an index rebuild).
50//! * `FT.REGEX <idx> <field> <pattern> [K=<n>]` (Dynomite
51//!   extension; not standard RediSearch). `K=0` (default) is
52//!   the exact regex path; `K>=1` is the approximate regex
53//!   path through the TRE C library, allowing up to `K` edit
54//!   operations between the pattern and the matching text.
55//! * `FT.INFO <idx>`
56//! * `FT.LIST` (alias `FT._LIST`)
57//! * `FT.DROPINDEX <idx> [DD]`
58//!
59//! Out-of-scope shapes are rejected with
60//! `-ERR not supported in this build`. The full grammar is
61//! tracked under `docs/dynvec/fold-into-redis-path.md`.
62
63use std::collections::{BTreeMap, BTreeSet, HashMap};
64use std::io::Write;
65
66use thiserror::Error;
67
68use crate::proto::redis::ft_filter::{self, FilterExpr};
69use crate::vector::registry::{VectorRegistry, VectorTable};
70use crate::vector::schema::{
71    DistanceMetric, IndexAlgorithm, MetadataField, MetadataFieldType, VectorSchema, VectorType,
72};
73
74/// Failure modes for the FT.* surface.
75///
76/// Each variant is mapped to a `-ERR ...` line by
77/// [`render_error`]. The outward-facing wire shape is plain
78/// RESP2 simple errors today; richer error replies (the
79/// RediSearch `-WRONGTYPE` / `-NOINDEX` family) are a follow
80/// up.
81#[derive(Debug, Error)]
82#[non_exhaustive]
83pub enum FtError {
84    /// The first token is not a known `FT.*` keyword.
85    #[error("unknown command: {0}")]
86    UnknownCommand(String),
87    /// The argument count or shape did not match the grammar.
88    #[error("syntax error: {0}")]
89    Syntax(String),
90    /// The grammar shape is recognised but lands on a feature
91    /// we do not implement in this build.
92    #[error("not supported in this build: {0}")]
93    Unsupported(String),
94    /// No index registered under that name.
95    #[error("index not found: {0}")]
96    NotFound(String),
97    /// An index already exists under that name.
98    #[error("index already exists: {0}")]
99    AlreadyExists(String),
100    /// The vector dimension on a write or query does not match
101    /// the index dimension.
102    #[error("dimension mismatch: index={index_dim}, payload={payload_dim}")]
103    DimensionMismatch {
104        /// Index dimension.
105        index_dim: usize,
106        /// Payload dimension.
107        payload_dim: usize,
108    },
109    /// Engine-level failure forwarded as a string.
110    #[error("engine: {0}")]
111    Engine(String),
112}
113
114/// Parsed FT.* command, ready for execution.
115#[derive(Clone, Debug, PartialEq)]
116pub enum FtCommand {
117    /// `FT.CREATE` parsed payload.
118    Create(CreateRequest),
119    /// `FT.SEARCH` k-NN form: `*=>[KNN k @field $param]`,
120    /// optionally pre-filtered by a filter expression on the
121    /// LHS of the `=>` operator.
122    Search(SearchRequest),
123    /// `FT.SEARCH` substring form: `@field:<substring>`.
124    /// Preserved as a fast-path for the trivial single-field
125    /// shape; richer expressions land on
126    /// [`Self::SearchFilter`].
127    SearchText(SearchTextRequest),
128    /// `FT.SEARCH` filter-expression form (numeric ranges,
129    /// tag sets, boolean combinators) without a KNN clause.
130    SearchFilter(SearchFilterRequest),
131    /// `FT.AGGREGATE` GROUPBY pipeline.
132    Aggregate(AggregateRequest),
133    /// `FT.EXPLAIN <idx> <query>`.
134    Explain(ExplainRequest),
135    /// `FT.ALTER <idx> ADD <field> <type>`.
136    Alter(AlterRequest),
137    /// `FT.REGEX` (Dynomite extension): exact or approximate
138    /// regex over a `TEXT`-indexed field.
139    Regex(RegexRequest),
140    /// `FT.INFO <name>`.
141    Info {
142        /// Index name.
143        name: String,
144    },
145    /// `FT.LIST` / `FT._LIST`.
146    List,
147    /// `FT.DROPINDEX <name> [DD]`.
148    DropIndex {
149        /// Index name.
150        name: String,
151        /// True when the `DD` keyword was present.
152        delete_documents: bool,
153    },
154}
155
156/// Document-source family. Today only `HASH` is recognised.
157#[derive(Clone, Copy, Debug, Eq, PartialEq)]
158pub enum DocType {
159    /// Hash documents (`ON HASH`).
160    Hash,
161}
162
163/// Parsed `FT.CREATE` request.
164#[derive(Clone, Debug, PartialEq)]
165pub struct CreateRequest {
166    /// Index name.
167    pub name: String,
168    /// Document family.
169    pub doc_type: DocType,
170    /// Compiled schema (prefixes + vector + metadata fields).
171    pub schema: VectorSchema,
172}
173
174/// Sort direction for the `SORTBY` clause of `FT.SEARCH`.
175#[derive(Clone, Copy, Debug, Eq, PartialEq)]
176pub enum SortDirection {
177    /// Ascending order (smallest first).
178    Asc,
179    /// Descending order (largest first).
180    Desc,
181}
182
183/// Parsed `FT.SEARCH` request restricted to the
184/// `<filter>=>[KNN k @field $param]` form (where the LHS
185/// `<filter>` is either `*` or an [`FilterExpr`] describing
186/// a pre-filter over the candidate set). Carries the optional
187/// projection clauses (`RETURN` / `LIMIT` / `SORTBY` /
188/// `NOCONTENT`) so callers can pre-flight them at parse
189/// time and the executor can apply them to the result set
190/// before rendering.
191#[derive(Clone, Debug, PartialEq)]
192pub struct SearchRequest {
193    /// Index name.
194    pub name: String,
195    /// Number of results to return.
196    pub k: usize,
197    /// Vector field referenced in the query expression.
198    pub vector_field: String,
199    /// Raw query vector bytes (little-endian f32 stream).
200    pub vector_bytes: Vec<u8>,
201    /// Optional pre-filter expression. When `None` the query
202    /// matches every indexed document (the legacy `*=>[KNN]`
203    /// shape). When `Some(_)` the filter is applied to the
204    /// candidate set first; the KNN ranker only sees the
205    /// surviving rows.
206    pub filter: Option<FilterExpr>,
207    /// Optional `RETURN N field1 ... fieldN` projection. When
208    /// `Some(list)` only those fields appear on each hit;
209    /// when `None` every stored metadata field is emitted
210    /// alongside the implicit `__vec_score`.
211    pub return_fields: Option<Vec<String>>,
212    /// Optional `LIMIT offset count` pagination window.
213    /// `None` means "return up to `k` hits starting at 0".
214    pub limit: Option<(usize, usize)>,
215    /// Optional `SORTBY @field [ASC|DESC]` ordering. `None`
216    /// preserves the engine's distance order.
217    pub sortby: Option<(String, SortDirection)>,
218    /// True when the client asked for `NOCONTENT`: hits are
219    /// emitted as bare doc ids with no field arrays.
220    pub nocontent: bool,
221}
222
223/// Parsed `FT.SEARCH` request for the filter-expression
224/// form (no KNN clause). The expression may combine numeric
225/// ranges, tag sets, text substrings, and boolean operators;
226/// see [`super::ft_filter`] for the grammar.
227#[derive(Clone, Debug, PartialEq)]
228pub struct SearchFilterRequest {
229    /// Index name.
230    pub name: String,
231    /// Filter expression. Evaluated against the index's
232    /// observed key set.
233    pub filter: FilterExpr,
234    /// Optional `RETURN` projection (see [`SearchRequest`]).
235    pub return_fields: Option<Vec<String>>,
236    /// Optional `LIMIT offset count` window.
237    pub limit: Option<(usize, usize)>,
238    /// Optional `SORTBY` ordering.
239    pub sortby: Option<(String, SortDirection)>,
240    /// True when the client asked for `NOCONTENT`.
241    pub nocontent: bool,
242}
243
244/// Parsed `FT.SEARCH` request for the text-substring form
245/// `@field:<substring>`. The substring is matched against the
246/// TEXT field's trigram + bloom inverted index. Carries the
247/// same projection clauses as the k-NN variant.
248#[derive(Clone, Debug, PartialEq)]
249pub struct SearchTextRequest {
250    /// Index name.
251    pub name: String,
252    /// `TEXT` schema field referenced in the query.
253    pub field: String,
254    /// Raw substring bytes to search for.
255    pub query: Vec<u8>,
256    /// Optional `RETURN` projection (see [`SearchRequest`]).
257    pub return_fields: Option<Vec<String>>,
258    /// Optional `LIMIT offset count` window.
259    pub limit: Option<(usize, usize)>,
260    /// Optional `SORTBY` ordering.
261    pub sortby: Option<(String, SortDirection)>,
262    /// True when the client asked for `NOCONTENT`.
263    pub nocontent: bool,
264}
265
266/// Reducer kind on an `FT.AGGREGATE` `REDUCE` clause.
267#[derive(Clone, Debug, PartialEq)]
268pub enum ReducerKind {
269    /// `REDUCE COUNT 0 AS <name>` -- per-group row count.
270    Count,
271    /// `REDUCE SUM 1 @<field> AS <name>` -- per-group sum of
272    /// the named numeric field.
273    Sum {
274        /// Field name (without the leading `@`).
275        field: String,
276    },
277    /// `REDUCE AVG 1 @<field> AS <name>` -- per-group mean of
278    /// the named numeric field.
279    Avg {
280        /// Field name (without the leading `@`).
281        field: String,
282    },
283}
284
285/// One reducer in an `FT.AGGREGATE` pipeline.
286#[derive(Clone, Debug, PartialEq)]
287pub struct ReducerSpec {
288    /// Reducer kind (`COUNT` / `SUM` / `AVG`).
289    pub kind: ReducerKind,
290    /// Output column name (the `AS <name>` token).
291    pub alias: String,
292}
293
294/// Parsed `FT.AGGREGATE` request.
295#[derive(Clone, Debug, PartialEq)]
296pub struct AggregateRequest {
297    /// Index name.
298    pub name: String,
299    /// Group-by fields (the `@<field>` tokens). At least
300    /// one is required by this build's subset.
301    pub group_by: Vec<String>,
302    /// Pipeline of reducers, in declaration order.
303    pub reducers: Vec<ReducerSpec>,
304    /// Optional `LIMIT offset count` window applied after
305    /// grouping.
306    pub limit: Option<(usize, usize)>,
307}
308
309/// Parsed `FT.EXPLAIN` request.
310#[derive(Clone, Debug, PartialEq)]
311pub struct ExplainRequest {
312    /// Index name.
313    pub name: String,
314    /// The raw query expression to explain.
315    pub query: Vec<u8>,
316}
317
318/// Parsed `FT.ALTER ADD <field> <type>` request. Only ADD is
319/// supported in this build; DROP and SCHEMA REPLACE are
320/// rejected at parse time.
321#[derive(Clone, Debug, PartialEq)]
322pub struct AlterRequest {
323    /// Index name.
324    pub name: String,
325    /// Field name to add.
326    pub field: String,
327    /// New field type. Limited to TEXT and TAG; VECTOR is
328    /// explicitly rejected at parse time.
329    pub field_type: MetadataFieldType,
330}
331
332/// Parsed `FT.REGEX` request. `max_errors` of `0` selects the
333/// exact-regex path; values `>= 1` route through the TRE
334/// approximate-regex matcher tolerating up to `max_errors`
335/// edit operations.
336#[derive(Clone, Debug, PartialEq)]
337pub struct RegexRequest {
338    /// Index name.
339    pub name: String,
340    /// `TEXT` schema field referenced in the query.
341    pub field: String,
342    /// Regular expression in POSIX-extended syntax.
343    pub pattern: String,
344    /// Maximum number of allowed edit operations.
345    pub max_errors: u16,
346}
347
348/// Outcome of an FT.* execution; the dispatch layer turns this
349/// into RESP bytes via [`render_outcome`]. Tests can inspect
350/// the structured result directly.
351#[derive(Clone, Debug, PartialEq)]
352pub enum FtOutcome {
353    /// Simple status string (`+OK`).
354    Ok,
355    /// FT.LIST array of names.
356    List(Vec<String>),
357    /// FT.INFO key/value pairs.
358    Info(Vec<(String, InfoValue)>),
359    /// FT.SEARCH result set: total count plus per-doc
360    /// (id, score) pairs.
361    Search {
362        /// Total number of matches returned.
363        total: usize,
364        /// Per-document (id, score) hits, sorted closest-first.
365        hits: Vec<SearchHit>,
366    },
367    /// FT.SEARCH result set when the client passed
368    /// `NOCONTENT`: only the matching doc ids are emitted,
369    /// without per-hit metadata arrays.
370    SearchNoContent {
371        /// Total number of matches returned.
372        total: usize,
373        /// Doc keys, in the same order they would appear in
374        /// the corresponding [`Self::Search`] response.
375        doc_ids: Vec<Vec<u8>>,
376    },
377    /// FT.AGGREGATE result set: total group count plus one
378    /// row of `(name, value)` pairs per group.
379    Aggregate {
380        /// Number of groups produced (before `LIMIT`).
381        total_groups: usize,
382        /// One row per group; each row is a flat list of
383        /// `(name, bytes)` pairs that the renderer emits as
384        /// a RESP array of bulk strings.
385        rows: Vec<Vec<(String, Vec<u8>)>>,
386    },
387    /// FT.EXPLAIN textual query plan.
388    Explain(String),
389    /// `+OK` with side note: drop also affected `<n>`
390    /// underlying documents.
391    DropOk {
392        /// True when `DD` was present (and the underlying keys
393        /// were enumerated).
394        deleted_documents: bool,
395        /// Number of underlying keys reported.
396        document_count: usize,
397    },
398}
399
400/// One hit in an FT.SEARCH response.
401#[derive(Clone, Debug, PartialEq)]
402pub struct SearchHit {
403    /// Document key.
404    pub doc_id: Vec<u8>,
405    /// Distance score (smaller is closer).
406    pub score: f32,
407    /// Selected metadata fields. Today the surface always
408    /// emits the score under `__vec_score` plus every metadata
409    /// field stored on the row.
410    pub fields: Vec<(String, Vec<u8>)>,
411}
412
413/// Untyped FT.INFO value; mirrors the RESP shapes RediSearch
414/// emits.
415#[derive(Clone, Debug, PartialEq)]
416pub enum InfoValue {
417    /// Bulk string.
418    String(String),
419    /// Integer.
420    Integer(i64),
421    /// Nested array.
422    Array(Vec<InfoValue>),
423}
424
425/// Parse a complete FT.* command (the keyword plus its args).
426///
427/// `args[0]` is the FT.* keyword (e.g. `FT.CREATE`). The rest
428/// is the command body. Case is normalised to uppercase ASCII
429/// for the keyword and per-clause tokens.
430///
431/// # Errors
432///
433/// [`FtError::UnknownCommand`] when `args[0]` is empty or not
434/// an FT.* keyword; [`FtError::Syntax`] /
435/// [`FtError::Unsupported`] for grammar issues.
436pub fn parse_command(args: &[&[u8]]) -> Result<FtCommand, FtError> {
437    let head = args
438        .first()
439        .ok_or_else(|| FtError::UnknownCommand(String::new()))?;
440    let cmd = ascii_upper(head);
441    let rest = &args[1..];
442    match cmd.as_slice() {
443        b"FT.CREATE" => parse_create(rest).map(FtCommand::Create),
444        b"FT.SEARCH" => parse_search(rest),
445        b"FT.AGGREGATE" => parse_aggregate(rest).map(FtCommand::Aggregate),
446        b"FT.EXPLAIN" => parse_explain(rest).map(FtCommand::Explain),
447        b"FT.ALTER" => parse_alter(rest).map(FtCommand::Alter),
448        b"FT.REGEX" => parse_regex(rest).map(FtCommand::Regex),
449        b"FT.INFO" => parse_info(rest),
450        b"FT.LIST" | b"FT._LIST" => parse_list(rest),
451        b"FT.DROPINDEX" => parse_dropindex(rest),
452        other => {
453            // Unknown FT.* keywords surface with the
454            // `not supported in this build` wording so the
455            // dispatcher can use the same shared response
456            // shape for genuinely-unimplemented commands.
457            // Non-FT.* keywords still yield UnknownCommand;
458            // the dispatcher only routes FT.* through here.
459            if other.starts_with(b"FT.") {
460                Err(FtError::Unsupported(
461                    String::from_utf8_lossy(other).into_owned(),
462                ))
463            } else {
464                Err(FtError::UnknownCommand(
465                    String::from_utf8_lossy(other).into_owned(),
466                ))
467            }
468        }
469    }
470}
471
472/// Execute a parsed [`FtCommand`] against `registry`.
473///
474/// # Errors
475///
476/// Surfaces [`RegistryError`](crate::vector::registry::RegistryError)
477/// translated into [`FtError`] for already-exists / not-found
478/// outcomes, or [`FtError::Engine`] for engine-side failures.
479pub fn execute(registry: &VectorRegistry, cmd: FtCommand) -> Result<FtOutcome, FtError> {
480    match cmd {
481        FtCommand::Create(req) => execute_create(registry, req),
482        FtCommand::Search(req) => execute_search(registry, &req),
483        FtCommand::SearchText(req) => execute_search_text(registry, &req),
484        FtCommand::SearchFilter(req) => execute_search_filter(registry, &req),
485        FtCommand::Aggregate(req) => execute_aggregate(registry, &req),
486        FtCommand::Explain(req) => execute_explain(registry, &req),
487        FtCommand::Alter(req) => execute_alter(registry, &req),
488        FtCommand::Regex(req) => execute_regex(registry, &req),
489        FtCommand::Info { name } => execute_info(registry, name),
490        FtCommand::List => Ok(FtOutcome::List(registry.list())),
491        FtCommand::DropIndex {
492            name,
493            delete_documents,
494        } => execute_dropindex(registry, name, delete_documents),
495    }
496}
497
498/// Parse and execute `args` in one call, returning the RESP
499/// bytes the wire wants to see.
500///
501/// This is the single public entry point the dispatcher reaches
502/// for after recognising an FT.* keyword. Callers that want
503/// structured results should call [`parse_command`] and
504/// [`execute`] separately.
505#[must_use]
506pub fn dispatch(registry: &VectorRegistry, args: &[&[u8]]) -> Vec<u8> {
507    match parse_command(args) {
508        Ok(cmd) => match execute(registry, cmd) {
509            Ok(outcome) => render_outcome(&outcome),
510            Err(err) => render_error(&err),
511        },
512        Err(err) => render_error(&err),
513    }
514}
515
516/// Inspect an HSET argument list and, if its key matches a
517/// registered prefix, route it into the corresponding index.
518///
519/// `args` is the raw HSET argument vector after the command
520/// keyword: `[key, f1, v1, f2, v2, ...]`. Returns the name of
521/// the index that absorbed the write, or `None` when no
522/// registered prefix matched.
523///
524/// # Errors
525///
526/// [`FtError::Syntax`] for malformed HSET argument lists,
527/// [`FtError::DimensionMismatch`] when the vector field does
528/// not carry the index's frozen dimension, and
529/// [`FtError::Engine`] when the engine refuses the upsert.
530pub fn maybe_index_hset(
531    registry: &VectorRegistry,
532    args: &[&[u8]],
533) -> Result<Option<String>, FtError> {
534    if args.is_empty() {
535        return Err(FtError::Syntax("HSET requires a key".to_string()));
536    }
537    let key = args[0];
538    let pairs = &args[1..];
539    if pairs.is_empty() || !pairs.len().is_multiple_of(2) {
540        return Err(FtError::Syntax(
541            "HSET requires field/value pairs".to_string(),
542        ));
543    }
544    for name in registry.list() {
545        let Some(table) = registry.get(&name) else {
546            continue;
547        };
548        if table.schema.prefixes.iter().any(|p| key.starts_with(p)) {
549            insert_into_index(&table, key, pairs)?;
550            return Ok(Some(name));
551        }
552    }
553    Ok(None)
554}
555
556// ---- FT.CREATE ----------------------------------------------------------
557
558fn parse_create(rest: &[&[u8]]) -> Result<CreateRequest, FtError> {
559    // Layout: <name> ON <doc-type> [PREFIX <n> <p>...] SCHEMA <field>+
560    let mut it = TokenCursor::new(rest);
561    let name = it.next_string("FT.CREATE: missing index name")?;
562
563    expect_keyword(it.next_required("FT.CREATE: expected ON")?, "ON")?;
564    let doc_type_tok = it.next_required("FT.CREATE: expected doc type")?;
565    let doc_type_up = ascii_upper(doc_type_tok);
566    let doc_type = match doc_type_up.as_slice() {
567        b"HASH" => DocType::Hash,
568        _ => {
569            return Err(FtError::Unsupported(format!(
570                "FT.CREATE doc type {}",
571                String::from_utf8_lossy(doc_type_tok)
572            )));
573        }
574    };
575
576    // PREFIX clause.
577    let mut prefixes: Vec<Vec<u8>> = Vec::new();
578    if matches_keyword(it.peek(), "PREFIX") {
579        it.advance();
580        let n_tok = it.next_required("FT.CREATE: PREFIX expects a count")?;
581        let n = parse_unsigned(n_tok, "FT.CREATE: PREFIX count")?;
582        for _ in 0..n {
583            let p = it.next_required("FT.CREATE: missing PREFIX value")?;
584            prefixes.push(p.to_vec());
585        }
586    }
587    if prefixes.is_empty() {
588        return Err(FtError::Syntax(
589            "FT.CREATE requires at least one PREFIX value".to_string(),
590        ));
591    }
592
593    expect_keyword(it.next_required("FT.CREATE: expected SCHEMA")?, "SCHEMA")?;
594
595    let (vector_field, metadata_fields) = parse_create_schema_body(&mut it)?;
596    let (vec_name, vec_type, dim, distance, algorithm) = vector_field.ok_or_else(|| {
597        FtError::Syntax("FT.CREATE: SCHEMA must declare a VECTOR field".to_string())
598    })?;
599
600    let schema = VectorSchema {
601        vector_field: vec_name,
602        vector_type: vec_type,
603        dim,
604        distance,
605        algorithm,
606        prefixes,
607        metadata_fields,
608    };
609    Ok(CreateRequest {
610        name,
611        doc_type,
612        schema,
613    })
614}
615
616/// Tuple of compiled VECTOR clause attributes returned by
617/// the FT.CREATE schema body parser. Carries the field name
618/// alongside the codec and metric selectors so the outer
619/// parser can assemble a [`VectorSchema`] without unpacking
620/// nested `Option`s twice.
621type CreateVectorClause = (String, VectorType, u16, DistanceMetric, IndexAlgorithm);
622
623/// Walk the SCHEMA body of an FT.CREATE invocation,
624/// accumulating the metadata field set and at most one
625/// VECTOR clause. The cursor must be positioned just after
626/// the `SCHEMA` keyword; on return it is at the end of the
627/// argument vector.
628fn parse_create_schema_body(
629    it: &mut TokenCursor<'_>,
630) -> Result<(Option<CreateVectorClause>, Vec<MetadataField>), FtError> {
631    let mut vector_field: Option<CreateVectorClause> = None;
632    let mut metadata_fields: Vec<MetadataField> = Vec::new();
633    while let Some(field_tok) = it.next() {
634        let field_name = utf8(field_tok, "FT.CREATE: field name")?;
635        let kind_tok = it.next_required("FT.CREATE: missing field kind")?;
636        let kind_up = ascii_upper(kind_tok);
637        match kind_up.as_slice() {
638            b"TEXT" => {
639                consume_field_modifiers(it);
640                metadata_fields.push(MetadataField {
641                    name: field_name,
642                    field_type: MetadataFieldType::Text,
643                    tag_separator: None,
644                });
645            }
646            b"NUMERIC" => {
647                consume_field_modifiers(it);
648                metadata_fields.push(MetadataField {
649                    name: field_name,
650                    field_type: MetadataFieldType::Numeric,
651                    tag_separator: None,
652                });
653            }
654            b"TAG" => {
655                let separator = parse_tag_modifiers(it, &field_name)?;
656                metadata_fields.push(MetadataField {
657                    name: field_name,
658                    field_type: MetadataFieldType::Tag,
659                    tag_separator: separator,
660                });
661            }
662            b"GEO" => {
663                consume_field_modifiers(it);
664                metadata_fields.push(MetadataField {
665                    name: field_name,
666                    field_type: MetadataFieldType::Geo,
667                    tag_separator: None,
668                });
669            }
670            b"VECTOR" => {
671                if vector_field.is_some() {
672                    return Err(FtError::Unsupported(
673                        "multiple VECTOR fields per index".to_string(),
674                    ));
675                }
676                let parsed = parse_vector_clause(it)?;
677                vector_field = Some((field_name, parsed.0, parsed.1, parsed.2, parsed.3));
678            }
679            other => {
680                return Err(FtError::Unsupported(format!(
681                    "FT.CREATE field kind {}",
682                    String::from_utf8_lossy(other)
683                )));
684            }
685        }
686    }
687    Ok((vector_field, metadata_fields))
688}
689
690/// Consume RediSearch per-field modifiers that this build
691/// recognises but does not act on (`SORTABLE`, `NOINDEX`,
692/// `UNF`, `WEIGHT <n>`, `PHONETIC <m>`). The cursor is
693/// advanced past every recognised modifier; the first
694/// unrecognised token (typically the next field name) is
695/// left in place for the outer loop.
696fn consume_field_modifiers(it: &mut TokenCursor<'_>) {
697    while let Some(tok) = it.peek() {
698        let up = ascii_upper(tok);
699        match up.as_slice() {
700            b"SORTABLE" | b"NOINDEX" | b"UNF" | b"CASESENSITIVE" | b"NOSTEM" => {
701                it.advance();
702            }
703            b"WEIGHT" | b"PHONETIC" => {
704                it.advance();
705                // Each takes a single value; if it is
706                // missing we let the outer loop surface a
707                // "missing field kind" error against the
708                // field that should have followed.
709                if it.peek().is_some() {
710                    it.advance();
711                }
712            }
713            _ => break,
714        }
715    }
716}
717
718/// Parse the optional `SEPARATOR <c>` modifier on a `TAG`
719/// field, plus any of the modifiers handled by
720/// [`consume_field_modifiers`]. Returns the configured
721/// separator byte (or `None` for the RediSearch default of
722/// `,`).
723fn parse_tag_modifiers(it: &mut TokenCursor<'_>, field_name: &str) -> Result<Option<u8>, FtError> {
724    let mut separator: Option<u8> = None;
725    while let Some(tok) = it.peek() {
726        let up = ascii_upper(tok);
727        match up.as_slice() {
728            b"SEPARATOR" => {
729                it.advance();
730                let sep_tok =
731                    it.next_required("FT.CREATE: TAG SEPARATOR expects a single-character value")?;
732                if sep_tok.len() != 1 {
733                    return Err(FtError::Syntax(format!(
734                        "FT.CREATE: TAG SEPARATOR for field {field_name} must be a single ASCII byte",
735                    )));
736                }
737                separator = Some(sep_tok[0]);
738            }
739            b"SORTABLE" | b"NOINDEX" | b"UNF" | b"CASESENSITIVE" => {
740                it.advance();
741            }
742            _ => break,
743        }
744    }
745    Ok(separator)
746}
747
748fn parse_vector_clause(
749    it: &mut TokenCursor<'_>,
750) -> Result<(VectorType, u16, DistanceMetric, IndexAlgorithm), FtError> {
751    let alg_tok = it.next_required("FT.CREATE: VECTOR missing algorithm")?;
752    let alg_up = ascii_upper(alg_tok);
753    let algorithm = match alg_up.as_slice() {
754        b"HNSW" => IndexAlgorithm::Hnsw,
755        b"FLAT" => {
756            return Err(FtError::Unsupported(
757                "FT.CREATE: FLAT vector index not supported in this build".to_string(),
758            ));
759        }
760        other => {
761            return Err(FtError::Unsupported(format!(
762                "FT.CREATE VECTOR algorithm {}",
763                String::from_utf8_lossy(other)
764            )));
765        }
766    };
767    let pair_count_tok = it.next_required("FT.CREATE: VECTOR missing parameter count")?;
768    let pair_count = parse_unsigned(pair_count_tok, "FT.CREATE VECTOR parameter count")?;
769    if !pair_count.is_multiple_of(2) {
770        return Err(FtError::Syntax(
771            "FT.CREATE VECTOR parameter count must be even".to_string(),
772        ));
773    }
774    let mut vec_type: Option<VectorType> = None;
775    let mut dim: Option<u16> = None;
776    let mut distance: Option<DistanceMetric> = None;
777    let pair_pairs = pair_count / 2;
778    for _ in 0..pair_pairs {
779        let key_tok = it.next_required("FT.CREATE: VECTOR missing parameter key")?;
780        let val_tok = it.next_required("FT.CREATE: VECTOR missing parameter value")?;
781        let key_up = ascii_upper(key_tok);
782        let val_up = ascii_upper(val_tok);
783        match key_up.as_slice() {
784            b"TYPE" => match val_up.as_slice() {
785                b"FLOAT32" => vec_type = Some(VectorType::Float32),
786                b"FLOAT16" => {
787                    return Err(FtError::Unsupported(
788                        "FT.CREATE VECTOR TYPE FLOAT16 not supported in this build".to_string(),
789                    ));
790                }
791                other => {
792                    return Err(FtError::Unsupported(format!(
793                        "FT.CREATE VECTOR TYPE {}",
794                        String::from_utf8_lossy(other)
795                    )));
796                }
797            },
798            b"DIM" => {
799                let d = parse_unsigned(val_tok, "FT.CREATE VECTOR DIM")?;
800                if d == 0 || d > usize::from(u16::MAX) {
801                    return Err(FtError::Syntax(
802                        "FT.CREATE VECTOR DIM out of range".to_string(),
803                    ));
804                }
805                dim = Some(u16::try_from(d).expect("dim fits u16"));
806            }
807            b"DISTANCE_METRIC" => {
808                distance = Some(match val_up.as_slice() {
809                    b"COSINE" => DistanceMetric::Cosine,
810                    b"L2" | b"EUCLIDEAN" => DistanceMetric::L2,
811                    b"IP" | b"DOTPRODUCT" | b"DOT_PRODUCT" => DistanceMetric::InnerProduct,
812                    other => {
813                        return Err(FtError::Unsupported(format!(
814                            "FT.CREATE DISTANCE_METRIC {}",
815                            String::from_utf8_lossy(other)
816                        )));
817                    }
818                });
819            }
820            // Other RediSearch knobs (M, EF_CONSTRUCTION, ...).
821            // Phase C ignores them (the engine uses defaults);
822            // a future phase will plug them into HnswParams.
823            _ => {}
824        }
825    }
826    let vec_type =
827        vec_type.ok_or_else(|| FtError::Syntax("FT.CREATE VECTOR missing TYPE".to_string()))?;
828    let dim = dim.ok_or_else(|| FtError::Syntax("FT.CREATE VECTOR missing DIM".to_string()))?;
829    let distance = distance
830        .ok_or_else(|| FtError::Syntax("FT.CREATE VECTOR missing DISTANCE_METRIC".to_string()))?;
831    Ok((vec_type, dim, distance, algorithm))
832}
833
834fn execute_create(registry: &VectorRegistry, req: CreateRequest) -> Result<FtOutcome, FtError> {
835    use crate::vector::registry::RegistryError;
836    match registry.create(req.name.clone(), req.schema) {
837        Ok(()) => Ok(FtOutcome::Ok),
838        Err(RegistryError::AlreadyExists(name)) => Err(FtError::AlreadyExists(name)),
839        Err(RegistryError::UnsupportedAlgorithm(_)) => Err(FtError::Unsupported(
840            "FT.CREATE: unsupported VECTOR algorithm".to_string(),
841        )),
842        Err(other) => Err(FtError::Engine(other.to_string())),
843    }
844}
845
846// ---- FT.SEARCH ----------------------------------------------------------
847
848/// Top-level FT.SEARCH parser. Routes between three
849/// shapes based on the body of the query expression:
850///
851/// * `<filter>=>[KNN k @field $param]` -> [`FtCommand::Search`]
852///   (the LHS may be `*` for the legacy match-all form, or
853///   any [`FilterExpr`] from [`super::ft_filter`]).
854/// * `@field:<word>` (no boolean operators) ->
855///   [`FtCommand::SearchText`] (preserves the existing
856///   trigram fast-path).
857/// * Any other filter expression -> [`FtCommand::SearchFilter`].
858fn parse_search(rest: &[&[u8]]) -> Result<FtCommand, FtError> {
859    let mut it = TokenCursor::new(rest);
860    let name = it.next_string("FT.SEARCH: missing index name")?;
861    let query = it.next_required("FT.SEARCH: missing query expression")?;
862
863    // Step 1: split off any `=>[KNN ...]` suffix.
864    let (filter_part, knn_part) = split_knn_suffix(query)?;
865
866    if let Some(knn_bytes) = knn_part {
867        // KNN form, optionally pre-filtered. Parse the LHS
868        // as either `*` (legacy match-all) or a filter
869        // expression.
870        let filter = parse_lhs_filter(filter_part)?;
871        let (k, vec_field, param_name) = parse_knn_clause(knn_bytes)?;
872        let mut params: HashMap<Vec<u8>, Vec<u8>> = HashMap::new();
873        let mut opts = SearchClauseOptions::default();
874        consume_search_trailing_clauses_with_params(&mut it, &mut params, &mut opts)?;
875        let vector_bytes = params
876            .remove(param_name.as_bytes())
877            .ok_or_else(|| FtError::Syntax(format!("FT.SEARCH: PARAMS missing ${param_name}")))?;
878        return Ok(FtCommand::Search(SearchRequest {
879            name,
880            k,
881            vector_field: vec_field,
882            vector_bytes,
883            filter,
884            return_fields: opts.return_fields,
885            limit: opts.limit,
886            sortby: opts.sortby,
887            nocontent: opts.nocontent,
888        }));
889    }
890
891    // No KNN clause. Try the legacy single-field text
892    // substring fast-path first; otherwise fall through to
893    // the generic filter executor.
894    if let Some(parsed) = try_parse_simple_text_field_query(filter_part)? {
895        let (field, substring) = parsed;
896        let mut opts = SearchClauseOptions::default();
897        consume_search_trailing_clauses(&mut it, false, &mut opts)?;
898        return Ok(FtCommand::SearchText(SearchTextRequest {
899            name,
900            field,
901            query: substring,
902            return_fields: opts.return_fields,
903            limit: opts.limit,
904            sortby: opts.sortby,
905            nocontent: opts.nocontent,
906        }));
907    }
908
909    let filter = ft_filter::parse_expr(filter_part)?;
910    let mut opts = SearchClauseOptions::default();
911    consume_search_trailing_clauses(&mut it, false, &mut opts)?;
912    Ok(FtCommand::SearchFilter(SearchFilterRequest {
913        name,
914        filter,
915        return_fields: opts.return_fields,
916        limit: opts.limit,
917        sortby: opts.sortby,
918        nocontent: opts.nocontent,
919    }))
920}
921
922/// Split a query body into `(filter_part, Some(knn_inner))`
923/// when a `=>[KNN ...]` suffix is present, or
924/// `(filter_part, None)` otherwise. The `knn_inner` slice
925/// excludes the surrounding `[]`.
926fn split_knn_suffix(query: &[u8]) -> Result<(&[u8], Option<&[u8]>), FtError> {
927    let trimmed = trim_ascii_bytes(query);
928    let Some(arrow) = find_byte_subseq(trimmed, b"=>") else {
929        return Ok((trimmed, None));
930    };
931    let lhs = trim_ascii_bytes(&trimmed[..arrow]);
932    let rhs = trim_ascii_bytes(&trimmed[arrow + 2..]);
933    if !rhs.starts_with(b"[") || !rhs.ends_with(b"]") {
934        return Err(FtError::Syntax(
935            "FT.SEARCH query: expected '[KNN ...]' after '=>'".to_string(),
936        ));
937    }
938    let inner = &rhs[1..rhs.len() - 1];
939    Ok((lhs, Some(inner)))
940}
941
942/// Parse the LHS of a `=>[KNN ...]` query into either
943/// `None` (the literal `*` match-all form) or `Some(expr)`
944/// (any other filter shape).
945fn parse_lhs_filter(lhs: &[u8]) -> Result<Option<FilterExpr>, FtError> {
946    let trimmed = trim_ascii_bytes(lhs);
947    if trimmed.is_empty() || trimmed == b"*" {
948        return Ok(None);
949    }
950    let expr = ft_filter::parse_expr(trimmed)?;
951    if matches!(expr, FilterExpr::All) {
952        Ok(None)
953    } else {
954        Ok(Some(expr))
955    }
956}
957
958fn trim_ascii_bytes(s: &[u8]) -> &[u8] {
959    let mut start = 0;
960    let mut end = s.len();
961    while start < end && s[start].is_ascii_whitespace() {
962        start += 1;
963    }
964    while end > start && s[end - 1].is_ascii_whitespace() {
965        end -= 1;
966    }
967    &s[start..end]
968}
969
970/// Try to interpret the LHS as the legacy single-field
971/// substring query `@field:<bytes>` (a single token, no
972/// brackets, braces, parens, or boolean operators). The
973/// fast-path preserves the trigram-index path for the
974/// existing wire tests.
975fn try_parse_simple_text_field_query(lhs: &[u8]) -> Result<Option<(String, Vec<u8>)>, FtError> {
976    let trimmed = trim_ascii_bytes(lhs);
977    if trimmed.is_empty() || trimmed[0] != b'@' {
978        return Ok(None);
979    }
980    // Reject any byte that would change parser dispatch:
981    // the simple form is exactly `@<ident>:<value>` with no
982    // syntactic controls.
983    for &b in trimmed {
984        if matches!(b, b'(' | b')' | b'[' | b']' | b'{' | b'}' | b'|' | b'"') {
985            return Ok(None);
986        }
987        if b.is_ascii_whitespace() {
988            return Ok(None);
989        }
990    }
991    let body = &trimmed[1..];
992    let colon = body
993        .iter()
994        .position(|&b| b == b':')
995        .ok_or_else(|| FtError::Syntax("FT.SEARCH text query: missing ':'".to_string()))?;
996    let field_bytes = &body[..colon];
997    let substring = &body[colon + 1..];
998    if field_bytes.is_empty() {
999        return Err(FtError::Syntax(
1000            "FT.SEARCH text query: empty field name".to_string(),
1001        ));
1002    }
1003    if substring.is_empty() {
1004        return Ok(None);
1005    }
1006    let field = std::str::from_utf8(field_bytes)
1007        .map(str::to_string)
1008        .map_err(|_| FtError::Syntax("FT.SEARCH text query: field is not UTF-8".to_string()))?;
1009    Ok(Some((field, substring.to_vec())))
1010}
1011
1012/// Parse the `KNN k @field $param` body of a `*=>[KNN ...]`
1013/// clause (without the surrounding brackets).
1014fn parse_knn_clause(body: &[u8]) -> Result<(usize, String, String), FtError> {
1015    let s = std::str::from_utf8(body)
1016        .map_err(|_| FtError::Syntax("FT.SEARCH KNN clause is not UTF-8".to_string()))?;
1017    let mut parts = s.split_ascii_whitespace();
1018    let knn_kw = parts
1019        .next()
1020        .ok_or_else(|| FtError::Syntax("FT.SEARCH query: empty KNN clause".to_string()))?;
1021    if !knn_kw.eq_ignore_ascii_case("KNN") {
1022        return Err(FtError::Unsupported(format!(
1023            "FT.SEARCH query operator: {knn_kw}"
1024        )));
1025    }
1026    let k_str = parts
1027        .next()
1028        .ok_or_else(|| FtError::Syntax("FT.SEARCH query: KNN expects k".to_string()))?;
1029    let k: usize = k_str
1030        .parse()
1031        .map_err(|_| FtError::Syntax(format!("FT.SEARCH query: invalid k {k_str}")))?;
1032    let field_tok = parts
1033        .next()
1034        .ok_or_else(|| FtError::Syntax("FT.SEARCH query: KNN expects @field".to_string()))?;
1035    let field = field_tok
1036        .strip_prefix('@')
1037        .ok_or_else(|| FtError::Syntax("FT.SEARCH query: field must start with @".to_string()))?;
1038    let param_tok = parts
1039        .next()
1040        .ok_or_else(|| FtError::Syntax("FT.SEARCH query: KNN expects $param".to_string()))?;
1041    let param = param_tok
1042        .strip_prefix('$')
1043        .ok_or_else(|| FtError::Syntax("FT.SEARCH query: param must start with $".to_string()))?;
1044    if parts.next().is_some() {
1045        return Err(FtError::Unsupported(
1046            "FT.SEARCH query: extra tokens after KNN expression".to_string(),
1047        ));
1048    }
1049    if k == 0 {
1050        return Err(FtError::Syntax("FT.SEARCH KNN k must be > 0".to_string()));
1051    }
1052    Ok((k, field.to_string(), param.to_string()))
1053}
1054
1055/// Variant of [`consume_search_trailing_clauses`] that
1056/// captures `PARAMS` key/value pairs into `params`. Used by
1057/// the KNN parse path; the filter-only path forbids `PARAMS`
1058/// because the query body is self-contained.
1059fn consume_search_trailing_clauses_with_params(
1060    it: &mut TokenCursor<'_>,
1061    params: &mut HashMap<Vec<u8>, Vec<u8>>,
1062    opts: &mut SearchClauseOptions,
1063) -> Result<(), FtError> {
1064    loop {
1065        let Some(tok) = it.next() else { break };
1066        let up = ascii_upper(tok);
1067        match up.as_slice() {
1068            b"PARAMS" => {
1069                let n_tok = it.next_required("FT.SEARCH: PARAMS expects a count")?;
1070                let n = parse_unsigned(n_tok, "FT.SEARCH PARAMS count")?;
1071                if !n.is_multiple_of(2) {
1072                    return Err(FtError::Syntax(
1073                        "FT.SEARCH PARAMS count must be even".to_string(),
1074                    ));
1075                }
1076                for _ in 0..(n / 2) {
1077                    let k_tok = it.next_required("FT.SEARCH: PARAMS expects key/value pair")?;
1078                    let v_tok = it.next_required("FT.SEARCH: PARAMS expects key/value pair")?;
1079                    params.insert(k_tok.to_vec(), v_tok.to_vec());
1080                }
1081            }
1082            b"RETURN" => parse_return_clause(it, opts)?,
1083            b"SORTBY" => parse_sortby_clause(it, opts)?,
1084            b"LIMIT" => parse_limit_clause(it, opts)?,
1085            b"NOCONTENT" => opts.nocontent = true,
1086            b"DIALECT" => {
1087                it.next_required("FT.SEARCH: DIALECT expects a value")?;
1088            }
1089            b"WITHSCORES" => {}
1090            other => {
1091                return Err(FtError::Unsupported(format!(
1092                    "FT.SEARCH clause {}",
1093                    String::from_utf8_lossy(other)
1094                )));
1095            }
1096        }
1097    }
1098    Ok(())
1099}
1100
1101/// Mutable accumulator for the `RETURN` / `LIMIT` / `SORTBY`
1102/// / `NOCONTENT` projection clauses common to both
1103/// `FT.SEARCH` shapes. Held inline by [`parse_search`] and
1104/// passed by reference to [`consume_search_trailing_clauses`]
1105/// so the substring path and the k-NN path stay in lockstep.
1106#[derive(Default)]
1107struct SearchClauseOptions {
1108    return_fields: Option<Vec<String>>,
1109    limit: Option<(usize, usize)>,
1110    sortby: Option<(String, SortDirection)>,
1111    nocontent: bool,
1112}
1113
1114fn parse_return_clause(
1115    it: &mut TokenCursor<'_>,
1116    opts: &mut SearchClauseOptions,
1117) -> Result<(), FtError> {
1118    let n_tok = it.next_required("FT.SEARCH: RETURN expects a count")?;
1119    let n = parse_unsigned(n_tok, "FT.SEARCH RETURN count")?;
1120    let mut fields: Vec<String> = Vec::with_capacity(n);
1121    for _ in 0..n {
1122        let f_tok = it.next_required("FT.SEARCH: RETURN expects field name")?;
1123        // RediSearch lets clients prefix RETURN field names
1124        // with `@`. Treat both forms identically by stripping
1125        // the marker before storing.
1126        let trimmed: &[u8] = if f_tok.first() == Some(&b'@') {
1127            &f_tok[1..]
1128        } else {
1129            f_tok
1130        };
1131        fields.push(utf8(trimmed, "FT.SEARCH RETURN field name")?);
1132    }
1133    opts.return_fields = Some(fields);
1134    Ok(())
1135}
1136
1137fn parse_limit_clause(
1138    it: &mut TokenCursor<'_>,
1139    opts: &mut SearchClauseOptions,
1140) -> Result<(), FtError> {
1141    let off_tok = it.next_required("FT.SEARCH: LIMIT expects offset")?;
1142    let cnt_tok = it.next_required("FT.SEARCH: LIMIT expects count")?;
1143    let off = parse_unsigned(off_tok, "FT.SEARCH LIMIT offset")?;
1144    let cnt = parse_unsigned(cnt_tok, "FT.SEARCH LIMIT count")?;
1145    opts.limit = Some((off, cnt));
1146    Ok(())
1147}
1148
1149fn parse_sortby_clause(
1150    it: &mut TokenCursor<'_>,
1151    opts: &mut SearchClauseOptions,
1152) -> Result<(), FtError> {
1153    let f_tok = it.next_required("FT.SEARCH: SORTBY expects @field")?;
1154    let field_bytes: &[u8] = if f_tok.first() == Some(&b'@') {
1155        &f_tok[1..]
1156    } else {
1157        f_tok
1158    };
1159    let field = utf8(field_bytes, "FT.SEARCH SORTBY field")?;
1160    // The direction token is optional. Peek; if the next
1161    // token is ASC/DESC, consume it; otherwise default to
1162    // ASC and leave the cursor untouched so the caller's
1163    // outer loop can dispatch it.
1164    let direction = if let Some(next) = it.peek() {
1165        let up = ascii_upper(next);
1166        match up.as_slice() {
1167            b"ASC" => {
1168                it.advance();
1169                SortDirection::Asc
1170            }
1171            b"DESC" => {
1172                it.advance();
1173                SortDirection::Desc
1174            }
1175            _ => SortDirection::Asc,
1176        }
1177    } else {
1178        SortDirection::Asc
1179    };
1180    opts.sortby = Some((field, direction));
1181    Ok(())
1182}
1183
1184/// Try to interpret the FT.SEARCH query expression as the
1185/// trigram-backed text form `@field:substring`. Returns
1186/// `Ok(Some((field, substring)))` when the shape matches,
1187/// `Ok(None)` to defer to the k-NN parser, and `Err` for
1188/// shapes that look like a text query but are malformed.
1189fn try_parse_text_field_query(query: &[u8]) -> Result<Option<(String, Vec<u8>)>, FtError> {
1190    if query.is_empty() || query[0] != b'@' {
1191        return Ok(None);
1192    }
1193    // The k-NN form `@title:foo=>[KNN k @vec $blob]` is not
1194    // implemented; surface the same `Unsupported` error the
1195    // legacy parser produced for that shape.
1196    if find_byte_subseq(query, b"=>").is_some() {
1197        return Err(FtError::Unsupported(format!(
1198            "FT.SEARCH query: {}",
1199            String::from_utf8_lossy(query)
1200        )));
1201    }
1202    let body = &query[1..];
1203    let colon = body
1204        .iter()
1205        .position(|&b| b == b':')
1206        .ok_or_else(|| FtError::Syntax("FT.SEARCH text query: missing ':'".to_string()))?;
1207    let field_bytes = &body[..colon];
1208    let substring = &body[colon + 1..];
1209    if field_bytes.is_empty() {
1210        return Err(FtError::Syntax(
1211            "FT.SEARCH text query: empty field name".to_string(),
1212        ));
1213    }
1214    let field = std::str::from_utf8(field_bytes)
1215        .map(str::to_string)
1216        .map_err(|_| FtError::Syntax("FT.SEARCH text query: field is not UTF-8".to_string()))?;
1217    Ok(Some((field, substring.to_vec())))
1218}
1219
1220/// Find the first occurrence of `needle` in `haystack`, or
1221/// `None` when absent. Linear scan; the inputs are short
1222/// FT.SEARCH query expressions (a few dozen bytes at most).
1223fn find_byte_subseq(haystack: &[u8], needle: &[u8]) -> Option<usize> {
1224    if needle.is_empty() || haystack.len() < needle.len() {
1225        return None;
1226    }
1227    haystack
1228        .windows(needle.len())
1229        .position(|window| window == needle)
1230}
1231
1232/// Walk the trailing FT.SEARCH clauses (RETURN, LIMIT,
1233/// DIALECT, NOCONTENT, WITHSCORES) without consuming PARAMS,
1234/// which is k-NN-only. The flag controls whether PARAMS is
1235/// permitted; today the substring path forbids it because
1236/// the query body is already self-contained.
1237fn consume_search_trailing_clauses(
1238    it: &mut TokenCursor<'_>,
1239    allow_params: bool,
1240    opts: &mut SearchClauseOptions,
1241) -> Result<(), FtError> {
1242    loop {
1243        let Some(tok) = it.next() else { break };
1244        let up = ascii_upper(tok);
1245        match up.as_slice() {
1246            b"PARAMS" if allow_params => {
1247                let n_tok = it.next_required("FT.SEARCH: PARAMS expects a count")?;
1248                let n = parse_unsigned(n_tok, "FT.SEARCH PARAMS count")?;
1249                if !n.is_multiple_of(2) {
1250                    return Err(FtError::Syntax(
1251                        "FT.SEARCH PARAMS count must be even".to_string(),
1252                    ));
1253                }
1254                for _ in 0..n {
1255                    it.next_required("FT.SEARCH: PARAMS expects key/value pair")?;
1256                }
1257            }
1258            b"RETURN" => parse_return_clause(it, opts)?,
1259            b"SORTBY" => parse_sortby_clause(it, opts)?,
1260            b"LIMIT" => parse_limit_clause(it, opts)?,
1261            b"NOCONTENT" => opts.nocontent = true,
1262            b"DIALECT" => {
1263                it.next_required("FT.SEARCH: DIALECT expects a value")?;
1264            }
1265            b"WITHSCORES" => {}
1266            other => {
1267                return Err(FtError::Unsupported(format!(
1268                    "FT.SEARCH clause {}",
1269                    String::from_utf8_lossy(other)
1270                )));
1271            }
1272        }
1273    }
1274    Ok(())
1275}
1276
1277/// Parse the `*=>[KNN k @field $param]` query expression.
1278fn parse_knn_query(query: &[u8]) -> Result<(usize, String, String), FtError> {
1279    let s = std::str::from_utf8(query)
1280        .map_err(|_| FtError::Syntax("FT.SEARCH query is not UTF-8".to_string()))?;
1281    let trimmed = s.trim();
1282    let stripped = trimmed
1283        .strip_prefix("*=>")
1284        .ok_or_else(|| FtError::Unsupported(format!("FT.SEARCH query: {trimmed}")))?
1285        .trim_start();
1286    let inner = stripped
1287        .strip_prefix('[')
1288        .and_then(|s| s.strip_suffix(']'))
1289        .ok_or_else(|| FtError::Syntax("FT.SEARCH query: missing brackets".to_string()))?;
1290    let mut parts = inner.split_ascii_whitespace();
1291    let knn_kw = parts
1292        .next()
1293        .ok_or_else(|| FtError::Syntax("FT.SEARCH query: empty".to_string()))?;
1294    if !knn_kw.eq_ignore_ascii_case("KNN") {
1295        return Err(FtError::Unsupported(format!(
1296            "FT.SEARCH query operator: {knn_kw}"
1297        )));
1298    }
1299    let k_str = parts
1300        .next()
1301        .ok_or_else(|| FtError::Syntax("FT.SEARCH query: KNN expects k".to_string()))?;
1302    let k: usize = k_str
1303        .parse()
1304        .map_err(|_| FtError::Syntax(format!("FT.SEARCH query: invalid k {k_str}")))?;
1305    let field_tok = parts
1306        .next()
1307        .ok_or_else(|| FtError::Syntax("FT.SEARCH query: KNN expects @field".to_string()))?;
1308    let field = field_tok
1309        .strip_prefix('@')
1310        .ok_or_else(|| FtError::Syntax("FT.SEARCH query: field must start with @".to_string()))?;
1311    let param_tok = parts
1312        .next()
1313        .ok_or_else(|| FtError::Syntax("FT.SEARCH query: KNN expects $param".to_string()))?;
1314    let param = param_tok
1315        .strip_prefix('$')
1316        .ok_or_else(|| FtError::Syntax("FT.SEARCH query: param must start with $".to_string()))?;
1317    if parts.next().is_some() {
1318        return Err(FtError::Unsupported(
1319            "FT.SEARCH query: extra tokens after KNN expression".to_string(),
1320        ));
1321    }
1322    if k == 0 {
1323        return Err(FtError::Syntax("FT.SEARCH KNN k must be > 0".to_string()));
1324    }
1325    Ok((k, field.to_string(), param.to_string()))
1326}
1327
1328fn execute_search(registry: &VectorRegistry, req: &SearchRequest) -> Result<FtOutcome, FtError> {
1329    let table = registry
1330        .get(&req.name)
1331        .ok_or_else(|| FtError::NotFound(req.name.clone()))?;
1332    if table.schema.vector_field != req.vector_field {
1333        return Err(FtError::Syntax(format!(
1334            "FT.SEARCH: query references @{} but index vector field is {}",
1335            req.vector_field, table.schema.vector_field
1336        )));
1337    }
1338    let dim = usize::from(table.schema.dim);
1339    let query = decode_le_f32(&req.vector_bytes, dim)?;
1340
1341    // When a pre-filter is supplied, evaluate it first and
1342    // restrict the KNN result to surviving keys. The HNSW
1343    // index does not accept a candidate filter directly, so
1344    // we oversample (cap at the tracked-row count) and
1345    // post-filter; the trim to `req.k` then takes the closest
1346    // survivors. For datasets larger than the cap a real
1347    // implementation would push the predicate into the
1348    // index walk; the brief explicitly accepts the
1349    // post-filter approach for the supported scale.
1350    let allowed: Option<BTreeSet<Vec<u8>>> = if let Some(filter) = &req.filter {
1351        let universe: BTreeSet<Vec<u8>> = table.indexed_keys().into_iter().collect();
1352        let matched = ft_filter::evaluate(filter, &table, &universe)?;
1353        Some(matched)
1354    } else {
1355        None
1356    };
1357
1358    let oversample_k = match allowed.as_ref() {
1359        // No filter: the engine's k is exactly what we want.
1360        None => req.k,
1361        Some(set) => {
1362            // With a filter: ask the engine for the surviving
1363            // count (clipped at the tracked-row count) so we
1364            // don't miss any candidate. `set.len()` is a
1365            // tight upper bound.
1366            set.len().max(req.k)
1367        }
1368    };
1369    let raw = if oversample_k == 0 {
1370        Vec::new()
1371    } else {
1372        table
1373            .engine
1374            .search(&query, oversample_k, None)
1375            .map_err(|e| FtError::Engine(e.to_string()))?
1376    };
1377
1378    let mut out = Vec::new();
1379    for (row, score) in raw {
1380        if let Some(allowed) = &allowed {
1381            if !allowed.contains(&row.key) {
1382                continue;
1383            }
1384        }
1385        let mut fields: Vec<(String, Vec<u8>)> = Vec::new();
1386        fields.push(("__vec_score".to_string(), format_float(score).into_bytes()));
1387        for (k, v) in &row.metadata {
1388            let value_bytes = match v {
1389                serde_json::Value::String(s) => s.clone().into_bytes(),
1390                other => other.to_string().into_bytes(),
1391            };
1392            fields.push((k.clone(), value_bytes));
1393        }
1394        out.push(SearchHit {
1395            doc_id: row.key,
1396            score,
1397            fields,
1398        });
1399        if out.len() >= req.k {
1400            break;
1401        }
1402    }
1403    Ok(finalize_search_outcome(
1404        out,
1405        req.sortby.as_ref(),
1406        req.limit,
1407        req.return_fields.as_deref(),
1408        req.nocontent,
1409    ))
1410}
1411
1412/// Execute a filter-expression search (no KNN clause). The
1413/// filter is evaluated against the index's observed key set;
1414/// each surviving key becomes a [`SearchHit`] populated with
1415/// its full metadata fan-out so the same projection pipeline
1416/// (`SORTBY` -> `LIMIT` -> `RETURN` -> `NOCONTENT`) applies
1417/// uniformly with the KNN path.
1418fn execute_search_filter(
1419    registry: &VectorRegistry,
1420    req: &SearchFilterRequest,
1421) -> Result<FtOutcome, FtError> {
1422    let table = registry
1423        .get(&req.name)
1424        .ok_or_else(|| FtError::NotFound(req.name.clone()))?;
1425    let universe: BTreeSet<Vec<u8>> = table.indexed_keys().into_iter().collect();
1426    let matched = ft_filter::evaluate(&req.filter, &table, &universe)?;
1427
1428    let mut out: Vec<SearchHit> = Vec::with_capacity(matched.len());
1429    for key in matched {
1430        let row = match table.engine.get(&key) {
1431            Ok(Some(r)) => r,
1432            Ok(None) => continue,
1433            Err(e) => return Err(FtError::Engine(e.to_string())),
1434        };
1435        let mut fields: Vec<(String, Vec<u8>)> = Vec::new();
1436        for (k, v) in &row.metadata {
1437            let value_bytes = match v {
1438                serde_json::Value::String(s) => s.clone().into_bytes(),
1439                other => other.to_string().into_bytes(),
1440            };
1441            fields.push((k.clone(), value_bytes));
1442        }
1443        out.push(SearchHit {
1444            doc_id: row.key,
1445            score: 0.0,
1446            fields,
1447        });
1448    }
1449    Ok(finalize_search_outcome(
1450        out,
1451        req.sortby.as_ref(),
1452        req.limit,
1453        req.return_fields.as_deref(),
1454        req.nocontent,
1455    ))
1456}
1457
1458fn execute_search_text(
1459    registry: &VectorRegistry,
1460    req: &SearchTextRequest,
1461) -> Result<FtOutcome, FtError> {
1462    let table = registry
1463        .get(&req.name)
1464        .ok_or_else(|| FtError::NotFound(req.name.clone()))?;
1465    if !table.has_text_field(&req.field) {
1466        return Err(FtError::Syntax(format!(
1467            "FT.SEARCH: index {} has no TEXT field {}",
1468            req.name, req.field
1469        )));
1470    }
1471    let raw_hits = table
1472        .search_text_substring(&req.field, &req.query)
1473        .ok_or_else(|| {
1474            FtError::Engine(format!(
1475                "text index for field {} not provisioned",
1476                req.field
1477            ))
1478        })?;
1479    let mut out = Vec::with_capacity(raw_hits.len());
1480    for (key, text) in raw_hits {
1481        let mut fields: Vec<(String, Vec<u8>)> = vec![(req.field.clone(), text)];
1482        // Mirror any sortby field that the user named so the
1483        // ranking pass can find it on each hit. The trigram
1484        // search returns only the matched body; pull the
1485        // remaining metadata from the dynvec engine row when
1486        // SORTBY references a different field.
1487        if let Some((sort_field, _)) = &req.sortby {
1488            if sort_field != &req.field {
1489                if let Ok(Some(row)) = table.engine.get(&key) {
1490                    if let Some(v) = row.metadata.get(sort_field) {
1491                        let bytes = match v {
1492                            serde_json::Value::String(s) => s.clone().into_bytes(),
1493                            other => other.to_string().into_bytes(),
1494                        };
1495                        fields.push((sort_field.clone(), bytes));
1496                    }
1497                }
1498            }
1499        }
1500        out.push(SearchHit {
1501            doc_id: key,
1502            score: 0.0,
1503            fields,
1504        });
1505    }
1506    Ok(finalize_search_outcome(
1507        out,
1508        req.sortby.as_ref(),
1509        req.limit,
1510        req.return_fields.as_deref(),
1511        req.nocontent,
1512    ))
1513}
1514
1515/// Apply the projection pipeline (`SORTBY` -> `LIMIT` ->
1516/// `RETURN` -> `NOCONTENT`) to a freshly-collected hit set
1517/// and pick the right [`FtOutcome`] variant.
1518///
1519/// `sortby` runs first because both `LIMIT` and `RETURN`
1520/// operate on the post-sort order. `LIMIT` then trims to the
1521/// (offset, count) window. `RETURN` projects each hit's
1522/// fields down to the requested subset. Finally, when
1523/// `nocontent` is set the field arrays are dropped entirely
1524/// and the renderer emits bare doc ids.
1525fn finalize_search_outcome(
1526    mut hits: Vec<SearchHit>,
1527    sortby: Option<&(String, SortDirection)>,
1528    limit: Option<(usize, usize)>,
1529    return_fields: Option<&[String]>,
1530    nocontent: bool,
1531) -> FtOutcome {
1532    if let Some((field, dir)) = sortby {
1533        sort_hits_by_field(&mut hits, field, *dir);
1534    }
1535    if let Some((offset, count)) = limit {
1536        if offset >= hits.len() {
1537            hits.clear();
1538        } else {
1539            let end = offset.saturating_add(count).min(hits.len());
1540            hits = hits.drain(offset..end).collect();
1541        }
1542    }
1543    if let Some(fields) = return_fields {
1544        for hit in &mut hits {
1545            hit.fields
1546                .retain(|(name, _)| fields.iter().any(|f| f == name));
1547        }
1548    }
1549    let total = hits.len();
1550    if nocontent {
1551        let doc_ids = hits.into_iter().map(|h| h.doc_id).collect();
1552        FtOutcome::SearchNoContent { total, doc_ids }
1553    } else {
1554        FtOutcome::Search { total, hits }
1555    }
1556}
1557
1558/// Sort `hits` in place by the metadata field `field` in
1559/// direction `dir`. Hits that are missing the field sort
1560/// last (in either direction) so they do not pollute the
1561/// ordering of the hits that do declare it.
1562fn sort_hits_by_field(hits: &mut [SearchHit], field: &str, dir: SortDirection) {
1563    hits.sort_by(|a, b| {
1564        let av = lookup_field(a, field);
1565        let bv = lookup_field(b, field);
1566        // Push None to the back regardless of direction.
1567        match (av, bv) {
1568            (Some(a_bytes), Some(b_bytes)) => {
1569                match (parse_sort_key(a_bytes), parse_sort_key(b_bytes)) {
1570                    (Some(a_num), Some(b_num)) => match dir {
1571                        SortDirection::Asc => a_num
1572                            .partial_cmp(&b_num)
1573                            .unwrap_or(std::cmp::Ordering::Equal),
1574                        SortDirection::Desc => b_num
1575                            .partial_cmp(&a_num)
1576                            .unwrap_or(std::cmp::Ordering::Equal),
1577                    },
1578                    _ => match dir {
1579                        SortDirection::Asc => a_bytes.cmp(b_bytes),
1580                        SortDirection::Desc => b_bytes.cmp(a_bytes),
1581                    },
1582                }
1583            }
1584            (Some(_), None) => std::cmp::Ordering::Less,
1585            (None, Some(_)) => std::cmp::Ordering::Greater,
1586            (None, None) => std::cmp::Ordering::Equal,
1587        }
1588    });
1589}
1590
1591fn lookup_field<'a>(hit: &'a SearchHit, name: &str) -> Option<&'a [u8]> {
1592    hit.fields
1593        .iter()
1594        .find(|(n, _)| n == name)
1595        .map(|(_, v)| v.as_slice())
1596}
1597
1598/// Try to parse a sort key as a finite f64. Used by
1599/// [`sort_hits_by_field`] so numeric fields sort by value
1600/// instead of by ASCII byte order.
1601fn parse_sort_key(bytes: &[u8]) -> Option<f64> {
1602    let s = std::str::from_utf8(bytes).ok()?;
1603    let parsed: f64 = s.trim().parse().ok()?;
1604    if parsed.is_finite() {
1605        Some(parsed)
1606    } else {
1607        None
1608    }
1609}
1610
1611// ---- FT.AGGREGATE / FT.EXPLAIN / FT.ALTER ------------------------------
1612
1613/// Parse `FT.AGGREGATE <idx> <query> GROUPBY <n> @field...`
1614/// `(REDUCE <kind> ...)+ [LIMIT <off> <cnt>]`. Other clauses
1615/// (`SORTBY`, `APPLY`, `FILTER`, `LOAD`, `WITHCURSOR`, ...) are
1616/// rejected with `not supported in this build` so the wire
1617/// surface keeps the brief's exit gate honest.
1618fn parse_aggregate(rest: &[&[u8]]) -> Result<AggregateRequest, FtError> {
1619    let mut it = TokenCursor::new(rest);
1620    let name = it.next_string("FT.AGGREGATE: missing index name")?;
1621    let _query = it.next_required("FT.AGGREGATE: missing query expression")?;
1622
1623    let mut group_by: Vec<String> = Vec::new();
1624    let mut reducers: Vec<ReducerSpec> = Vec::new();
1625    let mut limit: Option<(usize, usize)> = None;
1626    let mut saw_groupby = false;
1627
1628    while let Some(tok) = it.next() {
1629        let up = ascii_upper(tok);
1630        match up.as_slice() {
1631            b"GROUPBY" => {
1632                saw_groupby = true;
1633                parse_aggregate_groupby(&mut it, &mut group_by)?;
1634            }
1635            b"REDUCE" => {
1636                reducers.push(parse_aggregate_reduce(&mut it)?);
1637            }
1638            b"LIMIT" => {
1639                let off_tok = it.next_required("FT.AGGREGATE: LIMIT expects offset")?;
1640                let cnt_tok = it.next_required("FT.AGGREGATE: LIMIT expects count")?;
1641                let off = parse_unsigned(off_tok, "FT.AGGREGATE LIMIT offset")?;
1642                let cnt = parse_unsigned(cnt_tok, "FT.AGGREGATE LIMIT count")?;
1643                limit = Some((off, cnt));
1644            }
1645            other => {
1646                return Err(FtError::Unsupported(format!(
1647                    "FT.AGGREGATE clause {}",
1648                    String::from_utf8_lossy(other)
1649                )));
1650            }
1651        }
1652    }
1653
1654    if !saw_groupby {
1655        return Err(FtError::Unsupported(
1656            "FT.AGGREGATE without GROUPBY".to_string(),
1657        ));
1658    }
1659    if reducers.is_empty() {
1660        return Err(FtError::Syntax(
1661            "FT.AGGREGATE: GROUPBY requires at least one REDUCE".to_string(),
1662        ));
1663    }
1664    Ok(AggregateRequest {
1665        name,
1666        group_by,
1667        reducers,
1668        limit,
1669    })
1670}
1671
1672fn parse_aggregate_groupby(
1673    it: &mut TokenCursor<'_>,
1674    group_by: &mut Vec<String>,
1675) -> Result<(), FtError> {
1676    let n_tok = it.next_required("FT.AGGREGATE: GROUPBY expects a count")?;
1677    let n = parse_unsigned(n_tok, "FT.AGGREGATE GROUPBY count")?;
1678    if n == 0 {
1679        return Err(FtError::Syntax(
1680            "FT.AGGREGATE GROUPBY count must be > 0".to_string(),
1681        ));
1682    }
1683    for _ in 0..n {
1684        let f_tok = it.next_required("FT.AGGREGATE: GROUPBY expects @field")?;
1685        let bytes: &[u8] = if f_tok.first() == Some(&b'@') {
1686            &f_tok[1..]
1687        } else {
1688            f_tok
1689        };
1690        group_by.push(utf8(bytes, "FT.AGGREGATE GROUPBY field")?);
1691    }
1692    Ok(())
1693}
1694
1695fn parse_aggregate_reduce(it: &mut TokenCursor<'_>) -> Result<ReducerSpec, FtError> {
1696    let kind_tok = it.next_required("FT.AGGREGATE: REDUCE expects a kind")?;
1697    let kind_up = ascii_upper(kind_tok);
1698    let arg_count_tok = it.next_required("FT.AGGREGATE: REDUCE expects an argument count")?;
1699    let arg_count = parse_unsigned(arg_count_tok, "FT.AGGREGATE REDUCE arg count")?;
1700    let kind = match kind_up.as_slice() {
1701        b"COUNT" => {
1702            if arg_count != 0 {
1703                return Err(FtError::Syntax(
1704                    "FT.AGGREGATE REDUCE COUNT expects 0 args".to_string(),
1705                ));
1706            }
1707            ReducerKind::Count
1708        }
1709        b"SUM" => {
1710            if arg_count != 1 {
1711                return Err(FtError::Syntax(
1712                    "FT.AGGREGATE REDUCE SUM expects 1 arg".to_string(),
1713                ));
1714            }
1715            ReducerKind::Sum {
1716                field: take_field_arg(it, "FT.AGGREGATE: SUM expects @field")?,
1717            }
1718        }
1719        b"AVG" => {
1720            if arg_count != 1 {
1721                return Err(FtError::Syntax(
1722                    "FT.AGGREGATE REDUCE AVG expects 1 arg".to_string(),
1723                ));
1724            }
1725            ReducerKind::Avg {
1726                field: take_field_arg(it, "FT.AGGREGATE: AVG expects @field")?,
1727            }
1728        }
1729        other => {
1730            // Skip the per-reducer arguments before surfacing
1731            // the error so the cursor stays well-formed for
1732            // any caller that wants to keep parsing after a
1733            // soft failure. Today we bail immediately so this
1734            // is just hygiene, but it keeps the helper usable
1735            // for future relaxations.
1736            for _ in 0..arg_count {
1737                let _ = it.next();
1738            }
1739            return Err(FtError::Unsupported(format!(
1740                "FT.AGGREGATE REDUCE {}",
1741                String::from_utf8_lossy(other)
1742            )));
1743        }
1744    };
1745    let as_tok = it.next_required("FT.AGGREGATE: REDUCE expects AS <name>")?;
1746    if !as_tok.eq_ignore_ascii_case(b"AS") {
1747        return Err(FtError::Syntax(
1748            "FT.AGGREGATE REDUCE clause missing AS".to_string(),
1749        ));
1750    }
1751    let alias = it.next_string("FT.AGGREGATE: REDUCE AS expects a name")?;
1752    Ok(ReducerSpec { kind, alias })
1753}
1754
1755fn take_field_arg(it: &mut TokenCursor<'_>, msg: &str) -> Result<String, FtError> {
1756    let tok = it.next_required(msg)?;
1757    let bytes: &[u8] = if tok.first() == Some(&b'@') {
1758        &tok[1..]
1759    } else {
1760        tok
1761    };
1762    utf8(bytes, msg)
1763}
1764
1765fn parse_explain(rest: &[&[u8]]) -> Result<ExplainRequest, FtError> {
1766    let mut it = TokenCursor::new(rest);
1767    let name = it.next_string("FT.EXPLAIN: missing index name")?;
1768    let query_tok = it.next_required("FT.EXPLAIN: missing query expression")?;
1769    // The DIALECT clause is the only trailing modifier we
1770    // tolerate today; consume its argument if present so
1771    // clients that always emit it stay green.
1772    while let Some(tok) = it.next() {
1773        let up = ascii_upper(tok);
1774        match up.as_slice() {
1775            b"DIALECT" => {
1776                it.next_required("FT.EXPLAIN: DIALECT expects a value")?;
1777            }
1778            other => {
1779                return Err(FtError::Unsupported(format!(
1780                    "FT.EXPLAIN clause {}",
1781                    String::from_utf8_lossy(other)
1782                )));
1783            }
1784        }
1785    }
1786    Ok(ExplainRequest {
1787        name,
1788        query: query_tok.to_vec(),
1789    })
1790}
1791
1792fn parse_alter(rest: &[&[u8]]) -> Result<AlterRequest, FtError> {
1793    let mut it = TokenCursor::new(rest);
1794    let name = it.next_string("FT.ALTER: missing index name")?;
1795    let op_tok = it.next_required("FT.ALTER: expected ADD")?;
1796    let op_up = ascii_upper(op_tok);
1797    match op_up.as_slice() {
1798        b"ADD" => {}
1799        b"DROP" => {
1800            return Err(FtError::Unsupported("FT.ALTER DROP".to_string()));
1801        }
1802        b"SCHEMA" => {
1803            // FT.ALTER ... SCHEMA ADD <field> <type>...; the
1804            // SCHEMA-prefixed form is RediSearch 2.x syntax
1805            // and not implemented here, even when the inner
1806            // operation is ADD.
1807            return Err(FtError::Unsupported("FT.ALTER SCHEMA".to_string()));
1808        }
1809        other => {
1810            return Err(FtError::Syntax(format!(
1811                "FT.ALTER: expected ADD, got {}",
1812                String::from_utf8_lossy(other)
1813            )));
1814        }
1815    }
1816    let field = it.next_string("FT.ALTER ADD: missing field name")?;
1817    let type_tok = it.next_required("FT.ALTER ADD: missing field type")?;
1818    let type_up = ascii_upper(type_tok);
1819    let field_type = match type_up.as_slice() {
1820        b"TEXT" => MetadataFieldType::Text,
1821        b"TAG" => MetadataFieldType::Tag,
1822        b"VECTOR" => {
1823            return Err(FtError::Unsupported(
1824                "FT.ALTER ADD VECTOR (rebuild required)".to_string(),
1825            ));
1826        }
1827        b"NUMERIC" | b"GEO" => {
1828            return Err(FtError::Unsupported(format!(
1829                "FT.ALTER ADD {}",
1830                String::from_utf8_lossy(type_tok)
1831            )));
1832        }
1833        other => {
1834            return Err(FtError::Syntax(format!(
1835                "FT.ALTER ADD: unknown type {}",
1836                String::from_utf8_lossy(other)
1837            )));
1838        }
1839    };
1840    if it.peek().is_some() {
1841        return Err(FtError::Syntax(
1842            "FT.ALTER ADD: unexpected trailing tokens".to_string(),
1843        ));
1844    }
1845    Ok(AlterRequest {
1846        name,
1847        field,
1848        field_type,
1849    })
1850}
1851
1852fn execute_aggregate(
1853    registry: &VectorRegistry,
1854    req: &AggregateRequest,
1855) -> Result<FtOutcome, FtError> {
1856    let table = registry
1857        .get(&req.name)
1858        .ok_or_else(|| FtError::NotFound(req.name.clone()))?;
1859    // Walk every key the FT.* surface absorbed via HSET
1860    // interception. dynvec's storage layer does not expose
1861    // a public iterator over rows, but the registry tracks
1862    // every key it indexed in `indexed_keys`, so we re-fetch
1863    // the row via `engine.get` for each one. The set is
1864    // typically small (matches the number of HSETs that
1865    // landed against the index's prefixes); the cost is
1866    // dominated by the metadata fan-out, not the lookup.
1867    let mut groups: BTreeMap<Vec<u8>, GroupAccumulator> = BTreeMap::new();
1868    for key in table.indexed_keys() {
1869        let row = match table.engine.get(&key) {
1870            Ok(Some(r)) => r,
1871            Ok(None) => continue,
1872            Err(e) => return Err(FtError::Engine(e.to_string())),
1873        };
1874        let group_key = build_group_key(&req.group_by, &row.metadata);
1875        let entry = groups.entry(group_key).or_insert_with(|| {
1876            GroupAccumulator::new(
1877                req.group_by
1878                    .iter()
1879                    .map(|f| {
1880                        (
1881                            f.clone(),
1882                            metadata_string(row.metadata.get(f)).unwrap_or_default(),
1883                        )
1884                    })
1885                    .collect(),
1886                req.reducers.len(),
1887            )
1888        });
1889        entry.observe(&req.reducers, &row.metadata);
1890    }
1891
1892    // Emit groups in deterministic alphabetical order on
1893    // the composite group key so paginated callers see a
1894    // stable cursor without us having to expose one.
1895    let mut rows: Vec<Vec<(String, Vec<u8>)>> = Vec::with_capacity(groups.len());
1896    for (_, mut group) in groups {
1897        let mut row: Vec<(String, Vec<u8>)> = std::mem::take(&mut group.fields);
1898        for (i, reducer) in req.reducers.iter().enumerate() {
1899            let value = group.render_reducer(i, reducer);
1900            row.push((reducer.alias.clone(), value));
1901        }
1902        rows.push(row);
1903    }
1904
1905    if let Some((offset, count)) = req.limit {
1906        if offset >= rows.len() {
1907            rows.clear();
1908        } else {
1909            let end = offset.saturating_add(count).min(rows.len());
1910            rows = rows.drain(offset..end).collect();
1911        }
1912    }
1913
1914    let total_groups = rows.len();
1915    Ok(FtOutcome::Aggregate { total_groups, rows })
1916}
1917
1918/// Per-group accumulator state: the raw `(name, value)`
1919/// pairs that identify the group, plus one parallel slot
1920/// per reducer to track count + running sum.
1921struct GroupAccumulator {
1922    fields: Vec<(String, Vec<u8>)>,
1923    slots: Vec<ReducerAccum>,
1924}
1925
1926#[derive(Default)]
1927struct ReducerAccum {
1928    count: u64,
1929    sum: f64,
1930    /// True when at least one observed value parsed as a
1931    /// finite number; for non-numeric SUM/AVG fields this
1932    /// stays false and the renderer emits `0`.
1933    saw_numeric: bool,
1934}
1935
1936impl GroupAccumulator {
1937    fn new(fields: Vec<(String, Vec<u8>)>, n_reducers: usize) -> Self {
1938        let mut slots = Vec::with_capacity(n_reducers);
1939        for _ in 0..n_reducers {
1940            slots.push(ReducerAccum::default());
1941        }
1942        Self { fields, slots }
1943    }
1944
1945    fn observe(&mut self, reducers: &[ReducerSpec], metadata: &HashMap<String, serde_json::Value>) {
1946        for (i, reducer) in reducers.iter().enumerate() {
1947            let slot = &mut self.slots[i];
1948            slot.count = slot.count.saturating_add(1);
1949            match &reducer.kind {
1950                ReducerKind::Count => {}
1951                ReducerKind::Sum { field } | ReducerKind::Avg { field } => {
1952                    if let Some(v) = metadata.get(field) {
1953                        if let Some(n) = metadata_number(v) {
1954                            slot.sum += n;
1955                            slot.saw_numeric = true;
1956                        }
1957                    }
1958                }
1959            }
1960        }
1961    }
1962
1963    fn render_reducer(&self, i: usize, reducer: &ReducerSpec) -> Vec<u8> {
1964        let slot = &self.slots[i];
1965        match &reducer.kind {
1966            ReducerKind::Count => slot.count.to_string().into_bytes(),
1967            ReducerKind::Sum { .. } => {
1968                if slot.saw_numeric {
1969                    format_float_f64(slot.sum).into_bytes()
1970                } else {
1971                    b"0".to_vec()
1972                }
1973            }
1974            ReducerKind::Avg { .. } => {
1975                if slot.saw_numeric && slot.count > 0 {
1976                    // `f64::from(u32)` is lossless, so trim
1977                    // the count to u32 before the cast.
1978                    // `saturating_as_f64` is not available
1979                    // until Rust 1.87, so we round-trip
1980                    // through `u32::try_from` to preserve
1981                    // precision; values larger than `u32::MAX`
1982                    // saturate at `u32::MAX` for the divisor,
1983                    // which keeps the mean within float
1984                    // resolution for sane group sizes.
1985                    let denom = u32::try_from(slot.count).unwrap_or(u32::MAX);
1986                    let mean = slot.sum / f64::from(denom);
1987                    format_float_f64(mean).into_bytes()
1988                } else {
1989                    b"0".to_vec()
1990                }
1991            }
1992        }
1993    }
1994}
1995
1996fn build_group_key(group_by: &[String], metadata: &HashMap<String, serde_json::Value>) -> Vec<u8> {
1997    let mut key: Vec<u8> = Vec::new();
1998    for field in group_by {
1999        let v = metadata_string(metadata.get(field)).unwrap_or_default();
2000        // `\x1f` (Unit Separator) is a control byte that
2001        // never appears in field names or in stringified
2002        // numeric/JSON metadata, so it makes a safe
2003        // composite-key delimiter without escaping.
2004        if !key.is_empty() {
2005            key.push(0x1f);
2006        }
2007        key.extend_from_slice(&v);
2008    }
2009    key
2010}
2011
2012fn metadata_string(value: Option<&serde_json::Value>) -> Option<Vec<u8>> {
2013    match value? {
2014        serde_json::Value::String(s) => Some(s.clone().into_bytes()),
2015        other => Some(other.to_string().into_bytes()),
2016    }
2017}
2018
2019fn metadata_number(value: &serde_json::Value) -> Option<f64> {
2020    match value {
2021        serde_json::Value::Number(n) => n.as_f64(),
2022        serde_json::Value::String(s) => s.trim().parse::<f64>().ok(),
2023        _ => None,
2024    }
2025}
2026
2027fn execute_explain(registry: &VectorRegistry, req: &ExplainRequest) -> Result<FtOutcome, FtError> {
2028    let table = registry
2029        .get(&req.name)
2030        .ok_or_else(|| FtError::NotFound(req.name.clone()))?;
2031    // The brief asks for "a textual representation of how the
2032    // query would be planned". We classify the query into one
2033    // of three buckets and emit a stable, multi-line plan.
2034    // Stable wording lets clients regex-test the output.
2035    let plan = if let Some((field, substring)) = try_parse_text_field_query(&req.query)? {
2036        // Text substring path: renders the trigram set we'd
2037        // probe and the bloom-filter lane.
2038        let trigrams = trigram_preview(&substring);
2039        format!(
2040            "@{field}: SUBSTRING\n  field: {field}\n  query: {query}\n  index: trigram+bloom\n  trigrams: {trigrams}\n",
2041            field = field,
2042            query = String::from_utf8_lossy(&substring),
2043            trigrams = trigrams,
2044        )
2045    } else if let Ok((k, vec_field, param_name)) = parse_knn_query(&req.query) {
2046        let dim = table.schema.dim;
2047        let metric = format!("{:?}", table.schema.distance).to_uppercase();
2048        let alg = format!("{:?}", table.schema.algorithm).to_uppercase();
2049        format!(
2050            "VECTOR KNN\n  index: {idx}\n  algorithm: {alg}\n  metric: {metric}\n  field: {field}\n  k: {k}\n  dim: {dim}\n  param: ${param}\n",
2051            idx = req.name,
2052            alg = alg,
2053            metric = metric,
2054            field = vec_field,
2055            k = k,
2056            dim = dim,
2057            param = param_name,
2058        )
2059    } else {
2060        // Anything else (e.g. `*` or a free-form expression
2061        // that neither parser accepts) lands here. Surface a
2062        // best-effort plan so callers do not get a wire
2063        // error for a debug-only command.
2064        format!(
2065            "UNKNOWN QUERY\n  index: {idx}\n  query: {q}\n  note: only @field:substring and *=>[KNN ...] are planned in this build\n",
2066            idx = req.name,
2067            q = String::from_utf8_lossy(&req.query),
2068        )
2069    };
2070    Ok(FtOutcome::Explain(plan))
2071}
2072
2073/// Render the trigram set [`dyntext::TextIndex::search_substring`]
2074/// would probe for a query body. Used by FT.EXPLAIN to expose
2075/// the planner's view of a substring query without running it.
2076fn trigram_preview(query: &[u8]) -> String {
2077    if query.len() < 3 {
2078        return "<short-circuit: scan>".to_string();
2079    }
2080    let mut parts: Vec<String> = Vec::new();
2081    for window in query.windows(3).take(8) {
2082        parts.push(format!("\"{}\"", String::from_utf8_lossy(window)));
2083    }
2084    let suffix = if query.len() > 8 + 3 - 1 { ", ..." } else { "" };
2085    format!("[{}{}]", parts.join(", "), suffix)
2086}
2087
2088fn execute_alter(registry: &VectorRegistry, req: &AlterRequest) -> Result<FtOutcome, FtError> {
2089    let table = registry
2090        .get(&req.name)
2091        .ok_or_else(|| FtError::NotFound(req.name.clone()))?;
2092    match req.field_type {
2093        MetadataFieldType::Text => {
2094            // Provision a per-field trigram index. Subsequent
2095            // HSETs that touch this field will be picked up
2096            // by `maybe_index_hset` because
2097            // `has_text_field` consults the runtime
2098            // text_indexes map alongside the frozen schema.
2099            let _new = table.add_text_field(&req.field);
2100            Ok(FtOutcome::Ok)
2101        }
2102        MetadataFieldType::Tag => {
2103            // TAG fields land in the row metadata blob without
2104            // a dedicated index; the registry has no separate
2105            // per-tag state to provision, so the schema-level
2106            // record is sufficient. We still acknowledge the
2107            // ADD so clients can keep their schema scripts
2108            // declarative.
2109            Ok(FtOutcome::Ok)
2110        }
2111        MetadataFieldType::Numeric | MetadataFieldType::Geo => {
2112            // Reachable only via the parser's sealed table; if
2113            // a future dialect lands a NUMERIC/GEO branch the
2114            // sealed match in `parse_alter` already rejects it
2115            // before we get here. Defensive guard.
2116            Err(FtError::Unsupported(format!(
2117                "FT.ALTER ADD type {:?}",
2118                req.field_type
2119            )))
2120        }
2121    }
2122}
2123
2124fn format_float_f64(f: f64) -> String {
2125    // Shortest round-trip that still parses as a float when
2126    // the caller pipes the bulk string into `f64::parse`.
2127    // 6 fractional digits matches the existing
2128    // [`format_float`] used for vector scores.
2129    format!("{f:.6}")
2130}
2131
2132// ---- FT.REGEX -----------------------------------------------------------
2133
2134/// Parse `FT.REGEX <idx> <field> <pattern> [K=<n>]`.
2135///
2136/// `K=` is the only recognised option today; an unknown
2137/// option surfaces as [`FtError::Unsupported`]. The `n` value
2138/// must fit into a [`u16`]; pragmatically the TRE engine
2139/// rejects anything beyond the pattern length anyway, so the
2140/// upper bound here is purely a parser-level guard rail.
2141fn parse_regex(rest: &[&[u8]]) -> Result<RegexRequest, FtError> {
2142    let mut it = TokenCursor::new(rest);
2143    let name = it.next_string("FT.REGEX: missing index name")?;
2144    let field = it.next_string("FT.REGEX: missing field name")?;
2145    let pattern_tok = it.next_required("FT.REGEX: missing pattern")?;
2146    let pattern = std::str::from_utf8(pattern_tok)
2147        .map(str::to_string)
2148        .map_err(|_| FtError::Syntax("FT.REGEX: pattern is not UTF-8".to_string()))?;
2149    let mut max_errors: u16 = 0;
2150    for tok in &mut it {
2151        let up = ascii_upper(tok);
2152        if let Some(rest) = up.strip_prefix(b"K=") {
2153            let s = std::str::from_utf8(rest)
2154                .map_err(|_| FtError::Syntax("FT.REGEX: K= value is not UTF-8".to_string()))?;
2155            let n: u32 = s
2156                .parse()
2157                .map_err(|_| FtError::Syntax(format!("FT.REGEX: invalid K= value {s}")))?;
2158            max_errors = u16::try_from(n)
2159                .map_err(|_| FtError::Syntax(format!("FT.REGEX: K= value {n} exceeds u16")))?;
2160        } else {
2161            return Err(FtError::Unsupported(format!(
2162                "FT.REGEX option {}",
2163                String::from_utf8_lossy(tok)
2164            )));
2165        }
2166    }
2167    Ok(RegexRequest {
2168        name,
2169        field,
2170        pattern,
2171        max_errors,
2172    })
2173}
2174
2175fn execute_regex(registry: &VectorRegistry, req: &RegexRequest) -> Result<FtOutcome, FtError> {
2176    let table = registry
2177        .get(&req.name)
2178        .ok_or_else(|| FtError::NotFound(req.name.clone()))?;
2179    if !table.has_text_field(&req.field) {
2180        return Err(FtError::Syntax(format!(
2181            "FT.REGEX: index {} has no TEXT field {}",
2182            req.name, req.field
2183        )));
2184    }
2185    let raw_hits = if req.max_errors == 0 {
2186        table
2187            .search_text_regex(&req.field, &req.pattern)
2188            .ok_or_else(|| {
2189                FtError::Engine(format!(
2190                    "text index for field {} not provisioned",
2191                    req.field
2192                ))
2193            })?
2194            .map_err(|e| FtError::Syntax(format!("FT.REGEX: invalid pattern: {e}")))?
2195    } else {
2196        table
2197            .search_text_regex_approx(&req.field, &req.pattern, req.max_errors)
2198            .ok_or_else(|| {
2199                FtError::Engine(format!(
2200                    "text index for field {} not provisioned",
2201                    req.field
2202                ))
2203            })?
2204            .map_err(|e| FtError::Engine(format!("FT.REGEX (approximate): {e}")))?
2205    };
2206    let mut out = Vec::with_capacity(raw_hits.len());
2207    for (key, text) in raw_hits {
2208        out.push(SearchHit {
2209            doc_id: key,
2210            score: 0.0,
2211            fields: vec![(req.field.clone(), text)],
2212        });
2213    }
2214    let total = out.len();
2215    Ok(FtOutcome::Search { total, hits: out })
2216}
2217
2218// ---- FT.INFO / FT.LIST / FT.DROPINDEX -----------------------------------
2219
2220fn parse_info(rest: &[&[u8]]) -> Result<FtCommand, FtError> {
2221    let mut it = TokenCursor::new(rest);
2222    let name = it.next_string("FT.INFO: missing index name")?;
2223    if it.peek().is_some() {
2224        return Err(FtError::Syntax(
2225            "FT.INFO: unexpected trailing tokens".to_string(),
2226        ));
2227    }
2228    Ok(FtCommand::Info { name })
2229}
2230
2231fn parse_list(rest: &[&[u8]]) -> Result<FtCommand, FtError> {
2232    if !rest.is_empty() {
2233        return Err(FtError::Syntax("FT.LIST: takes no arguments".to_string()));
2234    }
2235    Ok(FtCommand::List)
2236}
2237
2238fn parse_dropindex(rest: &[&[u8]]) -> Result<FtCommand, FtError> {
2239    let mut it = TokenCursor::new(rest);
2240    let name = it.next_string("FT.DROPINDEX: missing index name")?;
2241    let mut delete_documents = false;
2242    loop {
2243        let Some(tok) = it.next() else { break };
2244        let up = ascii_upper(tok);
2245        match up.as_slice() {
2246            b"DD" => delete_documents = true,
2247            other => {
2248                return Err(FtError::Unsupported(format!(
2249                    "FT.DROPINDEX option {}",
2250                    String::from_utf8_lossy(other)
2251                )));
2252            }
2253        }
2254    }
2255    Ok(FtCommand::DropIndex {
2256        name,
2257        delete_documents,
2258    })
2259}
2260
2261fn execute_info(registry: &VectorRegistry, name: String) -> Result<FtOutcome, FtError> {
2262    let info = registry
2263        .info(&name)
2264        .ok_or_else(|| FtError::NotFound(name.clone()))?;
2265    let table = registry.get(&name).ok_or(FtError::NotFound(name))?;
2266    let mut out: Vec<(String, InfoValue)> = Vec::new();
2267    out.push(("index_name".to_string(), InfoValue::String(info.name)));
2268    out.push((
2269        "algorithm".to_string(),
2270        InfoValue::String(format!("{:?}", info.algorithm).to_uppercase()),
2271    ));
2272    out.push((
2273        "distance_metric".to_string(),
2274        InfoValue::String(format!("{:?}", info.distance).to_uppercase()),
2275    ));
2276    out.push((
2277        "vector_field".to_string(),
2278        InfoValue::String(table.schema.vector_field.clone()),
2279    ));
2280    out.push((
2281        "vector_type".to_string(),
2282        InfoValue::String(format!("{:?}", table.schema.vector_type).to_uppercase()),
2283    ));
2284    out.push(("dim".to_string(), InfoValue::Integer(i64::from(info.dim))));
2285    let prefixes_value = InfoValue::Array(
2286        table
2287            .schema
2288            .prefixes
2289            .iter()
2290            .map(|p| InfoValue::String(String::from_utf8_lossy(p).into_owned()))
2291            .collect(),
2292    );
2293    out.push(("prefixes".to_string(), prefixes_value));
2294    let metadata_value = InfoValue::Array(
2295        table
2296            .schema
2297            .metadata_fields
2298            .iter()
2299            .map(|f| {
2300                InfoValue::Array(vec![
2301                    InfoValue::String(f.name.clone()),
2302                    InfoValue::String(format!("{:?}", f.field_type).to_uppercase()),
2303                ])
2304            })
2305            .collect(),
2306    );
2307    out.push(("schema_fields".to_string(), metadata_value));
2308    out.push((
2309        "num_docs".to_string(),
2310        InfoValue::Integer(i64::try_from(info.live_rows).unwrap_or(i64::MAX)),
2311    ));
2312    out.push((
2313        "tracked_rows".to_string(),
2314        InfoValue::Integer(i64::try_from(info.tracked_rows).unwrap_or(i64::MAX)),
2315    ));
2316    Ok(FtOutcome::Info(out))
2317}
2318
2319fn execute_dropindex(
2320    registry: &VectorRegistry,
2321    name: String,
2322    delete_documents: bool,
2323) -> Result<FtOutcome, FtError> {
2324    use crate::vector::registry::RegistryError;
2325    if delete_documents {
2326        match registry.drop_with_dd(&name) {
2327            Ok(keys) => Ok(FtOutcome::DropOk {
2328                deleted_documents: true,
2329                document_count: keys.len(),
2330            }),
2331            Err(RegistryError::NotFound(_)) => Err(FtError::NotFound(name)),
2332            Err(other) => Err(FtError::Engine(other.to_string())),
2333        }
2334    } else {
2335        match registry.drop(&name) {
2336            Ok(_) => Ok(FtOutcome::DropOk {
2337                deleted_documents: false,
2338                document_count: 0,
2339            }),
2340            Err(RegistryError::NotFound(_)) => Err(FtError::NotFound(name)),
2341            Err(other) => Err(FtError::Engine(other.to_string())),
2342        }
2343    }
2344}
2345
2346// ---- HSET interception --------------------------------------------------
2347
2348fn insert_into_index(table: &VectorTable, key: &[u8], pairs: &[&[u8]]) -> Result<(), FtError> {
2349    let mut vector: Option<Vec<f32>> = None;
2350    let mut metadata: HashMap<String, serde_json::Value> = HashMap::new();
2351    let mut text_writes: Vec<(String, Vec<u8>)> = Vec::new();
2352    let mut chunks = pairs.chunks_exact(2);
2353    for chunk in &mut chunks {
2354        let field = chunk[0];
2355        let value = chunk[1];
2356        let field_str = std::str::from_utf8(field)
2357            .map_err(|_| FtError::Syntax("HSET field name is not UTF-8".to_string()))?;
2358        if field_str == table.schema.vector_field {
2359            let dim = usize::from(table.schema.dim);
2360            vector = Some(decode_le_f32(value, dim)?);
2361        } else {
2362            let value_str = String::from_utf8_lossy(value).into_owned();
2363            metadata.insert(field_str.to_string(), serde_json::Value::String(value_str));
2364            // Mirror the value into the trigram index when the
2365            // schema declares this field as TEXT-indexed. The
2366            // dynvec metadata copy stays so FT.SEARCH KNN /
2367            // FT.INFO can echo the field; the dyntext copy is
2368            // what powers @field:substring and FT.REGEX.
2369            if table.has_text_field(field_str) {
2370                text_writes.push((field_str.to_string(), value.to_vec()));
2371            }
2372        }
2373    }
2374    if !chunks.remainder().is_empty() {
2375        return Err(FtError::Syntax(
2376            "HSET requires a value for every field".to_string(),
2377        ));
2378    }
2379    let v = vector.ok_or_else(|| {
2380        FtError::Syntax(format!(
2381            "HSET into indexed prefix is missing the vector field '{}'",
2382            table.schema.vector_field
2383        ))
2384    })?;
2385    table
2386        .engine
2387        .upsert(key.to_vec(), &v, metadata)
2388        .map_err(|e| FtError::Engine(e.to_string()))?;
2389    for (field, bytes) in text_writes {
2390        table.upsert_text_field(&field, key, &bytes);
2391    }
2392    table.record_indexed_key(key.to_vec());
2393    Ok(())
2394}
2395
2396// ---- RESP rendering -----------------------------------------------------
2397
2398/// Render an [`FtOutcome`] as RESP2 bytes.
2399#[must_use]
2400pub fn render_outcome(outcome: &FtOutcome) -> Vec<u8> {
2401    let mut out = Vec::new();
2402    match outcome {
2403        FtOutcome::Ok | FtOutcome::DropOk { .. } => {
2404            out.extend_from_slice(b"+OK\r\n");
2405        }
2406        FtOutcome::List(names) => {
2407            write_array_header(&mut out, names.len());
2408            for name in names {
2409                write_bulk(&mut out, name.as_bytes());
2410            }
2411        }
2412        FtOutcome::Info(pairs) => {
2413            write_array_header(&mut out, pairs.len() * 2);
2414            for (k, v) in pairs {
2415                write_bulk(&mut out, k.as_bytes());
2416                write_info_value(&mut out, v);
2417            }
2418        }
2419        FtOutcome::Search { total, hits } => {
2420            // Layout: [total, doc_id_1, [field_pairs_1...], ...].
2421            let total_i64 = i64::try_from(*total).unwrap_or(i64::MAX);
2422            write_array_header(&mut out, 1 + hits.len() * 2);
2423            write_integer(&mut out, total_i64);
2424            for hit in hits {
2425                write_bulk(&mut out, &hit.doc_id);
2426                write_array_header(&mut out, hit.fields.len() * 2);
2427                for (fk, fv) in &hit.fields {
2428                    write_bulk(&mut out, fk.as_bytes());
2429                    write_bulk(&mut out, fv);
2430                }
2431            }
2432        }
2433        FtOutcome::SearchNoContent { total, doc_ids } => {
2434            // NOCONTENT layout: [total, doc_id_1, doc_id_2, ...].
2435            let total_i64 = i64::try_from(*total).unwrap_or(i64::MAX);
2436            write_array_header(&mut out, 1 + doc_ids.len());
2437            write_integer(&mut out, total_i64);
2438            for id in doc_ids {
2439                write_bulk(&mut out, id);
2440            }
2441        }
2442        FtOutcome::Aggregate { total_groups, rows } => {
2443            // FT.AGGREGATE layout (RediSearch shape):
2444            //   [total_groups, [k, v, k, v, ...], ...].
2445            let total_i64 = i64::try_from(*total_groups).unwrap_or(i64::MAX);
2446            write_array_header(&mut out, 1 + rows.len());
2447            write_integer(&mut out, total_i64);
2448            for row in rows {
2449                write_array_header(&mut out, row.len() * 2);
2450                for (name, value) in row {
2451                    write_bulk(&mut out, name.as_bytes());
2452                    write_bulk(&mut out, value);
2453                }
2454            }
2455        }
2456        FtOutcome::Explain(plan) => {
2457            write_bulk(&mut out, plan.as_bytes());
2458        }
2459    }
2460    out
2461}
2462
2463/// Render an [`FtError`] as a RESP2 simple-error line.
2464#[must_use]
2465pub fn render_error(err: &FtError) -> Vec<u8> {
2466    let mut buf = Vec::new();
2467    // Every variant currently maps to the generic `-ERR`
2468    // prefix; richer RESP error families (`-WRONGTYPE`,
2469    // `-NOINDEX`, ...) are tracked in the brief's
2470    // out-of-scope list.
2471    let _ = write!(buf, "-ERR {err}\r\n");
2472    buf
2473}
2474
2475fn write_array_header(out: &mut Vec<u8>, n: usize) {
2476    let _ = write!(out, "*{n}\r\n");
2477}
2478
2479fn write_bulk(out: &mut Vec<u8>, payload: &[u8]) {
2480    let _ = write!(out, "${}\r\n", payload.len());
2481    out.extend_from_slice(payload);
2482    out.extend_from_slice(b"\r\n");
2483}
2484
2485fn write_integer(out: &mut Vec<u8>, n: i64) {
2486    let _ = write!(out, ":{n}\r\n");
2487}
2488
2489fn write_info_value(out: &mut Vec<u8>, value: &InfoValue) {
2490    match value {
2491        InfoValue::String(s) => write_bulk(out, s.as_bytes()),
2492        InfoValue::Integer(n) => write_integer(out, *n),
2493        InfoValue::Array(items) => {
2494            write_array_header(out, items.len());
2495            for it in items {
2496                write_info_value(out, it);
2497            }
2498        }
2499    }
2500}
2501
2502// ---- helpers ------------------------------------------------------------
2503
2504fn ascii_upper(bytes: &[u8]) -> Vec<u8> {
2505    bytes.iter().map(u8::to_ascii_uppercase).collect()
2506}
2507
2508fn matches_keyword(tok: Option<&[u8]>, kw: &str) -> bool {
2509    tok.is_some_and(|t| t.eq_ignore_ascii_case(kw.as_bytes()))
2510}
2511
2512fn expect_keyword(tok: &[u8], kw: &str) -> Result<(), FtError> {
2513    if tok.eq_ignore_ascii_case(kw.as_bytes()) {
2514        Ok(())
2515    } else {
2516        Err(FtError::Syntax(format!(
2517            "expected {kw}, got {}",
2518            String::from_utf8_lossy(tok)
2519        )))
2520    }
2521}
2522
2523fn parse_unsigned(tok: &[u8], context: &str) -> Result<usize, FtError> {
2524    let s =
2525        std::str::from_utf8(tok).map_err(|_| FtError::Syntax(format!("{context}: not UTF-8")))?;
2526    s.parse::<usize>()
2527        .map_err(|_| FtError::Syntax(format!("{context}: not a non-negative integer ({s})")))
2528}
2529
2530fn utf8(tok: &[u8], context: &str) -> Result<String, FtError> {
2531    std::str::from_utf8(tok)
2532        .map(str::to_string)
2533        .map_err(|_| FtError::Syntax(format!("{context}: not UTF-8")))
2534}
2535
2536fn decode_le_f32(bytes: &[u8], expected_dim: usize) -> Result<Vec<f32>, FtError> {
2537    if !bytes.len().is_multiple_of(4) {
2538        return Err(FtError::Syntax(
2539            "vector payload length is not a multiple of 4 bytes".to_string(),
2540        ));
2541    }
2542    let dim = bytes.len() / 4;
2543    if dim != expected_dim {
2544        return Err(FtError::DimensionMismatch {
2545            index_dim: expected_dim,
2546            payload_dim: dim,
2547        });
2548    }
2549    let mut out = Vec::with_capacity(dim);
2550    for chunk in bytes.chunks_exact(4) {
2551        let arr = [chunk[0], chunk[1], chunk[2], chunk[3]];
2552        out.push(f32::from_le_bytes(arr));
2553    }
2554    Ok(out)
2555}
2556
2557fn format_float(f: f32) -> String {
2558    // RESP RediSearch traditionally renders scores in
2559    // shortest-round-trip form. `{:.6}` keeps the response
2560    // parseable as a float without trailing precision noise.
2561    format!("{f:.6}")
2562}
2563
2564/// Walk-cursor over `&[&[u8]]` argument lists. Used by the
2565/// hand-written FT.* parsers in this module.
2566struct TokenCursor<'a> {
2567    args: &'a [&'a [u8]],
2568    idx: usize,
2569}
2570
2571impl<'a> TokenCursor<'a> {
2572    fn new(args: &'a [&'a [u8]]) -> Self {
2573        Self { args, idx: 0 }
2574    }
2575
2576    fn peek(&self) -> Option<&'a [u8]> {
2577        self.args.get(self.idx).copied()
2578    }
2579
2580    fn advance(&mut self) {
2581        self.idx += 1;
2582    }
2583
2584    fn next_required(&mut self, msg: &str) -> Result<&'a [u8], FtError> {
2585        let tok = self
2586            .args
2587            .get(self.idx)
2588            .copied()
2589            .ok_or_else(|| FtError::Syntax(msg.to_string()))?;
2590        self.idx += 1;
2591        Ok(tok)
2592    }
2593
2594    fn next_string(&mut self, msg: &str) -> Result<String, FtError> {
2595        let tok = self.next_required(msg)?;
2596        utf8(tok, msg)
2597    }
2598}
2599
2600impl<'a> Iterator for TokenCursor<'a> {
2601    type Item = &'a [u8];
2602
2603    fn next(&mut self) -> Option<Self::Item> {
2604        let tok = self.args.get(self.idx).copied()?;
2605        self.idx += 1;
2606        Some(tok)
2607    }
2608}
2609
2610#[cfg(test)]
2611mod tests {
2612    use super::*;
2613
2614    fn slices<'a>(v: &'a [&'a [u8]]) -> Vec<&'a [u8]> {
2615        v.to_vec()
2616    }
2617
2618    #[test]
2619    fn ascii_upper_lowercases_to_uppercase() {
2620        assert_eq!(ascii_upper(b"ft.create"), b"FT.CREATE".to_vec());
2621    }
2622
2623    #[test]
2624    fn parse_create_minimal() {
2625        let v: Vec<&[u8]> = vec![
2626            b"FT.CREATE",
2627            b"idx",
2628            b"ON",
2629            b"HASH",
2630            b"PREFIX",
2631            b"1",
2632            b"docs:",
2633            b"SCHEMA",
2634            b"vec",
2635            b"VECTOR",
2636            b"HNSW",
2637            b"6",
2638            b"TYPE",
2639            b"FLOAT32",
2640            b"DIM",
2641            b"4",
2642            b"DISTANCE_METRIC",
2643            b"COSINE",
2644        ];
2645        let cmd = parse_command(&slices(&v)).expect("parse ok");
2646        let FtCommand::Create(req) = cmd else {
2647            panic!("expected create");
2648        };
2649        assert_eq!(req.name, "idx");
2650        assert_eq!(req.doc_type, DocType::Hash);
2651        assert_eq!(req.schema.dim, 4);
2652        assert_eq!(req.schema.vector_field, "vec");
2653        assert_eq!(req.schema.distance, DistanceMetric::Cosine);
2654        assert_eq!(req.schema.algorithm, IndexAlgorithm::Hnsw);
2655        assert_eq!(req.schema.prefixes, vec![b"docs:".to_vec()]);
2656    }
2657
2658    #[test]
2659    fn parse_knn_query_extracts_pieces() {
2660        let (k, field, param) = parse_knn_query(b"*=>[KNN 5 @vec $blob]").unwrap();
2661        assert_eq!(k, 5);
2662        assert_eq!(field, "vec");
2663        assert_eq!(param, "blob");
2664    }
2665
2666    #[test]
2667    fn parse_knn_query_rejects_filter() {
2668        let err = parse_knn_query(b"@title:foo=>[KNN 5 @vec $blob]").unwrap_err();
2669        assert!(matches!(err, FtError::Unsupported(_)));
2670    }
2671
2672    #[test]
2673    fn decode_le_f32_round_trips_a_short_vector() {
2674        let mut bytes = Vec::new();
2675        for v in [1.0_f32, -1.5, 2.25, 0.0] {
2676            bytes.extend_from_slice(&v.to_le_bytes());
2677        }
2678        let out = decode_le_f32(&bytes, 4).unwrap();
2679        assert_eq!(out, vec![1.0_f32, -1.5, 2.25, 0.0]);
2680    }
2681
2682    #[test]
2683    fn decode_le_f32_rejects_dim_mismatch() {
2684        let bytes = vec![0u8; 8];
2685        let err = decode_le_f32(&bytes, 4).unwrap_err();
2686        assert!(matches!(err, FtError::DimensionMismatch { .. }));
2687    }
2688}