1use std::collections::{HashMap, VecDeque};
24use std::ops::Index;
25
26use redis::Commands;
27use serde::Serialize;
28use serde_json::{Map, Value};
29
30use crate::{
31 error::{Error, Result},
32 filter::FilterExpression,
33 query::{PageableQuery, QueryKind, QueryParamValue, QueryString, SortDirection},
34 schema::{FieldKind, IndexDefinition, IndexSchema, StorageType, VectorAlgorithm},
35};
36
37#[derive(Debug, Clone)]
39pub struct RedisConnectionInfo {
40 pub redis_url: String,
42}
43
44impl RedisConnectionInfo {
45 pub fn new(redis_url: impl Into<String>) -> Self {
47 Self {
48 redis_url: redis_url.into(),
49 }
50 }
51
52 pub(crate) fn client(&self) -> Result<redis::Client> {
53 Ok(redis::Client::open(self.redis_url.as_str())?)
54 }
55}
56
57#[derive(Debug, Clone, PartialEq)]
59pub struct SearchDocument {
60 id: String,
61 fields: Map<String, Value>,
62}
63
64impl SearchDocument {
65 pub fn new(id: impl Into<String>, mut fields: Map<String, Value>) -> Self {
67 let id = id.into();
68 fields.insert("id".to_owned(), Value::String(id.clone()));
69 Self { id, fields }
70 }
71
72 pub fn id(&self) -> &str {
74 &self.id
75 }
76
77 pub fn fields(&self) -> &Map<String, Value> {
79 &self.fields
80 }
81
82 pub fn get(&self, field: &str) -> Option<&Value> {
84 self.fields.get(field)
85 }
86
87 pub fn to_map(&self) -> Map<String, Value> {
89 self.fields.clone()
90 }
91
92 pub fn into_map(self) -> Map<String, Value> {
94 self.fields
95 }
96}
97
98impl Index<&str> for SearchDocument {
99 type Output = Value;
100
101 fn index(&self, index: &str) -> &Self::Output {
102 self.fields
103 .get(index)
104 .unwrap_or_else(|| panic!("field '{index}' not found on search document"))
105 }
106}
107
108#[derive(Debug, Clone, PartialEq)]
110pub struct SearchResult {
111 pub total: usize,
113 pub docs: Vec<SearchDocument>,
115}
116
117impl SearchResult {
118 pub fn new(total: usize, docs: Vec<SearchDocument>) -> Self {
120 Self { total, docs }
121 }
122}
123
124#[derive(Debug, Clone, PartialEq)]
126pub enum QueryOutput {
127 Documents(Vec<Map<String, Value>>),
129 Count(usize),
131}
132
133impl QueryOutput {
134 pub fn as_documents(&self) -> Option<&[Map<String, Value>]> {
136 match self {
137 Self::Documents(documents) => Some(documents),
138 Self::Count(_) => None,
139 }
140 }
141
142 pub fn as_count(&self) -> Option<usize> {
144 match self {
145 Self::Count(count) => Some(*count),
146 Self::Documents(_) => None,
147 }
148 }
149}
150
151#[derive(Debug, Clone)]
153pub struct SearchIndex {
154 schema: IndexSchema,
155 connection: RedisConnectionInfo,
156}
157
158impl SearchIndex {
159 pub fn new(schema: IndexSchema, redis_url: impl Into<String>) -> Self {
161 Self {
162 schema,
163 connection: RedisConnectionInfo::new(redis_url),
164 }
165 }
166
167 pub fn from_yaml_str(yaml: &str, redis_url: impl Into<String>) -> Result<Self> {
169 Ok(Self::new(IndexSchema::from_yaml_str(yaml)?, redis_url))
170 }
171
172 pub fn from_yaml_file(
174 path: impl AsRef<std::path::Path>,
175 redis_url: impl Into<String>,
176 ) -> Result<Self> {
177 Ok(Self::new(IndexSchema::from_yaml_file(path)?, redis_url))
178 }
179
180 pub fn from_json_value(value: serde_json::Value, redis_url: impl Into<String>) -> Result<Self> {
182 Ok(Self::new(IndexSchema::from_json_value(value)?, redis_url))
183 }
184
185 pub fn schema(&self) -> &IndexSchema {
187 &self.schema
188 }
189
190 pub fn name(&self) -> &str {
192 &self.schema.index.name
193 }
194
195 pub fn prefix(&self) -> &str {
200 self.schema.index.prefix.first()
201 }
202
203 pub fn prefixes(&self) -> Vec<&str> {
205 self.schema.index.prefix.all()
206 }
207
208 pub fn key_separator(&self) -> &str {
210 &self.schema.index.key_separator
211 }
212
213 pub fn storage_type(&self) -> StorageType {
215 self.schema.index.storage_type
216 }
217
218 pub fn key(&self, key_suffix: &str) -> String {
220 compose_key(self.prefix(), self.key_separator(), key_suffix)
221 }
222
223 pub fn create_cmd(&self) -> redis::Cmd {
225 let mut cmd = redis::cmd("FT.CREATE");
226 let prefixes = self.schema.index.prefix.all();
227 cmd.arg(&self.schema.index.name)
228 .arg("ON")
229 .arg(self.schema.index.storage_type.redis_name())
230 .arg("PREFIX")
231 .arg(prefixes.len());
232 for pfx in &prefixes {
233 cmd.arg(*pfx);
234 }
235
236 if !self.schema.index.stopwords.is_empty() {
237 cmd.arg("STOPWORDS").arg(self.schema.index.stopwords.len());
238 for stopword in &self.schema.index.stopwords {
239 cmd.arg(stopword);
240 }
241 }
242
243 cmd.arg("SCHEMA");
244 for arg in self.schema.redis_schema_args() {
245 cmd.arg(arg);
246 }
247 cmd
248 }
249
250 pub fn create(&self) -> Result<()> {
252 self.create_with_options(false, false)
253 }
254
255 pub fn create_with_options(&self, overwrite: bool, drop_documents: bool) -> Result<()> {
258 if self.schema.fields.is_empty() {
259 return Err(Error::SchemaValidation(
260 "No fields defined for index".to_owned(),
261 ));
262 }
263
264 if self.exists()? {
265 if !overwrite {
266 return Ok(());
267 }
268 self.drop(drop_documents)?;
269 }
270
271 let client = self.connection.client()?;
272 let mut connection = client.get_connection()?;
273 let (): () = self.create_cmd().query(&mut connection)?;
274 Ok(())
275 }
276
277 pub fn drop(&self, delete_documents: bool) -> Result<()> {
279 let client = self.connection.client()?;
280 let mut connection = client.get_connection()?;
281 let mut cmd = redis::cmd("FT.DROPINDEX");
282 cmd.arg(&self.schema.index.name);
283 if delete_documents {
284 cmd.arg("DD");
285 }
286 let (): () = cmd.query(&mut connection)?;
287 Ok(())
288 }
289
290 pub fn delete(&self, drop_documents: bool) -> Result<()> {
292 if !self.exists()? {
293 return Err(Error::InvalidInput(format!(
294 "index '{}' does not exist",
295 self.name()
296 )));
297 }
298 self.drop(drop_documents)
299 }
300
301 pub fn info(&self) -> Result<Map<String, Value>> {
303 let client = self.connection.client()?;
304 let mut connection = client.get_connection()?;
305 let value = redis::cmd("FT.INFO")
306 .arg(&self.schema.index.name)
307 .query(&mut connection)?;
308 parse_info_response(value)
309 }
310
311 pub fn listall(&self) -> Result<Vec<String>> {
313 let client = self.connection.client()?;
314 let mut connection = client.get_connection()?;
315 let value = redis::cmd("FT._LIST").query(&mut connection)?;
316 Ok(value)
317 }
318
319 pub fn exists(&self) -> Result<bool> {
321 Ok(self.listall()?.iter().any(|name| name == self.name()))
322 }
323
324 pub fn load_json<T>(&self, key_suffix: &str, document: &T) -> Result<()>
326 where
327 T: Serialize,
328 {
329 let client = self.connection.client()?;
330 let mut connection = client.get_connection()?;
331 let payload = serde_json::to_string(document)?;
332 let (): () = redis::cmd("JSON.SET")
333 .arg(self.key(key_suffix))
334 .arg("$")
335 .arg(payload)
336 .query(&mut connection)?;
337 Ok(())
338 }
339
340 pub fn load_hash(&self, key_suffix: &str, values: &HashMap<String, String>) -> Result<()> {
342 let client = self.connection.client()?;
343 let mut connection = client.get_connection()?;
344 let mut cmd = redis::cmd("HSET");
345 cmd.arg(self.key(key_suffix));
346 for (field, value) in values {
347 cmd.arg(field).arg(value);
348 }
349 let _: i32 = cmd.query(&mut connection)?;
350 Ok(())
351 }
352
353 pub fn load(&self, data: &[Value], id_field: &str, ttl: Option<i64>) -> Result<Vec<String>> {
355 self.load_with_preprocess(data, id_field, ttl, |record| Ok(record.clone()))
356 }
357
358 pub fn load_with_preprocess<F>(
360 &self,
361 data: &[Value],
362 id_field: &str,
363 ttl: Option<i64>,
364 mut preprocess: F,
365 ) -> Result<Vec<String>>
366 where
367 F: FnMut(&Value) -> Result<Value>,
368 {
369 let prepared = prepare_load_records(data, &mut preprocess)?;
370 validate_load_ids(&prepared, id_field)?;
371 let client = self.connection.client()?;
372 let mut connection = client.get_connection()?;
373 let mut written_keys = Vec::with_capacity(prepared.len());
374
375 for record in &prepared {
376 let object = record.as_object().ok_or_else(|| {
377 Error::InvalidInput("load expects an array of JSON objects".to_owned())
378 })?;
379 let id = extract_id(object, id_field)?;
380 let key = self.key(id);
381
382 match self.storage_type() {
383 StorageType::Json => {
384 let payload = serde_json::to_string(record)?;
385 let (): () = redis::cmd("JSON.SET")
386 .arg(&key)
387 .arg("$")
388 .arg(payload)
389 .query(&mut connection)?;
390 }
391 StorageType::Hash => {
392 let encoded = encode_hash_record(object, &self.schema)?;
393 let mut cmd = redis::cmd("HSET");
394 cmd.arg(&key);
395 for (field, value) in encoded {
396 cmd.arg(field);
397 match value {
398 EncodedHashValue::String(value) => {
399 cmd.arg(value);
400 }
401 EncodedHashValue::Binary(value) => {
402 cmd.arg(value);
403 }
404 }
405 }
406 let _: i32 = cmd.query(&mut connection)?;
407 }
408 }
409
410 if let Some(ttl) = ttl {
411 let _: bool = redis::cmd("EXPIRE")
412 .arg(&key)
413 .arg(ttl)
414 .query(&mut connection)?;
415 }
416
417 written_keys.push(key);
418 }
419
420 Ok(written_keys)
421 }
422
423 pub fn load_with_keys(
432 &self,
433 data: &[Value],
434 keys: &[String],
435 ttl: Option<i64>,
436 ) -> Result<Vec<String>> {
437 if data.len() != keys.len() {
438 return Err(Error::InvalidInput(format!(
439 "data length ({}) must equal keys length ({})",
440 data.len(),
441 keys.len()
442 )));
443 }
444
445 let client = self.connection.client()?;
446 let mut connection = client.get_connection()?;
447
448 for (record, key) in data.iter().zip(keys.iter()) {
449 let object = record.as_object().ok_or_else(|| {
450 Error::InvalidInput("load expects an array of JSON objects".to_owned())
451 })?;
452
453 match self.storage_type() {
454 StorageType::Json => {
455 let payload = serde_json::to_string(record)?;
456 let (): () = redis::cmd("JSON.SET")
457 .arg(key)
458 .arg("$")
459 .arg(payload)
460 .query(&mut connection)?;
461 }
462 StorageType::Hash => {
463 let encoded = encode_hash_record(object, &self.schema)?;
464 let mut cmd = redis::cmd("HSET");
465 cmd.arg(key);
466 for (field, value) in encoded {
467 cmd.arg(field);
468 match value {
469 EncodedHashValue::String(value) => {
470 cmd.arg(value);
471 }
472 EncodedHashValue::Binary(value) => {
473 cmd.arg(value);
474 }
475 }
476 }
477 let _: i32 = cmd.query(&mut connection)?;
478 }
479 }
480
481 if let Some(ttl) = ttl {
482 let _: bool = redis::cmd("EXPIRE")
483 .arg(key)
484 .arg(ttl)
485 .query(&mut connection)?;
486 }
487 }
488
489 Ok(keys.to_vec())
490 }
491
492 pub fn fetch_json_raw(&self, key_suffix: &str) -> Result<String> {
494 let client = self.connection.client()?;
495 let mut connection = client.get_connection()?;
496 let value = redis::cmd("JSON.GET")
497 .arg(self.key(key_suffix))
498 .arg("$")
499 .query(&mut connection)?;
500 Ok(value)
501 }
502
503 pub fn fetch_hash(&self, key_suffix: &str) -> Result<HashMap<String, String>> {
505 let client = self.connection.client()?;
506 let mut connection = client.get_connection()?;
507 let value = connection.hgetall(self.key(key_suffix))?;
508 Ok(value)
509 }
510
511 pub fn fetch(&self, id: &str) -> Result<Option<Value>> {
513 match self.storage_type() {
514 StorageType::Json => {
515 let raw = self.fetch_json_raw(id);
516 match raw {
517 Ok(raw) => {
518 let parsed = serde_json::from_str::<Value>(&raw)?;
519 Ok(match parsed {
520 Value::Array(mut values) if values.len() == 1 => values.pop(),
521 other => Some(other),
522 })
523 }
524 Err(Error::Redis(err))
525 if err.kind() == redis::ErrorKind::UnexpectedReturnType =>
526 {
527 Ok(None)
528 }
529 Err(other) => Err(other),
530 }
531 }
532 StorageType::Hash => {
533 let map = self.fetch_hash(id)?;
534 if map.is_empty() {
535 Ok(None)
536 } else {
537 let mut object = Map::new();
538 for (key, value) in map {
539 object.insert(key, Value::String(value));
540 }
541 Ok(Some(Value::Object(object)))
542 }
543 }
544 }
545 }
546
547 pub fn drop_key(&self, key: &str) -> Result<usize> {
549 let client = self.connection.client()?;
550 let mut connection = client.get_connection()?;
551 let count: usize = redis::cmd("DEL").arg(key).query(&mut connection)?;
552 Ok(count)
553 }
554
555 pub fn drop_keys(&self, keys: &[String]) -> Result<usize> {
557 if keys.is_empty() {
558 return Ok(0);
559 }
560 let client = self.connection.client()?;
561 let mut connection = client.get_connection()?;
562 let mut cmd = redis::cmd("DEL");
563 for key in keys {
564 cmd.arg(key);
565 }
566 let count: usize = cmd.query(&mut connection)?;
567 Ok(count)
568 }
569
570 pub fn drop_document(&self, id: &str) -> Result<usize> {
572 self.drop_key(&self.key(id))
573 }
574
575 pub fn drop_documents(&self, ids: &[String]) -> Result<usize> {
577 if ids.is_empty() {
578 return Ok(0);
579 }
580 let keys = ids.iter().map(|id| self.key(id)).collect::<Vec<_>>();
581 self.drop_keys(&keys)
582 }
583
584 pub fn expire_key(&self, key: &str, ttl_seconds: i64) -> Result<bool> {
586 let client = self.connection.client()?;
587 let mut connection = client.get_connection()?;
588 let applied: bool = redis::cmd("EXPIRE")
589 .arg(key)
590 .arg(ttl_seconds)
591 .query(&mut connection)?;
592 Ok(applied)
593 }
594
595 pub fn expire_keys(&self, keys: &[String], ttl_seconds: i64) -> Result<Vec<bool>> {
597 let mut results = Vec::with_capacity(keys.len());
598 for key in keys {
599 results.push(self.expire_key(key, ttl_seconds)?);
600 }
601 Ok(results)
602 }
603
604 pub fn clear(&self) -> Result<usize> {
606 let mut total_deleted = 0;
607 let query = crate::query::FilterQuery::new(FilterExpression::MatchAll).paging(0, 500);
608
609 loop {
610 let batch = self.search(&query)?;
611 if batch.docs.is_empty() {
612 break;
613 }
614
615 let keys = batch
616 .docs
617 .iter()
618 .map(|doc| doc.id().to_owned())
619 .collect::<Vec<_>>();
620 total_deleted += self.drop_keys(&keys)?;
621 }
622
623 Ok(total_deleted)
624 }
625
626 pub fn search<Q>(&self, query: &Q) -> Result<SearchResult>
628 where
629 Q: QueryString + ?Sized,
630 {
631 parse_search_result(self.search_raw(query)?)
632 }
633
634 pub fn query<Q>(&self, query: &Q) -> Result<QueryOutput>
637 where
638 Q: QueryString + ?Sized,
639 {
640 let results = self.search(query)?;
641 process_search_result(results, query, self.storage_type())
642 }
643
644 pub fn batch_search<'a, I, Q>(&self, queries: I) -> Result<Vec<SearchResult>>
646 where
647 I: IntoIterator<Item = &'a Q>,
648 Q: QueryString + ?Sized + 'a,
649 {
650 self.batch_search_with_size(queries, usize::MAX)
651 }
652
653 pub fn batch_search_with_size<'a, I, Q>(
655 &self,
656 queries: I,
657 batch_size: usize,
658 ) -> Result<Vec<SearchResult>>
659 where
660 I: IntoIterator<Item = &'a Q>,
661 Q: QueryString + ?Sized + 'a,
662 {
663 if batch_size == 0 {
664 return Err(Error::InvalidInput(
665 "batch_size must be greater than zero".to_owned(),
666 ));
667 }
668
669 let queries = queries.into_iter().collect::<Vec<_>>();
670 let mut results = Vec::with_capacity(queries.len());
671 for chunk in queries.chunks(batch_size) {
672 for query in chunk {
673 results.push(self.search(*query)?);
674 }
675 }
676 Ok(results)
677 }
678
679 pub fn batch_query<'a, I, Q>(&self, queries: I) -> Result<Vec<QueryOutput>>
682 where
683 I: IntoIterator<Item = &'a Q>,
684 Q: QueryString + ?Sized + 'a,
685 {
686 self.batch_query_with_size(queries, usize::MAX)
687 }
688
689 pub fn batch_query_with_size<'a, I, Q>(
691 &self,
692 queries: I,
693 batch_size: usize,
694 ) -> Result<Vec<QueryOutput>>
695 where
696 I: IntoIterator<Item = &'a Q>,
697 Q: QueryString + ?Sized + 'a,
698 {
699 if batch_size == 0 {
700 return Err(Error::InvalidInput(
701 "batch_size must be greater than zero".to_owned(),
702 ));
703 }
704
705 let queries = queries.into_iter().collect::<Vec<_>>();
706 let mut results = Vec::with_capacity(queries.len());
707 for chunk in queries.chunks(batch_size) {
708 for query in chunk {
709 results.push(self.query(*query)?);
710 }
711 }
712 Ok(results)
713 }
714
715 pub fn paginate<Q>(&self, query: &Q, page_size: usize) -> Result<Vec<Vec<Map<String, Value>>>>
717 where
718 Q: PageableQuery + ?Sized,
719 {
720 if page_size == 0 {
721 return Err(Error::InvalidInput(
722 "page_size must be greater than zero".to_owned(),
723 ));
724 }
725
726 let mut offset = 0;
727 let mut batches = Vec::new();
728 loop {
729 let page = query.paged(offset, page_size);
730 let documents = match self.query(&page)? {
731 QueryOutput::Documents(documents) => documents,
732 QueryOutput::Count(_) => {
733 return Err(Error::InvalidInput(
734 "paginate requires a document-returning query".to_owned(),
735 ));
736 }
737 };
738
739 if documents.is_empty() {
740 break;
741 }
742
743 let fetched = documents.len();
744 batches.push(documents);
745 if fetched < page_size {
746 break;
747 }
748 offset += page_size;
749 }
750
751 Ok(batches)
752 }
753
754 pub fn hybrid_query(&self, query: &crate::query::HybridQuery<'_>) -> Result<QueryOutput> {
764 let client = self.connection.client()?;
765 let mut connection = client.get_connection()?;
766 let cmd = query.build_cmd(self.name());
767 let value: redis::Value = cmd.query(&mut connection)?;
768 let documents = parse_hybrid_result(value)?;
769 Ok(QueryOutput::Documents(documents))
770 }
771
772 pub fn aggregate_query(
777 &self,
778 query: &crate::query::AggregateHybridQuery<'_>,
779 ) -> Result<QueryOutput> {
780 let client = self.connection.client()?;
781 let mut connection = client.get_connection()?;
782 let cmd = query.build_aggregate_cmd(self.name());
783 let value: redis::Value = cmd.query(&mut connection)?;
784 let documents = parse_aggregate_result(value)?;
785 Ok(QueryOutput::Documents(documents))
786 }
787
788 #[cfg(feature = "sql")]
797 pub fn sql_query(&self, query: &crate::query::SQLQuery) -> Result<QueryOutput> {
798 if let Some(cmd) = query.build_geo_aggregate_cmd(self.name()) {
800 let client = self.connection.client()?;
801 let mut connection = client.get_connection()?;
802 let value: redis::Value = cmd.query(&mut connection)?;
803 let documents = parse_aggregate_result(value)?;
804 return Ok(QueryOutput::Documents(documents));
805 }
806 if let Some(cmd) = query.build_aggregate_cmd(self.name()) {
808 let client = self.connection.client()?;
809 let mut connection = client.get_connection()?;
810 let value: redis::Value = cmd.query(&mut connection)?;
811 let documents = parse_aggregate_result(value)?;
812 return Ok(QueryOutput::Documents(documents));
813 }
814 self.query(query)
817 }
818
819 pub fn multi_vector_query(
822 &self,
823 query: &crate::query::MultiVectorQuery<'_>,
824 ) -> Result<QueryOutput> {
825 let client = self.connection.client()?;
826 let mut connection = client.get_connection()?;
827 let cmd = query.build_aggregate_cmd(self.name());
828 let value: redis::Value = cmd.query(&mut connection)?;
829 let documents = parse_aggregate_result(value)?;
830 Ok(QueryOutput::Documents(documents))
831 }
832
833 pub fn from_existing(name: &str, redis_url: impl Into<String>) -> Result<Self> {
838 let connection = RedisConnectionInfo::new(redis_url);
839 let client = connection.client()?;
840 let mut conn = client.get_connection()?;
841 let value = redis::cmd("FT.INFO").arg(name).query(&mut conn)?;
842 let info = parse_info_response(value)?;
843 let schema = schema_from_info(name, &info)?;
844 Ok(Self { schema, connection })
845 }
846
847 pub fn search_raw<Q>(&self, query: &Q) -> Result<redis::Value>
849 where
850 Q: QueryString + ?Sized,
851 {
852 self.validate_query(query)?;
853 let client = self.connection.client()?;
854 let mut connection = client.get_connection()?;
855 let value = self.search_cmd(query).query(&mut connection)?;
856 Ok(value)
857 }
858
859 fn search_cmd<Q>(&self, query: &Q) -> redis::Cmd
860 where
861 Q: QueryString + ?Sized,
862 {
863 let render = query.render();
864 let mut cmd = redis::cmd("FT.SEARCH");
865 cmd.arg(&self.schema.index.name).arg(render.query_string);
866
867 if let Some(scorer) = render.scorer {
868 cmd.arg("SCORER").arg(scorer);
869 }
870
871 if !render.params.is_empty() {
872 cmd.arg("PARAMS").arg(render.params.len() * 2);
873 for param in render.params {
874 cmd.arg(param.name);
875 match param.value {
876 QueryParamValue::String(value) => {
877 cmd.arg(value);
878 }
879 QueryParamValue::Binary(value) => {
880 cmd.arg(value);
881 }
882 }
883 }
884 }
885
886 if render.no_content {
887 cmd.arg("NOCONTENT");
888 }
889
890 if !render.return_fields.is_empty() {
891 cmd.arg("RETURN").arg(render.return_fields.len());
892 for field in render.return_fields {
893 cmd.arg(field);
894 }
895 }
896
897 if let Some(sort_by) = render.sort_by {
898 cmd.arg("SORTBY").arg(sort_by.field);
899 cmd.arg(match sort_by.direction {
900 SortDirection::Asc => "ASC",
901 SortDirection::Desc => "DESC",
902 });
903 }
904
905 if render.in_order {
906 cmd.arg("INORDER");
907 }
908
909 if let Some(limit) = render.limit {
910 cmd.arg("LIMIT").arg(limit.offset).arg(limit.num);
911 }
912
913 if let Some(geofilter) = render.geofilter {
914 cmd.arg("GEOFILTER")
915 .arg(geofilter.field)
916 .arg(geofilter.lon)
917 .arg(geofilter.lat)
918 .arg(geofilter.radius)
919 .arg(geofilter.unit);
920 }
921
922 cmd.arg("DIALECT").arg(render.dialect);
923 cmd
924 }
925
926 fn validate_query<Q>(&self, query: &Q) -> Result<()>
927 where
928 Q: QueryString + ?Sized,
929 {
930 let render = query.render();
931 if render.query_string.contains("EF_RUNTIME") {
932 let supports_ef_runtime = self.schema.fields.iter().any(|field| {
933 matches!(
934 &field.kind,
935 FieldKind::Vector { attrs }
936 if matches!(attrs.algorithm, VectorAlgorithm::Hnsw)
937 )
938 });
939 if !supports_ef_runtime {
940 return Err(Error::SchemaValidation(
941 "EF_RUNTIME requires an HNSW vector field".to_owned(),
942 ));
943 }
944 }
945 Ok(())
946 }
947}
948
949#[derive(Debug, Clone)]
951pub struct AsyncSearchIndex {
952 schema: IndexSchema,
953 connection: RedisConnectionInfo,
954}
955
956impl AsyncSearchIndex {
957 pub fn new(schema: IndexSchema, redis_url: impl Into<String>) -> Self {
959 Self {
960 schema,
961 connection: RedisConnectionInfo::new(redis_url),
962 }
963 }
964
965 pub fn from_yaml_str(yaml: &str, redis_url: impl Into<String>) -> Result<Self> {
967 Ok(Self::new(IndexSchema::from_yaml_str(yaml)?, redis_url))
968 }
969
970 pub fn from_yaml_file(
972 path: impl AsRef<std::path::Path>,
973 redis_url: impl Into<String>,
974 ) -> Result<Self> {
975 Ok(Self::new(IndexSchema::from_yaml_file(path)?, redis_url))
976 }
977
978 pub fn from_json_value(value: serde_json::Value, redis_url: impl Into<String>) -> Result<Self> {
980 Ok(Self::new(IndexSchema::from_json_value(value)?, redis_url))
981 }
982
983 pub fn schema(&self) -> &IndexSchema {
985 &self.schema
986 }
987
988 pub fn name(&self) -> &str {
990 &self.schema.index.name
991 }
992
993 pub fn prefix(&self) -> &str {
998 self.schema.index.prefix.first()
999 }
1000
1001 pub fn prefixes(&self) -> Vec<&str> {
1003 self.schema.index.prefix.all()
1004 }
1005
1006 pub fn key_separator(&self) -> &str {
1008 &self.schema.index.key_separator
1009 }
1010
1011 pub fn storage_type(&self) -> StorageType {
1013 self.schema.index.storage_type
1014 }
1015
1016 pub fn key(&self, key_suffix: &str) -> String {
1018 compose_key(self.prefix(), self.key_separator(), key_suffix)
1019 }
1020
1021 pub async fn create(&self) -> Result<()> {
1023 self.create_with_options(false, false).await
1024 }
1025
1026 pub async fn create_with_options(&self, overwrite: bool, drop_documents: bool) -> Result<()> {
1028 if self.schema.fields.is_empty() {
1029 return Err(Error::SchemaValidation(
1030 "No fields defined for index".to_owned(),
1031 ));
1032 }
1033
1034 if self.exists().await? {
1035 if !overwrite {
1036 return Ok(());
1037 }
1038 self.drop(drop_documents).await?;
1039 }
1040
1041 let client = self.connection.client()?;
1042 let mut connection = client.get_multiplexed_async_connection().await?;
1043 let (): () = SearchIndex::new(self.schema.clone(), self.connection.redis_url.clone())
1044 .create_cmd()
1045 .query_async(&mut connection)
1046 .await?;
1047 Ok(())
1048 }
1049
1050 pub async fn drop(&self, delete_documents: bool) -> Result<()> {
1052 let client = self.connection.client()?;
1053 let mut connection = client.get_multiplexed_async_connection().await?;
1054 let mut cmd = redis::cmd("FT.DROPINDEX");
1055 cmd.arg(&self.schema.index.name);
1056 if delete_documents {
1057 cmd.arg("DD");
1058 }
1059 let (): () = cmd.query_async(&mut connection).await?;
1060 Ok(())
1061 }
1062
1063 pub async fn delete(&self, drop_documents: bool) -> Result<()> {
1065 if !self.exists().await? {
1066 return Err(Error::InvalidInput(format!(
1067 "index '{}' does not exist",
1068 self.schema.index.name
1069 )));
1070 }
1071 self.drop(drop_documents).await
1072 }
1073
1074 pub async fn exists(&self) -> Result<bool> {
1076 Ok(self
1077 .listall()
1078 .await?
1079 .iter()
1080 .any(|name| name == &self.schema.index.name))
1081 }
1082
1083 pub async fn listall(&self) -> Result<Vec<String>> {
1085 let client = self.connection.client()?;
1086 let mut connection = client.get_multiplexed_async_connection().await?;
1087 let value = redis::cmd("FT._LIST").query_async(&mut connection).await?;
1088 Ok(value)
1089 }
1090
1091 pub async fn info(&self) -> Result<Map<String, Value>> {
1093 let client = self.connection.client()?;
1094 let mut connection = client.get_multiplexed_async_connection().await?;
1095 let value = redis::cmd("FT.INFO")
1096 .arg(&self.schema.index.name)
1097 .query_async(&mut connection)
1098 .await?;
1099 parse_info_response(value)
1100 }
1101
1102 pub async fn load(
1104 &self,
1105 data: &[Value],
1106 id_field: &str,
1107 ttl: Option<i64>,
1108 ) -> Result<Vec<String>> {
1109 self.load_with_preprocess(data, id_field, ttl, |record| Ok(record.clone()))
1110 .await
1111 }
1112
1113 pub async fn load_with_preprocess<F>(
1115 &self,
1116 data: &[Value],
1117 id_field: &str,
1118 ttl: Option<i64>,
1119 mut preprocess: F,
1120 ) -> Result<Vec<String>>
1121 where
1122 F: FnMut(&Value) -> Result<Value>,
1123 {
1124 let prepared = prepare_load_records(data, &mut preprocess)?;
1125 validate_load_ids(&prepared, id_field)?;
1126 let client = self.connection.client()?;
1127 let mut connection = client.get_multiplexed_async_connection().await?;
1128 let mut written_keys = Vec::with_capacity(prepared.len());
1129
1130 for record in &prepared {
1131 let object = record.as_object().ok_or_else(|| {
1132 Error::InvalidInput("load expects an array of JSON objects".to_owned())
1133 })?;
1134 let id = extract_id(object, id_field)?;
1135 let key = self.key(id);
1136
1137 match self.storage_type() {
1138 StorageType::Json => {
1139 let payload = serde_json::to_string(record)?;
1140 let (): () = redis::cmd("JSON.SET")
1141 .arg(&key)
1142 .arg("$")
1143 .arg(payload)
1144 .query_async(&mut connection)
1145 .await?;
1146 }
1147 StorageType::Hash => {
1148 let encoded = encode_hash_record(object, &self.schema)?;
1149 let mut cmd = redis::cmd("HSET");
1150 cmd.arg(&key);
1151 for (field, value) in encoded {
1152 cmd.arg(field);
1153 match value {
1154 EncodedHashValue::String(value) => {
1155 cmd.arg(value);
1156 }
1157 EncodedHashValue::Binary(value) => {
1158 cmd.arg(value);
1159 }
1160 }
1161 }
1162 let _: i32 = cmd.query_async(&mut connection).await?;
1163 }
1164 }
1165
1166 if let Some(ttl) = ttl {
1167 let _: bool = redis::cmd("EXPIRE")
1168 .arg(&key)
1169 .arg(ttl)
1170 .query_async(&mut connection)
1171 .await?;
1172 }
1173
1174 written_keys.push(key);
1175 }
1176
1177 Ok(written_keys)
1178 }
1179
1180 pub async fn load_with_keys(
1189 &self,
1190 data: &[Value],
1191 keys: &[String],
1192 ttl: Option<i64>,
1193 ) -> Result<Vec<String>> {
1194 if data.len() != keys.len() {
1195 return Err(Error::InvalidInput(format!(
1196 "data length ({}) must equal keys length ({})",
1197 data.len(),
1198 keys.len()
1199 )));
1200 }
1201
1202 let client = self.connection.client()?;
1203 let mut connection = client.get_multiplexed_async_connection().await?;
1204
1205 for (record, key) in data.iter().zip(keys.iter()) {
1206 let object = record.as_object().ok_or_else(|| {
1207 Error::InvalidInput("load expects an array of JSON objects".to_owned())
1208 })?;
1209
1210 match self.storage_type() {
1211 StorageType::Json => {
1212 let payload = serde_json::to_string(record)?;
1213 let (): () = redis::cmd("JSON.SET")
1214 .arg(key)
1215 .arg("$")
1216 .arg(payload)
1217 .query_async(&mut connection)
1218 .await?;
1219 }
1220 StorageType::Hash => {
1221 let encoded = encode_hash_record(object, &self.schema)?;
1222 let mut cmd = redis::cmd("HSET");
1223 cmd.arg(key);
1224 for (field, value) in encoded {
1225 cmd.arg(field);
1226 match value {
1227 EncodedHashValue::String(value) => {
1228 cmd.arg(value);
1229 }
1230 EncodedHashValue::Binary(value) => {
1231 cmd.arg(value);
1232 }
1233 }
1234 }
1235 let _: i32 = cmd.query_async(&mut connection).await?;
1236 }
1237 }
1238
1239 if let Some(ttl) = ttl {
1240 let _: bool = redis::cmd("EXPIRE")
1241 .arg(key)
1242 .arg(ttl)
1243 .query_async(&mut connection)
1244 .await?;
1245 }
1246 }
1247
1248 Ok(keys.to_vec())
1249 }
1250
1251 pub async fn fetch(&self, id: &str) -> Result<Option<Value>> {
1253 match self.storage_type() {
1254 StorageType::Json => {
1255 let client = self.connection.client()?;
1256 let mut connection = client.get_multiplexed_async_connection().await?;
1257 let raw: std::result::Result<String, redis::RedisError> = redis::cmd("JSON.GET")
1258 .arg(self.key(id))
1259 .arg("$")
1260 .query_async(&mut connection)
1261 .await;
1262 match raw {
1263 Ok(raw) => {
1264 let parsed = serde_json::from_str::<Value>(&raw)?;
1265 Ok(match parsed {
1266 Value::Array(mut values) if values.len() == 1 => values.pop(),
1267 other => Some(other),
1268 })
1269 }
1270 Err(err) if err.kind() == redis::ErrorKind::UnexpectedReturnType => Ok(None),
1271 Err(err) => Err(Error::Redis(err)),
1272 }
1273 }
1274 StorageType::Hash => {
1275 let client = self.connection.client()?;
1276 let mut connection = client.get_multiplexed_async_connection().await?;
1277 let map: HashMap<String, String> = redis::cmd("HGETALL")
1278 .arg(self.key(id))
1279 .query_async(&mut connection)
1280 .await?;
1281 if map.is_empty() {
1282 Ok(None)
1283 } else {
1284 let mut object = Map::new();
1285 for (key, value) in map {
1286 object.insert(key, Value::String(value));
1287 }
1288 Ok(Some(Value::Object(object)))
1289 }
1290 }
1291 }
1292 }
1293
1294 pub async fn drop_key(&self, key: &str) -> Result<usize> {
1296 let client = self.connection.client()?;
1297 let mut connection = client.get_multiplexed_async_connection().await?;
1298 let count: usize = redis::cmd("DEL")
1299 .arg(key)
1300 .query_async(&mut connection)
1301 .await?;
1302 Ok(count)
1303 }
1304
1305 pub async fn drop_keys(&self, keys: &[String]) -> Result<usize> {
1307 if keys.is_empty() {
1308 return Ok(0);
1309 }
1310 let client = self.connection.client()?;
1311 let mut connection = client.get_multiplexed_async_connection().await?;
1312 let mut cmd = redis::cmd("DEL");
1313 for key in keys {
1314 cmd.arg(key);
1315 }
1316 let count: usize = cmd.query_async(&mut connection).await?;
1317 Ok(count)
1318 }
1319
1320 pub async fn drop_document(&self, id: &str) -> Result<usize> {
1322 self.drop_key(&self.key(id)).await
1323 }
1324
1325 pub async fn drop_documents(&self, ids: &[String]) -> Result<usize> {
1327 if ids.is_empty() {
1328 return Ok(0);
1329 }
1330 let keys = ids.iter().map(|id| self.key(id)).collect::<Vec<_>>();
1331 self.drop_keys(&keys).await
1332 }
1333
1334 pub async fn expire_key(&self, key: &str, ttl_seconds: i64) -> Result<bool> {
1336 let client = self.connection.client()?;
1337 let mut connection = client.get_multiplexed_async_connection().await?;
1338 let applied: bool = redis::cmd("EXPIRE")
1339 .arg(key)
1340 .arg(ttl_seconds)
1341 .query_async(&mut connection)
1342 .await?;
1343 Ok(applied)
1344 }
1345
1346 pub async fn expire_keys(&self, keys: &[String], ttl_seconds: i64) -> Result<Vec<bool>> {
1348 let mut results = Vec::with_capacity(keys.len());
1349 for key in keys {
1350 results.push(self.expire_key(key, ttl_seconds).await?);
1351 }
1352 Ok(results)
1353 }
1354
1355 pub async fn clear(&self) -> Result<usize> {
1357 let mut total_deleted = 0;
1358 let query = crate::query::FilterQuery::new(FilterExpression::MatchAll).paging(0, 500);
1359
1360 loop {
1361 let batch = self.search(&query).await?;
1362 if batch.docs.is_empty() {
1363 break;
1364 }
1365
1366 let keys = batch
1367 .docs
1368 .iter()
1369 .map(|doc| doc.id().to_owned())
1370 .collect::<Vec<_>>();
1371 total_deleted += self.drop_keys(&keys).await?;
1372 }
1373
1374 Ok(total_deleted)
1375 }
1376
1377 pub async fn search<Q>(&self, query: &Q) -> Result<SearchResult>
1379 where
1380 Q: QueryString + Send + Sync + ?Sized,
1381 {
1382 parse_search_result(self.search_raw(query).await?)
1383 }
1384
1385 pub async fn query<Q>(&self, query: &Q) -> Result<QueryOutput>
1387 where
1388 Q: QueryString + Send + Sync + ?Sized,
1389 {
1390 let results = self.search(query).await?;
1391 process_search_result(results, query, self.schema.index.storage_type)
1392 }
1393
1394 pub async fn batch_search<'a, I, Q>(&self, queries: I) -> Result<Vec<SearchResult>>
1396 where
1397 I: IntoIterator<Item = &'a Q>,
1398 Q: QueryString + Send + Sync + ?Sized + 'a,
1399 {
1400 self.batch_search_with_size(queries, usize::MAX).await
1401 }
1402
1403 pub async fn batch_search_with_size<'a, I, Q>(
1405 &self,
1406 queries: I,
1407 batch_size: usize,
1408 ) -> Result<Vec<SearchResult>>
1409 where
1410 I: IntoIterator<Item = &'a Q>,
1411 Q: QueryString + Send + Sync + ?Sized + 'a,
1412 {
1413 if batch_size == 0 {
1414 return Err(Error::InvalidInput(
1415 "batch_size must be greater than zero".to_owned(),
1416 ));
1417 }
1418
1419 let queries = queries.into_iter().collect::<Vec<_>>();
1420 let mut results = Vec::with_capacity(queries.len());
1421 for chunk in queries.chunks(batch_size) {
1422 for query in chunk {
1423 results.push(parse_search_result(self.search_raw(*query).await?)?);
1424 }
1425 }
1426 Ok(results)
1427 }
1428
1429 pub async fn batch_query<'a, I, Q>(&self, queries: I) -> Result<Vec<QueryOutput>>
1431 where
1432 I: IntoIterator<Item = &'a Q>,
1433 Q: QueryString + Send + Sync + ?Sized + 'a,
1434 {
1435 self.batch_query_with_size(queries, usize::MAX).await
1436 }
1437
1438 pub async fn batch_query_with_size<'a, I, Q>(
1440 &self,
1441 queries: I,
1442 batch_size: usize,
1443 ) -> Result<Vec<QueryOutput>>
1444 where
1445 I: IntoIterator<Item = &'a Q>,
1446 Q: QueryString + Send + Sync + ?Sized + 'a,
1447 {
1448 if batch_size == 0 {
1449 return Err(Error::InvalidInput(
1450 "batch_size must be greater than zero".to_owned(),
1451 ));
1452 }
1453
1454 let queries = queries.into_iter().collect::<Vec<_>>();
1455 let mut results = Vec::with_capacity(queries.len());
1456 for chunk in queries.chunks(batch_size) {
1457 for query in chunk {
1458 let parsed = parse_search_result(self.search_raw(*query).await?)?;
1459 results.push(process_search_result(
1460 parsed,
1461 *query,
1462 self.schema.index.storage_type,
1463 )?);
1464 }
1465 }
1466 Ok(results)
1467 }
1468
1469 pub async fn paginate<Q>(
1471 &self,
1472 query: &Q,
1473 page_size: usize,
1474 ) -> Result<Vec<Vec<Map<String, Value>>>>
1475 where
1476 Q: PageableQuery + Send + Sync + ?Sized,
1477 {
1478 if page_size == 0 {
1479 return Err(Error::InvalidInput(
1480 "page_size must be greater than zero".to_owned(),
1481 ));
1482 }
1483
1484 let mut offset = 0;
1485 let mut batches = Vec::new();
1486 loop {
1487 let page = query.paged(offset, page_size);
1488 let documents = match self.query(&page).await? {
1489 QueryOutput::Documents(documents) => documents,
1490 QueryOutput::Count(_) => {
1491 return Err(Error::InvalidInput(
1492 "paginate requires a document-returning query".to_owned(),
1493 ));
1494 }
1495 };
1496
1497 if documents.is_empty() {
1498 break;
1499 }
1500
1501 let fetched = documents.len();
1502 batches.push(documents);
1503 if fetched < page_size {
1504 break;
1505 }
1506 offset += page_size;
1507 }
1508
1509 Ok(batches)
1510 }
1511
1512 pub async fn search_raw<Q>(&self, query: &Q) -> Result<redis::Value>
1514 where
1515 Q: QueryString + Send + Sync + ?Sized,
1516 {
1517 let sync_index = SearchIndex::new(self.schema.clone(), self.connection.redis_url.clone());
1518 sync_index.validate_query(query)?;
1519 let client = self.connection.client()?;
1520 let mut connection = client.get_multiplexed_async_connection().await?;
1521 let value = sync_index
1522 .search_cmd(query)
1523 .query_async(&mut connection)
1524 .await?;
1525 Ok(value)
1526 }
1527
1528 pub async fn hybrid_query(&self, query: &crate::query::HybridQuery<'_>) -> Result<QueryOutput> {
1533 let client = self.connection.client()?;
1534 let mut connection = client.get_multiplexed_async_connection().await?;
1535 let cmd = query.build_cmd(self.name());
1536 let value: redis::Value = cmd.query_async(&mut connection).await?;
1537 let documents = parse_hybrid_result(value)?;
1538 Ok(QueryOutput::Documents(documents))
1539 }
1540
1541 pub async fn aggregate_query(
1544 &self,
1545 query: &crate::query::AggregateHybridQuery<'_>,
1546 ) -> Result<QueryOutput> {
1547 let client = self.connection.client()?;
1548 let mut connection = client.get_multiplexed_async_connection().await?;
1549 let cmd = query.build_aggregate_cmd(self.name());
1550 let value: redis::Value = cmd.query_async(&mut connection).await?;
1551 let documents = parse_aggregate_result(value)?;
1552 Ok(QueryOutput::Documents(documents))
1553 }
1554
1555 #[cfg(feature = "sql")]
1560 pub async fn sql_query(&self, query: &crate::query::SQLQuery) -> Result<QueryOutput> {
1561 if let Some(cmd) = query.build_geo_aggregate_cmd(self.name()) {
1563 let client = self.connection.client()?;
1564 let mut connection = client.get_multiplexed_async_connection().await?;
1565 let value: redis::Value = cmd.query_async(&mut connection).await?;
1566 let documents = parse_aggregate_result(value)?;
1567 return Ok(QueryOutput::Documents(documents));
1568 }
1569 if let Some(cmd) = query.build_aggregate_cmd(self.name()) {
1571 let client = self.connection.client()?;
1572 let mut connection = client.get_multiplexed_async_connection().await?;
1573 let value: redis::Value = cmd.query_async(&mut connection).await?;
1574 let documents = parse_aggregate_result(value)?;
1575 return Ok(QueryOutput::Documents(documents));
1576 }
1577 self.query(query).await
1579 }
1580
1581 pub async fn multi_vector_query(
1584 &self,
1585 query: &crate::query::MultiVectorQuery<'_>,
1586 ) -> Result<QueryOutput> {
1587 let client = self.connection.client()?;
1588 let mut connection = client.get_multiplexed_async_connection().await?;
1589 let cmd = query.build_aggregate_cmd(self.name());
1590 let value: redis::Value = cmd.query_async(&mut connection).await?;
1591 let documents = parse_aggregate_result(value)?;
1592 Ok(QueryOutput::Documents(documents))
1593 }
1594
1595 pub async fn from_existing(name: &str, redis_url: impl Into<String>) -> Result<Self> {
1600 let connection = RedisConnectionInfo::new(redis_url);
1601 let client = connection.client()?;
1602 let mut conn = client.get_multiplexed_async_connection().await?;
1603 let value: redis::Value = redis::cmd("FT.INFO")
1604 .arg(name)
1605 .query_async(&mut conn)
1606 .await?;
1607 let info = parse_info_response(value)?;
1608 let schema = schema_from_info(name, &info)?;
1609 Ok(Self { schema, connection })
1610 }
1611}
1612
1613#[allow(dead_code)]
1614fn _storage_type_for_load(schema: &IndexSchema) -> StorageType {
1615 schema.index.storage_type
1616}
1617
1618fn extract_id<'a>(object: &'a Map<String, Value>, id_field: &str) -> Result<&'a str> {
1619 object
1620 .get(id_field)
1621 .and_then(Value::as_str)
1622 .ok_or_else(|| Error::InvalidInput(format!("missing string id field '{id_field}'")))
1623}
1624
1625fn validate_load_ids(records: &[Value], id_field: &str) -> Result<()> {
1626 for record in records {
1627 let object = record
1628 .as_object()
1629 .ok_or_else(|| Error::InvalidInput("load expects JSON object records".to_owned()))?;
1630 extract_id(object, id_field)?;
1631 }
1632 Ok(())
1633}
1634
1635fn compose_key(prefix: &str, key_separator: &str, key_suffix: &str) -> String {
1636 if prefix.is_empty() {
1637 return key_suffix.to_owned();
1638 }
1639
1640 if key_separator.is_empty() {
1641 return format!("{prefix}{key_suffix}");
1642 }
1643
1644 let normalized_prefix = prefix.trim_end_matches(key_separator);
1645 if normalized_prefix.is_empty() {
1646 key_suffix.to_owned()
1647 } else {
1648 format!("{normalized_prefix}{key_separator}{key_suffix}")
1649 }
1650}
1651
1652enum EncodedHashValue {
1653 String(String),
1654 Binary(Vec<u8>),
1655}
1656
1657fn encode_hash_record(
1658 object: &Map<String, Value>,
1659 schema: &IndexSchema,
1660) -> Result<Vec<(String, EncodedHashValue)>> {
1661 let mut pairs = Vec::with_capacity(object.len());
1662 for (key, value) in object {
1663 let encoded_value = match value {
1664 Value::Array(values)
1665 if matches!(
1666 schema.field(key),
1667 Some(crate::schema::Field {
1668 kind: FieldKind::Vector { .. },
1669 ..
1670 })
1671 ) =>
1672 {
1673 EncodedHashValue::Binary(encode_vector_hash_field(key, values, schema)?)
1674 }
1675 Value::Null => EncodedHashValue::String("null".to_owned()),
1676 Value::Bool(value) => EncodedHashValue::String(value.to_string()),
1677 Value::Number(value) => EncodedHashValue::String(value.to_string()),
1678 Value::String(value) => EncodedHashValue::String(value.clone()),
1679 Value::Array(_) | Value::Object(_) => {
1680 EncodedHashValue::String(serde_json::to_string(value)?)
1681 }
1682 };
1683 pairs.push((key.clone(), encoded_value));
1684 }
1685 Ok(pairs)
1686}
1687
1688fn encode_vector_hash_field(
1689 field_name: &str,
1690 values: &[Value],
1691 schema: &IndexSchema,
1692) -> Result<Vec<u8>> {
1693 let Some(crate::schema::Field {
1694 kind: FieldKind::Vector { attrs },
1695 ..
1696 }) = schema.field(field_name)
1697 else {
1698 return Err(Error::SchemaValidation(format!(
1699 "vector field '{field_name}' not found in schema"
1700 )));
1701 };
1702
1703 if values.len() != attrs.dims {
1704 return Err(Error::InvalidInput(format!(
1705 "vector field '{field_name}' expected {} elements, received {}",
1706 attrs.dims,
1707 values.len()
1708 )));
1709 }
1710
1711 match attrs.datatype {
1712 crate::schema::VectorDataType::Bfloat16 => {
1713 let mut buffer = Vec::with_capacity(values.len() * 2);
1714 for value in values {
1715 let number = json_number_to_f64(value, field_name)? as f32;
1716 let bits = number.to_bits();
1718 let bf16 = (bits >> 16) as u16;
1719 buffer.extend_from_slice(&bf16.to_le_bytes());
1720 }
1721 Ok(buffer)
1722 }
1723 crate::schema::VectorDataType::Float16 => {
1724 let mut buffer = Vec::with_capacity(values.len() * 2);
1725 for value in values {
1726 let number = json_number_to_f64(value, field_name)? as f32;
1727 buffer.extend_from_slice(&f32_to_f16_bytes(number).to_le_bytes());
1728 }
1729 Ok(buffer)
1730 }
1731 crate::schema::VectorDataType::Float32 => {
1732 let mut buffer = Vec::with_capacity(values.len() * std::mem::size_of::<f32>());
1733 for value in values {
1734 let number = json_number_to_f64(value, field_name)? as f32;
1735 buffer.extend_from_slice(&number.to_le_bytes());
1736 }
1737 Ok(buffer)
1738 }
1739 crate::schema::VectorDataType::Float64 => {
1740 let mut buffer = Vec::with_capacity(values.len() * std::mem::size_of::<f64>());
1741 for value in values {
1742 let number = json_number_to_f64(value, field_name)?;
1743 buffer.extend_from_slice(&number.to_le_bytes());
1744 }
1745 Ok(buffer)
1746 }
1747 }
1748}
1749
1750fn json_number_to_f64(value: &Value, field_name: &str) -> Result<f64> {
1751 value.as_f64().ok_or_else(|| {
1752 Error::InvalidInput(format!(
1753 "vector field '{field_name}' must be encoded from numeric JSON values"
1754 ))
1755 })
1756}
1757
1758fn f32_to_f16_bytes(value: f32) -> u16 {
1760 let bits = value.to_bits();
1761 let sign = (bits >> 16) & 0x8000;
1762 let exponent = ((bits >> 23) & 0xFF) as i32;
1763 let mantissa = bits & 0x007F_FFFF;
1764
1765 if exponent == 255 {
1766 let m = if mantissa != 0 { 0x0200 } else { 0 };
1768 return (sign | 0x7C00 | m) as u16;
1769 }
1770
1771 let unbiased = exponent - 127;
1772 if unbiased > 15 {
1773 return (sign | 0x7C00) as u16;
1775 }
1776 if unbiased < -24 {
1777 return sign as u16;
1779 }
1780 if unbiased < -14 {
1781 let shift = -14 - unbiased;
1783 let m = (mantissa | 0x0080_0000) >> (shift + 13);
1784 return (sign | m) as u16;
1785 }
1786
1787 let exp16 = ((unbiased + 15) as u32) << 10;
1788 let m = mantissa >> 13;
1789 (sign | exp16 | m) as u16
1790}
1791
1792fn prepare_load_records<F>(data: &[Value], preprocess: &mut F) -> Result<Vec<Value>>
1793where
1794 F: FnMut(&Value) -> Result<Value>,
1795{
1796 let mut prepared = Vec::with_capacity(data.len());
1797 for record in data {
1798 let processed = preprocess(record)?;
1799 if !processed.is_object() {
1800 return Err(Error::InvalidInput(
1801 "preprocess must return a JSON object".to_owned(),
1802 ));
1803 }
1804 prepared.push(processed);
1805 }
1806 Ok(prepared)
1807}
1808
1809fn parse_search_result(value: redis::Value) -> Result<SearchResult> {
1810 let entries = match value {
1811 redis::Value::Array(entries) => entries,
1812 redis::Value::Nil => return Ok(SearchResult::new(0, Vec::new())),
1813 other => {
1814 return Err(Error::InvalidInput(format!(
1815 "expected FT.SEARCH array response, received {other:?}"
1816 )));
1817 }
1818 };
1819
1820 let mut entries = VecDeque::from(entries);
1821 let total = entries
1822 .pop_front()
1823 .map(redis_value_to_usize)
1824 .transpose()?
1825 .unwrap_or(0);
1826
1827 let mut docs = Vec::new();
1828 while let Some(id_value) = entries.pop_front() {
1829 let id = redis_value_to_string(&id_value)?;
1830 let fields = match entries.front() {
1831 Some(next) if is_search_payload(next) => {
1832 let payload = entries.pop_front().expect("front element exists");
1833 decode_search_payload(payload)?
1834 }
1835 _ => Map::new(),
1836 };
1837 docs.push(SearchDocument::new(id, fields));
1838 }
1839
1840 Ok(SearchResult::new(total, docs))
1841}
1842
1843fn parse_hybrid_result(value: redis::Value) -> Result<Vec<Map<String, Value>>> {
1853 let entries = match value {
1854 redis::Value::Array(entries) => entries,
1855 redis::Value::Nil => return Ok(Vec::new()),
1856 other => {
1857 return Err(Error::InvalidInput(format!(
1858 "expected FT.HYBRID array response, received {other:?}"
1859 )));
1860 }
1861 };
1862
1863 let mut results_value: Option<redis::Value> = None;
1865 let mut iter = entries.into_iter();
1866 while let Some(key) = iter.next() {
1867 let key_str = redis_value_to_string(&key).unwrap_or_default();
1868 let val = iter.next();
1869 match key_str.as_str() {
1870 "results" => {
1871 results_value = val;
1872 }
1873 _ => {
1874 }
1876 }
1877 }
1878
1879 let results_array = match results_value {
1880 Some(redis::Value::Array(arr)) => arr,
1881 Some(redis::Value::Nil) | None => return Ok(Vec::new()),
1882 Some(other) => {
1883 return Err(Error::InvalidInput(format!(
1884 "expected results array in FT.HYBRID response, received {other:?}"
1885 )));
1886 }
1887 };
1888
1889 let mut documents = Vec::with_capacity(results_array.len());
1890 for entry in results_array {
1891 match entry {
1892 redis::Value::Array(pairs) => {
1893 let mut map = Map::new();
1894 let mut pair_iter = pairs.into_iter();
1895 while let Some(field_val) = pair_iter.next() {
1896 let field = redis_value_to_string(&field_val)?;
1897 if let Some(value_val) = pair_iter.next() {
1898 let json_val = redis_value_to_json(value_val)?;
1899 if !field.starts_with("__") {
1901 map.insert(field, json_val);
1902 }
1903 }
1904 }
1905 documents.push(map);
1906 }
1907 _ => {
1908 }
1910 }
1911 }
1912
1913 Ok(documents)
1914}
1915
1916fn parse_info_response(value: redis::Value) -> Result<Map<String, Value>> {
1917 let entries = match value {
1918 redis::Value::Map(entries) => entries,
1919 redis::Value::Array(entries) => {
1920 let mut pairs = VecDeque::from(entries);
1921 let mut mapped = Vec::new();
1922 while let Some(key) = pairs.pop_front() {
1923 let Some(value) = pairs.pop_front() else {
1924 return Err(Error::InvalidInput(
1925 "FT.INFO response contained an odd number of elements".to_owned(),
1926 ));
1927 };
1928 mapped.push((key, value));
1929 }
1930 mapped
1931 }
1932 other => {
1933 return Err(Error::InvalidInput(format!(
1934 "expected FT.INFO map response, received {other:?}"
1935 )));
1936 }
1937 };
1938
1939 let mut info = Map::with_capacity(entries.len());
1940 for (key, value) in entries {
1941 info.insert(redis_value_to_string(&key)?, redis_value_to_json(value)?);
1942 }
1943 Ok(info)
1944}
1945
1946fn parse_aggregate_result(value: redis::Value) -> Result<Vec<Map<String, Value>>> {
1950 let entries = match value {
1951 redis::Value::Array(entries) => entries,
1952 redis::Value::Nil => return Ok(Vec::new()),
1953 other => {
1954 return Err(Error::InvalidInput(format!(
1955 "expected FT.AGGREGATE array response, received {other:?}"
1956 )));
1957 }
1958 };
1959
1960 let mut it = entries.into_iter();
1961
1962 let _total = it.next();
1964
1965 let mut documents = Vec::new();
1966 for row in it {
1967 let row_entries = match row {
1968 redis::Value::Array(entries) => entries,
1969 redis::Value::Map(entries) => entries
1970 .into_iter()
1971 .flat_map(|(k, v)| [k, v])
1972 .collect::<Vec<_>>(),
1973 _ => continue,
1974 };
1975
1976 let mut pairs = VecDeque::from(row_entries);
1977 let mut map = Map::new();
1978 while let Some(key) = pairs.pop_front() {
1979 let Some(val) = pairs.pop_front() else { break };
1980 let field = redis_value_to_string(&key)?;
1981 if field == "__score" {
1982 continue; }
1984 map.insert(field, redis_value_to_json(val)?);
1985 }
1986 documents.push(map);
1987 }
1988
1989 Ok(documents)
1990}
1991
1992fn schema_from_info(name: &str, info: &Map<String, Value>) -> Result<IndexSchema> {
1996 let index_def = info.get("index_definition").and_then(Value::as_array);
1998
1999 let mut storage_type = StorageType::Hash;
2000 let mut prefix = crate::schema::Prefix::default();
2001
2002 if let Some(def_arr) = index_def {
2003 let mut i = 0;
2005 while i + 1 < def_arr.len() {
2006 let key = def_arr[i].as_str().unwrap_or("");
2007 match key {
2008 "key_type" => {
2009 if let Some(v) = def_arr[i + 1].as_str() {
2010 storage_type = match v.to_uppercase().as_str() {
2011 "JSON" => StorageType::Json,
2012 _ => StorageType::Hash,
2013 };
2014 }
2015 }
2016 "prefixes" => {
2017 if let Some(arr) = def_arr[i + 1].as_array() {
2018 let prefixes: Vec<String> = arr
2019 .iter()
2020 .filter_map(Value::as_str)
2021 .map(String::from)
2022 .collect();
2023 prefix = if prefixes.len() == 1 {
2024 crate::schema::Prefix::Single(prefixes.into_iter().next().unwrap())
2025 } else {
2026 crate::schema::Prefix::Multi(prefixes)
2027 };
2028 }
2029 }
2030 _ => {}
2031 }
2032 i += 2;
2033 }
2034 }
2035
2036 let attributes = info.get("attributes").and_then(Value::as_array);
2038 let mut fields = Vec::new();
2039
2040 if let Some(attrs) = attributes {
2041 for attr_val in attrs {
2042 let attr_arr = match attr_val.as_array() {
2043 Some(arr) => arr,
2044 None => continue,
2045 };
2046
2047 if attr_arr.is_empty() {
2048 continue;
2049 }
2050
2051 let mut field_name = String::new();
2053 let mut field_type = String::new();
2054 let mut sortable = false;
2055 let mut no_index = false;
2056 let mut case_sensitive = false;
2057 let mut separator: Option<String> = None;
2058 let mut weight: Option<f32> = None;
2059 let mut no_stem = false;
2060 let mut with_suffix_trie = false;
2061 let mut phonetic: Option<String> = None;
2062 let mut algorithm = String::new();
2064 let mut dims: usize = 0;
2065 let mut distance_metric = String::new();
2066 let mut datatype = String::new();
2067
2068 let mut i = 0;
2069 while i < attr_arr.len() {
2070 let key = attr_arr[i].as_str().unwrap_or("");
2071 match key {
2072 "identifier" | "attribute" => {
2073 if i + 1 < attr_arr.len() {
2074 if let Some(v) = attr_arr[i + 1].as_str() {
2075 if key == "attribute" || field_name.is_empty() {
2076 field_name = v.to_owned();
2077 }
2078 }
2079 }
2080 i += 2;
2081 }
2082 "type" => {
2083 if i + 1 < attr_arr.len() {
2084 if let Some(v) = attr_arr[i + 1].as_str() {
2085 field_type = v.to_uppercase();
2086 }
2087 }
2088 i += 2;
2089 }
2090 "SORTABLE" => {
2091 sortable = true;
2092 i += 1;
2093 }
2094 "NOINDEX" => {
2095 no_index = true;
2096 i += 1;
2097 }
2098 "CASESENSITIVE" => {
2099 case_sensitive = true;
2100 i += 1;
2101 }
2102 "NOSTEM" => {
2103 no_stem = true;
2104 i += 1;
2105 }
2106 "WITHSUFFIXTRIE" => {
2107 with_suffix_trie = true;
2108 i += 1;
2109 }
2110 "SEPARATOR" => {
2111 if i + 1 < attr_arr.len() {
2112 separator = attr_arr[i + 1].as_str().map(String::from);
2113 }
2114 i += 2;
2115 }
2116 "WEIGHT" => {
2117 if i + 1 < attr_arr.len() {
2118 weight = attr_arr[i + 1]
2119 .as_str()
2120 .and_then(|s| s.parse::<f32>().ok())
2121 .or_else(|| attr_arr[i + 1].as_f64().map(|v| v as f32));
2122 }
2123 i += 2;
2124 }
2125 "PHONETIC" => {
2126 if i + 1 < attr_arr.len() {
2127 phonetic = attr_arr[i + 1].as_str().map(String::from);
2128 }
2129 i += 2;
2130 }
2131 _ if field_type == "VECTOR" => {
2132 let upper = key.to_uppercase();
2136 if upper == "FLAT" || upper == "HNSW" {
2137 algorithm = upper.to_lowercase();
2138 i += 1;
2139 if i < attr_arr.len() {
2141 if attr_arr[i]
2142 .as_str()
2143 .and_then(|s| s.parse::<usize>().ok())
2144 .is_some()
2145 || attr_arr[i].as_i64().is_some()
2146 {
2147 i += 1; }
2149 }
2150 } else if upper == "ALGORITHM" {
2151 if i + 1 < attr_arr.len() {
2152 algorithm =
2153 attr_arr[i + 1].as_str().unwrap_or("flat").to_lowercase();
2154 }
2155 i += 2;
2156 } else if upper == "DIM" || upper == "DIMS" {
2157 if i + 1 < attr_arr.len() {
2158 dims = attr_arr[i + 1]
2159 .as_str()
2160 .and_then(|s| s.parse().ok())
2161 .or_else(|| attr_arr[i + 1].as_u64().map(|v| v as usize))
2162 .unwrap_or(0);
2163 }
2164 i += 2;
2165 } else if upper == "DISTANCE_METRIC" {
2166 if i + 1 < attr_arr.len() {
2167 distance_metric =
2168 attr_arr[i + 1].as_str().unwrap_or("cosine").to_lowercase();
2169 }
2170 i += 2;
2171 } else if upper == "TYPE" || upper == "DATA_TYPE" || upper == "DATATYPE" {
2172 if i + 1 < attr_arr.len() {
2173 datatype =
2174 attr_arr[i + 1].as_str().unwrap_or("float32").to_lowercase();
2175 }
2176 i += 2;
2177 } else {
2178 i += 2;
2180 }
2181 }
2182 _ => {
2183 i += 1;
2184 }
2185 }
2186 }
2187
2188 let field_name = field_name
2190 .strip_prefix("$.")
2191 .unwrap_or(&field_name)
2192 .to_owned();
2193
2194 let separator = separator.filter(|s| s != ",");
2201 let weight = weight.filter(|w| (*w - 1.0).abs() > f32::EPSILON);
2202
2203 let kind = match field_type.as_str() {
2204 "TAG" => FieldKind::Tag {
2205 attrs: crate::schema::TagFieldAttributes {
2206 separator,
2207 case_sensitive,
2208 sortable,
2209 no_index,
2210 index_missing: false,
2211 index_empty: false,
2212 },
2213 },
2214 "TEXT" => FieldKind::Text {
2215 attrs: crate::schema::TextFieldAttributes {
2216 weight,
2217 sortable,
2218 no_stem,
2219 no_index,
2220 phonetic,
2221 with_suffix_trie,
2222 index_missing: false,
2223 index_empty: false,
2224 },
2225 },
2226 "NUMERIC" => FieldKind::Numeric {
2227 attrs: crate::schema::NumericFieldAttributes {
2228 sortable,
2229 no_index,
2230 index_missing: false,
2231 index_empty: false,
2232 },
2233 },
2234 "GEO" => FieldKind::Geo {
2235 attrs: crate::schema::GeoFieldAttributes {
2236 sortable,
2237 no_index,
2238 index_missing: false,
2239 index_empty: false,
2240 },
2241 },
2242 "VECTOR" => {
2243 let algo = match algorithm.to_lowercase().as_str() {
2244 "hnsw" => crate::schema::VectorAlgorithm::Hnsw,
2245 "svs-vamana" | "svs_vamana" => crate::schema::VectorAlgorithm::SvsVamana,
2246 _ => crate::schema::VectorAlgorithm::Flat,
2247 };
2248 let dm = match distance_metric.as_str() {
2249 "l2" => crate::schema::VectorDistanceMetric::L2,
2250 "ip" => crate::schema::VectorDistanceMetric::Ip,
2251 _ => crate::schema::VectorDistanceMetric::Cosine,
2252 };
2253 let dt = match datatype.to_lowercase().as_str() {
2254 "float64" => crate::schema::VectorDataType::Float64,
2255 "float16" => crate::schema::VectorDataType::Float16,
2256 "bfloat16" => crate::schema::VectorDataType::Bfloat16,
2257 _ => crate::schema::VectorDataType::Float32,
2258 };
2259 FieldKind::Vector {
2260 attrs: crate::schema::VectorFieldAttributes {
2261 algorithm: algo,
2262 dims,
2263 distance_metric: dm,
2264 datatype: dt,
2265 initial_cap: None,
2266 block_size: None,
2267 m: None,
2268 ef_construction: None,
2269 ef_runtime: None,
2270 epsilon: None,
2271 graph_max_degree: None,
2272 construction_window_size: None,
2273 search_window_size: None,
2274 compression: None,
2275 reduce: None,
2276 training_threshold: None,
2277 },
2278 }
2279 }
2280 _ => continue, };
2282
2283 fields.push(crate::schema::Field {
2284 name: field_name,
2285 path: None,
2286 kind,
2287 });
2288 }
2289 }
2290
2291 Ok(IndexSchema {
2292 index: IndexDefinition {
2293 name: name.to_owned(),
2294 prefix,
2295 key_separator: ":".to_owned(),
2296 storage_type,
2297 stopwords: Vec::new(),
2298 },
2299 fields,
2300 })
2301}
2302
2303fn process_search_result<Q>(
2304 results: SearchResult,
2305 query: &Q,
2306 storage_type: StorageType,
2307) -> Result<QueryOutput>
2308where
2309 Q: QueryString + ?Sized,
2310{
2311 if query.kind() == QueryKind::Count {
2312 return Ok(QueryOutput::Count(results.total));
2313 }
2314
2315 let unpack_json = matches!(storage_type, StorageType::Json)
2316 && query.should_unpack_json()
2317 && query.render().return_fields.is_empty();
2318 let mut documents = Vec::with_capacity(results.docs.len());
2319
2320 for document in results.docs {
2321 let mut map = document.into_map();
2322 if unpack_json {
2323 map = unpack_json_document(map)?;
2324 }
2325 map.remove("payload");
2326 documents.push(map);
2327 }
2328
2329 Ok(QueryOutput::Documents(documents))
2330}
2331
2332fn unpack_json_document(mut document: Map<String, Value>) -> Result<Map<String, Value>> {
2333 let Some(json_value) = document.remove("json") else {
2334 return Ok(document);
2335 };
2336
2337 let parsed = match json_value {
2338 Value::String(raw) => serde_json::from_str::<Value>(&raw)?,
2339 value => value,
2340 };
2341
2342 let mut unpacked = Map::new();
2343 if let Some(id) = document.remove("id") {
2344 unpacked.insert("id".to_owned(), id);
2345 }
2346
2347 match parsed {
2348 Value::Object(object) => {
2349 unpacked.extend(object);
2350 Ok(unpacked)
2351 }
2352 other => Err(Error::InvalidInput(format!(
2353 "expected JSON object payload while unpacking search result, received {other:?}"
2354 ))),
2355 }
2356}
2357
2358fn is_search_payload(value: &redis::Value) -> bool {
2359 matches!(
2360 value,
2361 redis::Value::Array(_) | redis::Value::Map(_) | redis::Value::Attribute { .. }
2362 )
2363}
2364
2365fn decode_search_payload(value: redis::Value) -> Result<Map<String, Value>> {
2366 match value {
2367 redis::Value::Array(entries) => decode_search_pairs(entries),
2368 redis::Value::Map(entries) => {
2369 let flat = entries
2370 .into_iter()
2371 .flat_map(|(key, value)| [key, value])
2372 .collect::<Vec<_>>();
2373 decode_search_pairs(flat)
2374 }
2375 redis::Value::Attribute { data, .. } => decode_search_payload(*data),
2376 other => Err(Error::InvalidInput(format!(
2377 "expected FT.SEARCH document payload, received {other:?}"
2378 ))),
2379 }
2380}
2381
2382fn decode_search_pairs(entries: Vec<redis::Value>) -> Result<Map<String, Value>> {
2383 let mut pairs = VecDeque::from(entries);
2384 let mut fields = Map::new();
2385 while let Some(key) = pairs.pop_front() {
2386 let Some(value) = pairs.pop_front() else {
2387 return Err(Error::InvalidInput(
2388 "FT.SEARCH document payload contained an odd number of elements".to_owned(),
2389 ));
2390 };
2391 let field = redis_value_to_string(&key)?;
2392 let normalized = if field == "$" { "json" } else { field.as_str() };
2393 fields.insert(normalized.to_owned(), redis_value_to_json(value)?);
2394 }
2395 Ok(fields)
2396}
2397
2398fn redis_value_to_usize(value: redis::Value) -> Result<usize> {
2399 let number =
2400 match value {
2401 redis::Value::Int(value) => value,
2402 redis::Value::BulkString(bytes) => String::from_utf8_lossy(&bytes)
2403 .parse::<i64>()
2404 .map_err(|_| {
2405 Error::InvalidInput("unable to parse integer Redis response".to_owned())
2406 })?,
2407 redis::Value::SimpleString(value) => value.parse::<i64>().map_err(|_| {
2408 Error::InvalidInput("unable to parse integer Redis response".to_owned())
2409 })?,
2410 other => {
2411 return Err(Error::InvalidInput(format!(
2412 "expected integer Redis response, received {other:?}"
2413 )));
2414 }
2415 };
2416
2417 usize::try_from(number)
2418 .map_err(|_| Error::InvalidInput("redis returned a negative integer".to_owned()))
2419}
2420
2421fn redis_value_to_string(value: &redis::Value) -> Result<String> {
2422 match value {
2423 redis::Value::BulkString(bytes) => Ok(String::from_utf8_lossy(bytes).into_owned()),
2424 redis::Value::SimpleString(value) => Ok(value.clone()),
2425 redis::Value::VerbatimString { text, .. } => Ok(text.clone()),
2426 redis::Value::Int(value) => Ok(value.to_string()),
2427 redis::Value::Double(value) => Ok(value.to_string()),
2428 redis::Value::Boolean(value) => Ok(value.to_string()),
2429 other => Err(Error::InvalidInput(format!(
2430 "expected string-like Redis response, received {other:?}"
2431 ))),
2432 }
2433}
2434
2435fn redis_value_to_json(value: redis::Value) -> Result<Value> {
2436 match value {
2437 redis::Value::Nil => Ok(Value::Null),
2438 redis::Value::Int(value) => Ok(Value::from(value)),
2439 redis::Value::Double(value) => Ok(Value::from(value)),
2440 redis::Value::Boolean(value) => Ok(Value::from(value)),
2441 redis::Value::BulkString(bytes) => {
2442 Ok(Value::String(String::from_utf8_lossy(&bytes).into_owned()))
2443 }
2444 redis::Value::SimpleString(value) => Ok(Value::String(value)),
2445 redis::Value::Okay => Ok(Value::String("OK".to_owned())),
2446 redis::Value::VerbatimString { text, .. } => Ok(Value::String(text)),
2447 redis::Value::Array(values) | redis::Value::Set(values) => {
2448 let mut array = Vec::with_capacity(values.len());
2449 for value in values {
2450 array.push(redis_value_to_json(value)?);
2451 }
2452 Ok(Value::Array(array))
2453 }
2454 redis::Value::Map(entries) => {
2455 let mut object = Map::with_capacity(entries.len());
2456 for (key, value) in entries {
2457 object.insert(redis_value_to_string(&key)?, redis_value_to_json(value)?);
2458 }
2459 Ok(Value::Object(object))
2460 }
2461 redis::Value::Attribute { data, .. } => redis_value_to_json(*data),
2462 redis::Value::BigNumber(number) => Ok(Value::String(number.to_string())),
2463 redis::Value::Push { .. } | redis::Value::ServerError(_) => {
2464 Ok(Value::String(format!("{value:?}")))
2465 }
2466 _ => Ok(Value::String(format!("{value:?}"))),
2467 }
2468}
2469
2470#[cfg(test)]
2471mod tests {
2472 use super::{
2473 EncodedHashValue, QueryOutput, SearchDocument, SearchIndex, SearchResult, compose_key,
2474 encode_hash_record, parse_aggregate_result, parse_info_response, parse_search_result,
2475 prepare_load_records, process_search_result, schema_from_info,
2476 };
2477 use crate::{
2478 filter::Tag,
2479 query::{CountQuery, FilterQuery},
2480 schema::{IndexSchema, StorageType},
2481 };
2482 use serde_json::{Map, Value, json};
2483
2484 #[test]
2485 fn search_index_properties_should_match_python_integration_test_search_index() {
2486 let index = SearchIndex::from_json_value(
2487 serde_json::json!({
2488 "index": { "name": "my_index" },
2489 "fields": [
2490 { "name": "test", "type": "tag" }
2491 ]
2492 }),
2493 "redis://127.0.0.1:6379",
2494 )
2495 .expect("index should parse");
2496
2497 assert_eq!(index.name(), "my_index");
2498 assert_eq!(index.prefix(), "rvl");
2499 assert_eq!(index.key_separator(), ":");
2500 assert!(matches!(index.storage_type(), StorageType::Hash));
2501 assert_eq!(index.key("foo"), "rvl:foo");
2502 }
2503
2504 #[test]
2505 fn search_index_should_honor_empty_prefix_like_python_integration_test_search_index() {
2506 let index = SearchIndex::from_json_value(
2507 serde_json::json!({
2508 "index": { "name": "my_index", "prefix": "" },
2509 "fields": [
2510 { "name": "test", "type": "tag" }
2511 ]
2512 }),
2513 "redis://127.0.0.1:6379",
2514 )
2515 .expect("index should parse");
2516
2517 assert_eq!(index.prefix(), "");
2518 assert_eq!(index.key("foo"), "foo");
2519 }
2520
2521 #[test]
2522 fn search_index_key_should_normalize_trailing_separator_like_python_key_separator_tests() {
2523 let index = SearchIndex::from_json_value(
2524 serde_json::json!({
2525 "index": {
2526 "name": "my_index",
2527 "prefix": "user::",
2528 "key_separator": ":"
2529 },
2530 "fields": [
2531 { "name": "test", "type": "tag" }
2532 ]
2533 }),
2534 "redis://127.0.0.1:6379",
2535 )
2536 .expect("index should parse");
2537
2538 assert_eq!(index.key("456"), "user:456");
2539 assert!(!index.key("456").contains("::"));
2540 }
2541
2542 #[test]
2543 fn search_index_key_should_use_custom_separator_consistently_like_python_key_separator_tests() {
2544 let index = SearchIndex::from_json_value(
2545 serde_json::json!({
2546 "index": {
2547 "name": "my_index",
2548 "prefix": "app:user",
2549 "key_separator": "-"
2550 },
2551 "fields": [
2552 { "name": "test", "type": "tag" }
2553 ]
2554 }),
2555 "redis://127.0.0.1:6379",
2556 )
2557 .expect("index should parse");
2558
2559 assert_eq!(index.key("999"), "app:user-999");
2560 assert_eq!(compose_key("routes:", ":", "ref1"), "routes:ref1");
2561 assert_eq!(compose_key("data", "::", "id"), "data::id");
2562 assert_eq!(compose_key("data::", "::", "id"), "data::id");
2563 }
2564
2565 #[test]
2566 fn search_index_multi_prefix_should_expose_all_prefixes_like_python_multi_prefix_tests() {
2567 let index = SearchIndex::from_json_value(
2568 serde_json::json!({
2569 "index": {
2570 "name": "multi_pfx",
2571 "prefix": ["pfx_a", "pfx_b"]
2572 },
2573 "fields": [
2574 { "name": "test", "type": "tag" }
2575 ]
2576 }),
2577 "redis://127.0.0.1:6379",
2578 )
2579 .expect("index should parse");
2580
2581 assert_eq!(index.prefix(), "pfx_a");
2582 assert_eq!(index.prefixes(), vec!["pfx_a", "pfx_b"]);
2583 assert_eq!(index.key("doc1"), "pfx_a:doc1");
2584 }
2585
2586 #[test]
2587 fn compose_key_should_handle_special_separators_like_python_key_separator_tests() {
2588 for sep in &["_", "::", "->", ".", "/"] {
2589 let result = compose_key("data", sep, "id");
2590 assert_eq!(result, format!("data{sep}id"));
2591 }
2592 }
2593
2594 #[test]
2596 fn trailing_separator_normalization_like_python_key_separator_tests() {
2597 let cases = [
2598 ("user:", ":", "123", "user:123"),
2599 ("user::", ":", "456", "user:456"),
2600 ("user", ":", "789", "user:789"),
2601 ("user-", "-", "abc", "user-abc"),
2602 ];
2603 for (prefix, sep, id, expected) in &cases {
2604 let result = compose_key(prefix, sep, id);
2605 assert_eq!(result, *expected, "prefix={prefix:?} sep={sep:?} id={id:?}");
2606 }
2607 }
2608
2609 #[test]
2611 fn empty_prefix_compose_key_like_python_key_separator_tests() {
2612 let result = compose_key("", ":", "789");
2613 assert_eq!(result, "789");
2614 }
2615
2616 #[test]
2617 fn hash_load_validation_should_require_string_id_field_like_python_search_index_tests() {
2618 let index = SearchIndex::from_json_value(
2619 serde_json::json!({
2620 "index": { "name": "my_index" },
2621 "fields": [
2622 { "name": "test", "type": "tag" }
2623 ]
2624 }),
2625 "redis://127.0.0.1:6379",
2626 )
2627 .expect("index should parse");
2628
2629 let error = index
2630 .load(
2631 &[serde_json::json!({ "wrong_key": "1", "value": "test" })],
2632 "id",
2633 None,
2634 )
2635 .expect_err("missing id field should error before redis usage");
2636
2637 assert!(error.to_string().contains("missing string id field"));
2638 }
2639
2640 #[test]
2641 fn search_result_parser_should_decode_hash_results_like_python_search() {
2642 let parsed = parse_search_result(redis::Value::Array(vec![
2643 redis::Value::Int(2),
2644 redis::Value::BulkString(b"users:john".to_vec()),
2645 redis::Value::Array(vec![
2646 redis::Value::BulkString(b"user".to_vec()),
2647 redis::Value::BulkString(b"john".to_vec()),
2648 redis::Value::BulkString(b"age".to_vec()),
2649 redis::Value::BulkString(b"18".to_vec()),
2650 ]),
2651 redis::Value::BulkString(b"users:mary".to_vec()),
2652 redis::Value::Array(vec![
2653 redis::Value::BulkString(b"user".to_vec()),
2654 redis::Value::BulkString(b"mary".to_vec()),
2655 redis::Value::BulkString(b"vector_distance".to_vec()),
2656 redis::Value::BulkString(b"0".to_vec()),
2657 ]),
2658 ]))
2659 .expect("result should parse");
2660
2661 assert_eq!(parsed.total, 2);
2662 assert_eq!(parsed.docs.len(), 2);
2663 assert_eq!(parsed.docs[0].id(), "users:john");
2664 assert_eq!(
2665 parsed.docs[0].get("user"),
2666 Some(&Value::String("john".to_owned()))
2667 );
2668 assert_eq!(
2669 parsed.docs[1].to_map().get("vector_distance"),
2670 Some(&Value::String("0".to_owned()))
2671 );
2672 }
2673
2674 #[test]
2675 fn process_search_result_should_unpack_json_for_filter_queries_without_projection() {
2676 let mut fields = Map::new();
2677 fields.insert(
2678 "json".to_owned(),
2679 Value::String(r#"{"user":"john","age":18,"credit_score":"high"}"#.to_owned()),
2680 );
2681 let results = SearchResult::new(1, vec![SearchDocument::new("users:john", fields)]);
2682 let query = FilterQuery::new(Tag::new("credit_score").eq("high"));
2683
2684 let processed = process_search_result(results, &query, StorageType::Json)
2685 .expect("query should process");
2686
2687 assert_eq!(
2688 processed,
2689 QueryOutput::Documents(vec![Map::from_iter([
2690 ("id".to_owned(), Value::String("users:john".to_owned())),
2691 ("user".to_owned(), Value::String("john".to_owned())),
2692 ("age".to_owned(), json!(18)),
2693 ("credit_score".to_owned(), Value::String("high".to_owned())),
2694 ])])
2695 );
2696 }
2697
2698 #[test]
2699 fn process_search_result_should_return_count_for_count_queries() {
2700 let results = SearchResult::new(7, Vec::new());
2701 let query = CountQuery::new();
2702
2703 let processed = process_search_result(results, &query, StorageType::Hash)
2704 .expect("count should process");
2705
2706 assert_eq!(processed, QueryOutput::Count(7));
2707 }
2708
2709 #[test]
2710 fn paginate_should_reject_zero_page_size_before_redis_usage() {
2711 let index = SearchIndex::from_json_value(
2712 serde_json::json!({
2713 "index": { "name": "my_index" },
2714 "fields": [
2715 { "name": "brand", "type": "tag" }
2716 ]
2717 }),
2718 "redis://127.0.0.1:6379",
2719 )
2720 .expect("index should parse");
2721 let query = FilterQuery::new(Tag::new("brand").eq("Nike"));
2722
2723 let error = index
2724 .paginate(&query, 0)
2725 .expect_err("zero page size should fail before redis usage");
2726
2727 assert!(
2728 error
2729 .to_string()
2730 .contains("page_size must be greater than zero")
2731 );
2732 }
2733
2734 #[test]
2735 fn create_with_options_should_reject_empty_schema_before_redis_usage() {
2736 let index = SearchIndex::from_json_value(
2737 serde_json::json!({
2738 "index": { "name": "empty_index" }
2739 }),
2740 "redis://127.0.0.1:6379",
2741 )
2742 .expect("index should parse");
2743
2744 let error = index
2745 .create_with_options(true, true)
2746 .expect_err("empty schema should fail before redis usage");
2747
2748 assert!(error.to_string().contains("No fields defined for index"));
2749 }
2750
2751 #[test]
2752 fn prepare_load_records_should_apply_preprocess_like_python_search_index_tests() {
2753 let prepared = prepare_load_records(&[json!({"id": "1", "test": "foo"})], &mut |record| {
2754 let mut record = record.clone();
2755 let object = record
2756 .as_object_mut()
2757 .expect("record remains an object during preprocessing");
2758 object.insert("test".to_owned(), Value::String("bar".to_owned()));
2759 Ok(record)
2760 })
2761 .expect("preprocess should succeed");
2762
2763 assert_eq!(prepared[0]["test"], Value::String("bar".to_owned()));
2764 }
2765
2766 #[test]
2767 fn prepare_load_records_should_reject_non_object_preprocess_results() {
2768 let error = prepare_load_records(&[json!({"id": "1", "test": "foo"})], &mut |_| {
2769 Ok(Value::String("invalid".to_owned()))
2770 })
2771 .expect_err("non-object preprocess output should fail");
2772
2773 assert!(
2774 error
2775 .to_string()
2776 .contains("preprocess must return a JSON object")
2777 );
2778 }
2779
2780 #[test]
2781 fn parse_info_response_should_decode_ft_info_shape() {
2782 let info = parse_info_response(redis::Value::Array(vec![
2783 redis::Value::BulkString(b"index_name".to_vec()),
2784 redis::Value::BulkString(b"my_index".to_vec()),
2785 redis::Value::BulkString(b"num_docs".to_vec()),
2786 redis::Value::Int(3),
2787 redis::Value::BulkString(b"hash_indexing_failures".to_vec()),
2788 redis::Value::Int(0),
2789 ]))
2790 .expect("info should parse");
2791
2792 assert_eq!(info["index_name"], Value::String("my_index".to_owned()));
2793 assert_eq!(info["num_docs"], json!(3));
2794 assert_eq!(info["hash_indexing_failures"], json!(0));
2795 }
2796
2797 #[test]
2798 fn search_document_should_expose_id_through_indexing_like_python_results_docs() {
2799 let document = SearchDocument::new(
2800 "rvl:1",
2801 Map::from_iter([("test".to_owned(), Value::String("foo".to_owned()))]),
2802 );
2803
2804 assert_eq!(document.id(), "rvl:1");
2805 assert_eq!(document["id"], Value::String("rvl:1".to_owned()));
2806 assert_eq!(document["test"], Value::String("foo".to_owned()));
2807 }
2808
2809 #[test]
2810 fn encode_hash_record_should_pack_vector_arrays_for_hash_storage() {
2811 let schema = IndexSchema::from_json_value(json!({
2812 "index": { "name": "my_index", "storage_type": "hash" },
2813 "fields": [
2814 { "name": "id", "type": "tag" },
2815 {
2816 "name": "embedding",
2817 "type": "vector",
2818 "attrs": {
2819 "dims": 3,
2820 "distance_metric": "COSINE",
2821 "algorithm": "FLAT",
2822 "datatype": "FLOAT32"
2823 }
2824 }
2825 ]
2826 }))
2827 .expect("schema should parse");
2828
2829 let encoded = encode_hash_record(
2830 &json!({
2831 "id": "1",
2832 "embedding": [0.1, 0.2, 0.3]
2833 })
2834 .as_object()
2835 .expect("record should be an object")
2836 .clone(),
2837 &schema,
2838 )
2839 .expect("hash record should encode");
2840
2841 let embedding = encoded
2842 .into_iter()
2843 .find(|(field, _)| field == "embedding")
2844 .map(|(_, value)| value)
2845 .expect("embedding should be encoded");
2846
2847 match embedding {
2848 EncodedHashValue::Binary(bytes) => assert_eq!(bytes.len(), 12),
2849 EncodedHashValue::String(_) => panic!("vector field should encode to binary bytes"),
2850 }
2851 }
2852
2853 #[test]
2856 fn parse_aggregate_result_should_produce_document_maps() {
2857 let value = redis::Value::Array(vec![
2859 redis::Value::Int(2),
2860 redis::Value::Array(vec![
2861 redis::Value::BulkString(b"user".to_vec()),
2862 redis::Value::BulkString(b"alice".to_vec()),
2863 redis::Value::BulkString(b"hybrid_score".to_vec()),
2864 redis::Value::BulkString(b"0.85".to_vec()),
2865 ]),
2866 redis::Value::Array(vec![
2867 redis::Value::BulkString(b"user".to_vec()),
2868 redis::Value::BulkString(b"bob".to_vec()),
2869 redis::Value::BulkString(b"hybrid_score".to_vec()),
2870 redis::Value::BulkString(b"0.72".to_vec()),
2871 ]),
2872 ]);
2873
2874 let docs = parse_aggregate_result(value).expect("should parse");
2875 assert_eq!(docs.len(), 2);
2876 assert_eq!(docs[0]["user"], "alice");
2877 assert_eq!(docs[0]["hybrid_score"], "0.85");
2878 assert_eq!(docs[1]["user"], "bob");
2879 }
2880
2881 #[test]
2882 fn parse_aggregate_result_should_strip_internal_score() {
2883 let value = redis::Value::Array(vec![
2884 redis::Value::Int(1),
2885 redis::Value::Array(vec![
2886 redis::Value::BulkString(b"__score".to_vec()),
2887 redis::Value::BulkString(b"1.0".to_vec()),
2888 redis::Value::BulkString(b"user".to_vec()),
2889 redis::Value::BulkString(b"alice".to_vec()),
2890 ]),
2891 ]);
2892
2893 let docs = parse_aggregate_result(value).expect("should parse");
2894 assert_eq!(docs.len(), 1);
2895 assert!(
2896 !docs[0].contains_key("__score"),
2897 "internal __score should be stripped"
2898 );
2899 assert_eq!(docs[0]["user"], "alice");
2900 }
2901
2902 #[test]
2903 fn parse_aggregate_result_should_handle_nil() {
2904 let docs = parse_aggregate_result(redis::Value::Nil).expect("should parse");
2905 assert!(docs.is_empty());
2906 }
2907
2908 #[test]
2911 fn schema_from_info_should_reconstruct_basic_schema() {
2912 let mut info = Map::new();
2913 info.insert(
2914 "index_definition".to_owned(),
2915 json!(["key_type", "HASH", "prefixes", ["rvl"]]),
2916 );
2917 info.insert(
2918 "attributes".to_owned(),
2919 json!([
2920 ["identifier", "$.name", "attribute", "name", "type", "TAG"],
2921 ["identifier", "$.age", "attribute", "age", "type", "NUMERIC"],
2922 ]),
2923 );
2924
2925 let schema = schema_from_info("test_index", &info).expect("should parse");
2926 assert_eq!(schema.index.name, "test_index");
2927 assert_eq!(schema.fields.len(), 2);
2928 assert_eq!(schema.fields[0].name, "name");
2929 assert_eq!(schema.fields[1].name, "age");
2930 }
2931
2932 #[test]
2933 fn schema_from_info_should_detect_json_storage() {
2934 let mut info = Map::new();
2935 info.insert(
2936 "index_definition".to_owned(),
2937 json!(["key_type", "JSON", "prefixes", ["myprefix"]]),
2938 );
2939 info.insert("attributes".to_owned(), json!([]));
2940
2941 let schema = schema_from_info("json_idx", &info).expect("should parse");
2942 assert!(matches!(schema.index.storage_type, StorageType::Json));
2943 }
2944
2945 #[test]
2946 fn schema_from_info_should_parse_vector_fields() {
2947 let mut info = Map::new();
2948 info.insert(
2949 "index_definition".to_owned(),
2950 json!(["key_type", "HASH", "prefixes", ["rvl"]]),
2951 );
2952 info.insert(
2953 "attributes".to_owned(),
2954 json!([[
2955 "identifier",
2956 "embedding",
2957 "attribute",
2958 "embedding",
2959 "type",
2960 "VECTOR",
2961 "HNSW",
2962 "6",
2963 "DIM",
2964 "768",
2965 "DISTANCE_METRIC",
2966 "COSINE",
2967 "TYPE",
2968 "FLOAT32"
2969 ]]),
2970 );
2971
2972 let schema = schema_from_info("vec_idx", &info).expect("should parse");
2973 assert_eq!(schema.fields.len(), 1);
2974 let field = &schema.fields[0];
2975 assert_eq!(field.name, "embedding");
2976 match &field.kind {
2977 crate::schema::FieldKind::Vector { attrs } => {
2978 assert_eq!(attrs.dims, 768);
2979 assert!(matches!(
2980 attrs.distance_metric,
2981 crate::schema::VectorDistanceMetric::Cosine
2982 ));
2983 assert!(matches!(
2984 attrs.algorithm,
2985 crate::schema::VectorAlgorithm::Hnsw
2986 ));
2987 }
2988 _ => panic!("expected vector field kind"),
2989 }
2990 }
2991
2992 #[test]
2993 fn multi_prefix_index_should_report_correct_prefix_count_in_create_cmd() {
2994 let index = SearchIndex::from_json_value(
2995 serde_json::json!({
2996 "index": {
2997 "name": "multi_test",
2998 "prefix": ["pfx_a", "pfx_b"]
2999 },
3000 "fields": [
3001 { "name": "tag", "type": "tag" }
3002 ]
3003 }),
3004 "redis://127.0.0.1:6379",
3005 )
3006 .expect("index should parse");
3007
3008 assert_eq!(index.prefixes(), vec!["pfx_a", "pfx_b"]);
3012 assert_eq!(index.prefix(), "pfx_a");
3013 let _cmd = index.create_cmd();
3015 }
3016
3017 #[test]
3020 fn schema_from_info_should_normalize_tag_separator_default() {
3021 let mut info = Map::new();
3026 info.insert(
3027 "index_definition".to_owned(),
3028 json!(["key_type", "HASH", "prefixes", ["test"]]),
3029 );
3030 info.insert(
3031 "attributes".to_owned(),
3032 json!([[
3033 "identifier",
3034 "brand",
3035 "attribute",
3036 "brand",
3037 "type",
3038 "TAG",
3039 "SEPARATOR",
3040 ","
3041 ]]),
3042 );
3043
3044 let schema = schema_from_info("norm_test", &info).expect("should parse");
3045 match &schema.fields[0].kind {
3046 crate::schema::FieldKind::Tag { attrs } => {
3047 assert!(
3048 attrs.separator.is_none(),
3049 "default separator ',' should be normalized to None, got {:?}",
3050 attrs.separator
3051 );
3052 }
3053 other => panic!("expected tag field, got {other:?}"),
3054 }
3055 }
3056
3057 #[test]
3058 fn schema_from_info_should_preserve_non_default_tag_separator() {
3059 let mut info = Map::new();
3060 info.insert(
3061 "index_definition".to_owned(),
3062 json!(["key_type", "HASH", "prefixes", ["test"]]),
3063 );
3064 info.insert(
3065 "attributes".to_owned(),
3066 json!([[
3067 "identifier",
3068 "brand",
3069 "attribute",
3070 "brand",
3071 "type",
3072 "TAG",
3073 "SEPARATOR",
3074 "|"
3075 ]]),
3076 );
3077
3078 let schema = schema_from_info("norm_test", &info).expect("should parse");
3079 match &schema.fields[0].kind {
3080 crate::schema::FieldKind::Tag { attrs } => {
3081 assert_eq!(attrs.separator.as_deref(), Some("|"));
3082 }
3083 other => panic!("expected tag field, got {other:?}"),
3084 }
3085 }
3086
3087 #[test]
3088 fn schema_from_info_should_normalize_text_weight_default() {
3089 let mut info = Map::new();
3092 info.insert(
3093 "index_definition".to_owned(),
3094 json!(["key_type", "HASH", "prefixes", ["test"]]),
3095 );
3096 info.insert(
3097 "attributes".to_owned(),
3098 json!([[
3099 "identifier",
3100 "content",
3101 "attribute",
3102 "content",
3103 "type",
3104 "TEXT",
3105 "WEIGHT",
3106 "1"
3107 ]]),
3108 );
3109
3110 let schema = schema_from_info("norm_test", &info).expect("should parse");
3111 match &schema.fields[0].kind {
3112 crate::schema::FieldKind::Text { attrs } => {
3113 assert!(
3114 attrs.weight.is_none(),
3115 "default weight 1.0 should be normalized to None, got {:?}",
3116 attrs.weight
3117 );
3118 }
3119 other => panic!("expected text field, got {other:?}"),
3120 }
3121 }
3122
3123 #[test]
3124 fn schema_from_info_should_preserve_non_default_text_weight() {
3125 let mut info = Map::new();
3126 info.insert(
3127 "index_definition".to_owned(),
3128 json!(["key_type", "HASH", "prefixes", ["test"]]),
3129 );
3130 info.insert(
3131 "attributes".to_owned(),
3132 json!([[
3133 "identifier",
3134 "content",
3135 "attribute",
3136 "content",
3137 "type",
3138 "TEXT",
3139 "WEIGHT",
3140 "2.5"
3141 ]]),
3142 );
3143
3144 let schema = schema_from_info("norm_test", &info).expect("should parse");
3145 match &schema.fields[0].kind {
3146 crate::schema::FieldKind::Text { attrs } => {
3147 assert_eq!(attrs.weight, Some(2.5));
3148 }
3149 other => panic!("expected text field, got {other:?}"),
3150 }
3151 }
3152
3153 #[test]
3154 fn schema_from_info_json_roundtrip_should_match_original_schema() {
3155 let original = IndexSchema::from_json_value(json!({
3159 "index": {
3160 "name": "my_router",
3161 "prefix": "my_router",
3162 "storage_type": "hash"
3163 },
3164 "fields": [
3165 { "name": "ref_id", "type": "tag" },
3166 { "name": "route", "type": "tag" },
3167 { "name": "reference", "type": "text" },
3168 {
3169 "name": "vector",
3170 "type": "vector",
3171 "attrs": {
3172 "algorithm": "flat",
3173 "dims": 3,
3174 "datatype": "float32",
3175 "distance_metric": "cosine"
3176 }
3177 }
3178 ]
3179 }))
3180 .expect("original schema should parse");
3181
3182 let mut info = Map::new();
3184 info.insert(
3185 "index_definition".to_owned(),
3186 json!(["key_type", "HASH", "prefixes", ["my_router"]]),
3187 );
3188 info.insert(
3189 "attributes".to_owned(),
3190 json!([
3191 [
3192 "identifier",
3193 "ref_id",
3194 "attribute",
3195 "ref_id",
3196 "type",
3197 "TAG",
3198 "SEPARATOR",
3199 ","
3200 ],
3201 [
3202 "identifier",
3203 "route",
3204 "attribute",
3205 "route",
3206 "type",
3207 "TAG",
3208 "SEPARATOR",
3209 ","
3210 ],
3211 [
3212 "identifier",
3213 "reference",
3214 "attribute",
3215 "reference",
3216 "type",
3217 "TEXT",
3218 "WEIGHT",
3219 "1"
3220 ],
3221 [
3222 "identifier",
3223 "vector",
3224 "attribute",
3225 "vector",
3226 "type",
3227 "VECTOR",
3228 "FLAT",
3229 "6",
3230 "TYPE",
3231 "FLOAT32",
3232 "DIM",
3233 "3",
3234 "DISTANCE_METRIC",
3235 "COSINE"
3236 ]
3237 ]),
3238 );
3239 let reconstructed =
3240 schema_from_info("my_router", &info).expect("reconstructed schema should parse");
3241
3242 let original_json = original.to_json_value().expect("original to_json_value");
3243 let reconstructed_json = reconstructed
3244 .to_json_value()
3245 .expect("reconstructed to_json_value");
3246 assert_eq!(
3247 original_json, reconstructed_json,
3248 "original and reconstructed schemas should match after normalization\n\
3249 original: {original_json:#}\n\
3250 reconstructed: {reconstructed_json:#}"
3251 );
3252 }
3253
3254 #[test]
3255 fn f32_to_f16_basic_values() {
3256 use super::f32_to_f16_bytes;
3257
3258 assert_eq!(f32_to_f16_bytes(0.0), 0x0000);
3260 assert_eq!(f32_to_f16_bytes(-0.0), 0x8000);
3262 assert_eq!(f32_to_f16_bytes(1.0), 0x3C00);
3264 assert_eq!(f32_to_f16_bytes(-1.0), 0xBC00);
3266 assert_eq!(f32_to_f16_bytes(f32::INFINITY), 0x7C00);
3268 assert_eq!(f32_to_f16_bytes(f32::NEG_INFINITY), 0xFC00);
3270 let nan_bits = f32_to_f16_bytes(f32::NAN);
3272 assert_eq!(nan_bits & 0x7C00, 0x7C00, "NaN exponent should be all ones");
3273 assert_ne!(nan_bits & 0x03FF, 0, "NaN should have non-zero mantissa");
3274 }
3275}