Skip to main content

redis_vl/
index.rs

1//! Search index lifecycle helpers and Redis transport adapters.
2//!
3//! [`SearchIndex`] provides blocking (sync) operations and
4//! [`AsyncSearchIndex`] provides Tokio-based async operations. Both manage
5//! the full index lifecycle: create, delete, load, fetch, search, query,
6//! batch operations, pagination, hybrid search, aggregate queries,
7//! multi-vector queries, and `from_existing` reconnection.
8//!
9//! # Example
10//!
11//! ```rust,no_run
12//! use redis_vl::{IndexSchema, SearchIndex, Vector, VectorQuery};
13//!
14//! let schema = IndexSchema::from_yaml_file("schema.yaml").unwrap();
15//! let index = SearchIndex::new(schema, "redis://127.0.0.1:6379");
16//! index.create().unwrap();
17//!
18//! let result = index.search(&VectorQuery::new(
19//!     Vector::new(&[0.1_f32; 128] as &[f32]), "embedding", 5
20//! )).unwrap();
21//! ```
22
23use std::collections::{HashMap, VecDeque};
24use std::ops::Index;
25
26use redis::Commands;
27use serde::Serialize;
28use serde_json::{Map, Value};
29
30use crate::{
31    error::{Error, Result},
32    filter::FilterExpression,
33    query::{PageableQuery, QueryKind, QueryParamValue, QueryString, SortDirection},
34    schema::{FieldKind, IndexDefinition, IndexSchema, StorageType, VectorAlgorithm},
35};
36
37/// Redis connection settings for index operations.
38#[derive(Debug, Clone)]
39pub struct RedisConnectionInfo {
40    /// Connection URL for Redis.
41    pub redis_url: String,
42}
43
44impl RedisConnectionInfo {
45    /// Creates a new connection descriptor.
46    pub fn new(redis_url: impl Into<String>) -> Self {
47        Self {
48            redis_url: redis_url.into(),
49        }
50    }
51
52    pub(crate) fn client(&self) -> Result<redis::Client> {
53        Ok(redis::Client::open(self.redis_url.as_str())?)
54    }
55}
56
57/// A single parsed Redis Search document.
58#[derive(Debug, Clone, PartialEq)]
59pub struct SearchDocument {
60    id: String,
61    fields: Map<String, Value>,
62}
63
64impl SearchDocument {
65    /// Creates a parsed search document.
66    pub fn new(id: impl Into<String>, mut fields: Map<String, Value>) -> Self {
67        let id = id.into();
68        fields.insert("id".to_owned(), Value::String(id.clone()));
69        Self { id, fields }
70    }
71
72    /// Returns the Redis document identifier.
73    pub fn id(&self) -> &str {
74        &self.id
75    }
76
77    /// Returns the projected document fields.
78    pub fn fields(&self) -> &Map<String, Value> {
79        &self.fields
80    }
81
82    /// Returns a field by name.
83    pub fn get(&self, field: &str) -> Option<&Value> {
84        self.fields.get(field)
85    }
86
87    /// Returns the document as a JSON object, including the `id` field.
88    pub fn to_map(&self) -> Map<String, Value> {
89        self.fields.clone()
90    }
91
92    /// Consumes the document and returns it as a JSON object, including the `id` field.
93    pub fn into_map(self) -> Map<String, Value> {
94        self.fields
95    }
96}
97
98impl Index<&str> for SearchDocument {
99    type Output = Value;
100
101    fn index(&self, index: &str) -> &Self::Output {
102        self.fields
103            .get(index)
104            .unwrap_or_else(|| panic!("field '{index}' not found on search document"))
105    }
106}
107
108/// Parsed Redis Search results.
109#[derive(Debug, Clone, PartialEq)]
110pub struct SearchResult {
111    /// Total number of matching documents reported by Redis.
112    pub total: usize,
113    /// Parsed result documents.
114    pub docs: Vec<SearchDocument>,
115}
116
117impl SearchResult {
118    /// Creates a parsed search result.
119    pub fn new(total: usize, docs: Vec<SearchDocument>) -> Self {
120        Self { total, docs }
121    }
122}
123
124/// Processed query output aligned with RedisVL query semantics.
125#[derive(Debug, Clone, PartialEq)]
126pub enum QueryOutput {
127    /// A document-returning query.
128    Documents(Vec<Map<String, Value>>),
129    /// A count-only query.
130    Count(usize),
131}
132
133impl QueryOutput {
134    /// Returns the contained documents when present.
135    pub fn as_documents(&self) -> Option<&[Map<String, Value>]> {
136        match self {
137            Self::Documents(documents) => Some(documents),
138            Self::Count(_) => None,
139        }
140    }
141
142    /// Returns the contained count when present.
143    pub fn as_count(&self) -> Option<usize> {
144        match self {
145            Self::Count(count) => Some(*count),
146            Self::Documents(_) => None,
147        }
148    }
149}
150
151/// Blocking search index client.
152#[derive(Debug, Clone)]
153pub struct SearchIndex {
154    schema: IndexSchema,
155    connection: RedisConnectionInfo,
156}
157
158impl SearchIndex {
159    /// Creates a new blocking search index.
160    pub fn new(schema: IndexSchema, redis_url: impl Into<String>) -> Self {
161        Self {
162            schema,
163            connection: RedisConnectionInfo::new(redis_url),
164        }
165    }
166
167    /// Creates a blocking index from a YAML string.
168    pub fn from_yaml_str(yaml: &str, redis_url: impl Into<String>) -> Result<Self> {
169        Ok(Self::new(IndexSchema::from_yaml_str(yaml)?, redis_url))
170    }
171
172    /// Creates a blocking index from a YAML file.
173    pub fn from_yaml_file(
174        path: impl AsRef<std::path::Path>,
175        redis_url: impl Into<String>,
176    ) -> Result<Self> {
177        Ok(Self::new(IndexSchema::from_yaml_file(path)?, redis_url))
178    }
179
180    /// Creates a blocking index from a JSON value.
181    pub fn from_json_value(value: serde_json::Value, redis_url: impl Into<String>) -> Result<Self> {
182        Ok(Self::new(IndexSchema::from_json_value(value)?, redis_url))
183    }
184
185    /// Returns the schema attached to the index.
186    pub fn schema(&self) -> &IndexSchema {
187        &self.schema
188    }
189
190    /// Returns the Redis index name.
191    pub fn name(&self) -> &str {
192        &self.schema.index.name
193    }
194
195    /// Returns the first (or only) key prefix.
196    ///
197    /// For multi-prefix indexes, returns the first prefix. Use [`Self::prefixes`]
198    /// to access all configured prefixes.
199    pub fn prefix(&self) -> &str {
200        self.schema.index.prefix.first()
201    }
202
203    /// Returns all key prefixes configured for this index.
204    pub fn prefixes(&self) -> Vec<&str> {
205        self.schema.index.prefix.all()
206    }
207
208    /// Returns the separator between prefix and identifier.
209    pub fn key_separator(&self) -> &str {
210        &self.schema.index.key_separator
211    }
212
213    /// Returns the storage type.
214    pub fn storage_type(&self) -> StorageType {
215        self.schema.index.storage_type
216    }
217
218    /// Builds a document key from the index prefix and identifier.
219    pub fn key(&self, key_suffix: &str) -> String {
220        compose_key(self.prefix(), self.key_separator(), key_suffix)
221    }
222
223    /// Builds an `FT.CREATE` command for the current schema.
224    pub fn create_cmd(&self) -> redis::Cmd {
225        let mut cmd = redis::cmd("FT.CREATE");
226        let prefixes = self.schema.index.prefix.all();
227        cmd.arg(&self.schema.index.name)
228            .arg("ON")
229            .arg(self.schema.index.storage_type.redis_name())
230            .arg("PREFIX")
231            .arg(prefixes.len());
232        for pfx in &prefixes {
233            cmd.arg(*pfx);
234        }
235
236        if !self.schema.index.stopwords.is_empty() {
237            cmd.arg("STOPWORDS").arg(self.schema.index.stopwords.len());
238            for stopword in &self.schema.index.stopwords {
239                cmd.arg(stopword);
240            }
241        }
242
243        cmd.arg("SCHEMA");
244        for arg in self.schema.redis_schema_args() {
245            cmd.arg(arg);
246        }
247        cmd
248    }
249
250    /// Creates the Redis search index.
251    pub fn create(&self) -> Result<()> {
252        self.create_with_options(false, false)
253    }
254
255    /// Creates the Redis search index with overwrite controls similar to the
256    /// Python RedisVL client.
257    pub fn create_with_options(&self, overwrite: bool, drop_documents: bool) -> Result<()> {
258        if self.schema.fields.is_empty() {
259            return Err(Error::SchemaValidation(
260                "No fields defined for index".to_owned(),
261            ));
262        }
263
264        if self.exists()? {
265            if !overwrite {
266                return Ok(());
267            }
268            self.drop(drop_documents)?;
269        }
270
271        let client = self.connection.client()?;
272        let mut connection = client.get_connection()?;
273        let (): () = self.create_cmd().query(&mut connection)?;
274        Ok(())
275    }
276
277    /// Drops the Redis search index.
278    pub fn drop(&self, delete_documents: bool) -> Result<()> {
279        let client = self.connection.client()?;
280        let mut connection = client.get_connection()?;
281        let mut cmd = redis::cmd("FT.DROPINDEX");
282        cmd.arg(&self.schema.index.name);
283        if delete_documents {
284            cmd.arg("DD");
285        }
286        let (): () = cmd.query(&mut connection)?;
287        Ok(())
288    }
289
290    /// Deletes the search index, optionally dropping associated documents.
291    pub fn delete(&self, drop_documents: bool) -> Result<()> {
292        if !self.exists()? {
293            return Err(Error::InvalidInput(format!(
294                "index '{}' does not exist",
295                self.name()
296            )));
297        }
298        self.drop(drop_documents)
299    }
300
301    /// Returns raw Redis index metadata.
302    pub fn info(&self) -> Result<Map<String, Value>> {
303        let client = self.connection.client()?;
304        let mut connection = client.get_connection()?;
305        let value = redis::cmd("FT.INFO")
306            .arg(&self.schema.index.name)
307            .query(&mut connection)?;
308        parse_info_response(value)
309    }
310
311    /// Lists all Redis search indices.
312    pub fn listall(&self) -> Result<Vec<String>> {
313        let client = self.connection.client()?;
314        let mut connection = client.get_connection()?;
315        let value = redis::cmd("FT._LIST").query(&mut connection)?;
316        Ok(value)
317    }
318
319    /// Returns whether the Redis search index exists.
320    pub fn exists(&self) -> Result<bool> {
321        Ok(self.listall()?.iter().any(|name| name == self.name()))
322    }
323
324    /// Loads a JSON document into Redis.
325    pub fn load_json<T>(&self, key_suffix: &str, document: &T) -> Result<()>
326    where
327        T: Serialize,
328    {
329        let client = self.connection.client()?;
330        let mut connection = client.get_connection()?;
331        let payload = serde_json::to_string(document)?;
332        let (): () = redis::cmd("JSON.SET")
333            .arg(self.key(key_suffix))
334            .arg("$")
335            .arg(payload)
336            .query(&mut connection)?;
337        Ok(())
338    }
339
340    /// Loads a Redis hash document.
341    pub fn load_hash(&self, key_suffix: &str, values: &HashMap<String, String>) -> Result<()> {
342        let client = self.connection.client()?;
343        let mut connection = client.get_connection()?;
344        let mut cmd = redis::cmd("HSET");
345        cmd.arg(self.key(key_suffix));
346        for (field, value) in values {
347            cmd.arg(field).arg(value);
348        }
349        let _: i32 = cmd.query(&mut connection)?;
350        Ok(())
351    }
352
353    /// Loads documents using the configured storage type and returns the written keys.
354    pub fn load(&self, data: &[Value], id_field: &str, ttl: Option<i64>) -> Result<Vec<String>> {
355        self.load_with_preprocess(data, id_field, ttl, |record| Ok(record.clone()))
356    }
357
358    /// Loads documents after applying a preprocessing callback to each record.
359    pub fn load_with_preprocess<F>(
360        &self,
361        data: &[Value],
362        id_field: &str,
363        ttl: Option<i64>,
364        mut preprocess: F,
365    ) -> Result<Vec<String>>
366    where
367        F: FnMut(&Value) -> Result<Value>,
368    {
369        let prepared = prepare_load_records(data, &mut preprocess)?;
370        validate_load_ids(&prepared, id_field)?;
371        let client = self.connection.client()?;
372        let mut connection = client.get_connection()?;
373        let mut written_keys = Vec::with_capacity(prepared.len());
374
375        for record in &prepared {
376            let object = record.as_object().ok_or_else(|| {
377                Error::InvalidInput("load expects an array of JSON objects".to_owned())
378            })?;
379            let id = extract_id(object, id_field)?;
380            let key = self.key(id);
381
382            match self.storage_type() {
383                StorageType::Json => {
384                    let payload = serde_json::to_string(record)?;
385                    let (): () = redis::cmd("JSON.SET")
386                        .arg(&key)
387                        .arg("$")
388                        .arg(payload)
389                        .query(&mut connection)?;
390                }
391                StorageType::Hash => {
392                    let encoded = encode_hash_record(object, &self.schema)?;
393                    let mut cmd = redis::cmd("HSET");
394                    cmd.arg(&key);
395                    for (field, value) in encoded {
396                        cmd.arg(field);
397                        match value {
398                            EncodedHashValue::String(value) => {
399                                cmd.arg(value);
400                            }
401                            EncodedHashValue::Binary(value) => {
402                                cmd.arg(value);
403                            }
404                        }
405                    }
406                    let _: i32 = cmd.query(&mut connection)?;
407                }
408            }
409
410            if let Some(ttl) = ttl {
411                let _: bool = redis::cmd("EXPIRE")
412                    .arg(&key)
413                    .arg(ttl)
414                    .query(&mut connection)?;
415            }
416
417            written_keys.push(key);
418        }
419
420        Ok(written_keys)
421    }
422
423    /// Loads documents using caller-supplied Redis keys instead of generating
424    /// keys from the index prefix.
425    ///
426    /// This mirrors the Python `index.load(data, keys=keys)` signature and is
427    /// essential for multi-prefix indexes where documents must be written under
428    /// different key prefixes.
429    ///
430    /// `keys` and `data` must have the same length.
431    pub fn load_with_keys(
432        &self,
433        data: &[Value],
434        keys: &[String],
435        ttl: Option<i64>,
436    ) -> Result<Vec<String>> {
437        if data.len() != keys.len() {
438            return Err(Error::InvalidInput(format!(
439                "data length ({}) must equal keys length ({})",
440                data.len(),
441                keys.len()
442            )));
443        }
444
445        let client = self.connection.client()?;
446        let mut connection = client.get_connection()?;
447
448        for (record, key) in data.iter().zip(keys.iter()) {
449            let object = record.as_object().ok_or_else(|| {
450                Error::InvalidInput("load expects an array of JSON objects".to_owned())
451            })?;
452
453            match self.storage_type() {
454                StorageType::Json => {
455                    let payload = serde_json::to_string(record)?;
456                    let (): () = redis::cmd("JSON.SET")
457                        .arg(key)
458                        .arg("$")
459                        .arg(payload)
460                        .query(&mut connection)?;
461                }
462                StorageType::Hash => {
463                    let encoded = encode_hash_record(object, &self.schema)?;
464                    let mut cmd = redis::cmd("HSET");
465                    cmd.arg(key);
466                    for (field, value) in encoded {
467                        cmd.arg(field);
468                        match value {
469                            EncodedHashValue::String(value) => {
470                                cmd.arg(value);
471                            }
472                            EncodedHashValue::Binary(value) => {
473                                cmd.arg(value);
474                            }
475                        }
476                    }
477                    let _: i32 = cmd.query(&mut connection)?;
478                }
479            }
480
481            if let Some(ttl) = ttl {
482                let _: bool = redis::cmd("EXPIRE")
483                    .arg(key)
484                    .arg(ttl)
485                    .query(&mut connection)?;
486            }
487        }
488
489        Ok(keys.to_vec())
490    }
491
492    /// Fetches a JSON document as raw JSON text.
493    pub fn fetch_json_raw(&self, key_suffix: &str) -> Result<String> {
494        let client = self.connection.client()?;
495        let mut connection = client.get_connection()?;
496        let value = redis::cmd("JSON.GET")
497            .arg(self.key(key_suffix))
498            .arg("$")
499            .query(&mut connection)?;
500        Ok(value)
501    }
502
503    /// Fetches a Redis hash as a string map.
504    pub fn fetch_hash(&self, key_suffix: &str) -> Result<HashMap<String, String>> {
505        let client = self.connection.client()?;
506        let mut connection = client.get_connection()?;
507        let value = connection.hgetall(self.key(key_suffix))?;
508        Ok(value)
509    }
510
511    /// Fetches a document by its logical identifier.
512    pub fn fetch(&self, id: &str) -> Result<Option<Value>> {
513        match self.storage_type() {
514            StorageType::Json => {
515                let raw = self.fetch_json_raw(id);
516                match raw {
517                    Ok(raw) => {
518                        let parsed = serde_json::from_str::<Value>(&raw)?;
519                        Ok(match parsed {
520                            Value::Array(mut values) if values.len() == 1 => values.pop(),
521                            other => Some(other),
522                        })
523                    }
524                    Err(Error::Redis(err))
525                        if err.kind() == redis::ErrorKind::UnexpectedReturnType =>
526                    {
527                        Ok(None)
528                    }
529                    Err(other) => Err(other),
530                }
531            }
532            StorageType::Hash => {
533                let map = self.fetch_hash(id)?;
534                if map.is_empty() {
535                    Ok(None)
536                } else {
537                    let mut object = Map::new();
538                    for (key, value) in map {
539                        object.insert(key, Value::String(value));
540                    }
541                    Ok(Some(Value::Object(object)))
542                }
543            }
544        }
545    }
546
547    /// Drops a single Redis key.
548    pub fn drop_key(&self, key: &str) -> Result<usize> {
549        let client = self.connection.client()?;
550        let mut connection = client.get_connection()?;
551        let count: usize = redis::cmd("DEL").arg(key).query(&mut connection)?;
552        Ok(count)
553    }
554
555    /// Drops multiple Redis keys.
556    pub fn drop_keys(&self, keys: &[String]) -> Result<usize> {
557        if keys.is_empty() {
558            return Ok(0);
559        }
560        let client = self.connection.client()?;
561        let mut connection = client.get_connection()?;
562        let mut cmd = redis::cmd("DEL");
563        for key in keys {
564            cmd.arg(key);
565        }
566        let count: usize = cmd.query(&mut connection)?;
567        Ok(count)
568    }
569
570    /// Drops a single logical document by identifier.
571    pub fn drop_document(&self, id: &str) -> Result<usize> {
572        self.drop_key(&self.key(id))
573    }
574
575    /// Drops multiple logical documents by identifier.
576    pub fn drop_documents(&self, ids: &[String]) -> Result<usize> {
577        if ids.is_empty() {
578            return Ok(0);
579        }
580        let keys = ids.iter().map(|id| self.key(id)).collect::<Vec<_>>();
581        self.drop_keys(&keys)
582    }
583
584    /// Applies a TTL to a single key.
585    pub fn expire_key(&self, key: &str, ttl_seconds: i64) -> Result<bool> {
586        let client = self.connection.client()?;
587        let mut connection = client.get_connection()?;
588        let applied: bool = redis::cmd("EXPIRE")
589            .arg(key)
590            .arg(ttl_seconds)
591            .query(&mut connection)?;
592        Ok(applied)
593    }
594
595    /// Applies a TTL to multiple keys.
596    pub fn expire_keys(&self, keys: &[String], ttl_seconds: i64) -> Result<Vec<bool>> {
597        let mut results = Vec::with_capacity(keys.len());
598        for key in keys {
599            results.push(self.expire_key(key, ttl_seconds)?);
600        }
601        Ok(results)
602    }
603
604    /// Clears keys matching the index prefix while keeping the index itself.
605    pub fn clear(&self) -> Result<usize> {
606        let mut total_deleted = 0;
607        let query = crate::query::FilterQuery::new(FilterExpression::MatchAll).paging(0, 500);
608
609        loop {
610            let batch = self.search(&query)?;
611            if batch.docs.is_empty() {
612                break;
613            }
614
615            let keys = batch
616                .docs
617                .iter()
618                .map(|doc| doc.id().to_owned())
619                .collect::<Vec<_>>();
620            total_deleted += self.drop_keys(&keys)?;
621        }
622
623        Ok(total_deleted)
624    }
625
626    /// Executes a query and returns the parsed Redis Search response.
627    pub fn search<Q>(&self, query: &Q) -> Result<SearchResult>
628    where
629        Q: QueryString + ?Sized,
630    {
631        parse_search_result(self.search_raw(query)?)
632    }
633
634    /// Executes a query and returns processed documents or a count, depending on
635    /// the query type.
636    pub fn query<Q>(&self, query: &Q) -> Result<QueryOutput>
637    where
638        Q: QueryString + ?Sized,
639    {
640        let results = self.search(query)?;
641        process_search_result(results, query, self.storage_type())
642    }
643
644    /// Executes multiple queries in order and returns parsed Redis Search responses.
645    pub fn batch_search<'a, I, Q>(&self, queries: I) -> Result<Vec<SearchResult>>
646    where
647        I: IntoIterator<Item = &'a Q>,
648        Q: QueryString + ?Sized + 'a,
649    {
650        self.batch_search_with_size(queries, usize::MAX)
651    }
652
653    /// Executes multiple queries in order, processing them in fixed-size chunks.
654    pub fn batch_search_with_size<'a, I, Q>(
655        &self,
656        queries: I,
657        batch_size: usize,
658    ) -> Result<Vec<SearchResult>>
659    where
660        I: IntoIterator<Item = &'a Q>,
661        Q: QueryString + ?Sized + 'a,
662    {
663        if batch_size == 0 {
664            return Err(Error::InvalidInput(
665                "batch_size must be greater than zero".to_owned(),
666            ));
667        }
668
669        let queries = queries.into_iter().collect::<Vec<_>>();
670        let mut results = Vec::with_capacity(queries.len());
671        for chunk in queries.chunks(batch_size) {
672            for query in chunk {
673                results.push(self.search(*query)?);
674            }
675        }
676        Ok(results)
677    }
678
679    /// Executes multiple queries in order and processes each result according to
680    /// the corresponding query type.
681    pub fn batch_query<'a, I, Q>(&self, queries: I) -> Result<Vec<QueryOutput>>
682    where
683        I: IntoIterator<Item = &'a Q>,
684        Q: QueryString + ?Sized + 'a,
685    {
686        self.batch_query_with_size(queries, usize::MAX)
687    }
688
689    /// Executes multiple queries in order, processing them in fixed-size chunks.
690    pub fn batch_query_with_size<'a, I, Q>(
691        &self,
692        queries: I,
693        batch_size: usize,
694    ) -> Result<Vec<QueryOutput>>
695    where
696        I: IntoIterator<Item = &'a Q>,
697        Q: QueryString + ?Sized + 'a,
698    {
699        if batch_size == 0 {
700            return Err(Error::InvalidInput(
701                "batch_size must be greater than zero".to_owned(),
702            ));
703        }
704
705        let queries = queries.into_iter().collect::<Vec<_>>();
706        let mut results = Vec::with_capacity(queries.len());
707        for chunk in queries.chunks(batch_size) {
708            for query in chunk {
709                results.push(self.query(*query)?);
710            }
711        }
712        Ok(results)
713    }
714
715    /// Executes a query in successive pages and returns the processed document batches.
716    pub fn paginate<Q>(&self, query: &Q, page_size: usize) -> Result<Vec<Vec<Map<String, Value>>>>
717    where
718        Q: PageableQuery + ?Sized,
719    {
720        if page_size == 0 {
721            return Err(Error::InvalidInput(
722                "page_size must be greater than zero".to_owned(),
723            ));
724        }
725
726        let mut offset = 0;
727        let mut batches = Vec::new();
728        loop {
729            let page = query.paged(offset, page_size);
730            let documents = match self.query(&page)? {
731                QueryOutput::Documents(documents) => documents,
732                QueryOutput::Count(_) => {
733                    return Err(Error::InvalidInput(
734                        "paginate requires a document-returning query".to_owned(),
735                    ));
736                }
737            };
738
739            if documents.is_empty() {
740                break;
741            }
742
743            let fetched = documents.len();
744            batches.push(documents);
745            if fetched < page_size {
746                break;
747            }
748            offset += page_size;
749        }
750
751        Ok(batches)
752    }
753
754    /// Executes a [`crate::query::HybridQuery`] via `FT.HYBRID` and returns processed
755    /// documents.
756    ///
757    /// Requires Redis 8.4.0+ with the hybrid search capability.
758    ///
759    /// FT.HYBRID returns a distinct response format (map-like with
760    /// `total_results`, `results`, `warnings`, `execution_time`) that differs
761    /// from the FT.SEARCH array format. This method uses
762    /// `parse_hybrid_result` to decode it.
763    pub fn hybrid_query(&self, query: &crate::query::HybridQuery<'_>) -> Result<QueryOutput> {
764        let client = self.connection.client()?;
765        let mut connection = client.get_connection()?;
766        let cmd = query.build_cmd(self.name());
767        let value: redis::Value = cmd.query(&mut connection)?;
768        let documents = parse_hybrid_result(value)?;
769        Ok(QueryOutput::Documents(documents))
770    }
771
772    /// Executes an [`crate::query::AggregateHybridQuery`] via `FT.AGGREGATE` and returns
773    /// processed documents.
774    ///
775    /// Mirrors the Python `_aggregate()` code path.
776    pub fn aggregate_query(
777        &self,
778        query: &crate::query::AggregateHybridQuery<'_>,
779    ) -> Result<QueryOutput> {
780        let client = self.connection.client()?;
781        let mut connection = client.get_connection()?;
782        let cmd = query.build_aggregate_cmd(self.name());
783        let value: redis::Value = cmd.query(&mut connection)?;
784        let documents = parse_aggregate_result(value)?;
785        Ok(QueryOutput::Documents(documents))
786    }
787
788    /// Executes an [`crate::query::SQLQuery`] and automatically dispatches to
789    /// `FT.SEARCH` or `FT.AGGREGATE` depending on the SQL statement.
790    ///
791    /// Aggregate queries (`COUNT`, `SUM`, `GROUP BY`, etc.) are translated to
792    /// `FT.AGGREGATE` commands. All other queries use the regular `FT.SEARCH`
793    /// path.
794    ///
795    /// This mirrors the Python `SearchIndex.query(SQLQuery(...))` behavior.
796    #[cfg(feature = "sql")]
797    pub fn sql_query(&self, query: &crate::query::SQLQuery) -> Result<QueryOutput> {
798        // Geo aggregate (geo_distance in SELECT) → FT.AGGREGATE.
799        if let Some(cmd) = query.build_geo_aggregate_cmd(self.name()) {
800            let client = self.connection.client()?;
801            let mut connection = client.get_connection()?;
802            let value: redis::Value = cmd.query(&mut connection)?;
803            let documents = parse_aggregate_result(value)?;
804            return Ok(QueryOutput::Documents(documents));
805        }
806        // Standard aggregate (COUNT, SUM, GROUP BY, etc.) → FT.AGGREGATE.
807        if let Some(cmd) = query.build_aggregate_cmd(self.name()) {
808            let client = self.connection.client()?;
809            let mut connection = client.get_connection()?;
810            let value: redis::Value = cmd.query(&mut connection)?;
811            let documents = parse_aggregate_result(value)?;
812            return Ok(QueryOutput::Documents(documents));
813        }
814        // Vector and geo WHERE queries use the regular FT.SEARCH path
815        // (QueryString implementation handles KNN + PARAMS / GEOFILTER).
816        self.query(query)
817    }
818
819    /// Executes a [`crate::query::MultiVectorQuery`] via `FT.AGGREGATE` and returns
820    /// processed documents.
821    pub fn multi_vector_query(
822        &self,
823        query: &crate::query::MultiVectorQuery<'_>,
824    ) -> Result<QueryOutput> {
825        let client = self.connection.client()?;
826        let mut connection = client.get_connection()?;
827        let cmd = query.build_aggregate_cmd(self.name());
828        let value: redis::Value = cmd.query(&mut connection)?;
829        let documents = parse_aggregate_result(value)?;
830        Ok(QueryOutput::Documents(documents))
831    }
832
833    /// Constructs a [`SearchIndex`] from an existing Redis index by reading
834    /// `FT.INFO` and reconstructing the schema.
835    ///
836    /// Mirrors Python `SearchIndex.from_existing(name, redis_url=...)`.
837    pub fn from_existing(name: &str, redis_url: impl Into<String>) -> Result<Self> {
838        let connection = RedisConnectionInfo::new(redis_url);
839        let client = connection.client()?;
840        let mut conn = client.get_connection()?;
841        let value = redis::cmd("FT.INFO").arg(name).query(&mut conn)?;
842        let info = parse_info_response(value)?;
843        let schema = schema_from_info(name, &info)?;
844        Ok(Self { schema, connection })
845    }
846
847    /// Executes a query and returns the raw Redis response.
848    pub fn search_raw<Q>(&self, query: &Q) -> Result<redis::Value>
849    where
850        Q: QueryString + ?Sized,
851    {
852        self.validate_query(query)?;
853        let client = self.connection.client()?;
854        let mut connection = client.get_connection()?;
855        let value = self.search_cmd(query).query(&mut connection)?;
856        Ok(value)
857    }
858
859    fn search_cmd<Q>(&self, query: &Q) -> redis::Cmd
860    where
861        Q: QueryString + ?Sized,
862    {
863        let render = query.render();
864        let mut cmd = redis::cmd("FT.SEARCH");
865        cmd.arg(&self.schema.index.name).arg(render.query_string);
866
867        if let Some(scorer) = render.scorer {
868            cmd.arg("SCORER").arg(scorer);
869        }
870
871        if !render.params.is_empty() {
872            cmd.arg("PARAMS").arg(render.params.len() * 2);
873            for param in render.params {
874                cmd.arg(param.name);
875                match param.value {
876                    QueryParamValue::String(value) => {
877                        cmd.arg(value);
878                    }
879                    QueryParamValue::Binary(value) => {
880                        cmd.arg(value);
881                    }
882                }
883            }
884        }
885
886        if render.no_content {
887            cmd.arg("NOCONTENT");
888        }
889
890        if !render.return_fields.is_empty() {
891            cmd.arg("RETURN").arg(render.return_fields.len());
892            for field in render.return_fields {
893                cmd.arg(field);
894            }
895        }
896
897        if let Some(sort_by) = render.sort_by {
898            cmd.arg("SORTBY").arg(sort_by.field);
899            cmd.arg(match sort_by.direction {
900                SortDirection::Asc => "ASC",
901                SortDirection::Desc => "DESC",
902            });
903        }
904
905        if render.in_order {
906            cmd.arg("INORDER");
907        }
908
909        if let Some(limit) = render.limit {
910            cmd.arg("LIMIT").arg(limit.offset).arg(limit.num);
911        }
912
913        if let Some(geofilter) = render.geofilter {
914            cmd.arg("GEOFILTER")
915                .arg(geofilter.field)
916                .arg(geofilter.lon)
917                .arg(geofilter.lat)
918                .arg(geofilter.radius)
919                .arg(geofilter.unit);
920        }
921
922        cmd.arg("DIALECT").arg(render.dialect);
923        cmd
924    }
925
926    fn validate_query<Q>(&self, query: &Q) -> Result<()>
927    where
928        Q: QueryString + ?Sized,
929    {
930        let render = query.render();
931        if render.query_string.contains("EF_RUNTIME") {
932            let supports_ef_runtime = self.schema.fields.iter().any(|field| {
933                matches!(
934                    &field.kind,
935                    FieldKind::Vector { attrs }
936                        if matches!(attrs.algorithm, VectorAlgorithm::Hnsw)
937                )
938            });
939            if !supports_ef_runtime {
940                return Err(Error::SchemaValidation(
941                    "EF_RUNTIME requires an HNSW vector field".to_owned(),
942                ));
943            }
944        }
945        Ok(())
946    }
947}
948
949/// Async search index client.
950#[derive(Debug, Clone)]
951pub struct AsyncSearchIndex {
952    schema: IndexSchema,
953    connection: RedisConnectionInfo,
954}
955
956impl AsyncSearchIndex {
957    /// Creates a new async search index.
958    pub fn new(schema: IndexSchema, redis_url: impl Into<String>) -> Self {
959        Self {
960            schema,
961            connection: RedisConnectionInfo::new(redis_url),
962        }
963    }
964
965    /// Creates an async index from a YAML string.
966    pub fn from_yaml_str(yaml: &str, redis_url: impl Into<String>) -> Result<Self> {
967        Ok(Self::new(IndexSchema::from_yaml_str(yaml)?, redis_url))
968    }
969
970    /// Creates an async index from a YAML file.
971    pub fn from_yaml_file(
972        path: impl AsRef<std::path::Path>,
973        redis_url: impl Into<String>,
974    ) -> Result<Self> {
975        Ok(Self::new(IndexSchema::from_yaml_file(path)?, redis_url))
976    }
977
978    /// Creates an async index from a JSON value.
979    pub fn from_json_value(value: serde_json::Value, redis_url: impl Into<String>) -> Result<Self> {
980        Ok(Self::new(IndexSchema::from_json_value(value)?, redis_url))
981    }
982
983    /// Returns the schema attached to the index.
984    pub fn schema(&self) -> &IndexSchema {
985        &self.schema
986    }
987
988    /// Returns the Redis index name.
989    pub fn name(&self) -> &str {
990        &self.schema.index.name
991    }
992
993    /// Returns the first (or only) key prefix.
994    ///
995    /// For multi-prefix indexes, returns the first prefix. Use [`Self::prefixes`]
996    /// to access all configured prefixes.
997    pub fn prefix(&self) -> &str {
998        self.schema.index.prefix.first()
999    }
1000
1001    /// Returns all key prefixes configured for this index.
1002    pub fn prefixes(&self) -> Vec<&str> {
1003        self.schema.index.prefix.all()
1004    }
1005
1006    /// Returns the separator between prefix and identifier.
1007    pub fn key_separator(&self) -> &str {
1008        &self.schema.index.key_separator
1009    }
1010
1011    /// Returns the storage type.
1012    pub fn storage_type(&self) -> StorageType {
1013        self.schema.index.storage_type
1014    }
1015
1016    /// Builds a document key from the index prefix and identifier.
1017    pub fn key(&self, key_suffix: &str) -> String {
1018        compose_key(self.prefix(), self.key_separator(), key_suffix)
1019    }
1020
1021    /// Creates the Redis search index asynchronously.
1022    pub async fn create(&self) -> Result<()> {
1023        self.create_with_options(false, false).await
1024    }
1025
1026    /// Creates the Redis search index asynchronously with overwrite controls.
1027    pub async fn create_with_options(&self, overwrite: bool, drop_documents: bool) -> Result<()> {
1028        if self.schema.fields.is_empty() {
1029            return Err(Error::SchemaValidation(
1030                "No fields defined for index".to_owned(),
1031            ));
1032        }
1033
1034        if self.exists().await? {
1035            if !overwrite {
1036                return Ok(());
1037            }
1038            self.drop(drop_documents).await?;
1039        }
1040
1041        let client = self.connection.client()?;
1042        let mut connection = client.get_multiplexed_async_connection().await?;
1043        let (): () = SearchIndex::new(self.schema.clone(), self.connection.redis_url.clone())
1044            .create_cmd()
1045            .query_async(&mut connection)
1046            .await?;
1047        Ok(())
1048    }
1049
1050    /// Drops the Redis search index asynchronously.
1051    pub async fn drop(&self, delete_documents: bool) -> Result<()> {
1052        let client = self.connection.client()?;
1053        let mut connection = client.get_multiplexed_async_connection().await?;
1054        let mut cmd = redis::cmd("FT.DROPINDEX");
1055        cmd.arg(&self.schema.index.name);
1056        if delete_documents {
1057            cmd.arg("DD");
1058        }
1059        let (): () = cmd.query_async(&mut connection).await?;
1060        Ok(())
1061    }
1062
1063    /// Deletes the search index asynchronously, optionally dropping associated documents.
1064    pub async fn delete(&self, drop_documents: bool) -> Result<()> {
1065        if !self.exists().await? {
1066            return Err(Error::InvalidInput(format!(
1067                "index '{}' does not exist",
1068                self.schema.index.name
1069            )));
1070        }
1071        self.drop(drop_documents).await
1072    }
1073
1074    /// Returns whether the Redis search index exists.
1075    pub async fn exists(&self) -> Result<bool> {
1076        Ok(self
1077            .listall()
1078            .await?
1079            .iter()
1080            .any(|name| name == &self.schema.index.name))
1081    }
1082
1083    /// Lists all Redis search indices asynchronously.
1084    pub async fn listall(&self) -> Result<Vec<String>> {
1085        let client = self.connection.client()?;
1086        let mut connection = client.get_multiplexed_async_connection().await?;
1087        let value = redis::cmd("FT._LIST").query_async(&mut connection).await?;
1088        Ok(value)
1089    }
1090
1091    /// Returns parsed Redis index metadata.
1092    pub async fn info(&self) -> Result<Map<String, Value>> {
1093        let client = self.connection.client()?;
1094        let mut connection = client.get_multiplexed_async_connection().await?;
1095        let value = redis::cmd("FT.INFO")
1096            .arg(&self.schema.index.name)
1097            .query_async(&mut connection)
1098            .await?;
1099        parse_info_response(value)
1100    }
1101
1102    /// Loads documents using the configured storage type and returns the written keys.
1103    pub async fn load(
1104        &self,
1105        data: &[Value],
1106        id_field: &str,
1107        ttl: Option<i64>,
1108    ) -> Result<Vec<String>> {
1109        self.load_with_preprocess(data, id_field, ttl, |record| Ok(record.clone()))
1110            .await
1111    }
1112
1113    /// Loads documents asynchronously after applying a preprocessing callback to each record.
1114    pub async fn load_with_preprocess<F>(
1115        &self,
1116        data: &[Value],
1117        id_field: &str,
1118        ttl: Option<i64>,
1119        mut preprocess: F,
1120    ) -> Result<Vec<String>>
1121    where
1122        F: FnMut(&Value) -> Result<Value>,
1123    {
1124        let prepared = prepare_load_records(data, &mut preprocess)?;
1125        validate_load_ids(&prepared, id_field)?;
1126        let client = self.connection.client()?;
1127        let mut connection = client.get_multiplexed_async_connection().await?;
1128        let mut written_keys = Vec::with_capacity(prepared.len());
1129
1130        for record in &prepared {
1131            let object = record.as_object().ok_or_else(|| {
1132                Error::InvalidInput("load expects an array of JSON objects".to_owned())
1133            })?;
1134            let id = extract_id(object, id_field)?;
1135            let key = self.key(id);
1136
1137            match self.storage_type() {
1138                StorageType::Json => {
1139                    let payload = serde_json::to_string(record)?;
1140                    let (): () = redis::cmd("JSON.SET")
1141                        .arg(&key)
1142                        .arg("$")
1143                        .arg(payload)
1144                        .query_async(&mut connection)
1145                        .await?;
1146                }
1147                StorageType::Hash => {
1148                    let encoded = encode_hash_record(object, &self.schema)?;
1149                    let mut cmd = redis::cmd("HSET");
1150                    cmd.arg(&key);
1151                    for (field, value) in encoded {
1152                        cmd.arg(field);
1153                        match value {
1154                            EncodedHashValue::String(value) => {
1155                                cmd.arg(value);
1156                            }
1157                            EncodedHashValue::Binary(value) => {
1158                                cmd.arg(value);
1159                            }
1160                        }
1161                    }
1162                    let _: i32 = cmd.query_async(&mut connection).await?;
1163                }
1164            }
1165
1166            if let Some(ttl) = ttl {
1167                let _: bool = redis::cmd("EXPIRE")
1168                    .arg(&key)
1169                    .arg(ttl)
1170                    .query_async(&mut connection)
1171                    .await?;
1172            }
1173
1174            written_keys.push(key);
1175        }
1176
1177        Ok(written_keys)
1178    }
1179
1180    /// Loads documents using caller-supplied Redis keys instead of generating
1181    /// keys from the index prefix.
1182    ///
1183    /// This mirrors the Python `index.load(data, keys=keys)` signature and is
1184    /// essential for multi-prefix indexes where documents must be written under
1185    /// different key prefixes.
1186    ///
1187    /// `keys` and `data` must have the same length.
1188    pub async fn load_with_keys(
1189        &self,
1190        data: &[Value],
1191        keys: &[String],
1192        ttl: Option<i64>,
1193    ) -> Result<Vec<String>> {
1194        if data.len() != keys.len() {
1195            return Err(Error::InvalidInput(format!(
1196                "data length ({}) must equal keys length ({})",
1197                data.len(),
1198                keys.len()
1199            )));
1200        }
1201
1202        let client = self.connection.client()?;
1203        let mut connection = client.get_multiplexed_async_connection().await?;
1204
1205        for (record, key) in data.iter().zip(keys.iter()) {
1206            let object = record.as_object().ok_or_else(|| {
1207                Error::InvalidInput("load expects an array of JSON objects".to_owned())
1208            })?;
1209
1210            match self.storage_type() {
1211                StorageType::Json => {
1212                    let payload = serde_json::to_string(record)?;
1213                    let (): () = redis::cmd("JSON.SET")
1214                        .arg(key)
1215                        .arg("$")
1216                        .arg(payload)
1217                        .query_async(&mut connection)
1218                        .await?;
1219                }
1220                StorageType::Hash => {
1221                    let encoded = encode_hash_record(object, &self.schema)?;
1222                    let mut cmd = redis::cmd("HSET");
1223                    cmd.arg(key);
1224                    for (field, value) in encoded {
1225                        cmd.arg(field);
1226                        match value {
1227                            EncodedHashValue::String(value) => {
1228                                cmd.arg(value);
1229                            }
1230                            EncodedHashValue::Binary(value) => {
1231                                cmd.arg(value);
1232                            }
1233                        }
1234                    }
1235                    let _: i32 = cmd.query_async(&mut connection).await?;
1236                }
1237            }
1238
1239            if let Some(ttl) = ttl {
1240                let _: bool = redis::cmd("EXPIRE")
1241                    .arg(key)
1242                    .arg(ttl)
1243                    .query_async(&mut connection)
1244                    .await?;
1245            }
1246        }
1247
1248        Ok(keys.to_vec())
1249    }
1250
1251    /// Fetches a document by its logical identifier.
1252    pub async fn fetch(&self, id: &str) -> Result<Option<Value>> {
1253        match self.storage_type() {
1254            StorageType::Json => {
1255                let client = self.connection.client()?;
1256                let mut connection = client.get_multiplexed_async_connection().await?;
1257                let raw: std::result::Result<String, redis::RedisError> = redis::cmd("JSON.GET")
1258                    .arg(self.key(id))
1259                    .arg("$")
1260                    .query_async(&mut connection)
1261                    .await;
1262                match raw {
1263                    Ok(raw) => {
1264                        let parsed = serde_json::from_str::<Value>(&raw)?;
1265                        Ok(match parsed {
1266                            Value::Array(mut values) if values.len() == 1 => values.pop(),
1267                            other => Some(other),
1268                        })
1269                    }
1270                    Err(err) if err.kind() == redis::ErrorKind::UnexpectedReturnType => Ok(None),
1271                    Err(err) => Err(Error::Redis(err)),
1272                }
1273            }
1274            StorageType::Hash => {
1275                let client = self.connection.client()?;
1276                let mut connection = client.get_multiplexed_async_connection().await?;
1277                let map: HashMap<String, String> = redis::cmd("HGETALL")
1278                    .arg(self.key(id))
1279                    .query_async(&mut connection)
1280                    .await?;
1281                if map.is_empty() {
1282                    Ok(None)
1283                } else {
1284                    let mut object = Map::new();
1285                    for (key, value) in map {
1286                        object.insert(key, Value::String(value));
1287                    }
1288                    Ok(Some(Value::Object(object)))
1289                }
1290            }
1291        }
1292    }
1293
1294    /// Drops a single Redis key.
1295    pub async fn drop_key(&self, key: &str) -> Result<usize> {
1296        let client = self.connection.client()?;
1297        let mut connection = client.get_multiplexed_async_connection().await?;
1298        let count: usize = redis::cmd("DEL")
1299            .arg(key)
1300            .query_async(&mut connection)
1301            .await?;
1302        Ok(count)
1303    }
1304
1305    /// Drops multiple Redis keys.
1306    pub async fn drop_keys(&self, keys: &[String]) -> Result<usize> {
1307        if keys.is_empty() {
1308            return Ok(0);
1309        }
1310        let client = self.connection.client()?;
1311        let mut connection = client.get_multiplexed_async_connection().await?;
1312        let mut cmd = redis::cmd("DEL");
1313        for key in keys {
1314            cmd.arg(key);
1315        }
1316        let count: usize = cmd.query_async(&mut connection).await?;
1317        Ok(count)
1318    }
1319
1320    /// Drops a single logical document by identifier.
1321    pub async fn drop_document(&self, id: &str) -> Result<usize> {
1322        self.drop_key(&self.key(id)).await
1323    }
1324
1325    /// Drops multiple logical documents by identifier.
1326    pub async fn drop_documents(&self, ids: &[String]) -> Result<usize> {
1327        if ids.is_empty() {
1328            return Ok(0);
1329        }
1330        let keys = ids.iter().map(|id| self.key(id)).collect::<Vec<_>>();
1331        self.drop_keys(&keys).await
1332    }
1333
1334    /// Applies a TTL to a single key.
1335    pub async fn expire_key(&self, key: &str, ttl_seconds: i64) -> Result<bool> {
1336        let client = self.connection.client()?;
1337        let mut connection = client.get_multiplexed_async_connection().await?;
1338        let applied: bool = redis::cmd("EXPIRE")
1339            .arg(key)
1340            .arg(ttl_seconds)
1341            .query_async(&mut connection)
1342            .await?;
1343        Ok(applied)
1344    }
1345
1346    /// Applies a TTL to multiple keys.
1347    pub async fn expire_keys(&self, keys: &[String], ttl_seconds: i64) -> Result<Vec<bool>> {
1348        let mut results = Vec::with_capacity(keys.len());
1349        for key in keys {
1350            results.push(self.expire_key(key, ttl_seconds).await?);
1351        }
1352        Ok(results)
1353    }
1354
1355    /// Clears keys matching the index prefix while keeping the index itself.
1356    pub async fn clear(&self) -> Result<usize> {
1357        let mut total_deleted = 0;
1358        let query = crate::query::FilterQuery::new(FilterExpression::MatchAll).paging(0, 500);
1359
1360        loop {
1361            let batch = self.search(&query).await?;
1362            if batch.docs.is_empty() {
1363                break;
1364            }
1365
1366            let keys = batch
1367                .docs
1368                .iter()
1369                .map(|doc| doc.id().to_owned())
1370                .collect::<Vec<_>>();
1371            total_deleted += self.drop_keys(&keys).await?;
1372        }
1373
1374        Ok(total_deleted)
1375    }
1376
1377    /// Executes a query asynchronously.
1378    pub async fn search<Q>(&self, query: &Q) -> Result<SearchResult>
1379    where
1380        Q: QueryString + Send + Sync + ?Sized,
1381    {
1382        parse_search_result(self.search_raw(query).await?)
1383    }
1384
1385    /// Executes a query asynchronously and returns processed documents or a count.
1386    pub async fn query<Q>(&self, query: &Q) -> Result<QueryOutput>
1387    where
1388        Q: QueryString + Send + Sync + ?Sized,
1389    {
1390        let results = self.search(query).await?;
1391        process_search_result(results, query, self.schema.index.storage_type)
1392    }
1393
1394    /// Executes multiple queries asynchronously in order.
1395    pub async fn batch_search<'a, I, Q>(&self, queries: I) -> Result<Vec<SearchResult>>
1396    where
1397        I: IntoIterator<Item = &'a Q>,
1398        Q: QueryString + Send + Sync + ?Sized + 'a,
1399    {
1400        self.batch_search_with_size(queries, usize::MAX).await
1401    }
1402
1403    /// Executes multiple queries asynchronously in fixed-size chunks.
1404    pub async fn batch_search_with_size<'a, I, Q>(
1405        &self,
1406        queries: I,
1407        batch_size: usize,
1408    ) -> Result<Vec<SearchResult>>
1409    where
1410        I: IntoIterator<Item = &'a Q>,
1411        Q: QueryString + Send + Sync + ?Sized + 'a,
1412    {
1413        if batch_size == 0 {
1414            return Err(Error::InvalidInput(
1415                "batch_size must be greater than zero".to_owned(),
1416            ));
1417        }
1418
1419        let queries = queries.into_iter().collect::<Vec<_>>();
1420        let mut results = Vec::with_capacity(queries.len());
1421        for chunk in queries.chunks(batch_size) {
1422            for query in chunk {
1423                results.push(parse_search_result(self.search_raw(*query).await?)?);
1424            }
1425        }
1426        Ok(results)
1427    }
1428
1429    /// Executes multiple queries asynchronously in order and processes each result.
1430    pub async fn batch_query<'a, I, Q>(&self, queries: I) -> Result<Vec<QueryOutput>>
1431    where
1432        I: IntoIterator<Item = &'a Q>,
1433        Q: QueryString + Send + Sync + ?Sized + 'a,
1434    {
1435        self.batch_query_with_size(queries, usize::MAX).await
1436    }
1437
1438    /// Executes multiple queries asynchronously in fixed-size chunks.
1439    pub async fn batch_query_with_size<'a, I, Q>(
1440        &self,
1441        queries: I,
1442        batch_size: usize,
1443    ) -> Result<Vec<QueryOutput>>
1444    where
1445        I: IntoIterator<Item = &'a Q>,
1446        Q: QueryString + Send + Sync + ?Sized + 'a,
1447    {
1448        if batch_size == 0 {
1449            return Err(Error::InvalidInput(
1450                "batch_size must be greater than zero".to_owned(),
1451            ));
1452        }
1453
1454        let queries = queries.into_iter().collect::<Vec<_>>();
1455        let mut results = Vec::with_capacity(queries.len());
1456        for chunk in queries.chunks(batch_size) {
1457            for query in chunk {
1458                let parsed = parse_search_result(self.search_raw(*query).await?)?;
1459                results.push(process_search_result(
1460                    parsed,
1461                    *query,
1462                    self.schema.index.storage_type,
1463                )?);
1464            }
1465        }
1466        Ok(results)
1467    }
1468
1469    /// Executes a query asynchronously in successive pages.
1470    pub async fn paginate<Q>(
1471        &self,
1472        query: &Q,
1473        page_size: usize,
1474    ) -> Result<Vec<Vec<Map<String, Value>>>>
1475    where
1476        Q: PageableQuery + Send + Sync + ?Sized,
1477    {
1478        if page_size == 0 {
1479            return Err(Error::InvalidInput(
1480                "page_size must be greater than zero".to_owned(),
1481            ));
1482        }
1483
1484        let mut offset = 0;
1485        let mut batches = Vec::new();
1486        loop {
1487            let page = query.paged(offset, page_size);
1488            let documents = match self.query(&page).await? {
1489                QueryOutput::Documents(documents) => documents,
1490                QueryOutput::Count(_) => {
1491                    return Err(Error::InvalidInput(
1492                        "paginate requires a document-returning query".to_owned(),
1493                    ));
1494                }
1495            };
1496
1497            if documents.is_empty() {
1498                break;
1499            }
1500
1501            let fetched = documents.len();
1502            batches.push(documents);
1503            if fetched < page_size {
1504                break;
1505            }
1506            offset += page_size;
1507        }
1508
1509        Ok(batches)
1510    }
1511
1512    /// Executes a query asynchronously and returns the raw Redis response.
1513    pub async fn search_raw<Q>(&self, query: &Q) -> Result<redis::Value>
1514    where
1515        Q: QueryString + Send + Sync + ?Sized,
1516    {
1517        let sync_index = SearchIndex::new(self.schema.clone(), self.connection.redis_url.clone());
1518        sync_index.validate_query(query)?;
1519        let client = self.connection.client()?;
1520        let mut connection = client.get_multiplexed_async_connection().await?;
1521        let value = sync_index
1522            .search_cmd(query)
1523            .query_async(&mut connection)
1524            .await?;
1525        Ok(value)
1526    }
1527
1528    /// Executes a [`crate::query::HybridQuery`] asynchronously via `FT.HYBRID` and returns
1529    /// processed documents.
1530    ///
1531    /// Requires Redis 8.4.0+ with the hybrid search capability.
1532    pub async fn hybrid_query(&self, query: &crate::query::HybridQuery<'_>) -> Result<QueryOutput> {
1533        let client = self.connection.client()?;
1534        let mut connection = client.get_multiplexed_async_connection().await?;
1535        let cmd = query.build_cmd(self.name());
1536        let value: redis::Value = cmd.query_async(&mut connection).await?;
1537        let documents = parse_hybrid_result(value)?;
1538        Ok(QueryOutput::Documents(documents))
1539    }
1540
1541    /// Executes an [`crate::query::AggregateHybridQuery`] asynchronously via `FT.AGGREGATE`
1542    /// and returns processed documents.
1543    pub async fn aggregate_query(
1544        &self,
1545        query: &crate::query::AggregateHybridQuery<'_>,
1546    ) -> Result<QueryOutput> {
1547        let client = self.connection.client()?;
1548        let mut connection = client.get_multiplexed_async_connection().await?;
1549        let cmd = query.build_aggregate_cmd(self.name());
1550        let value: redis::Value = cmd.query_async(&mut connection).await?;
1551        let documents = parse_aggregate_result(value)?;
1552        Ok(QueryOutput::Documents(documents))
1553    }
1554
1555    /// Executes an [`crate::query::SQLQuery`] asynchronously and automatically dispatches
1556    /// to `FT.SEARCH` or `FT.AGGREGATE` depending on the SQL statement.
1557    ///
1558    /// This mirrors the Python `AsyncSearchIndex.query(SQLQuery(...))` behavior.
1559    #[cfg(feature = "sql")]
1560    pub async fn sql_query(&self, query: &crate::query::SQLQuery) -> Result<QueryOutput> {
1561        // Geo aggregate (geo_distance in SELECT) → FT.AGGREGATE.
1562        if let Some(cmd) = query.build_geo_aggregate_cmd(self.name()) {
1563            let client = self.connection.client()?;
1564            let mut connection = client.get_multiplexed_async_connection().await?;
1565            let value: redis::Value = cmd.query_async(&mut connection).await?;
1566            let documents = parse_aggregate_result(value)?;
1567            return Ok(QueryOutput::Documents(documents));
1568        }
1569        // Standard aggregate (COUNT, SUM, GROUP BY, etc.) → FT.AGGREGATE.
1570        if let Some(cmd) = query.build_aggregate_cmd(self.name()) {
1571            let client = self.connection.client()?;
1572            let mut connection = client.get_multiplexed_async_connection().await?;
1573            let value: redis::Value = cmd.query_async(&mut connection).await?;
1574            let documents = parse_aggregate_result(value)?;
1575            return Ok(QueryOutput::Documents(documents));
1576        }
1577        // Vector and geo WHERE queries use the regular FT.SEARCH path.
1578        self.query(query).await
1579    }
1580
1581    /// Executes a [`crate::query::MultiVectorQuery`] asynchronously via `FT.AGGREGATE` and
1582    /// returns processed documents.
1583    pub async fn multi_vector_query(
1584        &self,
1585        query: &crate::query::MultiVectorQuery<'_>,
1586    ) -> Result<QueryOutput> {
1587        let client = self.connection.client()?;
1588        let mut connection = client.get_multiplexed_async_connection().await?;
1589        let cmd = query.build_aggregate_cmd(self.name());
1590        let value: redis::Value = cmd.query_async(&mut connection).await?;
1591        let documents = parse_aggregate_result(value)?;
1592        Ok(QueryOutput::Documents(documents))
1593    }
1594
1595    /// Constructs an [`AsyncSearchIndex`] from an existing Redis index by
1596    /// reading `FT.INFO` and reconstructing the schema.
1597    ///
1598    /// Mirrors Python `AsyncSearchIndex.from_existing(name, redis_url=...)`.
1599    pub async fn from_existing(name: &str, redis_url: impl Into<String>) -> Result<Self> {
1600        let connection = RedisConnectionInfo::new(redis_url);
1601        let client = connection.client()?;
1602        let mut conn = client.get_multiplexed_async_connection().await?;
1603        let value: redis::Value = redis::cmd("FT.INFO")
1604            .arg(name)
1605            .query_async(&mut conn)
1606            .await?;
1607        let info = parse_info_response(value)?;
1608        let schema = schema_from_info(name, &info)?;
1609        Ok(Self { schema, connection })
1610    }
1611}
1612
1613#[allow(dead_code)]
1614fn _storage_type_for_load(schema: &IndexSchema) -> StorageType {
1615    schema.index.storage_type
1616}
1617
1618fn extract_id<'a>(object: &'a Map<String, Value>, id_field: &str) -> Result<&'a str> {
1619    object
1620        .get(id_field)
1621        .and_then(Value::as_str)
1622        .ok_or_else(|| Error::InvalidInput(format!("missing string id field '{id_field}'")))
1623}
1624
1625fn validate_load_ids(records: &[Value], id_field: &str) -> Result<()> {
1626    for record in records {
1627        let object = record
1628            .as_object()
1629            .ok_or_else(|| Error::InvalidInput("load expects JSON object records".to_owned()))?;
1630        extract_id(object, id_field)?;
1631    }
1632    Ok(())
1633}
1634
1635fn compose_key(prefix: &str, key_separator: &str, key_suffix: &str) -> String {
1636    if prefix.is_empty() {
1637        return key_suffix.to_owned();
1638    }
1639
1640    if key_separator.is_empty() {
1641        return format!("{prefix}{key_suffix}");
1642    }
1643
1644    let normalized_prefix = prefix.trim_end_matches(key_separator);
1645    if normalized_prefix.is_empty() {
1646        key_suffix.to_owned()
1647    } else {
1648        format!("{normalized_prefix}{key_separator}{key_suffix}")
1649    }
1650}
1651
1652enum EncodedHashValue {
1653    String(String),
1654    Binary(Vec<u8>),
1655}
1656
1657fn encode_hash_record(
1658    object: &Map<String, Value>,
1659    schema: &IndexSchema,
1660) -> Result<Vec<(String, EncodedHashValue)>> {
1661    let mut pairs = Vec::with_capacity(object.len());
1662    for (key, value) in object {
1663        let encoded_value = match value {
1664            Value::Array(values)
1665                if matches!(
1666                    schema.field(key),
1667                    Some(crate::schema::Field {
1668                        kind: FieldKind::Vector { .. },
1669                        ..
1670                    })
1671                ) =>
1672            {
1673                EncodedHashValue::Binary(encode_vector_hash_field(key, values, schema)?)
1674            }
1675            Value::Null => EncodedHashValue::String("null".to_owned()),
1676            Value::Bool(value) => EncodedHashValue::String(value.to_string()),
1677            Value::Number(value) => EncodedHashValue::String(value.to_string()),
1678            Value::String(value) => EncodedHashValue::String(value.clone()),
1679            Value::Array(_) | Value::Object(_) => {
1680                EncodedHashValue::String(serde_json::to_string(value)?)
1681            }
1682        };
1683        pairs.push((key.clone(), encoded_value));
1684    }
1685    Ok(pairs)
1686}
1687
1688fn encode_vector_hash_field(
1689    field_name: &str,
1690    values: &[Value],
1691    schema: &IndexSchema,
1692) -> Result<Vec<u8>> {
1693    let Some(crate::schema::Field {
1694        kind: FieldKind::Vector { attrs },
1695        ..
1696    }) = schema.field(field_name)
1697    else {
1698        return Err(Error::SchemaValidation(format!(
1699            "vector field '{field_name}' not found in schema"
1700        )));
1701    };
1702
1703    if values.len() != attrs.dims {
1704        return Err(Error::InvalidInput(format!(
1705            "vector field '{field_name}' expected {} elements, received {}",
1706            attrs.dims,
1707            values.len()
1708        )));
1709    }
1710
1711    match attrs.datatype {
1712        crate::schema::VectorDataType::Bfloat16 => {
1713            let mut buffer = Vec::with_capacity(values.len() * 2);
1714            for value in values {
1715                let number = json_number_to_f64(value, field_name)? as f32;
1716                // BFloat16: upper 16 bits of f32 (truncate mantissa)
1717                let bits = number.to_bits();
1718                let bf16 = (bits >> 16) as u16;
1719                buffer.extend_from_slice(&bf16.to_le_bytes());
1720            }
1721            Ok(buffer)
1722        }
1723        crate::schema::VectorDataType::Float16 => {
1724            let mut buffer = Vec::with_capacity(values.len() * 2);
1725            for value in values {
1726                let number = json_number_to_f64(value, field_name)? as f32;
1727                buffer.extend_from_slice(&f32_to_f16_bytes(number).to_le_bytes());
1728            }
1729            Ok(buffer)
1730        }
1731        crate::schema::VectorDataType::Float32 => {
1732            let mut buffer = Vec::with_capacity(values.len() * std::mem::size_of::<f32>());
1733            for value in values {
1734                let number = json_number_to_f64(value, field_name)? as f32;
1735                buffer.extend_from_slice(&number.to_le_bytes());
1736            }
1737            Ok(buffer)
1738        }
1739        crate::schema::VectorDataType::Float64 => {
1740            let mut buffer = Vec::with_capacity(values.len() * std::mem::size_of::<f64>());
1741            for value in values {
1742                let number = json_number_to_f64(value, field_name)?;
1743                buffer.extend_from_slice(&number.to_le_bytes());
1744            }
1745            Ok(buffer)
1746        }
1747    }
1748}
1749
1750fn json_number_to_f64(value: &Value, field_name: &str) -> Result<f64> {
1751    value.as_f64().ok_or_else(|| {
1752        Error::InvalidInput(format!(
1753            "vector field '{field_name}' must be encoded from numeric JSON values"
1754        ))
1755    })
1756}
1757
1758/// Converts an f32 value to IEEE 754 half-precision (float16) encoded as u16.
1759fn f32_to_f16_bytes(value: f32) -> u16 {
1760    let bits = value.to_bits();
1761    let sign = (bits >> 16) & 0x8000;
1762    let exponent = ((bits >> 23) & 0xFF) as i32;
1763    let mantissa = bits & 0x007F_FFFF;
1764
1765    if exponent == 255 {
1766        // Infinity or NaN
1767        let m = if mantissa != 0 { 0x0200 } else { 0 };
1768        return (sign | 0x7C00 | m) as u16;
1769    }
1770
1771    let unbiased = exponent - 127;
1772    if unbiased > 15 {
1773        // Overflow → infinity
1774        return (sign | 0x7C00) as u16;
1775    }
1776    if unbiased < -24 {
1777        // Underflow → zero
1778        return sign as u16;
1779    }
1780    if unbiased < -14 {
1781        // Subnormal
1782        let shift = -14 - unbiased;
1783        let m = (mantissa | 0x0080_0000) >> (shift + 13);
1784        return (sign | m) as u16;
1785    }
1786
1787    let exp16 = ((unbiased + 15) as u32) << 10;
1788    let m = mantissa >> 13;
1789    (sign | exp16 | m) as u16
1790}
1791
1792fn prepare_load_records<F>(data: &[Value], preprocess: &mut F) -> Result<Vec<Value>>
1793where
1794    F: FnMut(&Value) -> Result<Value>,
1795{
1796    let mut prepared = Vec::with_capacity(data.len());
1797    for record in data {
1798        let processed = preprocess(record)?;
1799        if !processed.is_object() {
1800            return Err(Error::InvalidInput(
1801                "preprocess must return a JSON object".to_owned(),
1802            ));
1803        }
1804        prepared.push(processed);
1805    }
1806    Ok(prepared)
1807}
1808
1809fn parse_search_result(value: redis::Value) -> Result<SearchResult> {
1810    let entries = match value {
1811        redis::Value::Array(entries) => entries,
1812        redis::Value::Nil => return Ok(SearchResult::new(0, Vec::new())),
1813        other => {
1814            return Err(Error::InvalidInput(format!(
1815                "expected FT.SEARCH array response, received {other:?}"
1816            )));
1817        }
1818    };
1819
1820    let mut entries = VecDeque::from(entries);
1821    let total = entries
1822        .pop_front()
1823        .map(redis_value_to_usize)
1824        .transpose()?
1825        .unwrap_or(0);
1826
1827    let mut docs = Vec::new();
1828    while let Some(id_value) = entries.pop_front() {
1829        let id = redis_value_to_string(&id_value)?;
1830        let fields = match entries.front() {
1831            Some(next) if is_search_payload(next) => {
1832                let payload = entries.pop_front().expect("front element exists");
1833                decode_search_payload(payload)?
1834            }
1835            _ => Map::new(),
1836        };
1837        docs.push(SearchDocument::new(id, fields));
1838    }
1839
1840    Ok(SearchResult::new(total, docs))
1841}
1842
1843/// Parses an `FT.HYBRID` response.
1844///
1845/// FT.HYBRID returns a map-like array:
1846/// ```text
1847/// ["total_results", <int>, "results", [[field, value, ...], ...], "warnings", [...], "execution_time", "..."]
1848/// ```
1849///
1850/// Each result document is a flat array of `[field, value, field, value, ...]`
1851/// with no separate document ID.
1852fn parse_hybrid_result(value: redis::Value) -> Result<Vec<Map<String, Value>>> {
1853    let entries = match value {
1854        redis::Value::Array(entries) => entries,
1855        redis::Value::Nil => return Ok(Vec::new()),
1856        other => {
1857            return Err(Error::InvalidInput(format!(
1858                "expected FT.HYBRID array response, received {other:?}"
1859            )));
1860        }
1861    };
1862
1863    // Parse the top-level map: walk key-value pairs.
1864    let mut results_value: Option<redis::Value> = None;
1865    let mut iter = entries.into_iter();
1866    while let Some(key) = iter.next() {
1867        let key_str = redis_value_to_string(&key).unwrap_or_default();
1868        let val = iter.next();
1869        match key_str.as_str() {
1870            "results" => {
1871                results_value = val;
1872            }
1873            _ => {
1874                // Skip total_results, warnings, execution_time, etc.
1875            }
1876        }
1877    }
1878
1879    let results_array = match results_value {
1880        Some(redis::Value::Array(arr)) => arr,
1881        Some(redis::Value::Nil) | None => return Ok(Vec::new()),
1882        Some(other) => {
1883            return Err(Error::InvalidInput(format!(
1884                "expected results array in FT.HYBRID response, received {other:?}"
1885            )));
1886        }
1887    };
1888
1889    let mut documents = Vec::with_capacity(results_array.len());
1890    for entry in results_array {
1891        match entry {
1892            redis::Value::Array(pairs) => {
1893                let mut map = Map::new();
1894                let mut pair_iter = pairs.into_iter();
1895                while let Some(field_val) = pair_iter.next() {
1896                    let field = redis_value_to_string(&field_val)?;
1897                    if let Some(value_val) = pair_iter.next() {
1898                        let json_val = redis_value_to_json(value_val)?;
1899                        // Skip internal fields like __key, __score
1900                        if !field.starts_with("__") {
1901                            map.insert(field, json_val);
1902                        }
1903                    }
1904                }
1905                documents.push(map);
1906            }
1907            _ => {
1908                // Skip non-array entries
1909            }
1910        }
1911    }
1912
1913    Ok(documents)
1914}
1915
1916fn parse_info_response(value: redis::Value) -> Result<Map<String, Value>> {
1917    let entries = match value {
1918        redis::Value::Map(entries) => entries,
1919        redis::Value::Array(entries) => {
1920            let mut pairs = VecDeque::from(entries);
1921            let mut mapped = Vec::new();
1922            while let Some(key) = pairs.pop_front() {
1923                let Some(value) = pairs.pop_front() else {
1924                    return Err(Error::InvalidInput(
1925                        "FT.INFO response contained an odd number of elements".to_owned(),
1926                    ));
1927                };
1928                mapped.push((key, value));
1929            }
1930            mapped
1931        }
1932        other => {
1933            return Err(Error::InvalidInput(format!(
1934                "expected FT.INFO map response, received {other:?}"
1935            )));
1936        }
1937    };
1938
1939    let mut info = Map::with_capacity(entries.len());
1940    for (key, value) in entries {
1941        info.insert(redis_value_to_string(&key)?, redis_value_to_json(value)?);
1942    }
1943    Ok(info)
1944}
1945
1946/// Parses an `FT.AGGREGATE` response into a list of document maps.
1947///
1948/// Redis `FT.AGGREGATE` returns an array: `[total, [field, value, ...], ...]`.
1949fn parse_aggregate_result(value: redis::Value) -> Result<Vec<Map<String, Value>>> {
1950    let entries = match value {
1951        redis::Value::Array(entries) => entries,
1952        redis::Value::Nil => return Ok(Vec::new()),
1953        other => {
1954            return Err(Error::InvalidInput(format!(
1955                "expected FT.AGGREGATE array response, received {other:?}"
1956            )));
1957        }
1958    };
1959
1960    let mut it = entries.into_iter();
1961
1962    // First element is the total count (we skip it for doc processing)
1963    let _total = it.next();
1964
1965    let mut documents = Vec::new();
1966    for row in it {
1967        let row_entries = match row {
1968            redis::Value::Array(entries) => entries,
1969            redis::Value::Map(entries) => entries
1970                .into_iter()
1971                .flat_map(|(k, v)| [k, v])
1972                .collect::<Vec<_>>(),
1973            _ => continue,
1974        };
1975
1976        let mut pairs = VecDeque::from(row_entries);
1977        let mut map = Map::new();
1978        while let Some(key) = pairs.pop_front() {
1979            let Some(val) = pairs.pop_front() else { break };
1980            let field = redis_value_to_string(&key)?;
1981            if field == "__score" {
1982                continue; // Strip internal score like Python
1983            }
1984            map.insert(field, redis_value_to_json(val)?);
1985        }
1986        documents.push(map);
1987    }
1988
1989    Ok(documents)
1990}
1991
1992/// Reconstructs an [`IndexSchema`] from parsed `FT.INFO` output.
1993///
1994/// Mirrors Python `convert_index_info_to_schema`.
1995fn schema_from_info(name: &str, info: &Map<String, Value>) -> Result<IndexSchema> {
1996    // Extract storage type and prefixes from index_definition
1997    let index_def = info.get("index_definition").and_then(Value::as_array);
1998
1999    let mut storage_type = StorageType::Hash;
2000    let mut prefix = crate::schema::Prefix::default();
2001
2002    if let Some(def_arr) = index_def {
2003        // index_definition is a flat array: [key, value, key, value, ...]
2004        let mut i = 0;
2005        while i + 1 < def_arr.len() {
2006            let key = def_arr[i].as_str().unwrap_or("");
2007            match key {
2008                "key_type" => {
2009                    if let Some(v) = def_arr[i + 1].as_str() {
2010                        storage_type = match v.to_uppercase().as_str() {
2011                            "JSON" => StorageType::Json,
2012                            _ => StorageType::Hash,
2013                        };
2014                    }
2015                }
2016                "prefixes" => {
2017                    if let Some(arr) = def_arr[i + 1].as_array() {
2018                        let prefixes: Vec<String> = arr
2019                            .iter()
2020                            .filter_map(Value::as_str)
2021                            .map(String::from)
2022                            .collect();
2023                        prefix = if prefixes.len() == 1 {
2024                            crate::schema::Prefix::Single(prefixes.into_iter().next().unwrap())
2025                        } else {
2026                            crate::schema::Prefix::Multi(prefixes)
2027                        };
2028                    }
2029                }
2030                _ => {}
2031            }
2032            i += 2;
2033        }
2034    }
2035
2036    // Parse attributes (fields)
2037    let attributes = info.get("attributes").and_then(Value::as_array);
2038    let mut fields = Vec::new();
2039
2040    if let Some(attrs) = attributes {
2041        for attr_val in attrs {
2042            let attr_arr = match attr_val.as_array() {
2043                Some(arr) => arr,
2044                None => continue,
2045            };
2046
2047            if attr_arr.is_empty() {
2048                continue;
2049            }
2050
2051            // Parse the flat attribute array: [identifier, name, ...]
2052            let mut field_name = String::new();
2053            let mut field_type = String::new();
2054            let mut sortable = false;
2055            let mut no_index = false;
2056            let mut case_sensitive = false;
2057            let mut separator: Option<String> = None;
2058            let mut weight: Option<f32> = None;
2059            let mut no_stem = false;
2060            let mut with_suffix_trie = false;
2061            let mut phonetic: Option<String> = None;
2062            // Vector attrs
2063            let mut algorithm = String::new();
2064            let mut dims: usize = 0;
2065            let mut distance_metric = String::new();
2066            let mut datatype = String::new();
2067
2068            let mut i = 0;
2069            while i < attr_arr.len() {
2070                let key = attr_arr[i].as_str().unwrap_or("");
2071                match key {
2072                    "identifier" | "attribute" => {
2073                        if i + 1 < attr_arr.len() {
2074                            if let Some(v) = attr_arr[i + 1].as_str() {
2075                                if key == "attribute" || field_name.is_empty() {
2076                                    field_name = v.to_owned();
2077                                }
2078                            }
2079                        }
2080                        i += 2;
2081                    }
2082                    "type" => {
2083                        if i + 1 < attr_arr.len() {
2084                            if let Some(v) = attr_arr[i + 1].as_str() {
2085                                field_type = v.to_uppercase();
2086                            }
2087                        }
2088                        i += 2;
2089                    }
2090                    "SORTABLE" => {
2091                        sortable = true;
2092                        i += 1;
2093                    }
2094                    "NOINDEX" => {
2095                        no_index = true;
2096                        i += 1;
2097                    }
2098                    "CASESENSITIVE" => {
2099                        case_sensitive = true;
2100                        i += 1;
2101                    }
2102                    "NOSTEM" => {
2103                        no_stem = true;
2104                        i += 1;
2105                    }
2106                    "WITHSUFFIXTRIE" => {
2107                        with_suffix_trie = true;
2108                        i += 1;
2109                    }
2110                    "SEPARATOR" => {
2111                        if i + 1 < attr_arr.len() {
2112                            separator = attr_arr[i + 1].as_str().map(String::from);
2113                        }
2114                        i += 2;
2115                    }
2116                    "WEIGHT" => {
2117                        if i + 1 < attr_arr.len() {
2118                            weight = attr_arr[i + 1]
2119                                .as_str()
2120                                .and_then(|s| s.parse::<f32>().ok())
2121                                .or_else(|| attr_arr[i + 1].as_f64().map(|v| v as f32));
2122                        }
2123                        i += 2;
2124                    }
2125                    "PHONETIC" => {
2126                        if i + 1 < attr_arr.len() {
2127                            phonetic = attr_arr[i + 1].as_str().map(String::from);
2128                        }
2129                        i += 2;
2130                    }
2131                    _ if field_type == "VECTOR" => {
2132                        // Once we hit VECTOR type, remaining entries are vector params
2133                        // Format: algorithm, param_count, key, value, key, value, ...
2134                        // Or: key, value, key, value, ...
2135                        let upper = key.to_uppercase();
2136                        if upper == "FLAT" || upper == "HNSW" {
2137                            algorithm = upper.to_lowercase();
2138                            i += 1;
2139                            // Next might be a param count, skip it
2140                            if i < attr_arr.len() {
2141                                if attr_arr[i]
2142                                    .as_str()
2143                                    .and_then(|s| s.parse::<usize>().ok())
2144                                    .is_some()
2145                                    || attr_arr[i].as_i64().is_some()
2146                                {
2147                                    i += 1; // skip count
2148                                }
2149                            }
2150                        } else if upper == "ALGORITHM" {
2151                            if i + 1 < attr_arr.len() {
2152                                algorithm =
2153                                    attr_arr[i + 1].as_str().unwrap_or("flat").to_lowercase();
2154                            }
2155                            i += 2;
2156                        } else if upper == "DIM" || upper == "DIMS" {
2157                            if i + 1 < attr_arr.len() {
2158                                dims = attr_arr[i + 1]
2159                                    .as_str()
2160                                    .and_then(|s| s.parse().ok())
2161                                    .or_else(|| attr_arr[i + 1].as_u64().map(|v| v as usize))
2162                                    .unwrap_or(0);
2163                            }
2164                            i += 2;
2165                        } else if upper == "DISTANCE_METRIC" {
2166                            if i + 1 < attr_arr.len() {
2167                                distance_metric =
2168                                    attr_arr[i + 1].as_str().unwrap_or("cosine").to_lowercase();
2169                            }
2170                            i += 2;
2171                        } else if upper == "TYPE" || upper == "DATA_TYPE" || upper == "DATATYPE" {
2172                            if i + 1 < attr_arr.len() {
2173                                datatype =
2174                                    attr_arr[i + 1].as_str().unwrap_or("float32").to_lowercase();
2175                            }
2176                            i += 2;
2177                        } else {
2178                            // Skip unknown vector param
2179                            i += 2;
2180                        }
2181                    }
2182                    _ => {
2183                        i += 1;
2184                    }
2185                }
2186            }
2187
2188            // Strip JSON path prefix from field name
2189            let field_name = field_name
2190                .strip_prefix("$.")
2191                .unwrap_or(&field_name)
2192                .to_owned();
2193
2194            // Normalize Redis Search defaults back to None so that
2195            // schemas reconstructed from FT.INFO compare equal to
2196            // schemas built from JSON/YAML where optional defaults
2197            // are omitted.  Redis returns separator="," for TAG fields
2198            // and weight=1 for TEXT fields even when they were not
2199            // explicitly set during FT.CREATE.
2200            let separator = separator.filter(|s| s != ",");
2201            let weight = weight.filter(|w| (*w - 1.0).abs() > f32::EPSILON);
2202
2203            let kind = match field_type.as_str() {
2204                "TAG" => FieldKind::Tag {
2205                    attrs: crate::schema::TagFieldAttributes {
2206                        separator,
2207                        case_sensitive,
2208                        sortable,
2209                        no_index,
2210                        index_missing: false,
2211                        index_empty: false,
2212                    },
2213                },
2214                "TEXT" => FieldKind::Text {
2215                    attrs: crate::schema::TextFieldAttributes {
2216                        weight,
2217                        sortable,
2218                        no_stem,
2219                        no_index,
2220                        phonetic,
2221                        with_suffix_trie,
2222                        index_missing: false,
2223                        index_empty: false,
2224                    },
2225                },
2226                "NUMERIC" => FieldKind::Numeric {
2227                    attrs: crate::schema::NumericFieldAttributes {
2228                        sortable,
2229                        no_index,
2230                        index_missing: false,
2231                        index_empty: false,
2232                    },
2233                },
2234                "GEO" => FieldKind::Geo {
2235                    attrs: crate::schema::GeoFieldAttributes {
2236                        sortable,
2237                        no_index,
2238                        index_missing: false,
2239                        index_empty: false,
2240                    },
2241                },
2242                "VECTOR" => {
2243                    let algo = match algorithm.to_lowercase().as_str() {
2244                        "hnsw" => crate::schema::VectorAlgorithm::Hnsw,
2245                        "svs-vamana" | "svs_vamana" => crate::schema::VectorAlgorithm::SvsVamana,
2246                        _ => crate::schema::VectorAlgorithm::Flat,
2247                    };
2248                    let dm = match distance_metric.as_str() {
2249                        "l2" => crate::schema::VectorDistanceMetric::L2,
2250                        "ip" => crate::schema::VectorDistanceMetric::Ip,
2251                        _ => crate::schema::VectorDistanceMetric::Cosine,
2252                    };
2253                    let dt = match datatype.to_lowercase().as_str() {
2254                        "float64" => crate::schema::VectorDataType::Float64,
2255                        "float16" => crate::schema::VectorDataType::Float16,
2256                        "bfloat16" => crate::schema::VectorDataType::Bfloat16,
2257                        _ => crate::schema::VectorDataType::Float32,
2258                    };
2259                    FieldKind::Vector {
2260                        attrs: crate::schema::VectorFieldAttributes {
2261                            algorithm: algo,
2262                            dims,
2263                            distance_metric: dm,
2264                            datatype: dt,
2265                            initial_cap: None,
2266                            block_size: None,
2267                            m: None,
2268                            ef_construction: None,
2269                            ef_runtime: None,
2270                            epsilon: None,
2271                            graph_max_degree: None,
2272                            construction_window_size: None,
2273                            search_window_size: None,
2274                            compression: None,
2275                            reduce: None,
2276                            training_threshold: None,
2277                        },
2278                    }
2279                }
2280                _ => continue, // skip unknown field types
2281            };
2282
2283            fields.push(crate::schema::Field {
2284                name: field_name,
2285                path: None,
2286                kind,
2287            });
2288        }
2289    }
2290
2291    Ok(IndexSchema {
2292        index: IndexDefinition {
2293            name: name.to_owned(),
2294            prefix,
2295            key_separator: ":".to_owned(),
2296            storage_type,
2297            stopwords: Vec::new(),
2298        },
2299        fields,
2300    })
2301}
2302
2303fn process_search_result<Q>(
2304    results: SearchResult,
2305    query: &Q,
2306    storage_type: StorageType,
2307) -> Result<QueryOutput>
2308where
2309    Q: QueryString + ?Sized,
2310{
2311    if query.kind() == QueryKind::Count {
2312        return Ok(QueryOutput::Count(results.total));
2313    }
2314
2315    let unpack_json = matches!(storage_type, StorageType::Json)
2316        && query.should_unpack_json()
2317        && query.render().return_fields.is_empty();
2318    let mut documents = Vec::with_capacity(results.docs.len());
2319
2320    for document in results.docs {
2321        let mut map = document.into_map();
2322        if unpack_json {
2323            map = unpack_json_document(map)?;
2324        }
2325        map.remove("payload");
2326        documents.push(map);
2327    }
2328
2329    Ok(QueryOutput::Documents(documents))
2330}
2331
2332fn unpack_json_document(mut document: Map<String, Value>) -> Result<Map<String, Value>> {
2333    let Some(json_value) = document.remove("json") else {
2334        return Ok(document);
2335    };
2336
2337    let parsed = match json_value {
2338        Value::String(raw) => serde_json::from_str::<Value>(&raw)?,
2339        value => value,
2340    };
2341
2342    let mut unpacked = Map::new();
2343    if let Some(id) = document.remove("id") {
2344        unpacked.insert("id".to_owned(), id);
2345    }
2346
2347    match parsed {
2348        Value::Object(object) => {
2349            unpacked.extend(object);
2350            Ok(unpacked)
2351        }
2352        other => Err(Error::InvalidInput(format!(
2353            "expected JSON object payload while unpacking search result, received {other:?}"
2354        ))),
2355    }
2356}
2357
2358fn is_search_payload(value: &redis::Value) -> bool {
2359    matches!(
2360        value,
2361        redis::Value::Array(_) | redis::Value::Map(_) | redis::Value::Attribute { .. }
2362    )
2363}
2364
2365fn decode_search_payload(value: redis::Value) -> Result<Map<String, Value>> {
2366    match value {
2367        redis::Value::Array(entries) => decode_search_pairs(entries),
2368        redis::Value::Map(entries) => {
2369            let flat = entries
2370                .into_iter()
2371                .flat_map(|(key, value)| [key, value])
2372                .collect::<Vec<_>>();
2373            decode_search_pairs(flat)
2374        }
2375        redis::Value::Attribute { data, .. } => decode_search_payload(*data),
2376        other => Err(Error::InvalidInput(format!(
2377            "expected FT.SEARCH document payload, received {other:?}"
2378        ))),
2379    }
2380}
2381
2382fn decode_search_pairs(entries: Vec<redis::Value>) -> Result<Map<String, Value>> {
2383    let mut pairs = VecDeque::from(entries);
2384    let mut fields = Map::new();
2385    while let Some(key) = pairs.pop_front() {
2386        let Some(value) = pairs.pop_front() else {
2387            return Err(Error::InvalidInput(
2388                "FT.SEARCH document payload contained an odd number of elements".to_owned(),
2389            ));
2390        };
2391        let field = redis_value_to_string(&key)?;
2392        let normalized = if field == "$" { "json" } else { field.as_str() };
2393        fields.insert(normalized.to_owned(), redis_value_to_json(value)?);
2394    }
2395    Ok(fields)
2396}
2397
2398fn redis_value_to_usize(value: redis::Value) -> Result<usize> {
2399    let number =
2400        match value {
2401            redis::Value::Int(value) => value,
2402            redis::Value::BulkString(bytes) => String::from_utf8_lossy(&bytes)
2403                .parse::<i64>()
2404                .map_err(|_| {
2405                    Error::InvalidInput("unable to parse integer Redis response".to_owned())
2406                })?,
2407            redis::Value::SimpleString(value) => value.parse::<i64>().map_err(|_| {
2408                Error::InvalidInput("unable to parse integer Redis response".to_owned())
2409            })?,
2410            other => {
2411                return Err(Error::InvalidInput(format!(
2412                    "expected integer Redis response, received {other:?}"
2413                )));
2414            }
2415        };
2416
2417    usize::try_from(number)
2418        .map_err(|_| Error::InvalidInput("redis returned a negative integer".to_owned()))
2419}
2420
2421fn redis_value_to_string(value: &redis::Value) -> Result<String> {
2422    match value {
2423        redis::Value::BulkString(bytes) => Ok(String::from_utf8_lossy(bytes).into_owned()),
2424        redis::Value::SimpleString(value) => Ok(value.clone()),
2425        redis::Value::VerbatimString { text, .. } => Ok(text.clone()),
2426        redis::Value::Int(value) => Ok(value.to_string()),
2427        redis::Value::Double(value) => Ok(value.to_string()),
2428        redis::Value::Boolean(value) => Ok(value.to_string()),
2429        other => Err(Error::InvalidInput(format!(
2430            "expected string-like Redis response, received {other:?}"
2431        ))),
2432    }
2433}
2434
2435fn redis_value_to_json(value: redis::Value) -> Result<Value> {
2436    match value {
2437        redis::Value::Nil => Ok(Value::Null),
2438        redis::Value::Int(value) => Ok(Value::from(value)),
2439        redis::Value::Double(value) => Ok(Value::from(value)),
2440        redis::Value::Boolean(value) => Ok(Value::from(value)),
2441        redis::Value::BulkString(bytes) => {
2442            Ok(Value::String(String::from_utf8_lossy(&bytes).into_owned()))
2443        }
2444        redis::Value::SimpleString(value) => Ok(Value::String(value)),
2445        redis::Value::Okay => Ok(Value::String("OK".to_owned())),
2446        redis::Value::VerbatimString { text, .. } => Ok(Value::String(text)),
2447        redis::Value::Array(values) | redis::Value::Set(values) => {
2448            let mut array = Vec::with_capacity(values.len());
2449            for value in values {
2450                array.push(redis_value_to_json(value)?);
2451            }
2452            Ok(Value::Array(array))
2453        }
2454        redis::Value::Map(entries) => {
2455            let mut object = Map::with_capacity(entries.len());
2456            for (key, value) in entries {
2457                object.insert(redis_value_to_string(&key)?, redis_value_to_json(value)?);
2458            }
2459            Ok(Value::Object(object))
2460        }
2461        redis::Value::Attribute { data, .. } => redis_value_to_json(*data),
2462        redis::Value::BigNumber(number) => Ok(Value::String(number.to_string())),
2463        redis::Value::Push { .. } | redis::Value::ServerError(_) => {
2464            Ok(Value::String(format!("{value:?}")))
2465        }
2466        _ => Ok(Value::String(format!("{value:?}"))),
2467    }
2468}
2469
2470#[cfg(test)]
2471mod tests {
2472    use super::{
2473        EncodedHashValue, QueryOutput, SearchDocument, SearchIndex, SearchResult, compose_key,
2474        encode_hash_record, parse_aggregate_result, parse_info_response, parse_search_result,
2475        prepare_load_records, process_search_result, schema_from_info,
2476    };
2477    use crate::{
2478        filter::Tag,
2479        query::{CountQuery, FilterQuery},
2480        schema::{IndexSchema, StorageType},
2481    };
2482    use serde_json::{Map, Value, json};
2483
2484    #[test]
2485    fn search_index_properties_should_match_python_integration_test_search_index() {
2486        let index = SearchIndex::from_json_value(
2487            serde_json::json!({
2488                "index": { "name": "my_index" },
2489                "fields": [
2490                    { "name": "test", "type": "tag" }
2491                ]
2492            }),
2493            "redis://127.0.0.1:6379",
2494        )
2495        .expect("index should parse");
2496
2497        assert_eq!(index.name(), "my_index");
2498        assert_eq!(index.prefix(), "rvl");
2499        assert_eq!(index.key_separator(), ":");
2500        assert!(matches!(index.storage_type(), StorageType::Hash));
2501        assert_eq!(index.key("foo"), "rvl:foo");
2502    }
2503
2504    #[test]
2505    fn search_index_should_honor_empty_prefix_like_python_integration_test_search_index() {
2506        let index = SearchIndex::from_json_value(
2507            serde_json::json!({
2508                "index": { "name": "my_index", "prefix": "" },
2509                "fields": [
2510                    { "name": "test", "type": "tag" }
2511                ]
2512            }),
2513            "redis://127.0.0.1:6379",
2514        )
2515        .expect("index should parse");
2516
2517        assert_eq!(index.prefix(), "");
2518        assert_eq!(index.key("foo"), "foo");
2519    }
2520
2521    #[test]
2522    fn search_index_key_should_normalize_trailing_separator_like_python_key_separator_tests() {
2523        let index = SearchIndex::from_json_value(
2524            serde_json::json!({
2525                "index": {
2526                    "name": "my_index",
2527                    "prefix": "user::",
2528                    "key_separator": ":"
2529                },
2530                "fields": [
2531                    { "name": "test", "type": "tag" }
2532                ]
2533            }),
2534            "redis://127.0.0.1:6379",
2535        )
2536        .expect("index should parse");
2537
2538        assert_eq!(index.key("456"), "user:456");
2539        assert!(!index.key("456").contains("::"));
2540    }
2541
2542    #[test]
2543    fn search_index_key_should_use_custom_separator_consistently_like_python_key_separator_tests() {
2544        let index = SearchIndex::from_json_value(
2545            serde_json::json!({
2546                "index": {
2547                    "name": "my_index",
2548                    "prefix": "app:user",
2549                    "key_separator": "-"
2550                },
2551                "fields": [
2552                    { "name": "test", "type": "tag" }
2553                ]
2554            }),
2555            "redis://127.0.0.1:6379",
2556        )
2557        .expect("index should parse");
2558
2559        assert_eq!(index.key("999"), "app:user-999");
2560        assert_eq!(compose_key("routes:", ":", "ref1"), "routes:ref1");
2561        assert_eq!(compose_key("data", "::", "id"), "data::id");
2562        assert_eq!(compose_key("data::", "::", "id"), "data::id");
2563    }
2564
2565    #[test]
2566    fn search_index_multi_prefix_should_expose_all_prefixes_like_python_multi_prefix_tests() {
2567        let index = SearchIndex::from_json_value(
2568            serde_json::json!({
2569                "index": {
2570                    "name": "multi_pfx",
2571                    "prefix": ["pfx_a", "pfx_b"]
2572                },
2573                "fields": [
2574                    { "name": "test", "type": "tag" }
2575                ]
2576            }),
2577            "redis://127.0.0.1:6379",
2578        )
2579        .expect("index should parse");
2580
2581        assert_eq!(index.prefix(), "pfx_a");
2582        assert_eq!(index.prefixes(), vec!["pfx_a", "pfx_b"]);
2583        assert_eq!(index.key("doc1"), "pfx_a:doc1");
2584    }
2585
2586    #[test]
2587    fn compose_key_should_handle_special_separators_like_python_key_separator_tests() {
2588        for sep in &["_", "::", "->", ".", "/"] {
2589            let result = compose_key("data", sep, "id");
2590            assert_eq!(result, format!("data{sep}id"));
2591        }
2592    }
2593
2594    /// Tests from Python test_trailing_separator_normalization
2595    #[test]
2596    fn trailing_separator_normalization_like_python_key_separator_tests() {
2597        let cases = [
2598            ("user:", ":", "123", "user:123"),
2599            ("user::", ":", "456", "user:456"),
2600            ("user", ":", "789", "user:789"),
2601            ("user-", "-", "abc", "user-abc"),
2602        ];
2603        for (prefix, sep, id, expected) in &cases {
2604            let result = compose_key(prefix, sep, id);
2605            assert_eq!(result, *expected, "prefix={prefix:?} sep={sep:?} id={id:?}");
2606        }
2607    }
2608
2609    /// Tests from Python test_empty_prefix_handled_correctly
2610    #[test]
2611    fn empty_prefix_compose_key_like_python_key_separator_tests() {
2612        let result = compose_key("", ":", "789");
2613        assert_eq!(result, "789");
2614    }
2615
2616    #[test]
2617    fn hash_load_validation_should_require_string_id_field_like_python_search_index_tests() {
2618        let index = SearchIndex::from_json_value(
2619            serde_json::json!({
2620                "index": { "name": "my_index" },
2621                "fields": [
2622                    { "name": "test", "type": "tag" }
2623                ]
2624            }),
2625            "redis://127.0.0.1:6379",
2626        )
2627        .expect("index should parse");
2628
2629        let error = index
2630            .load(
2631                &[serde_json::json!({ "wrong_key": "1", "value": "test" })],
2632                "id",
2633                None,
2634            )
2635            .expect_err("missing id field should error before redis usage");
2636
2637        assert!(error.to_string().contains("missing string id field"));
2638    }
2639
2640    #[test]
2641    fn search_result_parser_should_decode_hash_results_like_python_search() {
2642        let parsed = parse_search_result(redis::Value::Array(vec![
2643            redis::Value::Int(2),
2644            redis::Value::BulkString(b"users:john".to_vec()),
2645            redis::Value::Array(vec![
2646                redis::Value::BulkString(b"user".to_vec()),
2647                redis::Value::BulkString(b"john".to_vec()),
2648                redis::Value::BulkString(b"age".to_vec()),
2649                redis::Value::BulkString(b"18".to_vec()),
2650            ]),
2651            redis::Value::BulkString(b"users:mary".to_vec()),
2652            redis::Value::Array(vec![
2653                redis::Value::BulkString(b"user".to_vec()),
2654                redis::Value::BulkString(b"mary".to_vec()),
2655                redis::Value::BulkString(b"vector_distance".to_vec()),
2656                redis::Value::BulkString(b"0".to_vec()),
2657            ]),
2658        ]))
2659        .expect("result should parse");
2660
2661        assert_eq!(parsed.total, 2);
2662        assert_eq!(parsed.docs.len(), 2);
2663        assert_eq!(parsed.docs[0].id(), "users:john");
2664        assert_eq!(
2665            parsed.docs[0].get("user"),
2666            Some(&Value::String("john".to_owned()))
2667        );
2668        assert_eq!(
2669            parsed.docs[1].to_map().get("vector_distance"),
2670            Some(&Value::String("0".to_owned()))
2671        );
2672    }
2673
2674    #[test]
2675    fn process_search_result_should_unpack_json_for_filter_queries_without_projection() {
2676        let mut fields = Map::new();
2677        fields.insert(
2678            "json".to_owned(),
2679            Value::String(r#"{"user":"john","age":18,"credit_score":"high"}"#.to_owned()),
2680        );
2681        let results = SearchResult::new(1, vec![SearchDocument::new("users:john", fields)]);
2682        let query = FilterQuery::new(Tag::new("credit_score").eq("high"));
2683
2684        let processed = process_search_result(results, &query, StorageType::Json)
2685            .expect("query should process");
2686
2687        assert_eq!(
2688            processed,
2689            QueryOutput::Documents(vec![Map::from_iter([
2690                ("id".to_owned(), Value::String("users:john".to_owned())),
2691                ("user".to_owned(), Value::String("john".to_owned())),
2692                ("age".to_owned(), json!(18)),
2693                ("credit_score".to_owned(), Value::String("high".to_owned())),
2694            ])])
2695        );
2696    }
2697
2698    #[test]
2699    fn process_search_result_should_return_count_for_count_queries() {
2700        let results = SearchResult::new(7, Vec::new());
2701        let query = CountQuery::new();
2702
2703        let processed = process_search_result(results, &query, StorageType::Hash)
2704            .expect("count should process");
2705
2706        assert_eq!(processed, QueryOutput::Count(7));
2707    }
2708
2709    #[test]
2710    fn paginate_should_reject_zero_page_size_before_redis_usage() {
2711        let index = SearchIndex::from_json_value(
2712            serde_json::json!({
2713                "index": { "name": "my_index" },
2714                "fields": [
2715                    { "name": "brand", "type": "tag" }
2716                ]
2717            }),
2718            "redis://127.0.0.1:6379",
2719        )
2720        .expect("index should parse");
2721        let query = FilterQuery::new(Tag::new("brand").eq("Nike"));
2722
2723        let error = index
2724            .paginate(&query, 0)
2725            .expect_err("zero page size should fail before redis usage");
2726
2727        assert!(
2728            error
2729                .to_string()
2730                .contains("page_size must be greater than zero")
2731        );
2732    }
2733
2734    #[test]
2735    fn create_with_options_should_reject_empty_schema_before_redis_usage() {
2736        let index = SearchIndex::from_json_value(
2737            serde_json::json!({
2738                "index": { "name": "empty_index" }
2739            }),
2740            "redis://127.0.0.1:6379",
2741        )
2742        .expect("index should parse");
2743
2744        let error = index
2745            .create_with_options(true, true)
2746            .expect_err("empty schema should fail before redis usage");
2747
2748        assert!(error.to_string().contains("No fields defined for index"));
2749    }
2750
2751    #[test]
2752    fn prepare_load_records_should_apply_preprocess_like_python_search_index_tests() {
2753        let prepared = prepare_load_records(&[json!({"id": "1", "test": "foo"})], &mut |record| {
2754            let mut record = record.clone();
2755            let object = record
2756                .as_object_mut()
2757                .expect("record remains an object during preprocessing");
2758            object.insert("test".to_owned(), Value::String("bar".to_owned()));
2759            Ok(record)
2760        })
2761        .expect("preprocess should succeed");
2762
2763        assert_eq!(prepared[0]["test"], Value::String("bar".to_owned()));
2764    }
2765
2766    #[test]
2767    fn prepare_load_records_should_reject_non_object_preprocess_results() {
2768        let error = prepare_load_records(&[json!({"id": "1", "test": "foo"})], &mut |_| {
2769            Ok(Value::String("invalid".to_owned()))
2770        })
2771        .expect_err("non-object preprocess output should fail");
2772
2773        assert!(
2774            error
2775                .to_string()
2776                .contains("preprocess must return a JSON object")
2777        );
2778    }
2779
2780    #[test]
2781    fn parse_info_response_should_decode_ft_info_shape() {
2782        let info = parse_info_response(redis::Value::Array(vec![
2783            redis::Value::BulkString(b"index_name".to_vec()),
2784            redis::Value::BulkString(b"my_index".to_vec()),
2785            redis::Value::BulkString(b"num_docs".to_vec()),
2786            redis::Value::Int(3),
2787            redis::Value::BulkString(b"hash_indexing_failures".to_vec()),
2788            redis::Value::Int(0),
2789        ]))
2790        .expect("info should parse");
2791
2792        assert_eq!(info["index_name"], Value::String("my_index".to_owned()));
2793        assert_eq!(info["num_docs"], json!(3));
2794        assert_eq!(info["hash_indexing_failures"], json!(0));
2795    }
2796
2797    #[test]
2798    fn search_document_should_expose_id_through_indexing_like_python_results_docs() {
2799        let document = SearchDocument::new(
2800            "rvl:1",
2801            Map::from_iter([("test".to_owned(), Value::String("foo".to_owned()))]),
2802        );
2803
2804        assert_eq!(document.id(), "rvl:1");
2805        assert_eq!(document["id"], Value::String("rvl:1".to_owned()));
2806        assert_eq!(document["test"], Value::String("foo".to_owned()));
2807    }
2808
2809    #[test]
2810    fn encode_hash_record_should_pack_vector_arrays_for_hash_storage() {
2811        let schema = IndexSchema::from_json_value(json!({
2812            "index": { "name": "my_index", "storage_type": "hash" },
2813            "fields": [
2814                { "name": "id", "type": "tag" },
2815                {
2816                    "name": "embedding",
2817                    "type": "vector",
2818                    "attrs": {
2819                        "dims": 3,
2820                        "distance_metric": "COSINE",
2821                        "algorithm": "FLAT",
2822                        "datatype": "FLOAT32"
2823                    }
2824                }
2825            ]
2826        }))
2827        .expect("schema should parse");
2828
2829        let encoded = encode_hash_record(
2830            &json!({
2831                "id": "1",
2832                "embedding": [0.1, 0.2, 0.3]
2833            })
2834            .as_object()
2835            .expect("record should be an object")
2836            .clone(),
2837            &schema,
2838        )
2839        .expect("hash record should encode");
2840
2841        let embedding = encoded
2842            .into_iter()
2843            .find(|(field, _)| field == "embedding")
2844            .map(|(_, value)| value)
2845            .expect("embedding should be encoded");
2846
2847        match embedding {
2848            EncodedHashValue::Binary(bytes) => assert_eq!(bytes.len(), 12),
2849            EncodedHashValue::String(_) => panic!("vector field should encode to binary bytes"),
2850        }
2851    }
2852
2853    // ── parse_aggregate_result parity tests ──
2854
2855    #[test]
2856    fn parse_aggregate_result_should_produce_document_maps() {
2857        // Simulate FT.AGGREGATE response: [total, [k, v, ...], [k, v, ...]]
2858        let value = redis::Value::Array(vec![
2859            redis::Value::Int(2),
2860            redis::Value::Array(vec![
2861                redis::Value::BulkString(b"user".to_vec()),
2862                redis::Value::BulkString(b"alice".to_vec()),
2863                redis::Value::BulkString(b"hybrid_score".to_vec()),
2864                redis::Value::BulkString(b"0.85".to_vec()),
2865            ]),
2866            redis::Value::Array(vec![
2867                redis::Value::BulkString(b"user".to_vec()),
2868                redis::Value::BulkString(b"bob".to_vec()),
2869                redis::Value::BulkString(b"hybrid_score".to_vec()),
2870                redis::Value::BulkString(b"0.72".to_vec()),
2871            ]),
2872        ]);
2873
2874        let docs = parse_aggregate_result(value).expect("should parse");
2875        assert_eq!(docs.len(), 2);
2876        assert_eq!(docs[0]["user"], "alice");
2877        assert_eq!(docs[0]["hybrid_score"], "0.85");
2878        assert_eq!(docs[1]["user"], "bob");
2879    }
2880
2881    #[test]
2882    fn parse_aggregate_result_should_strip_internal_score() {
2883        let value = redis::Value::Array(vec![
2884            redis::Value::Int(1),
2885            redis::Value::Array(vec![
2886                redis::Value::BulkString(b"__score".to_vec()),
2887                redis::Value::BulkString(b"1.0".to_vec()),
2888                redis::Value::BulkString(b"user".to_vec()),
2889                redis::Value::BulkString(b"alice".to_vec()),
2890            ]),
2891        ]);
2892
2893        let docs = parse_aggregate_result(value).expect("should parse");
2894        assert_eq!(docs.len(), 1);
2895        assert!(
2896            !docs[0].contains_key("__score"),
2897            "internal __score should be stripped"
2898        );
2899        assert_eq!(docs[0]["user"], "alice");
2900    }
2901
2902    #[test]
2903    fn parse_aggregate_result_should_handle_nil() {
2904        let docs = parse_aggregate_result(redis::Value::Nil).expect("should parse");
2905        assert!(docs.is_empty());
2906    }
2907
2908    // ── schema_from_info parity tests ──
2909
2910    #[test]
2911    fn schema_from_info_should_reconstruct_basic_schema() {
2912        let mut info = Map::new();
2913        info.insert(
2914            "index_definition".to_owned(),
2915            json!(["key_type", "HASH", "prefixes", ["rvl"]]),
2916        );
2917        info.insert(
2918            "attributes".to_owned(),
2919            json!([
2920                ["identifier", "$.name", "attribute", "name", "type", "TAG"],
2921                ["identifier", "$.age", "attribute", "age", "type", "NUMERIC"],
2922            ]),
2923        );
2924
2925        let schema = schema_from_info("test_index", &info).expect("should parse");
2926        assert_eq!(schema.index.name, "test_index");
2927        assert_eq!(schema.fields.len(), 2);
2928        assert_eq!(schema.fields[0].name, "name");
2929        assert_eq!(schema.fields[1].name, "age");
2930    }
2931
2932    #[test]
2933    fn schema_from_info_should_detect_json_storage() {
2934        let mut info = Map::new();
2935        info.insert(
2936            "index_definition".to_owned(),
2937            json!(["key_type", "JSON", "prefixes", ["myprefix"]]),
2938        );
2939        info.insert("attributes".to_owned(), json!([]));
2940
2941        let schema = schema_from_info("json_idx", &info).expect("should parse");
2942        assert!(matches!(schema.index.storage_type, StorageType::Json));
2943    }
2944
2945    #[test]
2946    fn schema_from_info_should_parse_vector_fields() {
2947        let mut info = Map::new();
2948        info.insert(
2949            "index_definition".to_owned(),
2950            json!(["key_type", "HASH", "prefixes", ["rvl"]]),
2951        );
2952        info.insert(
2953            "attributes".to_owned(),
2954            json!([[
2955                "identifier",
2956                "embedding",
2957                "attribute",
2958                "embedding",
2959                "type",
2960                "VECTOR",
2961                "HNSW",
2962                "6",
2963                "DIM",
2964                "768",
2965                "DISTANCE_METRIC",
2966                "COSINE",
2967                "TYPE",
2968                "FLOAT32"
2969            ]]),
2970        );
2971
2972        let schema = schema_from_info("vec_idx", &info).expect("should parse");
2973        assert_eq!(schema.fields.len(), 1);
2974        let field = &schema.fields[0];
2975        assert_eq!(field.name, "embedding");
2976        match &field.kind {
2977            crate::schema::FieldKind::Vector { attrs } => {
2978                assert_eq!(attrs.dims, 768);
2979                assert!(matches!(
2980                    attrs.distance_metric,
2981                    crate::schema::VectorDistanceMetric::Cosine
2982                ));
2983                assert!(matches!(
2984                    attrs.algorithm,
2985                    crate::schema::VectorAlgorithm::Hnsw
2986                ));
2987            }
2988            _ => panic!("expected vector field kind"),
2989        }
2990    }
2991
2992    #[test]
2993    fn multi_prefix_index_should_report_correct_prefix_count_in_create_cmd() {
2994        let index = SearchIndex::from_json_value(
2995            serde_json::json!({
2996                "index": {
2997                    "name": "multi_test",
2998                    "prefix": ["pfx_a", "pfx_b"]
2999                },
3000                "fields": [
3001                    { "name": "tag", "type": "tag" }
3002                ]
3003            }),
3004            "redis://127.0.0.1:6379",
3005        )
3006        .expect("index should parse");
3007
3008        // Verify the schema round-trips all prefixes — `create_cmd` iterates
3009        // over `self.schema.index.prefix.all()` so if these pass the command
3010        // will carry both prefixes.
3011        assert_eq!(index.prefixes(), vec!["pfx_a", "pfx_b"]);
3012        assert_eq!(index.prefix(), "pfx_a");
3013        // create_cmd is exercised end-to-end via integration tests.
3014        let _cmd = index.create_cmd();
3015    }
3016
3017    // ── schema_from_info default-normalization tests ──
3018
3019    #[test]
3020    fn schema_from_info_should_normalize_tag_separator_default() {
3021        // Redis FT.INFO always reports SEPARATOR "," for tag fields even when
3022        // the field was created without an explicit separator. The reconstructed
3023        // schema must treat the Redis default (,) as None so that comparison
3024        // with an original JSON-built schema succeeds.
3025        let mut info = Map::new();
3026        info.insert(
3027            "index_definition".to_owned(),
3028            json!(["key_type", "HASH", "prefixes", ["test"]]),
3029        );
3030        info.insert(
3031            "attributes".to_owned(),
3032            json!([[
3033                "identifier",
3034                "brand",
3035                "attribute",
3036                "brand",
3037                "type",
3038                "TAG",
3039                "SEPARATOR",
3040                ","
3041            ]]),
3042        );
3043
3044        let schema = schema_from_info("norm_test", &info).expect("should parse");
3045        match &schema.fields[0].kind {
3046            crate::schema::FieldKind::Tag { attrs } => {
3047                assert!(
3048                    attrs.separator.is_none(),
3049                    "default separator ',' should be normalized to None, got {:?}",
3050                    attrs.separator
3051                );
3052            }
3053            other => panic!("expected tag field, got {other:?}"),
3054        }
3055    }
3056
3057    #[test]
3058    fn schema_from_info_should_preserve_non_default_tag_separator() {
3059        let mut info = Map::new();
3060        info.insert(
3061            "index_definition".to_owned(),
3062            json!(["key_type", "HASH", "prefixes", ["test"]]),
3063        );
3064        info.insert(
3065            "attributes".to_owned(),
3066            json!([[
3067                "identifier",
3068                "brand",
3069                "attribute",
3070                "brand",
3071                "type",
3072                "TAG",
3073                "SEPARATOR",
3074                "|"
3075            ]]),
3076        );
3077
3078        let schema = schema_from_info("norm_test", &info).expect("should parse");
3079        match &schema.fields[0].kind {
3080            crate::schema::FieldKind::Tag { attrs } => {
3081                assert_eq!(attrs.separator.as_deref(), Some("|"));
3082            }
3083            other => panic!("expected tag field, got {other:?}"),
3084        }
3085    }
3086
3087    #[test]
3088    fn schema_from_info_should_normalize_text_weight_default() {
3089        // Redis FT.INFO always reports WEIGHT 1 for text fields even when the
3090        // field was created without an explicit weight.
3091        let mut info = Map::new();
3092        info.insert(
3093            "index_definition".to_owned(),
3094            json!(["key_type", "HASH", "prefixes", ["test"]]),
3095        );
3096        info.insert(
3097            "attributes".to_owned(),
3098            json!([[
3099                "identifier",
3100                "content",
3101                "attribute",
3102                "content",
3103                "type",
3104                "TEXT",
3105                "WEIGHT",
3106                "1"
3107            ]]),
3108        );
3109
3110        let schema = schema_from_info("norm_test", &info).expect("should parse");
3111        match &schema.fields[0].kind {
3112            crate::schema::FieldKind::Text { attrs } => {
3113                assert!(
3114                    attrs.weight.is_none(),
3115                    "default weight 1.0 should be normalized to None, got {:?}",
3116                    attrs.weight
3117                );
3118            }
3119            other => panic!("expected text field, got {other:?}"),
3120        }
3121    }
3122
3123    #[test]
3124    fn schema_from_info_should_preserve_non_default_text_weight() {
3125        let mut info = Map::new();
3126        info.insert(
3127            "index_definition".to_owned(),
3128            json!(["key_type", "HASH", "prefixes", ["test"]]),
3129        );
3130        info.insert(
3131            "attributes".to_owned(),
3132            json!([[
3133                "identifier",
3134                "content",
3135                "attribute",
3136                "content",
3137                "type",
3138                "TEXT",
3139                "WEIGHT",
3140                "2.5"
3141            ]]),
3142        );
3143
3144        let schema = schema_from_info("norm_test", &info).expect("should parse");
3145        match &schema.fields[0].kind {
3146            crate::schema::FieldKind::Text { attrs } => {
3147                assert_eq!(attrs.weight, Some(2.5));
3148            }
3149            other => panic!("expected text field, got {other:?}"),
3150        }
3151    }
3152
3153    #[test]
3154    fn schema_from_info_json_roundtrip_should_match_original_schema() {
3155        // Simulates the semantic router / message history reconnect scenario:
3156        // an original schema built from JSON should match a schema reconstructed
3157        // from FT.INFO output where Redis adds default separator and weight.
3158        let original = IndexSchema::from_json_value(json!({
3159            "index": {
3160                "name": "my_router",
3161                "prefix": "my_router",
3162                "storage_type": "hash"
3163            },
3164            "fields": [
3165                { "name": "ref_id", "type": "tag" },
3166                { "name": "route", "type": "tag" },
3167                { "name": "reference", "type": "text" },
3168                {
3169                    "name": "vector",
3170                    "type": "vector",
3171                    "attrs": {
3172                        "algorithm": "flat",
3173                        "dims": 3,
3174                        "datatype": "float32",
3175                        "distance_metric": "cosine"
3176                    }
3177                }
3178            ]
3179        }))
3180        .expect("original schema should parse");
3181
3182        // Simulate FT.INFO output with Redis defaults explicitly present
3183        let mut info = Map::new();
3184        info.insert(
3185            "index_definition".to_owned(),
3186            json!(["key_type", "HASH", "prefixes", ["my_router"]]),
3187        );
3188        info.insert(
3189            "attributes".to_owned(),
3190            json!([
3191                [
3192                    "identifier",
3193                    "ref_id",
3194                    "attribute",
3195                    "ref_id",
3196                    "type",
3197                    "TAG",
3198                    "SEPARATOR",
3199                    ","
3200                ],
3201                [
3202                    "identifier",
3203                    "route",
3204                    "attribute",
3205                    "route",
3206                    "type",
3207                    "TAG",
3208                    "SEPARATOR",
3209                    ","
3210                ],
3211                [
3212                    "identifier",
3213                    "reference",
3214                    "attribute",
3215                    "reference",
3216                    "type",
3217                    "TEXT",
3218                    "WEIGHT",
3219                    "1"
3220                ],
3221                [
3222                    "identifier",
3223                    "vector",
3224                    "attribute",
3225                    "vector",
3226                    "type",
3227                    "VECTOR",
3228                    "FLAT",
3229                    "6",
3230                    "TYPE",
3231                    "FLOAT32",
3232                    "DIM",
3233                    "3",
3234                    "DISTANCE_METRIC",
3235                    "COSINE"
3236                ]
3237            ]),
3238        );
3239        let reconstructed =
3240            schema_from_info("my_router", &info).expect("reconstructed schema should parse");
3241
3242        let original_json = original.to_json_value().expect("original to_json_value");
3243        let reconstructed_json = reconstructed
3244            .to_json_value()
3245            .expect("reconstructed to_json_value");
3246        assert_eq!(
3247            original_json, reconstructed_json,
3248            "original and reconstructed schemas should match after normalization\n\
3249             original:      {original_json:#}\n\
3250             reconstructed: {reconstructed_json:#}"
3251        );
3252    }
3253
3254    #[test]
3255    fn f32_to_f16_basic_values() {
3256        use super::f32_to_f16_bytes;
3257
3258        // Zero → 0x0000
3259        assert_eq!(f32_to_f16_bytes(0.0), 0x0000);
3260        // Negative zero → 0x8000
3261        assert_eq!(f32_to_f16_bytes(-0.0), 0x8000);
3262        // 1.0 → 0x3C00
3263        assert_eq!(f32_to_f16_bytes(1.0), 0x3C00);
3264        // -1.0 → 0xBC00
3265        assert_eq!(f32_to_f16_bytes(-1.0), 0xBC00);
3266        // Infinity → 0x7C00
3267        assert_eq!(f32_to_f16_bytes(f32::INFINITY), 0x7C00);
3268        // Negative infinity → 0xFC00
3269        assert_eq!(f32_to_f16_bytes(f32::NEG_INFINITY), 0xFC00);
3270        // NaN → sign | 0x7C00 | some mantissa bits
3271        let nan_bits = f32_to_f16_bytes(f32::NAN);
3272        assert_eq!(nan_bits & 0x7C00, 0x7C00, "NaN exponent should be all ones");
3273        assert_ne!(nan_bits & 0x03FF, 0, "NaN should have non-zero mantissa");
3274    }
3275}