use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::io::Write;
use thiserror::Error;
use crate::proto::redis::ft_filter::{self, FilterExpr};
use crate::vector::registry::{VectorRegistry, VectorTable};
use crate::vector::schema::{
DistanceMetric, IndexAlgorithm, MetadataField, MetadataFieldType, VectorSchema, VectorType,
};
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum FtError {
#[error("unknown command: {0}")]
UnknownCommand(String),
#[error("syntax error: {0}")]
Syntax(String),
#[error("not supported in this build: {0}")]
Unsupported(String),
#[error("index not found: {0}")]
NotFound(String),
#[error("index already exists: {0}")]
AlreadyExists(String),
#[error("dimension mismatch: index={index_dim}, payload={payload_dim}")]
DimensionMismatch {
index_dim: usize,
payload_dim: usize,
},
#[error("engine: {0}")]
Engine(String),
}
#[derive(Clone, Debug, PartialEq)]
pub enum FtCommand {
Create(CreateRequest),
Search(SearchRequest),
SearchText(SearchTextRequest),
SearchFilter(SearchFilterRequest),
Aggregate(AggregateRequest),
Explain(ExplainRequest),
Alter(AlterRequest),
Regex(RegexRequest),
Info {
name: String,
},
List,
DropIndex {
name: String,
delete_documents: bool,
},
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum DocType {
Hash,
}
#[derive(Clone, Debug, PartialEq)]
pub struct CreateRequest {
pub name: String,
pub doc_type: DocType,
pub schema: VectorSchema,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum SortDirection {
Asc,
Desc,
}
#[derive(Clone, Debug, PartialEq)]
pub struct SearchRequest {
pub name: String,
pub k: usize,
pub vector_field: String,
pub vector_bytes: Vec<u8>,
pub filter: Option<FilterExpr>,
pub return_fields: Option<Vec<String>>,
pub limit: Option<(usize, usize)>,
pub sortby: Option<(String, SortDirection)>,
pub nocontent: bool,
}
#[derive(Clone, Debug, PartialEq)]
pub struct SearchFilterRequest {
pub name: String,
pub filter: FilterExpr,
pub return_fields: Option<Vec<String>>,
pub limit: Option<(usize, usize)>,
pub sortby: Option<(String, SortDirection)>,
pub nocontent: bool,
}
#[derive(Clone, Debug, PartialEq)]
pub struct SearchTextRequest {
pub name: String,
pub field: String,
pub query: Vec<u8>,
pub return_fields: Option<Vec<String>>,
pub limit: Option<(usize, usize)>,
pub sortby: Option<(String, SortDirection)>,
pub nocontent: bool,
}
#[derive(Clone, Debug, PartialEq)]
pub enum ReducerKind {
Count,
Sum {
field: String,
},
Avg {
field: String,
},
}
#[derive(Clone, Debug, PartialEq)]
pub struct ReducerSpec {
pub kind: ReducerKind,
pub alias: String,
}
#[derive(Clone, Debug, PartialEq)]
pub struct AggregateRequest {
pub name: String,
pub group_by: Vec<String>,
pub reducers: Vec<ReducerSpec>,
pub limit: Option<(usize, usize)>,
}
#[derive(Clone, Debug, PartialEq)]
pub struct ExplainRequest {
pub name: String,
pub query: Vec<u8>,
}
#[derive(Clone, Debug, PartialEq)]
pub struct AlterRequest {
pub name: String,
pub field: String,
pub field_type: MetadataFieldType,
}
#[derive(Clone, Debug, PartialEq)]
pub struct RegexRequest {
pub name: String,
pub field: String,
pub pattern: String,
pub max_errors: u16,
}
#[derive(Clone, Debug, PartialEq)]
pub enum FtOutcome {
Ok,
List(Vec<String>),
Info(Vec<(String, InfoValue)>),
Search {
total: usize,
hits: Vec<SearchHit>,
},
SearchNoContent {
total: usize,
doc_ids: Vec<Vec<u8>>,
},
Aggregate {
total_groups: usize,
rows: Vec<Vec<(String, Vec<u8>)>>,
},
Explain(String),
DropOk {
deleted_documents: bool,
document_count: usize,
},
}
#[derive(Clone, Debug, PartialEq)]
pub struct SearchHit {
pub doc_id: Vec<u8>,
pub score: f32,
pub fields: Vec<(String, Vec<u8>)>,
}
#[derive(Clone, Debug, PartialEq)]
pub enum InfoValue {
String(String),
Integer(i64),
Array(Vec<InfoValue>),
}
pub fn parse_command(args: &[&[u8]]) -> Result<FtCommand, FtError> {
let head = args
.first()
.ok_or_else(|| FtError::UnknownCommand(String::new()))?;
let cmd = ascii_upper(head);
let rest = &args[1..];
match cmd.as_slice() {
b"FT.CREATE" => parse_create(rest).map(FtCommand::Create),
b"FT.SEARCH" => parse_search(rest),
b"FT.AGGREGATE" => parse_aggregate(rest).map(FtCommand::Aggregate),
b"FT.EXPLAIN" => parse_explain(rest).map(FtCommand::Explain),
b"FT.ALTER" => parse_alter(rest).map(FtCommand::Alter),
b"FT.REGEX" => parse_regex(rest).map(FtCommand::Regex),
b"FT.INFO" => parse_info(rest),
b"FT.LIST" | b"FT._LIST" => parse_list(rest),
b"FT.DROPINDEX" => parse_dropindex(rest),
other => {
if other.starts_with(b"FT.") {
Err(FtError::Unsupported(
String::from_utf8_lossy(other).into_owned(),
))
} else {
Err(FtError::UnknownCommand(
String::from_utf8_lossy(other).into_owned(),
))
}
}
}
}
pub fn execute(registry: &VectorRegistry, cmd: FtCommand) -> Result<FtOutcome, FtError> {
match cmd {
FtCommand::Create(req) => execute_create(registry, req),
FtCommand::Search(req) => execute_search(registry, &req),
FtCommand::SearchText(req) => execute_search_text(registry, &req),
FtCommand::SearchFilter(req) => execute_search_filter(registry, &req),
FtCommand::Aggregate(req) => execute_aggregate(registry, &req),
FtCommand::Explain(req) => execute_explain(registry, &req),
FtCommand::Alter(req) => execute_alter(registry, &req),
FtCommand::Regex(req) => execute_regex(registry, &req),
FtCommand::Info { name } => execute_info(registry, name),
FtCommand::List => Ok(FtOutcome::List(registry.list())),
FtCommand::DropIndex {
name,
delete_documents,
} => execute_dropindex(registry, name, delete_documents),
}
}
#[must_use]
pub fn dispatch(registry: &VectorRegistry, args: &[&[u8]]) -> Vec<u8> {
match parse_command(args) {
Ok(cmd) => match execute(registry, cmd) {
Ok(outcome) => render_outcome(&outcome),
Err(err) => render_error(&err),
},
Err(err) => render_error(&err),
}
}
pub fn maybe_index_hset(
registry: &VectorRegistry,
args: &[&[u8]],
) -> Result<Option<String>, FtError> {
if args.is_empty() {
return Err(FtError::Syntax("HSET requires a key".to_string()));
}
let key = args[0];
let pairs = &args[1..];
if pairs.is_empty() || !pairs.len().is_multiple_of(2) {
return Err(FtError::Syntax(
"HSET requires field/value pairs".to_string(),
));
}
for name in registry.list() {
let Some(table) = registry.get(&name) else {
continue;
};
if table.schema.prefixes.iter().any(|p| key.starts_with(p)) {
insert_into_index(&table, key, pairs)?;
return Ok(Some(name));
}
}
Ok(None)
}
fn parse_create(rest: &[&[u8]]) -> Result<CreateRequest, FtError> {
let mut it = TokenCursor::new(rest);
let name = it.next_string("FT.CREATE: missing index name")?;
expect_keyword(it.next_required("FT.CREATE: expected ON")?, "ON")?;
let doc_type_tok = it.next_required("FT.CREATE: expected doc type")?;
let doc_type_up = ascii_upper(doc_type_tok);
let doc_type = match doc_type_up.as_slice() {
b"HASH" => DocType::Hash,
_ => {
return Err(FtError::Unsupported(format!(
"FT.CREATE doc type {}",
String::from_utf8_lossy(doc_type_tok)
)));
}
};
let mut prefixes: Vec<Vec<u8>> = Vec::new();
if matches_keyword(it.peek(), "PREFIX") {
it.advance();
let n_tok = it.next_required("FT.CREATE: PREFIX expects a count")?;
let n = parse_unsigned(n_tok, "FT.CREATE: PREFIX count")?;
for _ in 0..n {
let p = it.next_required("FT.CREATE: missing PREFIX value")?;
prefixes.push(p.to_vec());
}
}
if prefixes.is_empty() {
return Err(FtError::Syntax(
"FT.CREATE requires at least one PREFIX value".to_string(),
));
}
expect_keyword(it.next_required("FT.CREATE: expected SCHEMA")?, "SCHEMA")?;
let (vector_field, metadata_fields) = parse_create_schema_body(&mut it)?;
let (vec_name, vec_type, dim, distance, algorithm) = vector_field.ok_or_else(|| {
FtError::Syntax("FT.CREATE: SCHEMA must declare a VECTOR field".to_string())
})?;
let schema = VectorSchema {
vector_field: vec_name,
vector_type: vec_type,
dim,
distance,
algorithm,
prefixes,
metadata_fields,
};
Ok(CreateRequest {
name,
doc_type,
schema,
})
}
type CreateVectorClause = (String, VectorType, u16, DistanceMetric, IndexAlgorithm);
fn parse_create_schema_body(
it: &mut TokenCursor<'_>,
) -> Result<(Option<CreateVectorClause>, Vec<MetadataField>), FtError> {
let mut vector_field: Option<CreateVectorClause> = None;
let mut metadata_fields: Vec<MetadataField> = Vec::new();
while let Some(field_tok) = it.next() {
let field_name = utf8(field_tok, "FT.CREATE: field name")?;
let kind_tok = it.next_required("FT.CREATE: missing field kind")?;
let kind_up = ascii_upper(kind_tok);
match kind_up.as_slice() {
b"TEXT" => {
consume_field_modifiers(it);
metadata_fields.push(MetadataField {
name: field_name,
field_type: MetadataFieldType::Text,
tag_separator: None,
});
}
b"NUMERIC" => {
consume_field_modifiers(it);
metadata_fields.push(MetadataField {
name: field_name,
field_type: MetadataFieldType::Numeric,
tag_separator: None,
});
}
b"TAG" => {
let separator = parse_tag_modifiers(it, &field_name)?;
metadata_fields.push(MetadataField {
name: field_name,
field_type: MetadataFieldType::Tag,
tag_separator: separator,
});
}
b"GEO" => {
consume_field_modifiers(it);
metadata_fields.push(MetadataField {
name: field_name,
field_type: MetadataFieldType::Geo,
tag_separator: None,
});
}
b"VECTOR" => {
if vector_field.is_some() {
return Err(FtError::Unsupported(
"multiple VECTOR fields per index".to_string(),
));
}
let parsed = parse_vector_clause(it)?;
vector_field = Some((field_name, parsed.0, parsed.1, parsed.2, parsed.3));
}
other => {
return Err(FtError::Unsupported(format!(
"FT.CREATE field kind {}",
String::from_utf8_lossy(other)
)));
}
}
}
Ok((vector_field, metadata_fields))
}
fn consume_field_modifiers(it: &mut TokenCursor<'_>) {
while let Some(tok) = it.peek() {
let up = ascii_upper(tok);
match up.as_slice() {
b"SORTABLE" | b"NOINDEX" | b"UNF" | b"CASESENSITIVE" | b"NOSTEM" => {
it.advance();
}
b"WEIGHT" | b"PHONETIC" => {
it.advance();
if it.peek().is_some() {
it.advance();
}
}
_ => break,
}
}
}
fn parse_tag_modifiers(it: &mut TokenCursor<'_>, field_name: &str) -> Result<Option<u8>, FtError> {
let mut separator: Option<u8> = None;
while let Some(tok) = it.peek() {
let up = ascii_upper(tok);
match up.as_slice() {
b"SEPARATOR" => {
it.advance();
let sep_tok =
it.next_required("FT.CREATE: TAG SEPARATOR expects a single-character value")?;
if sep_tok.len() != 1 {
return Err(FtError::Syntax(format!(
"FT.CREATE: TAG SEPARATOR for field {field_name} must be a single ASCII byte",
)));
}
separator = Some(sep_tok[0]);
}
b"SORTABLE" | b"NOINDEX" | b"UNF" | b"CASESENSITIVE" => {
it.advance();
}
_ => break,
}
}
Ok(separator)
}
fn parse_vector_clause(
it: &mut TokenCursor<'_>,
) -> Result<(VectorType, u16, DistanceMetric, IndexAlgorithm), FtError> {
let alg_tok = it.next_required("FT.CREATE: VECTOR missing algorithm")?;
let alg_up = ascii_upper(alg_tok);
let algorithm = match alg_up.as_slice() {
b"HNSW" => IndexAlgorithm::Hnsw,
b"FLAT" => {
return Err(FtError::Unsupported(
"FT.CREATE: FLAT vector index not supported in this build".to_string(),
));
}
other => {
return Err(FtError::Unsupported(format!(
"FT.CREATE VECTOR algorithm {}",
String::from_utf8_lossy(other)
)));
}
};
let pair_count_tok = it.next_required("FT.CREATE: VECTOR missing parameter count")?;
let pair_count = parse_unsigned(pair_count_tok, "FT.CREATE VECTOR parameter count")?;
if !pair_count.is_multiple_of(2) {
return Err(FtError::Syntax(
"FT.CREATE VECTOR parameter count must be even".to_string(),
));
}
let mut vec_type: Option<VectorType> = None;
let mut dim: Option<u16> = None;
let mut distance: Option<DistanceMetric> = None;
let pair_pairs = pair_count / 2;
for _ in 0..pair_pairs {
let key_tok = it.next_required("FT.CREATE: VECTOR missing parameter key")?;
let val_tok = it.next_required("FT.CREATE: VECTOR missing parameter value")?;
let key_up = ascii_upper(key_tok);
let val_up = ascii_upper(val_tok);
match key_up.as_slice() {
b"TYPE" => match val_up.as_slice() {
b"FLOAT32" => vec_type = Some(VectorType::Float32),
b"FLOAT16" => {
return Err(FtError::Unsupported(
"FT.CREATE VECTOR TYPE FLOAT16 not supported in this build".to_string(),
));
}
other => {
return Err(FtError::Unsupported(format!(
"FT.CREATE VECTOR TYPE {}",
String::from_utf8_lossy(other)
)));
}
},
b"DIM" => {
let d = parse_unsigned(val_tok, "FT.CREATE VECTOR DIM")?;
if d == 0 || d > usize::from(u16::MAX) {
return Err(FtError::Syntax(
"FT.CREATE VECTOR DIM out of range".to_string(),
));
}
dim = Some(u16::try_from(d).expect("dim fits u16"));
}
b"DISTANCE_METRIC" => {
distance = Some(match val_up.as_slice() {
b"COSINE" => DistanceMetric::Cosine,
b"L2" | b"EUCLIDEAN" => DistanceMetric::L2,
b"IP" | b"DOTPRODUCT" | b"DOT_PRODUCT" => DistanceMetric::InnerProduct,
other => {
return Err(FtError::Unsupported(format!(
"FT.CREATE DISTANCE_METRIC {}",
String::from_utf8_lossy(other)
)));
}
});
}
_ => {}
}
}
let vec_type =
vec_type.ok_or_else(|| FtError::Syntax("FT.CREATE VECTOR missing TYPE".to_string()))?;
let dim = dim.ok_or_else(|| FtError::Syntax("FT.CREATE VECTOR missing DIM".to_string()))?;
let distance = distance
.ok_or_else(|| FtError::Syntax("FT.CREATE VECTOR missing DISTANCE_METRIC".to_string()))?;
Ok((vec_type, dim, distance, algorithm))
}
fn execute_create(registry: &VectorRegistry, req: CreateRequest) -> Result<FtOutcome, FtError> {
use crate::vector::registry::RegistryError;
match registry.create(req.name.clone(), req.schema) {
Ok(()) => Ok(FtOutcome::Ok),
Err(RegistryError::AlreadyExists(name)) => Err(FtError::AlreadyExists(name)),
Err(RegistryError::UnsupportedAlgorithm(_)) => Err(FtError::Unsupported(
"FT.CREATE: unsupported VECTOR algorithm".to_string(),
)),
Err(other) => Err(FtError::Engine(other.to_string())),
}
}
fn parse_search(rest: &[&[u8]]) -> Result<FtCommand, FtError> {
let mut it = TokenCursor::new(rest);
let name = it.next_string("FT.SEARCH: missing index name")?;
let query = it.next_required("FT.SEARCH: missing query expression")?;
let (filter_part, knn_part) = split_knn_suffix(query)?;
if let Some(knn_bytes) = knn_part {
let filter = parse_lhs_filter(filter_part)?;
let (k, vec_field, param_name) = parse_knn_clause(knn_bytes)?;
let mut params: HashMap<Vec<u8>, Vec<u8>> = HashMap::new();
let mut opts = SearchClauseOptions::default();
consume_search_trailing_clauses_with_params(&mut it, &mut params, &mut opts)?;
let vector_bytes = params
.remove(param_name.as_bytes())
.ok_or_else(|| FtError::Syntax(format!("FT.SEARCH: PARAMS missing ${param_name}")))?;
return Ok(FtCommand::Search(SearchRequest {
name,
k,
vector_field: vec_field,
vector_bytes,
filter,
return_fields: opts.return_fields,
limit: opts.limit,
sortby: opts.sortby,
nocontent: opts.nocontent,
}));
}
if let Some(parsed) = try_parse_simple_text_field_query(filter_part)? {
let (field, substring) = parsed;
let mut opts = SearchClauseOptions::default();
consume_search_trailing_clauses(&mut it, false, &mut opts)?;
return Ok(FtCommand::SearchText(SearchTextRequest {
name,
field,
query: substring,
return_fields: opts.return_fields,
limit: opts.limit,
sortby: opts.sortby,
nocontent: opts.nocontent,
}));
}
let filter = ft_filter::parse_expr(filter_part)?;
let mut opts = SearchClauseOptions::default();
consume_search_trailing_clauses(&mut it, false, &mut opts)?;
Ok(FtCommand::SearchFilter(SearchFilterRequest {
name,
filter,
return_fields: opts.return_fields,
limit: opts.limit,
sortby: opts.sortby,
nocontent: opts.nocontent,
}))
}
fn split_knn_suffix(query: &[u8]) -> Result<(&[u8], Option<&[u8]>), FtError> {
let trimmed = trim_ascii_bytes(query);
let Some(arrow) = find_byte_subseq(trimmed, b"=>") else {
return Ok((trimmed, None));
};
let lhs = trim_ascii_bytes(&trimmed[..arrow]);
let rhs = trim_ascii_bytes(&trimmed[arrow + 2..]);
if !rhs.starts_with(b"[") || !rhs.ends_with(b"]") {
return Err(FtError::Syntax(
"FT.SEARCH query: expected '[KNN ...]' after '=>'".to_string(),
));
}
let inner = &rhs[1..rhs.len() - 1];
Ok((lhs, Some(inner)))
}
fn parse_lhs_filter(lhs: &[u8]) -> Result<Option<FilterExpr>, FtError> {
let trimmed = trim_ascii_bytes(lhs);
if trimmed.is_empty() || trimmed == b"*" {
return Ok(None);
}
let expr = ft_filter::parse_expr(trimmed)?;
if matches!(expr, FilterExpr::All) {
Ok(None)
} else {
Ok(Some(expr))
}
}
fn trim_ascii_bytes(s: &[u8]) -> &[u8] {
let mut start = 0;
let mut end = s.len();
while start < end && s[start].is_ascii_whitespace() {
start += 1;
}
while end > start && s[end - 1].is_ascii_whitespace() {
end -= 1;
}
&s[start..end]
}
fn try_parse_simple_text_field_query(lhs: &[u8]) -> Result<Option<(String, Vec<u8>)>, FtError> {
let trimmed = trim_ascii_bytes(lhs);
if trimmed.is_empty() || trimmed[0] != b'@' {
return Ok(None);
}
for &b in trimmed {
if matches!(b, b'(' | b')' | b'[' | b']' | b'{' | b'}' | b'|' | b'"') {
return Ok(None);
}
if b.is_ascii_whitespace() {
return Ok(None);
}
}
let body = &trimmed[1..];
let colon = body
.iter()
.position(|&b| b == b':')
.ok_or_else(|| FtError::Syntax("FT.SEARCH text query: missing ':'".to_string()))?;
let field_bytes = &body[..colon];
let substring = &body[colon + 1..];
if field_bytes.is_empty() {
return Err(FtError::Syntax(
"FT.SEARCH text query: empty field name".to_string(),
));
}
if substring.is_empty() {
return Ok(None);
}
let field = std::str::from_utf8(field_bytes)
.map(str::to_string)
.map_err(|_| FtError::Syntax("FT.SEARCH text query: field is not UTF-8".to_string()))?;
Ok(Some((field, substring.to_vec())))
}
fn parse_knn_clause(body: &[u8]) -> Result<(usize, String, String), FtError> {
let s = std::str::from_utf8(body)
.map_err(|_| FtError::Syntax("FT.SEARCH KNN clause is not UTF-8".to_string()))?;
let mut parts = s.split_ascii_whitespace();
let knn_kw = parts
.next()
.ok_or_else(|| FtError::Syntax("FT.SEARCH query: empty KNN clause".to_string()))?;
if !knn_kw.eq_ignore_ascii_case("KNN") {
return Err(FtError::Unsupported(format!(
"FT.SEARCH query operator: {knn_kw}"
)));
}
let k_str = parts
.next()
.ok_or_else(|| FtError::Syntax("FT.SEARCH query: KNN expects k".to_string()))?;
let k: usize = k_str
.parse()
.map_err(|_| FtError::Syntax(format!("FT.SEARCH query: invalid k {k_str}")))?;
let field_tok = parts
.next()
.ok_or_else(|| FtError::Syntax("FT.SEARCH query: KNN expects @field".to_string()))?;
let field = field_tok
.strip_prefix('@')
.ok_or_else(|| FtError::Syntax("FT.SEARCH query: field must start with @".to_string()))?;
let param_tok = parts
.next()
.ok_or_else(|| FtError::Syntax("FT.SEARCH query: KNN expects $param".to_string()))?;
let param = param_tok
.strip_prefix('$')
.ok_or_else(|| FtError::Syntax("FT.SEARCH query: param must start with $".to_string()))?;
if parts.next().is_some() {
return Err(FtError::Unsupported(
"FT.SEARCH query: extra tokens after KNN expression".to_string(),
));
}
if k == 0 {
return Err(FtError::Syntax("FT.SEARCH KNN k must be > 0".to_string()));
}
Ok((k, field.to_string(), param.to_string()))
}
fn consume_search_trailing_clauses_with_params(
it: &mut TokenCursor<'_>,
params: &mut HashMap<Vec<u8>, Vec<u8>>,
opts: &mut SearchClauseOptions,
) -> Result<(), FtError> {
loop {
let Some(tok) = it.next() else { break };
let up = ascii_upper(tok);
match up.as_slice() {
b"PARAMS" => {
let n_tok = it.next_required("FT.SEARCH: PARAMS expects a count")?;
let n = parse_unsigned(n_tok, "FT.SEARCH PARAMS count")?;
if !n.is_multiple_of(2) {
return Err(FtError::Syntax(
"FT.SEARCH PARAMS count must be even".to_string(),
));
}
for _ in 0..(n / 2) {
let k_tok = it.next_required("FT.SEARCH: PARAMS expects key/value pair")?;
let v_tok = it.next_required("FT.SEARCH: PARAMS expects key/value pair")?;
params.insert(k_tok.to_vec(), v_tok.to_vec());
}
}
b"RETURN" => parse_return_clause(it, opts)?,
b"SORTBY" => parse_sortby_clause(it, opts)?,
b"LIMIT" => parse_limit_clause(it, opts)?,
b"NOCONTENT" => opts.nocontent = true,
b"DIALECT" => {
it.next_required("FT.SEARCH: DIALECT expects a value")?;
}
b"WITHSCORES" => {}
other => {
return Err(FtError::Unsupported(format!(
"FT.SEARCH clause {}",
String::from_utf8_lossy(other)
)));
}
}
}
Ok(())
}
#[derive(Default)]
struct SearchClauseOptions {
return_fields: Option<Vec<String>>,
limit: Option<(usize, usize)>,
sortby: Option<(String, SortDirection)>,
nocontent: bool,
}
fn parse_return_clause(
it: &mut TokenCursor<'_>,
opts: &mut SearchClauseOptions,
) -> Result<(), FtError> {
let n_tok = it.next_required("FT.SEARCH: RETURN expects a count")?;
let n = parse_unsigned(n_tok, "FT.SEARCH RETURN count")?;
let mut fields: Vec<String> = Vec::with_capacity(n);
for _ in 0..n {
let f_tok = it.next_required("FT.SEARCH: RETURN expects field name")?;
let trimmed: &[u8] = if f_tok.first() == Some(&b'@') {
&f_tok[1..]
} else {
f_tok
};
fields.push(utf8(trimmed, "FT.SEARCH RETURN field name")?);
}
opts.return_fields = Some(fields);
Ok(())
}
fn parse_limit_clause(
it: &mut TokenCursor<'_>,
opts: &mut SearchClauseOptions,
) -> Result<(), FtError> {
let off_tok = it.next_required("FT.SEARCH: LIMIT expects offset")?;
let cnt_tok = it.next_required("FT.SEARCH: LIMIT expects count")?;
let off = parse_unsigned(off_tok, "FT.SEARCH LIMIT offset")?;
let cnt = parse_unsigned(cnt_tok, "FT.SEARCH LIMIT count")?;
opts.limit = Some((off, cnt));
Ok(())
}
fn parse_sortby_clause(
it: &mut TokenCursor<'_>,
opts: &mut SearchClauseOptions,
) -> Result<(), FtError> {
let f_tok = it.next_required("FT.SEARCH: SORTBY expects @field")?;
let field_bytes: &[u8] = if f_tok.first() == Some(&b'@') {
&f_tok[1..]
} else {
f_tok
};
let field = utf8(field_bytes, "FT.SEARCH SORTBY field")?;
let direction = if let Some(next) = it.peek() {
let up = ascii_upper(next);
match up.as_slice() {
b"ASC" => {
it.advance();
SortDirection::Asc
}
b"DESC" => {
it.advance();
SortDirection::Desc
}
_ => SortDirection::Asc,
}
} else {
SortDirection::Asc
};
opts.sortby = Some((field, direction));
Ok(())
}
fn try_parse_text_field_query(query: &[u8]) -> Result<Option<(String, Vec<u8>)>, FtError> {
if query.is_empty() || query[0] != b'@' {
return Ok(None);
}
if find_byte_subseq(query, b"=>").is_some() {
return Err(FtError::Unsupported(format!(
"FT.SEARCH query: {}",
String::from_utf8_lossy(query)
)));
}
let body = &query[1..];
let colon = body
.iter()
.position(|&b| b == b':')
.ok_or_else(|| FtError::Syntax("FT.SEARCH text query: missing ':'".to_string()))?;
let field_bytes = &body[..colon];
let substring = &body[colon + 1..];
if field_bytes.is_empty() {
return Err(FtError::Syntax(
"FT.SEARCH text query: empty field name".to_string(),
));
}
let field = std::str::from_utf8(field_bytes)
.map(str::to_string)
.map_err(|_| FtError::Syntax("FT.SEARCH text query: field is not UTF-8".to_string()))?;
Ok(Some((field, substring.to_vec())))
}
fn find_byte_subseq(haystack: &[u8], needle: &[u8]) -> Option<usize> {
if needle.is_empty() || haystack.len() < needle.len() {
return None;
}
haystack
.windows(needle.len())
.position(|window| window == needle)
}
fn consume_search_trailing_clauses(
it: &mut TokenCursor<'_>,
allow_params: bool,
opts: &mut SearchClauseOptions,
) -> Result<(), FtError> {
loop {
let Some(tok) = it.next() else { break };
let up = ascii_upper(tok);
match up.as_slice() {
b"PARAMS" if allow_params => {
let n_tok = it.next_required("FT.SEARCH: PARAMS expects a count")?;
let n = parse_unsigned(n_tok, "FT.SEARCH PARAMS count")?;
if !n.is_multiple_of(2) {
return Err(FtError::Syntax(
"FT.SEARCH PARAMS count must be even".to_string(),
));
}
for _ in 0..n {
it.next_required("FT.SEARCH: PARAMS expects key/value pair")?;
}
}
b"RETURN" => parse_return_clause(it, opts)?,
b"SORTBY" => parse_sortby_clause(it, opts)?,
b"LIMIT" => parse_limit_clause(it, opts)?,
b"NOCONTENT" => opts.nocontent = true,
b"DIALECT" => {
it.next_required("FT.SEARCH: DIALECT expects a value")?;
}
b"WITHSCORES" => {}
other => {
return Err(FtError::Unsupported(format!(
"FT.SEARCH clause {}",
String::from_utf8_lossy(other)
)));
}
}
}
Ok(())
}
fn parse_knn_query(query: &[u8]) -> Result<(usize, String, String), FtError> {
let s = std::str::from_utf8(query)
.map_err(|_| FtError::Syntax("FT.SEARCH query is not UTF-8".to_string()))?;
let trimmed = s.trim();
let stripped = trimmed
.strip_prefix("*=>")
.ok_or_else(|| FtError::Unsupported(format!("FT.SEARCH query: {trimmed}")))?
.trim_start();
let inner = stripped
.strip_prefix('[')
.and_then(|s| s.strip_suffix(']'))
.ok_or_else(|| FtError::Syntax("FT.SEARCH query: missing brackets".to_string()))?;
let mut parts = inner.split_ascii_whitespace();
let knn_kw = parts
.next()
.ok_or_else(|| FtError::Syntax("FT.SEARCH query: empty".to_string()))?;
if !knn_kw.eq_ignore_ascii_case("KNN") {
return Err(FtError::Unsupported(format!(
"FT.SEARCH query operator: {knn_kw}"
)));
}
let k_str = parts
.next()
.ok_or_else(|| FtError::Syntax("FT.SEARCH query: KNN expects k".to_string()))?;
let k: usize = k_str
.parse()
.map_err(|_| FtError::Syntax(format!("FT.SEARCH query: invalid k {k_str}")))?;
let field_tok = parts
.next()
.ok_or_else(|| FtError::Syntax("FT.SEARCH query: KNN expects @field".to_string()))?;
let field = field_tok
.strip_prefix('@')
.ok_or_else(|| FtError::Syntax("FT.SEARCH query: field must start with @".to_string()))?;
let param_tok = parts
.next()
.ok_or_else(|| FtError::Syntax("FT.SEARCH query: KNN expects $param".to_string()))?;
let param = param_tok
.strip_prefix('$')
.ok_or_else(|| FtError::Syntax("FT.SEARCH query: param must start with $".to_string()))?;
if parts.next().is_some() {
return Err(FtError::Unsupported(
"FT.SEARCH query: extra tokens after KNN expression".to_string(),
));
}
if k == 0 {
return Err(FtError::Syntax("FT.SEARCH KNN k must be > 0".to_string()));
}
Ok((k, field.to_string(), param.to_string()))
}
fn execute_search(registry: &VectorRegistry, req: &SearchRequest) -> Result<FtOutcome, FtError> {
let table = registry
.get(&req.name)
.ok_or_else(|| FtError::NotFound(req.name.clone()))?;
if table.schema.vector_field != req.vector_field {
return Err(FtError::Syntax(format!(
"FT.SEARCH: query references @{} but index vector field is {}",
req.vector_field, table.schema.vector_field
)));
}
let dim = usize::from(table.schema.dim);
let query = decode_le_f32(&req.vector_bytes, dim)?;
let allowed: Option<BTreeSet<Vec<u8>>> = if let Some(filter) = &req.filter {
let universe: BTreeSet<Vec<u8>> = table.indexed_keys().into_iter().collect();
let matched = ft_filter::evaluate(filter, &table, &universe)?;
Some(matched)
} else {
None
};
let oversample_k = match allowed.as_ref() {
None => req.k,
Some(set) => {
set.len().max(req.k)
}
};
let raw = if oversample_k == 0 {
Vec::new()
} else {
table
.engine
.search(&query, oversample_k, None)
.map_err(|e| FtError::Engine(e.to_string()))?
};
let mut out = Vec::new();
for (row, score) in raw {
if let Some(allowed) = &allowed {
if !allowed.contains(&row.key) {
continue;
}
}
let mut fields: Vec<(String, Vec<u8>)> = Vec::new();
fields.push(("__vec_score".to_string(), format_float(score).into_bytes()));
for (k, v) in &row.metadata {
let value_bytes = match v {
serde_json::Value::String(s) => s.clone().into_bytes(),
other => other.to_string().into_bytes(),
};
fields.push((k.clone(), value_bytes));
}
out.push(SearchHit {
doc_id: row.key,
score,
fields,
});
if out.len() >= req.k {
break;
}
}
Ok(finalize_search_outcome(
out,
req.sortby.as_ref(),
req.limit,
req.return_fields.as_deref(),
req.nocontent,
))
}
fn execute_search_filter(
registry: &VectorRegistry,
req: &SearchFilterRequest,
) -> Result<FtOutcome, FtError> {
let table = registry
.get(&req.name)
.ok_or_else(|| FtError::NotFound(req.name.clone()))?;
let universe: BTreeSet<Vec<u8>> = table.indexed_keys().into_iter().collect();
let matched = ft_filter::evaluate(&req.filter, &table, &universe)?;
let mut out: Vec<SearchHit> = Vec::with_capacity(matched.len());
for key in matched {
let row = match table.engine.get(&key) {
Ok(Some(r)) => r,
Ok(None) => continue,
Err(e) => return Err(FtError::Engine(e.to_string())),
};
let mut fields: Vec<(String, Vec<u8>)> = Vec::new();
for (k, v) in &row.metadata {
let value_bytes = match v {
serde_json::Value::String(s) => s.clone().into_bytes(),
other => other.to_string().into_bytes(),
};
fields.push((k.clone(), value_bytes));
}
out.push(SearchHit {
doc_id: row.key,
score: 0.0,
fields,
});
}
Ok(finalize_search_outcome(
out,
req.sortby.as_ref(),
req.limit,
req.return_fields.as_deref(),
req.nocontent,
))
}
fn execute_search_text(
registry: &VectorRegistry,
req: &SearchTextRequest,
) -> Result<FtOutcome, FtError> {
let table = registry
.get(&req.name)
.ok_or_else(|| FtError::NotFound(req.name.clone()))?;
if !table.has_text_field(&req.field) {
return Err(FtError::Syntax(format!(
"FT.SEARCH: index {} has no TEXT field {}",
req.name, req.field
)));
}
let raw_hits = table
.search_text_substring(&req.field, &req.query)
.ok_or_else(|| {
FtError::Engine(format!(
"text index for field {} not provisioned",
req.field
))
})?;
let mut out = Vec::with_capacity(raw_hits.len());
for (key, text) in raw_hits {
let mut fields: Vec<(String, Vec<u8>)> = vec![(req.field.clone(), text)];
if let Some((sort_field, _)) = &req.sortby {
if sort_field != &req.field {
if let Ok(Some(row)) = table.engine.get(&key) {
if let Some(v) = row.metadata.get(sort_field) {
let bytes = match v {
serde_json::Value::String(s) => s.clone().into_bytes(),
other => other.to_string().into_bytes(),
};
fields.push((sort_field.clone(), bytes));
}
}
}
}
out.push(SearchHit {
doc_id: key,
score: 0.0,
fields,
});
}
Ok(finalize_search_outcome(
out,
req.sortby.as_ref(),
req.limit,
req.return_fields.as_deref(),
req.nocontent,
))
}
fn finalize_search_outcome(
mut hits: Vec<SearchHit>,
sortby: Option<&(String, SortDirection)>,
limit: Option<(usize, usize)>,
return_fields: Option<&[String]>,
nocontent: bool,
) -> FtOutcome {
if let Some((field, dir)) = sortby {
sort_hits_by_field(&mut hits, field, *dir);
}
if let Some((offset, count)) = limit {
if offset >= hits.len() {
hits.clear();
} else {
let end = offset.saturating_add(count).min(hits.len());
hits = hits.drain(offset..end).collect();
}
}
if let Some(fields) = return_fields {
for hit in &mut hits {
hit.fields
.retain(|(name, _)| fields.iter().any(|f| f == name));
}
}
let total = hits.len();
if nocontent {
let doc_ids = hits.into_iter().map(|h| h.doc_id).collect();
FtOutcome::SearchNoContent { total, doc_ids }
} else {
FtOutcome::Search { total, hits }
}
}
fn sort_hits_by_field(hits: &mut [SearchHit], field: &str, dir: SortDirection) {
hits.sort_by(|a, b| {
let av = lookup_field(a, field);
let bv = lookup_field(b, field);
match (av, bv) {
(Some(a_bytes), Some(b_bytes)) => {
match (parse_sort_key(a_bytes), parse_sort_key(b_bytes)) {
(Some(a_num), Some(b_num)) => match dir {
SortDirection::Asc => a_num
.partial_cmp(&b_num)
.unwrap_or(std::cmp::Ordering::Equal),
SortDirection::Desc => b_num
.partial_cmp(&a_num)
.unwrap_or(std::cmp::Ordering::Equal),
},
_ => match dir {
SortDirection::Asc => a_bytes.cmp(b_bytes),
SortDirection::Desc => b_bytes.cmp(a_bytes),
},
}
}
(Some(_), None) => std::cmp::Ordering::Less,
(None, Some(_)) => std::cmp::Ordering::Greater,
(None, None) => std::cmp::Ordering::Equal,
}
});
}
fn lookup_field<'a>(hit: &'a SearchHit, name: &str) -> Option<&'a [u8]> {
hit.fields
.iter()
.find(|(n, _)| n == name)
.map(|(_, v)| v.as_slice())
}
fn parse_sort_key(bytes: &[u8]) -> Option<f64> {
let s = std::str::from_utf8(bytes).ok()?;
let parsed: f64 = s.trim().parse().ok()?;
if parsed.is_finite() {
Some(parsed)
} else {
None
}
}
fn parse_aggregate(rest: &[&[u8]]) -> Result<AggregateRequest, FtError> {
let mut it = TokenCursor::new(rest);
let name = it.next_string("FT.AGGREGATE: missing index name")?;
let _query = it.next_required("FT.AGGREGATE: missing query expression")?;
let mut group_by: Vec<String> = Vec::new();
let mut reducers: Vec<ReducerSpec> = Vec::new();
let mut limit: Option<(usize, usize)> = None;
let mut saw_groupby = false;
while let Some(tok) = it.next() {
let up = ascii_upper(tok);
match up.as_slice() {
b"GROUPBY" => {
saw_groupby = true;
parse_aggregate_groupby(&mut it, &mut group_by)?;
}
b"REDUCE" => {
reducers.push(parse_aggregate_reduce(&mut it)?);
}
b"LIMIT" => {
let off_tok = it.next_required("FT.AGGREGATE: LIMIT expects offset")?;
let cnt_tok = it.next_required("FT.AGGREGATE: LIMIT expects count")?;
let off = parse_unsigned(off_tok, "FT.AGGREGATE LIMIT offset")?;
let cnt = parse_unsigned(cnt_tok, "FT.AGGREGATE LIMIT count")?;
limit = Some((off, cnt));
}
other => {
return Err(FtError::Unsupported(format!(
"FT.AGGREGATE clause {}",
String::from_utf8_lossy(other)
)));
}
}
}
if !saw_groupby {
return Err(FtError::Unsupported(
"FT.AGGREGATE without GROUPBY".to_string(),
));
}
if reducers.is_empty() {
return Err(FtError::Syntax(
"FT.AGGREGATE: GROUPBY requires at least one REDUCE".to_string(),
));
}
Ok(AggregateRequest {
name,
group_by,
reducers,
limit,
})
}
fn parse_aggregate_groupby(
it: &mut TokenCursor<'_>,
group_by: &mut Vec<String>,
) -> Result<(), FtError> {
let n_tok = it.next_required("FT.AGGREGATE: GROUPBY expects a count")?;
let n = parse_unsigned(n_tok, "FT.AGGREGATE GROUPBY count")?;
if n == 0 {
return Err(FtError::Syntax(
"FT.AGGREGATE GROUPBY count must be > 0".to_string(),
));
}
for _ in 0..n {
let f_tok = it.next_required("FT.AGGREGATE: GROUPBY expects @field")?;
let bytes: &[u8] = if f_tok.first() == Some(&b'@') {
&f_tok[1..]
} else {
f_tok
};
group_by.push(utf8(bytes, "FT.AGGREGATE GROUPBY field")?);
}
Ok(())
}
fn parse_aggregate_reduce(it: &mut TokenCursor<'_>) -> Result<ReducerSpec, FtError> {
let kind_tok = it.next_required("FT.AGGREGATE: REDUCE expects a kind")?;
let kind_up = ascii_upper(kind_tok);
let arg_count_tok = it.next_required("FT.AGGREGATE: REDUCE expects an argument count")?;
let arg_count = parse_unsigned(arg_count_tok, "FT.AGGREGATE REDUCE arg count")?;
let kind = match kind_up.as_slice() {
b"COUNT" => {
if arg_count != 0 {
return Err(FtError::Syntax(
"FT.AGGREGATE REDUCE COUNT expects 0 args".to_string(),
));
}
ReducerKind::Count
}
b"SUM" => {
if arg_count != 1 {
return Err(FtError::Syntax(
"FT.AGGREGATE REDUCE SUM expects 1 arg".to_string(),
));
}
ReducerKind::Sum {
field: take_field_arg(it, "FT.AGGREGATE: SUM expects @field")?,
}
}
b"AVG" => {
if arg_count != 1 {
return Err(FtError::Syntax(
"FT.AGGREGATE REDUCE AVG expects 1 arg".to_string(),
));
}
ReducerKind::Avg {
field: take_field_arg(it, "FT.AGGREGATE: AVG expects @field")?,
}
}
other => {
for _ in 0..arg_count {
let _ = it.next();
}
return Err(FtError::Unsupported(format!(
"FT.AGGREGATE REDUCE {}",
String::from_utf8_lossy(other)
)));
}
};
let as_tok = it.next_required("FT.AGGREGATE: REDUCE expects AS <name>")?;
if !as_tok.eq_ignore_ascii_case(b"AS") {
return Err(FtError::Syntax(
"FT.AGGREGATE REDUCE clause missing AS".to_string(),
));
}
let alias = it.next_string("FT.AGGREGATE: REDUCE AS expects a name")?;
Ok(ReducerSpec { kind, alias })
}
fn take_field_arg(it: &mut TokenCursor<'_>, msg: &str) -> Result<String, FtError> {
let tok = it.next_required(msg)?;
let bytes: &[u8] = if tok.first() == Some(&b'@') {
&tok[1..]
} else {
tok
};
utf8(bytes, msg)
}
fn parse_explain(rest: &[&[u8]]) -> Result<ExplainRequest, FtError> {
let mut it = TokenCursor::new(rest);
let name = it.next_string("FT.EXPLAIN: missing index name")?;
let query_tok = it.next_required("FT.EXPLAIN: missing query expression")?;
while let Some(tok) = it.next() {
let up = ascii_upper(tok);
match up.as_slice() {
b"DIALECT" => {
it.next_required("FT.EXPLAIN: DIALECT expects a value")?;
}
other => {
return Err(FtError::Unsupported(format!(
"FT.EXPLAIN clause {}",
String::from_utf8_lossy(other)
)));
}
}
}
Ok(ExplainRequest {
name,
query: query_tok.to_vec(),
})
}
fn parse_alter(rest: &[&[u8]]) -> Result<AlterRequest, FtError> {
let mut it = TokenCursor::new(rest);
let name = it.next_string("FT.ALTER: missing index name")?;
let op_tok = it.next_required("FT.ALTER: expected ADD")?;
let op_up = ascii_upper(op_tok);
match op_up.as_slice() {
b"ADD" => {}
b"DROP" => {
return Err(FtError::Unsupported("FT.ALTER DROP".to_string()));
}
b"SCHEMA" => {
return Err(FtError::Unsupported("FT.ALTER SCHEMA".to_string()));
}
other => {
return Err(FtError::Syntax(format!(
"FT.ALTER: expected ADD, got {}",
String::from_utf8_lossy(other)
)));
}
}
let field = it.next_string("FT.ALTER ADD: missing field name")?;
let type_tok = it.next_required("FT.ALTER ADD: missing field type")?;
let type_up = ascii_upper(type_tok);
let field_type = match type_up.as_slice() {
b"TEXT" => MetadataFieldType::Text,
b"TAG" => MetadataFieldType::Tag,
b"VECTOR" => {
return Err(FtError::Unsupported(
"FT.ALTER ADD VECTOR (rebuild required)".to_string(),
));
}
b"NUMERIC" | b"GEO" => {
return Err(FtError::Unsupported(format!(
"FT.ALTER ADD {}",
String::from_utf8_lossy(type_tok)
)));
}
other => {
return Err(FtError::Syntax(format!(
"FT.ALTER ADD: unknown type {}",
String::from_utf8_lossy(other)
)));
}
};
if it.peek().is_some() {
return Err(FtError::Syntax(
"FT.ALTER ADD: unexpected trailing tokens".to_string(),
));
}
Ok(AlterRequest {
name,
field,
field_type,
})
}
fn execute_aggregate(
registry: &VectorRegistry,
req: &AggregateRequest,
) -> Result<FtOutcome, FtError> {
let table = registry
.get(&req.name)
.ok_or_else(|| FtError::NotFound(req.name.clone()))?;
let mut groups: BTreeMap<Vec<u8>, GroupAccumulator> = BTreeMap::new();
for key in table.indexed_keys() {
let row = match table.engine.get(&key) {
Ok(Some(r)) => r,
Ok(None) => continue,
Err(e) => return Err(FtError::Engine(e.to_string())),
};
let group_key = build_group_key(&req.group_by, &row.metadata);
let entry = groups.entry(group_key).or_insert_with(|| {
GroupAccumulator::new(
req.group_by
.iter()
.map(|f| {
(
f.clone(),
metadata_string(row.metadata.get(f)).unwrap_or_default(),
)
})
.collect(),
req.reducers.len(),
)
});
entry.observe(&req.reducers, &row.metadata);
}
let mut rows: Vec<Vec<(String, Vec<u8>)>> = Vec::with_capacity(groups.len());
for (_, mut group) in groups {
let mut row: Vec<(String, Vec<u8>)> = std::mem::take(&mut group.fields);
for (i, reducer) in req.reducers.iter().enumerate() {
let value = group.render_reducer(i, reducer);
row.push((reducer.alias.clone(), value));
}
rows.push(row);
}
if let Some((offset, count)) = req.limit {
if offset >= rows.len() {
rows.clear();
} else {
let end = offset.saturating_add(count).min(rows.len());
rows = rows.drain(offset..end).collect();
}
}
let total_groups = rows.len();
Ok(FtOutcome::Aggregate { total_groups, rows })
}
struct GroupAccumulator {
fields: Vec<(String, Vec<u8>)>,
slots: Vec<ReducerAccum>,
}
#[derive(Default)]
struct ReducerAccum {
count: u64,
sum: f64,
saw_numeric: bool,
}
impl GroupAccumulator {
fn new(fields: Vec<(String, Vec<u8>)>, n_reducers: usize) -> Self {
let mut slots = Vec::with_capacity(n_reducers);
for _ in 0..n_reducers {
slots.push(ReducerAccum::default());
}
Self { fields, slots }
}
fn observe(&mut self, reducers: &[ReducerSpec], metadata: &HashMap<String, serde_json::Value>) {
for (i, reducer) in reducers.iter().enumerate() {
let slot = &mut self.slots[i];
slot.count = slot.count.saturating_add(1);
match &reducer.kind {
ReducerKind::Count => {}
ReducerKind::Sum { field } | ReducerKind::Avg { field } => {
if let Some(v) = metadata.get(field) {
if let Some(n) = metadata_number(v) {
slot.sum += n;
slot.saw_numeric = true;
}
}
}
}
}
}
fn render_reducer(&self, i: usize, reducer: &ReducerSpec) -> Vec<u8> {
let slot = &self.slots[i];
match &reducer.kind {
ReducerKind::Count => slot.count.to_string().into_bytes(),
ReducerKind::Sum { .. } => {
if slot.saw_numeric {
format_float_f64(slot.sum).into_bytes()
} else {
b"0".to_vec()
}
}
ReducerKind::Avg { .. } => {
if slot.saw_numeric && slot.count > 0 {
let denom = u32::try_from(slot.count).unwrap_or(u32::MAX);
let mean = slot.sum / f64::from(denom);
format_float_f64(mean).into_bytes()
} else {
b"0".to_vec()
}
}
}
}
}
fn build_group_key(group_by: &[String], metadata: &HashMap<String, serde_json::Value>) -> Vec<u8> {
let mut key: Vec<u8> = Vec::new();
for field in group_by {
let v = metadata_string(metadata.get(field)).unwrap_or_default();
if !key.is_empty() {
key.push(0x1f);
}
key.extend_from_slice(&v);
}
key
}
fn metadata_string(value: Option<&serde_json::Value>) -> Option<Vec<u8>> {
match value? {
serde_json::Value::String(s) => Some(s.clone().into_bytes()),
other => Some(other.to_string().into_bytes()),
}
}
fn metadata_number(value: &serde_json::Value) -> Option<f64> {
match value {
serde_json::Value::Number(n) => n.as_f64(),
serde_json::Value::String(s) => s.trim().parse::<f64>().ok(),
_ => None,
}
}
fn execute_explain(registry: &VectorRegistry, req: &ExplainRequest) -> Result<FtOutcome, FtError> {
let table = registry
.get(&req.name)
.ok_or_else(|| FtError::NotFound(req.name.clone()))?;
let plan = if let Some((field, substring)) = try_parse_text_field_query(&req.query)? {
let trigrams = trigram_preview(&substring);
format!(
"@{field}: SUBSTRING\n field: {field}\n query: {query}\n index: trigram+bloom\n trigrams: {trigrams}\n",
field = field,
query = String::from_utf8_lossy(&substring),
trigrams = trigrams,
)
} else if let Ok((k, vec_field, param_name)) = parse_knn_query(&req.query) {
let dim = table.schema.dim;
let metric = format!("{:?}", table.schema.distance).to_uppercase();
let alg = format!("{:?}", table.schema.algorithm).to_uppercase();
format!(
"VECTOR KNN\n index: {idx}\n algorithm: {alg}\n metric: {metric}\n field: {field}\n k: {k}\n dim: {dim}\n param: ${param}\n",
idx = req.name,
alg = alg,
metric = metric,
field = vec_field,
k = k,
dim = dim,
param = param_name,
)
} else {
format!(
"UNKNOWN QUERY\n index: {idx}\n query: {q}\n note: only @field:substring and *=>[KNN ...] are planned in this build\n",
idx = req.name,
q = String::from_utf8_lossy(&req.query),
)
};
Ok(FtOutcome::Explain(plan))
}
fn trigram_preview(query: &[u8]) -> String {
if query.len() < 3 {
return "<short-circuit: scan>".to_string();
}
let mut parts: Vec<String> = Vec::new();
for window in query.windows(3).take(8) {
parts.push(format!("\"{}\"", String::from_utf8_lossy(window)));
}
let suffix = if query.len() > 8 + 3 - 1 { ", ..." } else { "" };
format!("[{}{}]", parts.join(", "), suffix)
}
fn execute_alter(registry: &VectorRegistry, req: &AlterRequest) -> Result<FtOutcome, FtError> {
let table = registry
.get(&req.name)
.ok_or_else(|| FtError::NotFound(req.name.clone()))?;
match req.field_type {
MetadataFieldType::Text => {
let _new = table.add_text_field(&req.field);
Ok(FtOutcome::Ok)
}
MetadataFieldType::Tag => {
Ok(FtOutcome::Ok)
}
MetadataFieldType::Numeric | MetadataFieldType::Geo => {
Err(FtError::Unsupported(format!(
"FT.ALTER ADD type {:?}",
req.field_type
)))
}
}
}
fn format_float_f64(f: f64) -> String {
format!("{f:.6}")
}
fn parse_regex(rest: &[&[u8]]) -> Result<RegexRequest, FtError> {
let mut it = TokenCursor::new(rest);
let name = it.next_string("FT.REGEX: missing index name")?;
let field = it.next_string("FT.REGEX: missing field name")?;
let pattern_tok = it.next_required("FT.REGEX: missing pattern")?;
let pattern = std::str::from_utf8(pattern_tok)
.map(str::to_string)
.map_err(|_| FtError::Syntax("FT.REGEX: pattern is not UTF-8".to_string()))?;
let mut max_errors: u16 = 0;
for tok in &mut it {
let up = ascii_upper(tok);
if let Some(rest) = up.strip_prefix(b"K=") {
let s = std::str::from_utf8(rest)
.map_err(|_| FtError::Syntax("FT.REGEX: K= value is not UTF-8".to_string()))?;
let n: u32 = s
.parse()
.map_err(|_| FtError::Syntax(format!("FT.REGEX: invalid K= value {s}")))?;
max_errors = u16::try_from(n)
.map_err(|_| FtError::Syntax(format!("FT.REGEX: K= value {n} exceeds u16")))?;
} else {
return Err(FtError::Unsupported(format!(
"FT.REGEX option {}",
String::from_utf8_lossy(tok)
)));
}
}
Ok(RegexRequest {
name,
field,
pattern,
max_errors,
})
}
fn execute_regex(registry: &VectorRegistry, req: &RegexRequest) -> Result<FtOutcome, FtError> {
let table = registry
.get(&req.name)
.ok_or_else(|| FtError::NotFound(req.name.clone()))?;
if !table.has_text_field(&req.field) {
return Err(FtError::Syntax(format!(
"FT.REGEX: index {} has no TEXT field {}",
req.name, req.field
)));
}
let raw_hits = if req.max_errors == 0 {
table
.search_text_regex(&req.field, &req.pattern)
.ok_or_else(|| {
FtError::Engine(format!(
"text index for field {} not provisioned",
req.field
))
})?
.map_err(|e| FtError::Syntax(format!("FT.REGEX: invalid pattern: {e}")))?
} else {
table
.search_text_regex_approx(&req.field, &req.pattern, req.max_errors)
.ok_or_else(|| {
FtError::Engine(format!(
"text index for field {} not provisioned",
req.field
))
})?
.map_err(|e| FtError::Engine(format!("FT.REGEX (approximate): {e}")))?
};
let mut out = Vec::with_capacity(raw_hits.len());
for (key, text) in raw_hits {
out.push(SearchHit {
doc_id: key,
score: 0.0,
fields: vec![(req.field.clone(), text)],
});
}
let total = out.len();
Ok(FtOutcome::Search { total, hits: out })
}
fn parse_info(rest: &[&[u8]]) -> Result<FtCommand, FtError> {
let mut it = TokenCursor::new(rest);
let name = it.next_string("FT.INFO: missing index name")?;
if it.peek().is_some() {
return Err(FtError::Syntax(
"FT.INFO: unexpected trailing tokens".to_string(),
));
}
Ok(FtCommand::Info { name })
}
fn parse_list(rest: &[&[u8]]) -> Result<FtCommand, FtError> {
if !rest.is_empty() {
return Err(FtError::Syntax("FT.LIST: takes no arguments".to_string()));
}
Ok(FtCommand::List)
}
fn parse_dropindex(rest: &[&[u8]]) -> Result<FtCommand, FtError> {
let mut it = TokenCursor::new(rest);
let name = it.next_string("FT.DROPINDEX: missing index name")?;
let mut delete_documents = false;
loop {
let Some(tok) = it.next() else { break };
let up = ascii_upper(tok);
match up.as_slice() {
b"DD" => delete_documents = true,
other => {
return Err(FtError::Unsupported(format!(
"FT.DROPINDEX option {}",
String::from_utf8_lossy(other)
)));
}
}
}
Ok(FtCommand::DropIndex {
name,
delete_documents,
})
}
fn execute_info(registry: &VectorRegistry, name: String) -> Result<FtOutcome, FtError> {
let info = registry
.info(&name)
.ok_or_else(|| FtError::NotFound(name.clone()))?;
let table = registry.get(&name).ok_or(FtError::NotFound(name))?;
let mut out: Vec<(String, InfoValue)> = Vec::new();
out.push(("index_name".to_string(), InfoValue::String(info.name)));
out.push((
"algorithm".to_string(),
InfoValue::String(format!("{:?}", info.algorithm).to_uppercase()),
));
out.push((
"distance_metric".to_string(),
InfoValue::String(format!("{:?}", info.distance).to_uppercase()),
));
out.push((
"vector_field".to_string(),
InfoValue::String(table.schema.vector_field.clone()),
));
out.push((
"vector_type".to_string(),
InfoValue::String(format!("{:?}", table.schema.vector_type).to_uppercase()),
));
out.push(("dim".to_string(), InfoValue::Integer(i64::from(info.dim))));
let prefixes_value = InfoValue::Array(
table
.schema
.prefixes
.iter()
.map(|p| InfoValue::String(String::from_utf8_lossy(p).into_owned()))
.collect(),
);
out.push(("prefixes".to_string(), prefixes_value));
let metadata_value = InfoValue::Array(
table
.schema
.metadata_fields
.iter()
.map(|f| {
InfoValue::Array(vec![
InfoValue::String(f.name.clone()),
InfoValue::String(format!("{:?}", f.field_type).to_uppercase()),
])
})
.collect(),
);
out.push(("schema_fields".to_string(), metadata_value));
out.push((
"num_docs".to_string(),
InfoValue::Integer(i64::try_from(info.live_rows).unwrap_or(i64::MAX)),
));
out.push((
"tracked_rows".to_string(),
InfoValue::Integer(i64::try_from(info.tracked_rows).unwrap_or(i64::MAX)),
));
Ok(FtOutcome::Info(out))
}
fn execute_dropindex(
registry: &VectorRegistry,
name: String,
delete_documents: bool,
) -> Result<FtOutcome, FtError> {
use crate::vector::registry::RegistryError;
if delete_documents {
match registry.drop_with_dd(&name) {
Ok(keys) => Ok(FtOutcome::DropOk {
deleted_documents: true,
document_count: keys.len(),
}),
Err(RegistryError::NotFound(_)) => Err(FtError::NotFound(name)),
Err(other) => Err(FtError::Engine(other.to_string())),
}
} else {
match registry.drop(&name) {
Ok(_) => Ok(FtOutcome::DropOk {
deleted_documents: false,
document_count: 0,
}),
Err(RegistryError::NotFound(_)) => Err(FtError::NotFound(name)),
Err(other) => Err(FtError::Engine(other.to_string())),
}
}
}
fn insert_into_index(table: &VectorTable, key: &[u8], pairs: &[&[u8]]) -> Result<(), FtError> {
let mut vector: Option<Vec<f32>> = None;
let mut metadata: HashMap<String, serde_json::Value> = HashMap::new();
let mut text_writes: Vec<(String, Vec<u8>)> = Vec::new();
let mut chunks = pairs.chunks_exact(2);
for chunk in &mut chunks {
let field = chunk[0];
let value = chunk[1];
let field_str = std::str::from_utf8(field)
.map_err(|_| FtError::Syntax("HSET field name is not UTF-8".to_string()))?;
if field_str == table.schema.vector_field {
let dim = usize::from(table.schema.dim);
vector = Some(decode_le_f32(value, dim)?);
} else {
let value_str = String::from_utf8_lossy(value).into_owned();
metadata.insert(field_str.to_string(), serde_json::Value::String(value_str));
if table.has_text_field(field_str) {
text_writes.push((field_str.to_string(), value.to_vec()));
}
}
}
if !chunks.remainder().is_empty() {
return Err(FtError::Syntax(
"HSET requires a value for every field".to_string(),
));
}
let v = vector.ok_or_else(|| {
FtError::Syntax(format!(
"HSET into indexed prefix is missing the vector field '{}'",
table.schema.vector_field
))
})?;
table
.engine
.upsert(key.to_vec(), &v, metadata)
.map_err(|e| FtError::Engine(e.to_string()))?;
for (field, bytes) in text_writes {
table.upsert_text_field(&field, key, &bytes);
}
table.record_indexed_key(key.to_vec());
Ok(())
}
#[must_use]
pub fn render_outcome(outcome: &FtOutcome) -> Vec<u8> {
let mut out = Vec::new();
match outcome {
FtOutcome::Ok | FtOutcome::DropOk { .. } => {
out.extend_from_slice(b"+OK\r\n");
}
FtOutcome::List(names) => {
write_array_header(&mut out, names.len());
for name in names {
write_bulk(&mut out, name.as_bytes());
}
}
FtOutcome::Info(pairs) => {
write_array_header(&mut out, pairs.len() * 2);
for (k, v) in pairs {
write_bulk(&mut out, k.as_bytes());
write_info_value(&mut out, v);
}
}
FtOutcome::Search { total, hits } => {
let total_i64 = i64::try_from(*total).unwrap_or(i64::MAX);
write_array_header(&mut out, 1 + hits.len() * 2);
write_integer(&mut out, total_i64);
for hit in hits {
write_bulk(&mut out, &hit.doc_id);
write_array_header(&mut out, hit.fields.len() * 2);
for (fk, fv) in &hit.fields {
write_bulk(&mut out, fk.as_bytes());
write_bulk(&mut out, fv);
}
}
}
FtOutcome::SearchNoContent { total, doc_ids } => {
let total_i64 = i64::try_from(*total).unwrap_or(i64::MAX);
write_array_header(&mut out, 1 + doc_ids.len());
write_integer(&mut out, total_i64);
for id in doc_ids {
write_bulk(&mut out, id);
}
}
FtOutcome::Aggregate { total_groups, rows } => {
let total_i64 = i64::try_from(*total_groups).unwrap_or(i64::MAX);
write_array_header(&mut out, 1 + rows.len());
write_integer(&mut out, total_i64);
for row in rows {
write_array_header(&mut out, row.len() * 2);
for (name, value) in row {
write_bulk(&mut out, name.as_bytes());
write_bulk(&mut out, value);
}
}
}
FtOutcome::Explain(plan) => {
write_bulk(&mut out, plan.as_bytes());
}
}
out
}
#[must_use]
pub fn render_error(err: &FtError) -> Vec<u8> {
let mut buf = Vec::new();
let _ = write!(buf, "-ERR {err}\r\n");
buf
}
fn write_array_header(out: &mut Vec<u8>, n: usize) {
let _ = write!(out, "*{n}\r\n");
}
fn write_bulk(out: &mut Vec<u8>, payload: &[u8]) {
let _ = write!(out, "${}\r\n", payload.len());
out.extend_from_slice(payload);
out.extend_from_slice(b"\r\n");
}
fn write_integer(out: &mut Vec<u8>, n: i64) {
let _ = write!(out, ":{n}\r\n");
}
fn write_info_value(out: &mut Vec<u8>, value: &InfoValue) {
match value {
InfoValue::String(s) => write_bulk(out, s.as_bytes()),
InfoValue::Integer(n) => write_integer(out, *n),
InfoValue::Array(items) => {
write_array_header(out, items.len());
for it in items {
write_info_value(out, it);
}
}
}
}
fn ascii_upper(bytes: &[u8]) -> Vec<u8> {
bytes.iter().map(u8::to_ascii_uppercase).collect()
}
fn matches_keyword(tok: Option<&[u8]>, kw: &str) -> bool {
tok.is_some_and(|t| t.eq_ignore_ascii_case(kw.as_bytes()))
}
fn expect_keyword(tok: &[u8], kw: &str) -> Result<(), FtError> {
if tok.eq_ignore_ascii_case(kw.as_bytes()) {
Ok(())
} else {
Err(FtError::Syntax(format!(
"expected {kw}, got {}",
String::from_utf8_lossy(tok)
)))
}
}
fn parse_unsigned(tok: &[u8], context: &str) -> Result<usize, FtError> {
let s =
std::str::from_utf8(tok).map_err(|_| FtError::Syntax(format!("{context}: not UTF-8")))?;
s.parse::<usize>()
.map_err(|_| FtError::Syntax(format!("{context}: not a non-negative integer ({s})")))
}
fn utf8(tok: &[u8], context: &str) -> Result<String, FtError> {
std::str::from_utf8(tok)
.map(str::to_string)
.map_err(|_| FtError::Syntax(format!("{context}: not UTF-8")))
}
fn decode_le_f32(bytes: &[u8], expected_dim: usize) -> Result<Vec<f32>, FtError> {
if !bytes.len().is_multiple_of(4) {
return Err(FtError::Syntax(
"vector payload length is not a multiple of 4 bytes".to_string(),
));
}
let dim = bytes.len() / 4;
if dim != expected_dim {
return Err(FtError::DimensionMismatch {
index_dim: expected_dim,
payload_dim: dim,
});
}
let mut out = Vec::with_capacity(dim);
for chunk in bytes.chunks_exact(4) {
let arr = [chunk[0], chunk[1], chunk[2], chunk[3]];
out.push(f32::from_le_bytes(arr));
}
Ok(out)
}
fn format_float(f: f32) -> String {
format!("{f:.6}")
}
struct TokenCursor<'a> {
args: &'a [&'a [u8]],
idx: usize,
}
impl<'a> TokenCursor<'a> {
fn new(args: &'a [&'a [u8]]) -> Self {
Self { args, idx: 0 }
}
fn peek(&self) -> Option<&'a [u8]> {
self.args.get(self.idx).copied()
}
fn advance(&mut self) {
self.idx += 1;
}
fn next_required(&mut self, msg: &str) -> Result<&'a [u8], FtError> {
let tok = self
.args
.get(self.idx)
.copied()
.ok_or_else(|| FtError::Syntax(msg.to_string()))?;
self.idx += 1;
Ok(tok)
}
fn next_string(&mut self, msg: &str) -> Result<String, FtError> {
let tok = self.next_required(msg)?;
utf8(tok, msg)
}
}
impl<'a> Iterator for TokenCursor<'a> {
type Item = &'a [u8];
fn next(&mut self) -> Option<Self::Item> {
let tok = self.args.get(self.idx).copied()?;
self.idx += 1;
Some(tok)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn slices<'a>(v: &'a [&'a [u8]]) -> Vec<&'a [u8]> {
v.to_vec()
}
#[test]
fn ascii_upper_lowercases_to_uppercase() {
assert_eq!(ascii_upper(b"ft.create"), b"FT.CREATE".to_vec());
}
#[test]
fn parse_create_minimal() {
let v: Vec<&[u8]> = vec![
b"FT.CREATE",
b"idx",
b"ON",
b"HASH",
b"PREFIX",
b"1",
b"docs:",
b"SCHEMA",
b"vec",
b"VECTOR",
b"HNSW",
b"6",
b"TYPE",
b"FLOAT32",
b"DIM",
b"4",
b"DISTANCE_METRIC",
b"COSINE",
];
let cmd = parse_command(&slices(&v)).expect("parse ok");
let FtCommand::Create(req) = cmd else {
panic!("expected create");
};
assert_eq!(req.name, "idx");
assert_eq!(req.doc_type, DocType::Hash);
assert_eq!(req.schema.dim, 4);
assert_eq!(req.schema.vector_field, "vec");
assert_eq!(req.schema.distance, DistanceMetric::Cosine);
assert_eq!(req.schema.algorithm, IndexAlgorithm::Hnsw);
assert_eq!(req.schema.prefixes, vec![b"docs:".to_vec()]);
}
#[test]
fn parse_knn_query_extracts_pieces() {
let (k, field, param) = parse_knn_query(b"*=>[KNN 5 @vec $blob]").unwrap();
assert_eq!(k, 5);
assert_eq!(field, "vec");
assert_eq!(param, "blob");
}
#[test]
fn parse_knn_query_rejects_filter() {
let err = parse_knn_query(b"@title:foo=>[KNN 5 @vec $blob]").unwrap_err();
assert!(matches!(err, FtError::Unsupported(_)));
}
#[test]
fn decode_le_f32_round_trips_a_short_vector() {
let mut bytes = Vec::new();
for v in [1.0_f32, -1.5, 2.25, 0.0] {
bytes.extend_from_slice(&v.to_le_bytes());
}
let out = decode_le_f32(&bytes, 4).unwrap();
assert_eq!(out, vec![1.0_f32, -1.5, 2.25, 0.0]);
}
#[test]
fn decode_le_f32_rejects_dim_mismatch() {
let bytes = vec![0u8; 8];
let err = decode_le_f32(&bytes, 4).unwrap_err();
assert!(matches!(err, FtError::DimensionMismatch { .. }));
}
}