Skip to main content

luci/
index.rs

1//! `Index` — unified facade for creating, writing, and searching a Luci index.
2//!
3//! This is the primary entry point for using Luci. It wires together
4//! storage, schema, analyzers, writer, and searcher into a single API.
5//!
6//! ```ignore
7//! use luci::search::expression::SearchExpression;
8//!
9//! let mut index = Index::create("/tmp/my_index", schema)?;
10//! index.add(json!({"title": "hello world", "status": "active"}))?;
11//! let expr = SearchExpression::from_json(json!({"match": {"title": "hello"}}), 10)?;
12//! let results = index.search(&expr)?;
13//! ```
14
15use std::path::Path;
16
17use crate::analysis::AnalyzerRegistry;
18use crate::analysis::config::AnalysisConfig;
19use crate::core::{LuciError, Result};
20use crate::mapping::Mapping;
21
22use crate::query::ast::ScoringExpression;
23use crate::query::parser::opt_str;
24use crate::search::{MissingValue, SortField, SortFieldType, SortOrder, SortValue};
25use crate::storage::SingleFileDirectory;
26
27use crate::search::expression::SearchExpression;
28
29use crate::query::Query as _;
30use crate::reader::IndexReader;
31use crate::search::searcher::Searcher;
32use crate::writer::IndexWriter;
33
34/// Parsed contents of the `user_metadata` blob written by
35/// [`IndexWriter::commit`].
36struct ParsedUserMetadata {
37    mapping: Mapping,
38    deletions: crate::deletion::DeletionMap,
39}
40
41/// Decode the `user_metadata` blob.
42///
43/// The blob holds only mapping + deletions; per-field vector indexes
44/// live in their own file extents managed by the storage layer (see
45/// [[global-vector-indices]]). The format is either
46/// length-prefixed `[mapping_len][mapping_json][deletion_bytes]` or,
47/// for the oldest indexes, raw mapping JSON with no deletions.
48fn parse_user_metadata(meta: &[u8]) -> Result<ParsedUserMetadata> {
49    if meta.is_empty() {
50        return Ok(ParsedUserMetadata {
51            mapping: Mapping::builder().build(),
52            deletions: crate::deletion::DeletionMap::new(),
53        });
54    }
55    if meta.first() == Some(&b'{') {
56        let json: serde_json::Value = serde_json::from_slice(meta)
57            .map_err(|e| LuciError::IndexCorrupted(format!("invalid mapping metadata: {e}")))?;
58        return Ok(ParsedUserMetadata {
59            mapping: Mapping::from_json(&json)?,
60            deletions: crate::deletion::DeletionMap::new(),
61        });
62    }
63    if meta.len() < 4 {
64        return Ok(ParsedUserMetadata {
65            mapping: Mapping::builder().build(),
66            deletions: crate::deletion::DeletionMap::new(),
67        });
68    }
69    let mapping_len = u32::from_le_bytes(meta[0..4].try_into().unwrap()) as usize;
70    let mapping_bytes = &meta[4..4 + mapping_len];
71    let json: serde_json::Value = serde_json::from_slice(mapping_bytes)
72        .map_err(|e| LuciError::IndexCorrupted(format!("invalid mapping metadata: {e}")))?;
73    let mapping = Mapping::from_json(&json)?;
74    let after_mapping = 4 + mapping_len;
75    let deletions = if after_mapping >= meta.len() {
76        crate::deletion::DeletionMap::new()
77    } else if meta.len() >= after_mapping + 4 {
78        // New format: deletion_bytes is length-prefixed.
79        let del_len =
80            u32::from_le_bytes(meta[after_mapping..after_mapping + 4].try_into().unwrap()) as usize;
81        let start = after_mapping + 4;
82        if start + del_len <= meta.len() {
83            crate::deletion::DeletionMap::from_bytes(&meta[start..start + del_len])?
84        } else {
85            // Older format wrote deletion bytes raw, no length prefix.
86            crate::deletion::DeletionMap::from_bytes(&meta[after_mapping..])?
87        }
88    } else {
89        crate::deletion::DeletionMap::from_bytes(&meta[after_mapping..])?
90    };
91    Ok(ParsedUserMetadata { mapping, deletions })
92}
93
94/// Parse `sort` from request JSON into sort field specs.
95///
96/// Returns `Ok(None)` only when the `sort` key is absent. A sort value
97/// of an unsupported type (e.g. a number), an unknown order/missing
98/// string, or an unknown per-field option returns `Err(InvalidQuery)`
99/// rather than silently dropping the sort — see
100/// [[fix-strict-search-parsing]] and [[code-must-not-lie]].
101pub fn parse_sort(
102    value: Option<&serde_json::Value>,
103) -> crate::core::Result<Option<Vec<crate::search::SortField>>> {
104    let arr = match value {
105        None => return Ok(None),
106        Some(serde_json::Value::Array(a)) => a,
107        Some(serde_json::Value::String(s)) => {
108            return Ok(Some(vec![parse_sort_item(&serde_json::Value::String(
109                s.clone(),
110            ))?]));
111        }
112        Some(obj @ serde_json::Value::Object(_)) => {
113            return Ok(Some(vec![parse_sort_item(obj)?]));
114        }
115        Some(other) => {
116            return Err(LuciError::InvalidQuery(format!(
117                "sort: must be a string, object, or array of those; got {other}"
118            )));
119        }
120    };
121
122    let items: std::result::Result<Vec<_>, _> = arr.iter().map(parse_sort_item).collect();
123    items.map(Some)
124}
125
126fn parse_sort_item(value: &serde_json::Value) -> crate::core::Result<crate::search::SortField> {
127    match value {
128        serde_json::Value::String(s) => {
129            let (field, default_order) = match s.as_str() {
130                "_score" => (SortFieldType::Score, SortOrder::Desc),
131                "_doc" => (SortFieldType::Doc, SortOrder::Asc),
132                name => (SortFieldType::Field(name.to_string()), SortOrder::Asc),
133            };
134            Ok(SortField {
135                field,
136                order: default_order,
137                missing: MissingValue::Last,
138            })
139        }
140        serde_json::Value::Object(obj) => {
141            let (name, spec) = obj.iter().next().ok_or_else(|| {
142                crate::core::LuciError::InvalidQuery("sort: entry must have a field name".into())
143            })?;
144            let (field, default_order) = match name.as_str() {
145                "_score" => (SortFieldType::Score, SortOrder::Desc),
146                "_doc" => (SortFieldType::Doc, SortOrder::Asc),
147                name => (SortFieldType::Field(name.to_string()), SortOrder::Asc),
148            };
149            let (order, missing) = match spec {
150                serde_json::Value::String(o) => {
151                    let ord = match o.as_str() {
152                        "asc" => SortOrder::Asc,
153                        "desc" => SortOrder::Desc,
154                        other => {
155                            return Err(LuciError::InvalidQuery(format!(
156                                "sort[{name}]: unknown order '{other}', expected 'asc' or 'desc'"
157                            )));
158                        }
159                    };
160                    (ord, MissingValue::Last)
161                }
162                serde_json::Value::Object(_) => {
163                    let ctx = format!("sort[{name}]");
164                    let opts = crate::search::expression::validate_obj_keys(
165                        spec,
166                        &["order", "missing"],
167                        &ctx,
168                    )?;
169                    let ord = match opt_str(opts, "order", &ctx)? {
170                        Some("asc") => SortOrder::Asc,
171                        Some("desc") => SortOrder::Desc,
172                        Some(other) => {
173                            return Err(LuciError::InvalidQuery(format!(
174                                "{ctx}: unknown order '{other}', expected 'asc' or 'desc'"
175                            )));
176                        }
177                        None => default_order,
178                    };
179                    let miss = match opt_str(opts, "missing", &ctx)? {
180                        Some("_first") => MissingValue::First,
181                        Some("_last") => MissingValue::Last,
182                        Some(other) => {
183                            return Err(LuciError::InvalidQuery(format!(
184                                "{ctx}: unknown missing '{other}', expected '_first' or '_last'"
185                            )));
186                        }
187                        None => MissingValue::Last,
188                    };
189                    (ord, miss)
190                }
191                _ => {
192                    return Err(LuciError::InvalidQuery(format!(
193                        "sort[{name}]: spec must be \"asc\"/\"desc\" or an object, got {spec}"
194                    )));
195                }
196            };
197            Ok(SortField {
198                field,
199                order,
200                missing,
201            })
202        }
203        _ => Err(LuciError::InvalidQuery(format!(
204            "sort: entry must be a field-name string or a {{field: spec}} object, got {value}"
205        ))),
206    }
207}
208
209/// Extract inner_hit specs from the query expression tree.
210pub(crate) fn extract_inner_hit_specs(
211    ast: &ScoringExpression,
212    searcher: &crate::search::searcher::Searcher,
213) -> crate::core::Result<Vec<crate::query::nested::InnerHitSpec>> {
214    let mut specs = Vec::new();
215    collect_inner_hit_specs(ast, searcher, &mut specs)?;
216    Ok(specs)
217}
218
219fn collect_inner_hit_specs(
220    ast: &ScoringExpression,
221    searcher: &crate::search::searcher::Searcher,
222    specs: &mut Vec<crate::query::nested::InnerHitSpec>,
223) -> crate::core::Result<()> {
224    match ast {
225        ScoringExpression::Nested {
226            path,
227            query,
228            inner_hits: Some(config),
229        } => {
230            // Bind a separate weight for inner hit resolution. A bind error
231            // (e.g. a bad-field knn under the nested query) must surface, not
232            // be swallowed into a missing inner_hits block. See
233            // [[feature-knn-query-type]] §4b.
234            let weight = query.bind(searcher, crate::core::ScoreMode::Complete)?;
235            let name = config.name.clone().unwrap_or_else(|| path.clone());
236            specs.push(crate::query::nested::InnerHitSpec {
237                name,
238                path: path.clone(),
239                config: config.clone(),
240                weight,
241            });
242        }
243        ScoringExpression::Bool {
244            must,
245            should,
246            must_not,
247            filter,
248            ..
249        } => {
250            for sub in must.iter().chain(should).chain(must_not).chain(filter) {
251                collect_inner_hit_specs(sub, searcher, specs)?;
252            }
253        }
254        ScoringExpression::Nested { query, .. } => {
255            collect_inner_hit_specs(query, searcher, specs)?;
256        }
257        _ => {}
258    }
259    Ok(())
260}
261
262/// Parse `search_after` cursor values from request JSON.
263pub fn parse_search_after(
264    value: Option<&serde_json::Value>,
265) -> crate::core::Result<Option<Vec<crate::search::SortValue>>> {
266    let arr = match value {
267        None | Some(serde_json::Value::Null) => return Ok(None),
268        Some(serde_json::Value::Array(a)) => a,
269        Some(other) => {
270            return Err(LuciError::InvalidQuery(format!(
271                "search_after: must be an array of cursor values, got {other}"
272            )));
273        }
274    };
275    let values = arr
276        .iter()
277        .map(|v| match v {
278            serde_json::Value::Number(n) => {
279                if let Some(i) = n.as_i64() {
280                    Ok(SortValue::I64(i))
281                } else if let Some(f) = n.as_f64() {
282                    Ok(SortValue::F64(f))
283                } else {
284                    Err(LuciError::InvalidQuery(format!(
285                        "search_after: numeric cursor value out of range: {n}"
286                    )))
287                }
288            }
289            serde_json::Value::String(s) => Ok(SortValue::Str(s.clone())),
290            serde_json::Value::Bool(b) => Ok(SortValue::Bool(*b)),
291            serde_json::Value::Null => Ok(SortValue::Null),
292            other => Err(LuciError::InvalidQuery(format!(
293                "search_after: cursor values must be a number, string, boolean, or null; \
294                 got {other}"
295            ))),
296        })
297        .collect::<crate::core::Result<Vec<_>>>()?;
298    Ok(Some(values))
299}
300
301/// Parse `_source` from request JSON into a `SourceFilter`.
302pub fn parse_source_filter(value: Option<&serde_json::Value>) -> crate::search::SourceFilter {
303    use crate::search::SourceFilter;
304    match value {
305        None | Some(serde_json::Value::Bool(true)) => SourceFilter::Enabled,
306        Some(serde_json::Value::Bool(false)) => SourceFilter::Disabled,
307        Some(serde_json::Value::String(s)) => SourceFilter::Fields(vec![s.clone()]),
308        Some(serde_json::Value::Array(arr)) => {
309            let fields: Vec<String> = arr
310                .iter()
311                .filter_map(|v| v.as_str().map(String::from))
312                .collect();
313            SourceFilter::Fields(fields)
314        }
315        Some(serde_json::Value::Object(obj)) => {
316            let includes = obj
317                .get("includes")
318                .and_then(|v| v.as_array())
319                .map(|arr| {
320                    arr.iter()
321                        .filter_map(|v| v.as_str().map(String::from))
322                        .collect()
323                })
324                .unwrap_or_default();
325            let excludes = obj
326                .get("excludes")
327                .and_then(|v| v.as_array())
328                .map(|arr| {
329                    arr.iter()
330                        .filter_map(|v| v.as_str().map(String::from))
331                        .collect()
332                })
333                .unwrap_or_default();
334            SourceFilter::IncludeExclude { includes, excludes }
335        }
336        _ => SourceFilter::Enabled,
337    }
338}
339
340/// A Luci search index.
341///
342/// Provides a unified API for indexing and searching JSON documents.
343/// Owns the storage, schema, writer, and search state.
344pub struct Index {
345    schema: Mapping,
346    analysis_config: Option<AnalysisConfig>,
347    /// Writer state — behind Mutex for exclusive write access.
348    writer: std::sync::Mutex<WriterState>,
349    /// Reader state — behind RwLock for concurrent read access.
350    reader: std::sync::RwLock<ReaderState>,
351    /// Shared file handle for reader refresh. Avoids opening a new fd,
352    /// which would break `fcntl` lock semantics.
353    /// See [[architecture-cross-process-locking#Critical Constraint]].
354    file_handle: std::sync::Arc<std::fs::File>,
355    /// Transaction state: true when a transaction is active.
356    /// Protected by `txn_mutex` + `txn_condvar` for blocking.
357    /// See [[architecture-concurrency-transactions]].
358    txn_mutex: std::sync::Mutex<bool>,
359    txn_condvar: std::sync::Condvar,
360}
361
362struct WriterState {
363    writer: IndexWriter,
364    commit_generation: u64,
365}
366
367struct ReaderState {
368    /// Committed segment state. `Arc`-wrapped so `SearchResults` can
369    /// hold a reference that outlives the next commit.
370    segment_store: Option<std::sync::Arc<crate::search::segment_store::SegmentStore>>,
371    store_generation: u64,
372}
373
374impl Index {
375    /// Create a new index at the given path with fully dynamic mappings.
376    ///
377    /// All field types are inferred from the first document that contains each
378    /// field. Mappings are persisted to disk on commit.
379    pub fn create(path: impl AsRef<Path>) -> Result<Self> {
380        Self::create_with_mapping(path, Mapping::builder().build())
381    }
382
383    /// Create a new index with explicit field mappings.
384    ///
385    /// Unknown fields are handled according to the mapping's `dynamic` mode
386    /// (default: `true`, meaning auto-map and index).
387    pub fn create_with_mapping(path: impl AsRef<Path>, mapping: Mapping) -> Result<Self> {
388        Self::create_with_settings(path, mapping, None)
389    }
390
391    /// Create a new index with explicit mappings and analysis settings.
392    ///
393    /// The `analysis_config` defines custom analyzers, tokenizers, char filters,
394    /// and token filters. It is persisted in the index metadata so that
395    /// [`open`](Self::open) can rebuild the analyzer registry.
396    ///
397    /// See [[feature-analysis-pipeline]].
398    pub fn create_with_settings(
399        path: impl AsRef<Path>,
400        mut mapping: Mapping,
401        analysis_config: Option<AnalysisConfig>,
402    ) -> Result<Self> {
403        // Validate cross-field references (e.g., copy_to targets must
404        // exist in the schema) before any work is done. Catches
405        // builder-API users who skipped Mapping::from_json — see
406        // [[code-must-not-lie]].
407        mapping.validate()?;
408        mapping.ensure_id_field();
409        let path = path.as_ref().to_path_buf();
410        let storage = SingleFileDirectory::create(&path)?;
411        let file_handle = storage.file_handle();
412        let writer_analyzers = match &analysis_config {
413            Some(config) => config
414                .build_registry()
415                .map_err(|e| LuciError::InvalidQuery(e))?,
416            None => AnalyzerRegistry::new(),
417        };
418        let mut writer = IndexWriter::new(storage, mapping.clone(), writer_analyzers);
419        // Persist analysis config so open() can rebuild the registry.
420        if let Some(ref config) = analysis_config {
421            writer.set_analysis_json(Some(config.to_json()));
422        }
423        Ok(Self {
424            schema: mapping,
425            analysis_config,
426            writer: std::sync::Mutex::new(WriterState {
427                writer,
428                commit_generation: 0,
429            }),
430            reader: std::sync::RwLock::new(ReaderState {
431                segment_store: None,
432                store_generation: 0,
433            }),
434            file_handle,
435            txn_mutex: std::sync::Mutex::new(false),
436            txn_condvar: std::sync::Condvar::new(),
437        })
438    }
439
440    /// Open an existing index at the given path.
441    ///
442    /// Mappings are loaded from disk (persisted on each commit). If the index
443    /// has no persisted mappings (pre-persistence format), an empty mapping
444    /// with `dynamic=true` is used.
445    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
446        let path = path.as_ref().to_path_buf();
447        let storage = SingleFileDirectory::open(&path)?;
448        let generation = storage.generation();
449
450        // Load persisted mapping + deletions + global HNSW from user
451        // metadata. The framing dispatch lives in `parse_user_metadata`
452        // — see [[global-vector-indices]] for the v2 format.
453        let meta = storage.user_metadata();
454        let parsed = parse_user_metadata(meta)?;
455        let mut mapping = parsed.mapping;
456        mapping.ensure_id_field();
457
458        // Reconstruct the global HNSW from per-field vector indexes
459        // managed by the storage layer (one extent per field).
460        let global_hnsw = crate::vector::global::GlobalHnsw::new(&mapping);
461        for field_id in storage.vector_index_fields() {
462            let Some(bytes) = storage.read_vector_index(field_id)? else {
463                continue;
464            };
465            global_hnsw.load_field(field_id, &bytes)?;
466        }
467
468        // Load analysis config from settings if present
469        let analysis_config = Self::load_analysis_config_from_storage(&storage);
470        let writer_analyzers = match &analysis_config {
471            Some(config) => config
472                .build_registry()
473                .unwrap_or_else(|_| AnalyzerRegistry::new()),
474            None => AnalyzerRegistry::new(),
475        };
476        let file_handle = storage.file_handle();
477        let mut writer = IndexWriter::new(storage, mapping.clone(), writer_analyzers);
478        if let Some(ref config) = analysis_config {
479            writer.set_analysis_json(Some(config.to_json()));
480        }
481        writer.load_deletions(parsed.deletions);
482        writer.load_global_hnsw(global_hnsw);
483        Ok(Self {
484            schema: mapping,
485            analysis_config,
486            writer: std::sync::Mutex::new(WriterState {
487                writer,
488                commit_generation: generation,
489            }),
490            reader: std::sync::RwLock::new(ReaderState {
491                segment_store: None,
492                store_generation: 0,
493            }),
494            file_handle,
495            txn_mutex: std::sync::Mutex::new(false),
496            txn_condvar: std::sync::Condvar::new(),
497        })
498    }
499
500    /// Load analysis config from storage metadata.
501    ///
502    /// Looks for a `"settings"."analysis"` key in the persisted mapping JSON.
503    fn load_analysis_config_from_storage(storage: &SingleFileDirectory) -> Option<AnalysisConfig> {
504        let meta = storage.user_metadata();
505        if meta.is_empty() {
506            return None;
507        }
508        let parsed = parse_user_metadata(meta).ok()?;
509        let json = parsed.mapping.to_json();
510        let analysis = json.get("settings")?.get("analysis")?;
511        AnalysisConfig::from_json(analysis).ok()
512    }
513
514    /// Add a document. Auto-commits — the document is immediately searchable.
515    ///
516    /// Blocks if a transaction is active (waits for it to complete).
517    /// See [[architecture-concurrency-transactions]].
518    pub fn add(&self, doc: serde_json::Value) -> Result<()> {
519        self.wait_for_transaction();
520        let mut w = self.writer.lock().unwrap();
521        w.writer.add(doc)?;
522        self.commit_inner(&mut w)
523    }
524
525    /// Add multiple documents in a single call. Auto-commits at the end —
526    /// all documents are searchable after this returns.
527    ///
528    /// Blocks if a transaction is active (waits for it to complete).
529    /// Reduces Python→Rust FFI overhead from one call per document to one
530    /// call per batch. Fails fast on the first invalid document.
531    ///
532    /// Returns `{"took": <ms>, "count": <n>}`.
533    ///
534    /// See [[feature-bulk-api]].
535    pub fn bulk(&self, docs: Vec<serde_json::Value>) -> Result<serde_json::Value> {
536        self.wait_for_transaction();
537        let start = std::time::Instant::now();
538        let total = docs.len();
539        let mut w = self.writer.lock().unwrap();
540        for (i, doc) in docs.into_iter().enumerate() {
541            w.writer
542                .add(doc)
543                .map_err(|e| LuciError::InvalidQuery(format!("bulk item {i}: {e}")))?;
544        }
545        self.commit_inner(&mut w)?;
546        Ok(serde_json::json!({
547            "took": start.elapsed().as_millis() as u64,
548            "count": total
549        }))
550    }
551
552    /// Get a document by its `_id`. Returns the source if found.
553    /// See [[feature-document-crud]].
554    pub fn get(&self, id: &str) -> Result<Option<serde_json::Value>> {
555        let expr = SearchExpression::from_json(serde_json::json!({"term": {"_id": id}}), 10)?;
556        let results = self.search(&expr)?;
557        Ok(results.hit(0).and_then(|h| h.source()))
558    }
559
560    /// Delete a document by its `_id`. Returns true if found and deleted.
561    /// See [[feature-document-crud]].
562    pub fn delete(&self, id: &str) -> Result<bool> {
563        // delete() auto-commits below, so block until any active transaction
564        // completes (consistent with add()/bulk()) — otherwise the commit
565        // would prematurely flush the transaction's buffered documents.
566        self.wait_for_transaction();
567        let expr = SearchExpression::from_json(serde_json::json!({"term": {"_id": id}}), 1)?;
568        let results = self.search(&expr)?;
569        if let Some(hit) = results.hit(0) {
570            let mut w = self.writer.lock().unwrap();
571            w.writer.mark_deleted(hit.segment_id(), hit.doc_id());
572            // Auto-commit so the deletion is durable on return, matching
573            // add()/bulk(). Without this a bare delete() is lost on process
574            // exit. See luci-index/tests/deletion_persistence.rs.
575            self.commit_inner(&mut w)?;
576            return Ok(true);
577        }
578        Ok(false)
579    }
580
581    /// Update a document by its `_id` with a partial doc merge.
582    /// Returns true if found and updated. See [[feature-document-crud]].
583    pub fn update(&self, id: &str, partial_doc: serde_json::Value) -> Result<bool> {
584        // Auto-commits below; wait out any active transaction (see delete()).
585        self.wait_for_transaction();
586        let expr = SearchExpression::from_json(serde_json::json!({"term": {"_id": id}}), 10)?;
587        let results = self.search(&expr)?;
588        let (seg_id, doc_id, mut source) = match results.hit(0) {
589            Some(hit) => match hit.source() {
590                Some(s) => (hit.segment_id(), hit.doc_id(), s),
591                None => return Ok(false),
592            },
593            None => return Ok(false),
594        };
595
596        // Merge partial doc into existing source
597        if let (Some(existing_obj), Some(partial_obj)) =
598            (source.as_object_mut(), partial_doc.as_object())
599        {
600            for (k, v) in partial_obj {
601                existing_obj.insert(k.clone(), v.clone());
602            }
603        }
604
605        // Preserve the _id
606        if let Some(obj) = source.as_object_mut() {
607            obj.insert("_id".to_string(), serde_json::Value::String(id.to_string()));
608        }
609
610        // Replace atomically: mark the old doc deleted and add the new version
611        // in a single commit, so a crash can't leave the doc deleted but not
612        // re-added (which separate self.delete()+self.add() would now risk,
613        // since delete() auto-commits).
614        let mut w = self.writer.lock().unwrap();
615        w.writer.mark_deleted(seg_id, doc_id);
616        w.writer.add(source)?;
617        self.commit_inner(&mut w)?;
618        Ok(true)
619    }
620
621    /// Delete all documents matching a query. Returns count deleted.
622    /// See [[feature-document-crud]].
623    pub fn delete_by_query(&self, query: serde_json::Value) -> Result<u64> {
624        // Auto-commits below; wait out any active transaction (see delete()).
625        self.wait_for_transaction();
626        let expr = SearchExpression::from_json(
627            serde_json::json!({"query": query, "size": 10000, "_source": false}),
628            10000,
629        )?;
630        let results = self.search(&expr)?;
631        let count = results.len() as u64;
632        let mut w = self.writer.lock().unwrap();
633        for hit in results.iter() {
634            w.writer.mark_deleted(hit.segment_id(), hit.doc_id());
635        }
636        // Commit once after marking the whole batch (durable on return,
637        // matching add()/bulk()). Skip the commit when nothing matched.
638        if count > 0 {
639            self.commit_inner(&mut w)?;
640        }
641        Ok(count)
642    }
643
644    /// Count documents matching a query.
645    /// See [[feature-document-crud]].
646    pub fn count(&self, query: serde_json::Value) -> Result<u64> {
647        let expr = SearchExpression::from_json(
648            serde_json::json!({"query": query, "_source": false, "size": 100000}),
649            100000,
650        )?;
651        let results = self.search(&expr)?;
652        Ok(results.len() as u64)
653    }
654
655    /// Flush and commit buffered documents. Internal — called automatically
656    /// by `add()` and `bulk()`. Not part of the public API.
657    /// Takes a mutable reference to WriterState (caller holds the Mutex).
658    fn commit_inner(&self, w: &mut WriterState) -> Result<()> {
659        w.writer.commit()?;
660        w.commit_generation += 1;
661        Ok(())
662    }
663
664    /// Ensure the reader's segment store is up-to-date with the latest commit.
665    /// If the reader is stale, rebuilds it under a write lock.
666    fn refresh_reader(&self) -> Result<()> {
667        let commit_gen = self.writer.lock().unwrap().commit_generation;
668        let store_gen = self.reader.read().unwrap().store_generation;
669
670        if store_gen == commit_gen && self.reader.read().unwrap().segment_store.is_some() {
671            return Ok(());
672        }
673
674        // Stale — rebuild using the shared file handle to avoid
675        // opening a new fd (which would break fcntl lock semantics).
676        let storage = SingleFileDirectory::open_from_handle(self.file_handle.clone())?;
677        let reader = IndexReader::open(&storage)?;
678        let store_analyzers = match &self.analysis_config {
679            Some(config) => config
680                .build_registry()
681                .unwrap_or_else(|_| AnalyzerRegistry::new()),
682            None => AnalyzerRegistry::new(),
683        };
684
685        // Snapshot the global HNSW from the same commit the writer just
686        // produced. Reading from storage rather than sharing an `Arc`
687        // with the writer keeps the read-side view frozen until the
688        // next commit, matching the existing reader-isolation semantics
689        // of `SegmentReader`.
690        let global_hnsw = {
691            let g = crate::vector::global::GlobalHnsw::new(&self.schema);
692            for field_id in storage.vector_index_fields() {
693                if let Some(bytes) = storage.read_vector_index(field_id)? {
694                    g.load_field(field_id, &bytes)?;
695                }
696            }
697            Some(std::sync::Arc::new(g))
698        };
699
700        let store = crate::search::segment_store::SegmentStore::new(
701            reader.into_segments(),
702            store_analyzers,
703            Some(self.schema.clone()),
704            global_hnsw,
705        );
706
707        let mut r = self.reader.write().unwrap();
708        r.segment_store = Some(std::sync::Arc::new(store));
709        r.store_generation = commit_gen;
710        Ok(())
711    }
712
713    /// Force-merge all segments down to at most `max_segments`.
714    ///
715    /// Call after bulk indexing is complete to optimize for query performance.
716    /// Invalidates the cached searcher.
717    pub fn force_merge(&self, max_segments: usize) -> Result<()> {
718        let mut w = self.writer.lock().unwrap();
719        w.writer.force_merge(max_segments)?;
720        w.commit_generation += 1;
721        Ok(())
722    }
723
724    /// Search the index with a native SearchExpression.
725    ///
726    /// Thin entry point: refreshes the reader, delegates to
727    /// `Searcher::execute_query`, then handles deletion filtering
728    /// and total_hits capping.
729    pub fn search(
730        &self,
731        expr: &crate::search::expression::SearchExpression,
732    ) -> Result<crate::search::results::SearchResults> {
733        use crate::query::ast::QueryExpression;
734
735        // Ensure reader is up-to-date with the latest commit
736        self.refresh_reader()?;
737
738        let r = self.reader.read().unwrap();
739        let store = r.segment_store.as_ref().unwrap();
740        let searcher = Searcher::new(store);
741
742        // Resolve the query — default to match_all
743        let match_all = QueryExpression::Scoring(ScoringExpression::MatchAll);
744        let query = expr.query.as_ref().unwrap_or(&match_all);
745
746        // Searcher handles all dispatch (scoring vs ranking). Errors
747        // from bind / scorer_supplier / scorer propagate — see
748        // [[fix-silent-scorer-errors]].
749        let mut results = searcher.execute_query(query, expr)?;
750
751        // Apply rescore
752        if let Some(ref rescore) = expr.rescore {
753            searcher.apply_rescore(
754                &mut results,
755                rescore.query.as_ref(),
756                rescore.window_size,
757                rescore.query_weight,
758                rescore.rescore_query_weight,
759                rescore.score_mode,
760            )?;
761        }
762
763        // Clone the Arc<SegmentStore> for the SearchResults, then drop the
764        // reader lock before acquiring the writer lock for deletion filtering.
765        // This maintains the lock ordering rule: writer before reader.
766        let store = r.segment_store.as_ref().unwrap().clone();
767        drop(r);
768
769        // Filter out deleted documents
770        let w = self.writer.lock().unwrap();
771        let deletions = w.writer.deletions();
772        let pre_len = results.hits.len() as u64;
773        results
774            .hits
775            .retain(|hit| !deletions.is_deleted(hit.segment_id, hit.doc_id));
776        let removed = pre_len - results.hits.len() as u64;
777        drop(w);
778
779        // Adjust total_hits
780        if removed > 0 || results.total_hits.value > results.hits.len() as u64 {
781            results.total_hits =
782                crate::search::TotalHits::exact(results.total_hits.value.saturating_sub(removed));
783        }
784
785        // Apply track_total_hits capping
786        results.total_hits =
787            crate::search::TotalHits::resolve(results.total_hits.value, expr.track_total_hits);
788
789        // Wrap ScoringResults with SegmentStore for lazy Hit access
790        Ok(crate::search::results::SearchResults::new(
791            results.hits,
792            results.total_hits,
793            results.aggregations,
794            store,
795            expr.query.clone(),
796        ))
797    }
798
799    /// Set the memory budget for auto-flush (bytes).
800    ///
801    /// When the in-memory buffer exceeds this size, it is automatically
802    /// flushed to a new segment. Smaller values produce more segments
803    /// (better parallelism, more merge work). Default: 64 MB.
804    pub fn set_memory_budget(&self, budget: usize) {
805        self.writer.lock().unwrap().writer.set_memory_budget(budget);
806    }
807
808    /// Set the timeout for acquiring the cross-process write lock.
809    ///
810    /// Default: 5 seconds. If another process holds the write lock,
811    /// retries until the timeout, then returns `WriterLocked`.
812    /// Not persisted — applies only to this session.
813    pub fn set_write_timeout(&self, timeout: std::time::Duration) {
814        self.writer
815            .lock()
816            .unwrap()
817            .writer
818            .set_write_timeout(timeout);
819    }
820
821    /// Get the schema.
822    pub fn schema(&self) -> &Mapping {
823        &self.schema
824    }
825
826    /// Number of buffered (uncommitted) documents.
827    pub fn buffered_doc_count(&self) -> u32 {
828        self.writer.lock().unwrap().writer.buffered_doc_count()
829    }
830
831    // --- Transaction API ---
832    // These methods support the Python Transaction context manager.
833    // See [[architecture-concurrency-transactions]].
834
835    /// Block until no transaction is active.
836    ///
837    /// Called by `add()` and `bulk()` to wait for any active transaction
838    /// to complete before proceeding with auto-commit writes.
839    fn wait_for_transaction(&self) {
840        let guard = self.txn_mutex.lock().unwrap();
841        // Drop the guard when the condition is met (txn not active).
842        let _guard = self
843            .txn_condvar
844            .wait_while(guard, |active| *active)
845            .unwrap();
846    }
847
848    /// Begin a transaction. Blocks if another transaction is active.
849    ///
850    /// While a transaction is active, `add()` and `bulk()` will block
851    /// until the transaction completes.
852    pub fn begin_transaction(&self) -> Result<()> {
853        let mut active = self.txn_mutex.lock().unwrap();
854        // Wait for any existing transaction to finish.
855        active = self.txn_condvar.wait_while(active, |a| *a).unwrap();
856        *active = true;
857        Ok(())
858    }
859
860    /// End a transaction, waking any blocked writers.
861    ///
862    /// Must only be called after a successful `begin_transaction()`.
863    pub fn end_transaction(&self) {
864        let mut active = self.txn_mutex.lock().unwrap();
865        *active = false;
866        self.txn_condvar.notify_all();
867    }
868
869    /// Whether a transaction is currently active.
870    pub fn is_transaction_active(&self) -> bool {
871        *self.txn_mutex.lock().unwrap()
872    }
873
874    /// Add a document without committing. For use inside a transaction.
875    ///
876    /// The caller must have called `begin_transaction()` first.
877    pub fn txn_add(&self, doc: serde_json::Value) -> Result<()> {
878        let mut w = self.writer.lock().unwrap();
879        w.writer.add(doc)?;
880        Ok(())
881    }
882
883    /// Commit all buffered documents. For use inside a transaction.
884    ///
885    /// The caller must have called `begin_transaction()` first.
886    pub fn txn_commit(&self) -> Result<()> {
887        let mut w = self.writer.lock().unwrap();
888        self.commit_inner(&mut w)
889    }
890
891    /// Discard all buffered (uncommitted) documents. For transaction rollback.
892    ///
893    /// Resets the writer's in-memory buffer without flushing to storage.
894    pub fn txn_rollback(&self) {
895        let mut w = self.writer.lock().unwrap();
896        w.writer.discard_buffer();
897    }
898}
899
900#[cfg(test)]
901mod tests {
902    use super::*;
903    use crate::mapping::FieldType;
904    use serde_json::json;
905
906    fn test_dir(name: &str) -> std::path::PathBuf {
907        let dir =
908            std::env::temp_dir().join(format!("luci_index_facade_{}_{name}", std::process::id()));
909        let _ = std::fs::remove_dir_all(&dir);
910        dir
911    }
912
913    fn cleanup(path: &Path) {
914        let _ = std::fs::remove_dir_all(path);
915    }
916
917    fn test_schema() -> Mapping {
918        Mapping::builder()
919            .field("title", FieldType::Text)
920            .field("body", FieldType::Text)
921            .field("status", FieldType::Keyword)
922            .build()
923    }
924
925    #[test]
926    fn create_add_commit_search() {
927        let path = test_dir("basic");
928        let index = Index::create_with_mapping(&path, test_schema()).unwrap();
929
930        index
931            .add(json!({"title": "Hello World", "body": "A greeting", "status": "published"}))
932            .unwrap();
933        index
934            .add(json!({"title": "Search Engine", "body": "Building search", "status": "draft"}))
935            .unwrap();
936
937        let results = index
938            .search(&SearchExpression::from_json(json!({"match": {"title": "hello"}}), 10).unwrap())
939            .unwrap();
940        assert_eq!(results.total_hits().value, 1);
941        assert_eq!(
942            results.hit(0).unwrap().source().unwrap()["title"],
943            "Hello World"
944        );
945
946        cleanup(&path);
947    }
948
949    #[test]
950    fn search_with_query_wrapper() {
951        let path = test_dir("wrapper");
952        let index = Index::create_with_mapping(&path, test_schema()).unwrap();
953
954        index
955            .add(json!({"title": "Rust Programming", "status": "published"}))
956            .unwrap();
957        index
958            .add(json!({"title": "Go Programming", "status": "published"}))
959            .unwrap();
960
961        let results = index
962            .search(
963                &SearchExpression::from_json(json!({"query": {"match": {"title": "rust"}}}), 10)
964                    .unwrap(),
965            )
966            .unwrap();
967        assert_eq!(results.total_hits().value, 1);
968
969        cleanup(&path);
970    }
971
972    #[test]
973    fn term_query_on_keyword() {
974        let path = test_dir("keyword");
975        let index = Index::create_with_mapping(&path, test_schema()).unwrap();
976
977        index
978            .add(json!({"title": "A", "status": "published"}))
979            .unwrap();
980        index.add(json!({"title": "B", "status": "draft"})).unwrap();
981        index
982            .add(json!({"title": "C", "status": "published"}))
983            .unwrap();
984
985        let results = index
986            .search(
987                &SearchExpression::from_json(json!({"term": {"status": "published"}}), 10).unwrap(),
988            )
989            .unwrap();
990        assert_eq!(results.total_hits().value, 2);
991
992        cleanup(&path);
993    }
994
995    #[test]
996    fn bool_query_end_to_end() {
997        let path = test_dir("bool");
998        let index = Index::create_with_mapping(&path, test_schema()).unwrap();
999
1000        index
1001            .add(json!({"title": "Search Engine Design", "status": "published"}))
1002            .unwrap();
1003        index
1004            .add(json!({"title": "Search Tips", "status": "draft"}))
1005            .unwrap();
1006        index
1007            .add(json!({"title": "Database Design", "status": "published"}))
1008            .unwrap();
1009
1010        let results = index
1011            .search(
1012                &SearchExpression::from_json(
1013                    json!({
1014                        "bool": {
1015                            "must": [{"match": {"title": "search"}}],
1016                            "filter": [{"term": {"status": "published"}}]
1017                        }
1018                    }),
1019                    10,
1020                )
1021                .unwrap(),
1022            )
1023            .unwrap();
1024
1025        assert_eq!(results.total_hits().value, 1);
1026        let title = results.hit(0).unwrap().source().unwrap()["title"]
1027            .as_str()
1028            .unwrap()
1029            .to_string();
1030        assert!(title.contains("Search Engine"));
1031
1032        cleanup(&path);
1033    }
1034
1035    #[test]
1036    fn phrase_query_end_to_end() {
1037        let path = test_dir("phrase");
1038        let index = Index::create_with_mapping(&path, test_schema()).unwrap();
1039
1040        index
1041            .add(json!({"body": "the quick brown fox jumps"}))
1042            .unwrap();
1043        index.add(json!({"body": "brown quick fox"})).unwrap();
1044
1045        let results = index
1046            .search(
1047                &SearchExpression::from_json(
1048                    json!({"match_phrase": {"body": "quick brown fox"}}),
1049                    10,
1050                )
1051                .unwrap(),
1052            )
1053            .unwrap();
1054        assert_eq!(results.total_hits().value, 1);
1055
1056        cleanup(&path);
1057    }
1058
1059    #[test]
1060    fn match_all_query() {
1061        let path = test_dir("match_all");
1062        let index = Index::create_with_mapping(&path, test_schema()).unwrap();
1063
1064        index.add(json!({"title": "A"})).unwrap();
1065        index.add(json!({"title": "B"})).unwrap();
1066        index.add(json!({"title": "C"})).unwrap();
1067
1068        let results = index
1069            .search(&SearchExpression::from_json(json!({"match_all": {}}), 10).unwrap())
1070            .unwrap();
1071        assert_eq!(results.total_hits().value, 3);
1072
1073        cleanup(&path);
1074    }
1075
1076    #[test]
1077    fn open_existing_index() {
1078        let path = test_dir("reopen");
1079
1080        {
1081            let index = Index::create_with_mapping(&path, test_schema()).unwrap();
1082            index
1083                .add(json!({"title": "persistent doc", "status": "published"}))
1084                .unwrap();
1085        }
1086
1087        {
1088            let index = Index::open(&path).unwrap();
1089            let results = index
1090                .search(
1091                    &SearchExpression::from_json(json!({"match": {"title": "persistent"}}), 10)
1092                        .unwrap(),
1093                )
1094                .unwrap();
1095            assert_eq!(results.total_hits().value, 1);
1096        }
1097
1098        cleanup(&path);
1099    }
1100
1101    #[test]
1102    fn multiple_commits_searchable() {
1103        let path = test_dir("multi_commit");
1104        let index = Index::create_with_mapping(&path, test_schema()).unwrap();
1105
1106        index.add(json!({"title": "first batch"})).unwrap();
1107        index.add(json!({"title": "second batch"})).unwrap();
1108
1109        let results = index
1110            .search(&SearchExpression::from_json(json!({"match": {"title": "batch"}}), 10).unwrap())
1111            .unwrap();
1112        assert_eq!(results.total_hits().value, 2);
1113
1114        cleanup(&path);
1115    }
1116
1117    #[test]
1118    fn empty_index_search() {
1119        let path = test_dir("empty");
1120        let index = Index::create_with_mapping(&path, test_schema()).unwrap();
1121
1122        let results = index
1123            .search(&SearchExpression::from_json(json!({"match_all": {}}), 10).unwrap())
1124            .unwrap();
1125        assert_eq!(results.total_hits().value, 0);
1126
1127        cleanup(&path);
1128    }
1129
1130    #[test]
1131    fn search_no_results() {
1132        let path = test_dir("no_results");
1133        let index = Index::create_with_mapping(&path, test_schema()).unwrap();
1134        index.add(json!({"title": "hello"})).unwrap();
1135
1136        let results = index
1137            .search(
1138                &SearchExpression::from_json(json!({"term": {"status": "nonexistent"}}), 10)
1139                    .unwrap(),
1140            )
1141            .unwrap();
1142        assert_eq!(results.total_hits().value, 0);
1143
1144        cleanup(&path);
1145    }
1146
1147    #[test]
1148    fn constant_score_end_to_end() {
1149        let path = test_dir("const_score");
1150        let index = Index::create_with_mapping(&path, test_schema()).unwrap();
1151        index
1152            .add(json!({"title": "test", "status": "published"}))
1153            .unwrap();
1154
1155        let results = index
1156            .search(
1157                &SearchExpression::from_json(
1158                    json!({
1159                        "constant_score": {
1160                            "filter": {"term": {"status": "published"}},
1161                            "boost": 3.14
1162                        }
1163                    }),
1164                    10,
1165                )
1166                .unwrap(),
1167            )
1168            .unwrap();
1169        assert_eq!(results.total_hits().value, 1);
1170        assert!((results.hit(0).unwrap().score() - 3.14).abs() < 0.01);
1171
1172        cleanup(&path);
1173    }
1174
1175    #[test]
1176    fn from_offset_pagination() {
1177        let path = test_dir("from_offset");
1178        let index = Index::create_with_mapping(&path, test_schema()).unwrap();
1179
1180        // Index 20 docs with distinct titles (bulk to get one segment)
1181        let docs: Vec<_> = (0..20)
1182            .map(|i| json!({"title": format!("document {i}"), "status": "published"}))
1183            .collect();
1184        index.bulk(docs).unwrap();
1185
1186        // Page 1: from=0, size=5
1187        let page1 = index
1188            .search(
1189                &SearchExpression::from_json(
1190                    json!({"query": {"match_all": {}}, "from": 0, "size": 5}),
1191                    10,
1192                )
1193                .unwrap(),
1194            )
1195            .unwrap();
1196        assert_eq!(page1.len(), 5);
1197        assert_eq!(page1.total_hits().value, 20);
1198
1199        // Page 2: from=5, size=5
1200        let page2 = index
1201            .search(
1202                &SearchExpression::from_json(
1203                    json!({"query": {"match_all": {}}, "from": 5, "size": 5}),
1204                    10,
1205                )
1206                .unwrap(),
1207            )
1208            .unwrap();
1209        assert_eq!(page2.len(), 5);
1210        assert_eq!(page2.total_hits().value, 20);
1211
1212        // Pages should have no overlap
1213        let page1_ids: Vec<_> = page1.iter().map(|h| h.doc_id()).collect();
1214        let page2_ids: Vec<_> = page2.iter().map(|h| h.doc_id()).collect();
1215        for id in &page2_ids {
1216            assert!(!page1_ids.contains(id), "page 2 should not overlap page 1");
1217        }
1218
1219        // from beyond total_hits → empty
1220        let empty = index
1221            .search(
1222                &SearchExpression::from_json(
1223                    json!({"query": {"match_all": {}}, "from": 100, "size": 5}),
1224                    10,
1225                )
1226                .unwrap(),
1227            )
1228            .unwrap();
1229        assert_eq!(empty.len(), 0);
1230        assert_eq!(empty.total_hits().value, 20);
1231
1232        // from=0 should match default behavior
1233        let default = index
1234            .search(&SearchExpression::from_json(json!({"match_all": {}}), 5).unwrap())
1235            .unwrap();
1236        let explicit = index
1237            .search(
1238                &SearchExpression::from_json(
1239                    json!({"query": {"match_all": {}}, "from": 0, "size": 5}),
1240                    10,
1241                )
1242                .unwrap(),
1243            )
1244            .unwrap();
1245        assert_eq!(default.len(), explicit.len());
1246
1247        cleanup(&path);
1248    }
1249
1250    #[test]
1251    fn source_filtering() {
1252        let path = test_dir("source_filter");
1253        let index = Index::create_with_mapping(&path, test_schema()).unwrap();
1254
1255        index
1256            .add(json!({"title": "Hello World", "body": "Full text", "status": "published"}))
1257            .unwrap();
1258        index
1259            .add(json!({"title": "Search Engine", "body": "More text", "status": "draft"}))
1260            .unwrap();
1261
1262        // _source: false — no source at all
1263        let sf_disabled = crate::search::SourceFilter::Disabled;
1264        let results = index
1265            .search(
1266                &SearchExpression::from_json(
1267                    json!({"query": {"match_all": {}}, "_source": false}),
1268                    10,
1269                )
1270                .unwrap(),
1271            )
1272            .unwrap();
1273        assert_eq!(results.len(), 2);
1274        assert!(
1275            results
1276                .hit(0)
1277                .unwrap()
1278                .source_filtered(&sf_disabled)
1279                .is_none()
1280        );
1281        assert!(
1282            results
1283                .hit(1)
1284                .unwrap()
1285                .source_filtered(&sf_disabled)
1286                .is_none()
1287        );
1288
1289        // _source: ["title"] — only title field
1290        let sf_title = crate::search::SourceFilter::Fields(vec!["title".to_string()]);
1291        let results = index
1292            .search(
1293                &SearchExpression::from_json(
1294                    json!({"query": {"match_all": {}}, "_source": ["title"]}),
1295                    10,
1296                )
1297                .unwrap(),
1298            )
1299            .unwrap();
1300        for hit in results.iter() {
1301            let src = hit.source_filtered(&sf_title).unwrap();
1302            assert!(src.get("title").is_some());
1303            assert!(src.get("body").is_none());
1304            assert!(src.get("status").is_none());
1305        }
1306
1307        // _source: {"excludes": ["body"]} — all except body
1308        let sf_excl = crate::search::SourceFilter::IncludeExclude {
1309            includes: vec![],
1310            excludes: vec!["body".to_string()],
1311        };
1312        let results = index
1313            .search(
1314                &SearchExpression::from_json(
1315                    json!({
1316                        "query": {"match_all": {}},
1317                        "_source": {"excludes": ["body"]}
1318                    }),
1319                    10,
1320                )
1321                .unwrap(),
1322            )
1323            .unwrap();
1324        for hit in results.iter() {
1325            let src = hit.source_filtered(&sf_excl).unwrap();
1326            assert!(src.get("title").is_some());
1327            assert!(src.get("status").is_some());
1328            assert!(src.get("body").is_none());
1329        }
1330
1331        // _source: {"includes": ["title", "body"], "excludes": ["body"]} — exclude wins
1332        let sf_incl_excl = crate::search::SourceFilter::IncludeExclude {
1333            includes: vec!["title".to_string(), "body".to_string()],
1334            excludes: vec!["body".to_string()],
1335        };
1336        let results = index
1337            .search(
1338                &SearchExpression::from_json(
1339                    json!({
1340                        "query": {"match_all": {}},
1341                        "_source": {"includes": ["title", "body"], "excludes": ["body"]}
1342                    }),
1343                    10,
1344                )
1345                .unwrap(),
1346            )
1347            .unwrap();
1348        for hit in results.iter() {
1349            let src = hit.source_filtered(&sf_incl_excl).unwrap();
1350            assert!(src.get("title").is_some());
1351            assert!(src.get("body").is_none());
1352        }
1353
1354        // _source: true — full source (default)
1355        let results = index
1356            .search(
1357                &SearchExpression::from_json(
1358                    json!({"query": {"match_all": {}}, "_source": true}),
1359                    10,
1360                )
1361                .unwrap(),
1362            )
1363            .unwrap();
1364        for hit in results.iter() {
1365            let src = hit.source().unwrap();
1366            assert!(src.get("title").is_some());
1367            assert!(src.get("body").is_some());
1368            assert!(src.get("status").is_some());
1369        }
1370
1371        cleanup(&path);
1372    }
1373
1374    #[test]
1375    fn fields_retrieval() {
1376        let path = test_dir("fields_retrieval");
1377        let schema = Mapping::builder()
1378            .field("title", FieldType::Text)
1379            .field("tag", FieldType::Keyword)
1380            .field("price", FieldType::Float)
1381            .build();
1382        let index = Index::create_with_mapping(&path, schema).unwrap();
1383
1384        index
1385            .add(json!({"title": "Hello World", "tag": "tech", "price": 9.99}))
1386            .unwrap();
1387        index
1388            .add(json!({"title": "Search Engine", "tag": "science", "price": 19.99}))
1389            .unwrap();
1390
1391        // Retrieve keyword + numeric fields
1392        let field_names: Vec<String> = vec!["tag".to_string(), "price".to_string()];
1393        let results = index
1394            .search(
1395                &SearchExpression::from_json(
1396                    json!({
1397                        "query": {"match_all": {}},
1398                        "fields": ["tag", "price"],
1399                        "_source": false
1400                    }),
1401                    10,
1402                )
1403                .unwrap(),
1404            )
1405            .unwrap();
1406
1407        assert_eq!(results.len(), 2);
1408        let sf_disabled = crate::search::SourceFilter::Disabled;
1409        for hit in results.iter() {
1410            // _source disabled
1411            assert!(hit.source_filtered(&sf_disabled).is_none());
1412            // fields present
1413            let fields = hit.fields(&field_names);
1414            assert!(fields.get("tag").is_some(), "tag field should be present");
1415            assert!(
1416                fields.get("price").is_some(),
1417                "price field should be present"
1418            );
1419            // Values are arrays (ES compat)
1420            let tag = fields.get("tag").unwrap();
1421            assert!(tag.is_array(), "tag should be array");
1422        }
1423
1424        // Text field silently omitted (no columnar store)
1425        let title_fields: Vec<String> = vec!["title".to_string()];
1426        let results = index
1427            .search(
1428                &SearchExpression::from_json(
1429                    json!({
1430                        "query": {"match_all": {}},
1431                        "fields": ["title"],
1432                        "_source": false
1433                    }),
1434                    10,
1435                )
1436                .unwrap(),
1437            )
1438            .unwrap();
1439        for hit in results.iter() {
1440            let fields = hit.fields(&title_fields);
1441            assert!(
1442                fields.get("title").is_none(),
1443                "text field should be omitted"
1444            );
1445        }
1446
1447        // Fields + source together
1448        let tag_fields: Vec<String> = vec!["tag".to_string()];
1449        let results = index
1450            .search(
1451                &SearchExpression::from_json(
1452                    json!({
1453                        "query": {"match_all": {}},
1454                        "fields": ["tag"],
1455                        "_source": ["title"]
1456                    }),
1457                    10,
1458                )
1459                .unwrap(),
1460            )
1461            .unwrap();
1462        for hit in results.iter() {
1463            assert!(hit.source().is_some());
1464            assert!(!hit.fields(&tag_fields).is_empty());
1465        }
1466
1467        cleanup(&path);
1468    }
1469
1470    #[test]
1471    fn sort_by_field() {
1472        let path = test_dir("sort_by_field");
1473        let schema = Mapping::builder()
1474            .field("title", FieldType::Text)
1475            .field("tag", FieldType::Keyword)
1476            .field("price", FieldType::Float)
1477            .build();
1478        let index = Index::create_with_mapping(&path, schema).unwrap();
1479
1480        index
1481            .add(json!({"title": "Expensive", "tag": "b", "price": 99.99}))
1482            .unwrap();
1483        index
1484            .add(json!({"title": "Cheap", "tag": "a", "price": 1.99}))
1485            .unwrap();
1486        index
1487            .add(json!({"title": "Mid", "tag": "c", "price": 49.99}))
1488            .unwrap();
1489
1490        // Sort by price ascending
1491        let results = index
1492            .search(
1493                &SearchExpression::from_json(
1494                    json!({
1495                        "query": {"match_all": {}},
1496                        "sort": ["price"]
1497                    }),
1498                    10,
1499                )
1500                .unwrap(),
1501            )
1502            .unwrap();
1503        assert_eq!(results.len(), 3);
1504        let prices: Vec<f64> = results
1505            .iter()
1506            .map(|h| h.sort_values().unwrap()[0].to_json().as_f64().unwrap())
1507            .collect();
1508        assert!(
1509            prices[0] <= prices[1] && prices[1] <= prices[2],
1510            "prices should be ascending: {:?}",
1511            prices
1512        );
1513
1514        // Sort by price descending
1515        let results = index
1516            .search(
1517                &SearchExpression::from_json(
1518                    json!({
1519                        "query": {"match_all": {}},
1520                        "sort": [{"price": "desc"}]
1521                    }),
1522                    10,
1523                )
1524                .unwrap(),
1525            )
1526            .unwrap();
1527        let prices: Vec<f64> = results
1528            .iter()
1529            .map(|h| h.sort_values().unwrap()[0].to_json().as_f64().unwrap())
1530            .collect();
1531        assert!(
1532            prices[0] >= prices[1] && prices[1] >= prices[2],
1533            "prices should be descending: {:?}",
1534            prices
1535        );
1536
1537        // Sort by keyword field
1538        let results = index
1539            .search(
1540                &SearchExpression::from_json(
1541                    json!({
1542                        "query": {"match_all": {}},
1543                        "sort": [{"tag": "asc"}]
1544                    }),
1545                    10,
1546                )
1547                .unwrap(),
1548            )
1549            .unwrap();
1550        let tags: Vec<String> = results
1551            .iter()
1552            .map(|h| {
1553                h.sort_values().unwrap()[0]
1554                    .to_json()
1555                    .as_str()
1556                    .unwrap()
1557                    .to_string()
1558            })
1559            .collect();
1560        assert_eq!(tags, vec!["a", "b", "c"]);
1561
1562        // Sort values present in response
1563        let results = index
1564            .search(
1565                &SearchExpression::from_json(
1566                    json!({
1567                        "query": {"match_all": {}},
1568                        "sort": ["price"]
1569                    }),
1570                    10,
1571                )
1572                .unwrap(),
1573            )
1574            .unwrap();
1575        for hit in results.iter() {
1576            assert!(hit.sort_values().is_some(), "sort values should be present");
1577        }
1578
1579        // Sort by _score (should match default ordering)
1580        let results = index
1581            .search(
1582                &SearchExpression::from_json(
1583                    json!({
1584                        "query": {"match_all": {}},
1585                        "sort": ["_score"]
1586                    }),
1587                    10,
1588                )
1589                .unwrap(),
1590            )
1591            .unwrap();
1592        assert_eq!(results.len(), 3);
1593
1594        cleanup(&path);
1595    }
1596
1597    #[test]
1598    fn search_after_pagination() {
1599        let path = test_dir("search_after");
1600        let schema = Mapping::builder()
1601            .field("title", FieldType::Text)
1602            .field("price", FieldType::Float)
1603            .build();
1604        let index = Index::create_with_mapping(&path, schema).unwrap();
1605
1606        let docs: Vec<_> = (0..20)
1607            .map(|i| json!({"title": format!("item {i}"), "price": i as f64}))
1608            .collect();
1609        index.bulk(docs).unwrap();
1610
1611        // Page 1: sort by price, size=5
1612        let page1 = index
1613            .search(
1614                &SearchExpression::from_json(
1615                    json!({
1616                        "query": {"match_all": {}},
1617                        "sort": ["price"],
1618                        "size": 5
1619                    }),
1620                    10,
1621                )
1622                .unwrap(),
1623            )
1624            .unwrap();
1625        assert_eq!(page1.len(), 5);
1626        let last_hit = page1.hit(page1.len() - 1).unwrap();
1627        let last_sort = last_hit.sort_values().unwrap();
1628
1629        // Page 2: search_after with last sort values
1630        let page2 = index
1631            .search(
1632                &SearchExpression::from_json(
1633                    json!({
1634                        "query": {"match_all": {}},
1635                        "sort": ["price"],
1636                        "size": 5,
1637                        "search_after": [last_sort[0].to_json()]
1638                    }),
1639                    10,
1640                )
1641                .unwrap(),
1642            )
1643            .unwrap();
1644        assert_eq!(page2.len(), 5);
1645
1646        // No overlap between pages
1647        let p1_prices: Vec<f64> = page1
1648            .iter()
1649            .map(|h| h.sort_values().unwrap()[0].to_json().as_f64().unwrap())
1650            .collect();
1651        let p2_prices: Vec<f64> = page2
1652            .iter()
1653            .map(|h| h.sort_values().unwrap()[0].to_json().as_f64().unwrap())
1654            .collect();
1655        assert!(
1656            p1_prices.last().unwrap() < p2_prices.first().unwrap(),
1657            "page 2 should start after page 1: {:?} vs {:?}",
1658            p1_prices,
1659            p2_prices
1660        );
1661
1662        // Iterate all pages
1663        let mut all_prices = Vec::new();
1664        let mut cursor: Option<Vec<serde_json::Value>> = None;
1665        loop {
1666            let mut req = json!({
1667                "query": {"match_all": {}},
1668                "sort": ["price"],
1669                "size": 7
1670            });
1671            if let Some(ref c) = cursor {
1672                req["search_after"] = serde_json::json!(c);
1673            }
1674            let page = index
1675                .search(&SearchExpression::from_json(req, 10).unwrap())
1676                .unwrap();
1677            if page.is_empty() {
1678                break;
1679            }
1680            for hit in page.iter() {
1681                all_prices.push(hit.sort_values().unwrap()[0].to_json().as_f64().unwrap());
1682            }
1683            let last = page.hit(page.len() - 1).unwrap();
1684            cursor = Some(
1685                last.sort_values()
1686                    .unwrap()
1687                    .iter()
1688                    .map(|sv| sv.to_json())
1689                    .collect(),
1690            );
1691        }
1692        assert_eq!(all_prices.len(), 20, "should iterate all 20 docs");
1693        // Verify ascending order and no duplicates
1694        for i in 1..all_prices.len() {
1695            assert!(
1696                all_prices[i] > all_prices[i - 1],
1697                "should be strictly ascending: {} vs {}",
1698                all_prices[i - 1],
1699                all_prices[i]
1700            );
1701        }
1702
1703        // search_after beyond all results → empty
1704        let empty = index
1705            .search(
1706                &SearchExpression::from_json(
1707                    json!({
1708                        "query": {"match_all": {}},
1709                        "sort": ["price"],
1710                        "search_after": [999.0]
1711                    }),
1712                    10,
1713                )
1714                .unwrap(),
1715            )
1716            .unwrap();
1717        assert!(empty.is_empty());
1718
1719        cleanup(&path);
1720    }
1721
1722    #[test]
1723    fn explain_score() {
1724        let path = test_dir("explain");
1725        let index = Index::create_with_mapping(&path, test_schema()).unwrap();
1726
1727        index
1728            .bulk(vec![
1729                json!({"title": "hello world", "status": "published"}),
1730                json!({"title": "hello hello hello", "status": "draft"}),
1731            ])
1732            .unwrap();
1733
1734        // Explain is lazy — always available when a query is provided
1735        let results = index
1736            .search(
1737                &SearchExpression::from_json(json!({"query": {"match": {"title": "hello"}}}), 10)
1738                    .unwrap(),
1739            )
1740            .unwrap();
1741        assert_eq!(results.len(), 2);
1742        for hit in results.iter() {
1743            let exp = hit
1744                .explain()
1745                .expect("explain should not error")
1746                .expect("explain should be present");
1747            assert!(exp.value > 0.0, "score should be positive");
1748            assert!(
1749                !exp.description.is_empty(),
1750                "description should be non-empty"
1751            );
1752            assert!(!exp.details.is_empty(), "should have BM25 sub-details");
1753        }
1754
1755        // Higher TF should have higher explanation value
1756        let scores: Vec<f32> = results
1757            .iter()
1758            .map(|h| h.explain().unwrap().unwrap().value)
1759            .collect();
1760        // hits are score-descending, so first should have higher explain value
1761        assert!(scores[0] >= scores[1]);
1762
1763        cleanup(&path);
1764    }
1765
1766    #[test]
1767    fn collapse_by_field() {
1768        let path = test_dir("collapse");
1769        let schema = Mapping::builder()
1770            .field("title", FieldType::Text)
1771            .field("author", FieldType::Keyword)
1772            .build();
1773        let index = Index::create_with_mapping(&path, schema).unwrap();
1774
1775        // Multiple docs per author
1776        index
1777            .add(json!({"title": "post one by alice", "author": "alice"}))
1778            .unwrap();
1779        index
1780            .add(json!({"title": "post two by alice", "author": "alice"}))
1781            .unwrap();
1782        index
1783            .add(json!({"title": "post by bob", "author": "bob"}))
1784            .unwrap();
1785        index
1786            .add(json!({"title": "another by bob", "author": "bob"}))
1787            .unwrap();
1788        index
1789            .add(json!({"title": "post by carol", "author": "carol"}))
1790            .unwrap();
1791
1792        // Without collapse: all 5 docs
1793        let results = index
1794            .search(&SearchExpression::from_json(json!({"query": {"match_all": {}}}), 10).unwrap())
1795            .unwrap();
1796        assert_eq!(results.len(), 5);
1797
1798        // With collapse: 1 per author = 3 hits
1799        let author_fields: Vec<String> = vec!["author".to_string()];
1800        let results = index
1801            .search(
1802                &SearchExpression::from_json(
1803                    json!({
1804                        "query": {"match_all": {}},
1805                        "collapse": {"field": "author"}
1806                    }),
1807                    10,
1808                )
1809                .unwrap(),
1810            )
1811            .unwrap();
1812        assert_eq!(results.len(), 3, "should have 3 unique authors");
1813
1814        // Each hit should have the author field
1815        let mut authors: Vec<String> = results
1816            .iter()
1817            .map(|h| h.fields(&author_fields))
1818            .filter_map(|f| f.get("author").cloned())
1819            .filter_map(|v| v.as_array().cloned())
1820            .filter_map(|a| a.first().cloned())
1821            .filter_map(|v| v.as_str().map(String::from))
1822            .collect();
1823        authors.sort();
1824        assert_eq!(authors, vec!["alice", "bob", "carol"]);
1825
1826        // total_hits should be the uncollapsed count
1827        assert_eq!(results.total_hits().value, 5);
1828
1829        cleanup(&path);
1830    }
1831
1832    #[test]
1833    fn track_total_hits() {
1834        use crate::search::TotalHitsRelation;
1835
1836        let path = test_dir("track_total_hits");
1837        let index = Index::create_with_mapping(&path, test_schema()).unwrap();
1838        for i in 0..100 {
1839            index
1840                .add(json!({"title": format!("document {i}"), "status": "published"}))
1841                .unwrap();
1842        }
1843
1844        // Default: exact count
1845        let results = index
1846            .search(&SearchExpression::from_json(json!({"query": {"match_all": {}}}), 10).unwrap())
1847            .unwrap();
1848        assert_eq!(results.total_hits().value, 100);
1849        assert_eq!(results.total_hits().relation, TotalHitsRelation::EqualTo);
1850
1851        // track_total_hits: true — same as default
1852        let results = index
1853            .search(
1854                &SearchExpression::from_json(
1855                    json!({
1856                        "query": {"match_all": {}},
1857                        "track_total_hits": true
1858                    }),
1859                    10,
1860                )
1861                .unwrap(),
1862            )
1863            .unwrap();
1864        assert_eq!(results.total_hits().value, 100);
1865        assert_eq!(results.total_hits().relation, TotalHitsRelation::EqualTo);
1866
1867        // track_total_hits: false — disabled
1868        let results = index
1869            .search(
1870                &SearchExpression::from_json(
1871                    json!({
1872                        "query": {"match_all": {}},
1873                        "track_total_hits": false
1874                    }),
1875                    10,
1876                )
1877                .unwrap(),
1878            )
1879            .unwrap();
1880        assert_eq!(results.total_hits().value, 0);
1881        assert_eq!(
1882            results.total_hits().relation,
1883            TotalHitsRelation::GreaterThanOrEqualTo
1884        );
1885
1886        // track_total_hits: 50 — cap below actual
1887        let results = index
1888            .search(
1889                &SearchExpression::from_json(
1890                    json!({
1891                        "query": {"match_all": {}},
1892                        "track_total_hits": 50
1893                    }),
1894                    10,
1895                )
1896                .unwrap(),
1897            )
1898            .unwrap();
1899        assert_eq!(results.total_hits().value, 50);
1900        assert_eq!(
1901            results.total_hits().relation,
1902            TotalHitsRelation::GreaterThanOrEqualTo
1903        );
1904
1905        // track_total_hits: 200 — cap above actual
1906        let results = index
1907            .search(
1908                &SearchExpression::from_json(
1909                    json!({
1910                        "query": {"match_all": {}},
1911                        "track_total_hits": 200
1912                    }),
1913                    10,
1914                )
1915                .unwrap(),
1916            )
1917            .unwrap();
1918        assert_eq!(results.total_hits().value, 100);
1919        assert_eq!(results.total_hits().relation, TotalHitsRelation::EqualTo);
1920
1921        cleanup(&path);
1922    }
1923
1924    #[test]
1925    fn inner_hits_nested() {
1926        let path = test_dir("inner_hits");
1927        let schema = Mapping::builder()
1928            .field("product", FieldType::Text)
1929            .field("offers", FieldType::Nested)
1930            .field("offers.seller", FieldType::Keyword)
1931            .field("offers.price", FieldType::Keyword)
1932            .build();
1933        let index = Index::create_with_mapping(&path, schema).unwrap();
1934
1935        index
1936            .add(json!({
1937                "product": "laptop",
1938                "offers": [
1939                    {"seller": "Alice", "price": "999"},
1940                    {"seller": "Bob", "price": "1299"}
1941                ]
1942            }))
1943            .unwrap();
1944
1945        // Nested query with inner_hits — should return which offer matched
1946        let results = index
1947            .search(
1948                &SearchExpression::from_json(
1949                    json!({
1950                        "nested": {
1951                            "path": "offers",
1952                            "query": {
1953                                "term": {"offers.seller": "Alice"}
1954                            },
1955                            "inner_hits": {}
1956                        }
1957                    }),
1958                    10,
1959                )
1960                .unwrap(),
1961            )
1962            .unwrap();
1963
1964        assert_eq!(results.total_hits().value, 1);
1965        assert_eq!(results.len(), 1);
1966
1967        // Check inner_hits are present
1968        let hit = results.hit(0).unwrap();
1969        let inner = hit
1970            .inner_hits()
1971            .expect("inner_hits should not error")
1972            .expect("inner_hits should be present");
1973        let offers = inner
1974            .get("offers")
1975            .expect("should have 'offers' inner_hits group");
1976
1977        let inner_hits_obj = offers.get("hits").unwrap();
1978        let inner_total = inner_hits_obj
1979            .get("total")
1980            .unwrap()
1981            .get("value")
1982            .unwrap()
1983            .as_u64()
1984            .unwrap();
1985        assert_eq!(inner_total, 1, "should have 1 matching inner hit");
1986
1987        let inner_docs = inner_hits_obj.get("hits").unwrap().as_array().unwrap();
1988        assert_eq!(inner_docs.len(), 1);
1989
1990        // The inner hit should have Alice's offer as _source
1991        let inner_source = inner_docs[0].get("_source").unwrap();
1992        assert_eq!(inner_source.get("seller").unwrap(), "Alice");
1993        assert_eq!(inner_source.get("price").unwrap(), "999");
1994
1995        cleanup(&path);
1996    }
1997
1998    #[test]
1999    fn rescore_reranks() {
2000        let path = test_dir("rescore");
2001        let index = Index::create_with_mapping(&path, test_schema()).unwrap();
2002
2003        // Doc 0: has "hello" (matches initial query)
2004        // Doc 1: has "hello world" (matches both initial and rescore phrase)
2005        // Doc 2: has "hello there world" (matches initial, weak phrase match)
2006        index
2007            .add(json!({"title": "hello", "status": "published"}))
2008            .unwrap();
2009        index
2010            .add(json!({"title": "hello world", "status": "published"}))
2011            .unwrap();
2012        index
2013            .add(json!({"title": "hello there world", "status": "published"}))
2014            .unwrap();
2015
2016        // Initial search: match "hello" — all 3 docs match
2017        let without_rescore = index
2018            .search(
2019                &SearchExpression::from_json(
2020                    json!({
2021                        "query": {"match": {"title": "hello"}}
2022                    }),
2023                    10,
2024                )
2025                .unwrap(),
2026            )
2027            .unwrap();
2028        assert_eq!(without_rescore.len(), 3);
2029
2030        // With rescore: boost docs matching phrase "hello world"
2031        let with_rescore = index
2032            .search(
2033                &SearchExpression::from_json(
2034                    json!({
2035                        "query": {"match": {"title": "hello"}},
2036                        "rescore": {
2037                            "window_size": 10,
2038                            "query": {
2039                                "rescore_query": {"match_phrase": {"title": "hello world"}},
2040                                "query_weight": 0.5,
2041                                "rescore_query_weight": 1.5
2042                            }
2043                        }
2044                    }),
2045                    10,
2046                )
2047                .unwrap(),
2048            )
2049            .unwrap();
2050        assert_eq!(with_rescore.len(), 3);
2051
2052        // Doc with "hello world" should be boosted to top
2053        // (it matches both the initial match and the rescore phrase)
2054        let top_source = with_rescore.hit(0).unwrap().source().unwrap();
2055        assert_eq!(
2056            top_source["title"], "hello world",
2057            "phrase match should be boosted to top"
2058        );
2059
2060        // Rescore should change scores
2061        assert_ne!(
2062            without_rescore.hit(0).unwrap().score(),
2063            with_rescore.hit(0).unwrap().score(),
2064            "rescore should modify scores"
2065        );
2066
2067        cleanup(&path);
2068    }
2069
2070    #[test]
2071    fn multi_fields() {
2072        let path = test_dir("multi_fields");
2073        // title as text + title.raw as keyword
2074        let mapping_json = json!({
2075            "properties": {
2076                "title": {
2077                    "type": "text",
2078                    "fields": {
2079                        "raw": {"type": "keyword"}
2080                    }
2081                }
2082            }
2083        });
2084        let schema = Mapping::from_json(&mapping_json).unwrap();
2085        let index = Index::create_with_mapping(&path, schema).unwrap();
2086
2087        index.add(json!({"title": "Hello World"})).unwrap();
2088        index.add(json!({"title": "Hello Luci"})).unwrap();
2089        index.add(json!({"title": "Goodbye World"})).unwrap();
2090
2091        // Text search on parent field
2092        let results = index
2093            .search(&SearchExpression::from_json(json!({"match": {"title": "hello"}}), 10).unwrap())
2094            .unwrap();
2095        assert_eq!(
2096            results.total_hits().value,
2097            2,
2098            "text search should match 2 docs"
2099        );
2100
2101        // Exact match on sub-field (keyword)
2102        let results = index
2103            .search(
2104                &SearchExpression::from_json(json!({"term": {"title.raw": "Hello World"}}), 10)
2105                    .unwrap(),
2106            )
2107            .unwrap();
2108        assert_eq!(
2109            results.total_hits().value,
2110            1,
2111            "exact keyword match on sub-field"
2112        );
2113
2114        // Term query on sub-field shouldn't match partial text
2115        let results = index
2116            .search(
2117                &SearchExpression::from_json(json!({"term": {"title.raw": "hello"}}), 10).unwrap(),
2118            )
2119            .unwrap();
2120        assert_eq!(
2121            results.total_hits().value,
2122            0,
2123            "keyword sub-field is not analyzed"
2124        );
2125
2126        // Sort by sub-field
2127        let results = index
2128            .search(
2129                &SearchExpression::from_json(
2130                    json!({
2131                        "query": {"match_all": {}},
2132                        "sort": [{"title.raw": "asc"}]
2133                    }),
2134                    10,
2135                )
2136                .unwrap(),
2137            )
2138            .unwrap();
2139        assert_eq!(results.len(), 3);
2140        let titles: Vec<String> = results
2141            .iter()
2142            .map(|h| {
2143                h.sort_values().unwrap()[0]
2144                    .to_json()
2145                    .as_str()
2146                    .unwrap()
2147                    .to_string()
2148            })
2149            .collect();
2150        assert_eq!(titles[0], "Goodbye World");
2151        assert_eq!(titles[1], "Hello Luci");
2152        assert_eq!(titles[2], "Hello World");
2153
2154        // Aggregation on sub-field
2155        let results = index
2156            .search(
2157                &SearchExpression::from_json(
2158                    json!({
2159                        "query": {"match_all": {}},
2160                        "aggs": {"titles": {"terms": {"field": "title.raw"}}},
2161                        "size": 0
2162                    }),
2163                    10,
2164                )
2165                .unwrap(),
2166            )
2167            .unwrap();
2168        assert!(results.aggregations().contains_key("titles"));
2169
2170        cleanup(&path);
2171    }
2172
2173    #[test]
2174    fn copy_to() {
2175        let path = test_dir("copy_to");
2176        let schema = Mapping::from_json(&json!({
2177            "properties": {
2178                "title": {"type": "text", "copy_to": "all_text"},
2179                "body": {"type": "text", "copy_to": "all_text"},
2180                "tag": {"type": "keyword"},
2181                "all_text": {"type": "text"}
2182            }
2183        }))
2184        .unwrap();
2185        let index = Index::create_with_mapping(&path, schema).unwrap();
2186
2187        index
2188            .add(json!({"title": "search engine", "body": "fast and embedded", "tag": "tech"}))
2189            .unwrap();
2190        index
2191            .add(json!({"title": "database", "body": "columnar storage", "tag": "tech"}))
2192            .unwrap();
2193
2194        // Search on individual field
2195        let results = index
2196            .search(
2197                &SearchExpression::from_json(json!({"match": {"title": "search"}}), 10).unwrap(),
2198            )
2199            .unwrap();
2200        assert_eq!(results.total_hits().value, 1);
2201
2202        // Search on copy_to target — should find doc with "search" in title
2203        let results = index
2204            .search(
2205                &SearchExpression::from_json(json!({"match": {"all_text": "search"}}), 10).unwrap(),
2206            )
2207            .unwrap();
2208        assert_eq!(
2209            results.total_hits().value,
2210            1,
2211            "copy_to target should contain title content"
2212        );
2213
2214        // Search on copy_to target — should find doc with "embedded" in body
2215        let results = index
2216            .search(
2217                &SearchExpression::from_json(json!({"match": {"all_text": "embedded"}}), 10)
2218                    .unwrap(),
2219            )
2220            .unwrap();
2221        assert_eq!(
2222            results.total_hits().value,
2223            1,
2224            "copy_to target should contain body content"
2225        );
2226
2227        // Search on copy_to target — "columnar" from body of doc 2
2228        let results = index
2229            .search(
2230                &SearchExpression::from_json(json!({"match": {"all_text": "columnar"}}), 10)
2231                    .unwrap(),
2232            )
2233            .unwrap();
2234        assert_eq!(
2235            results.total_hits().value,
2236            1,
2237            "copy_to target should contain body of second doc"
2238        );
2239
2240        // all_text should NOT appear in _source
2241        let results = index
2242            .search(
2243                &SearchExpression::from_json(json!({"match": {"all_text": "search"}}), 10).unwrap(),
2244            )
2245            .unwrap();
2246        let src = results.hit(0).unwrap().source().unwrap();
2247        assert!(
2248            src.get("all_text").is_none(),
2249            "copy_to target should not be in _source"
2250        );
2251
2252        cleanup(&path);
2253    }
2254
2255    #[test]
2256    fn document_crud() {
2257        let path = test_dir("crud");
2258        let index = Index::create_with_mapping(&path, test_schema()).unwrap();
2259
2260        // Add with user-provided _id
2261        index
2262            .add(json!({"_id": "doc1", "title": "Hello", "status": "published"}))
2263            .unwrap();
2264        index
2265            .add(json!({"_id": "doc2", "title": "World", "status": "draft"}))
2266            .unwrap();
2267
2268        // Get by ID
2269        let doc = index.get("doc1").unwrap().expect("doc1 should exist");
2270        assert_eq!(doc["title"], "Hello");
2271
2272        // Get missing
2273        assert!(index.get("nonexistent").unwrap().is_none());
2274
2275        // Count
2276        assert_eq!(index.count(json!({"match_all": {}})).unwrap(), 2);
2277
2278        // Delete by ID
2279        assert!(index.delete("doc1").unwrap());
2280        assert!(index.get("doc1").unwrap().is_none());
2281        assert_eq!(index.count(json!({"match_all": {}})).unwrap(), 1);
2282
2283        // Delete missing
2284        assert!(!index.delete("nonexistent").unwrap());
2285
2286        // Update by ID
2287        index
2288            .add(json!({"_id": "doc3", "title": "Original", "status": "published"}))
2289            .unwrap();
2290        assert!(index.update("doc3", json!({"title": "Updated"})).unwrap());
2291        let doc = index
2292            .get("doc3")
2293            .unwrap()
2294            .expect("doc3 should exist after update");
2295        assert_eq!(doc["title"], "Updated");
2296        assert_eq!(doc["status"], "published"); // preserved
2297
2298        // Delete by query
2299        index
2300            .add(json!({"_id": "d1", "title": "draft one", "status": "draft"}))
2301            .unwrap();
2302        index
2303            .add(json!({"_id": "d2", "title": "draft two", "status": "draft"}))
2304            .unwrap();
2305        let deleted = index
2306            .delete_by_query(json!({"term": {"status": "draft"}}))
2307            .unwrap();
2308        assert!(deleted >= 2);
2309
2310        // Auto-generated IDs
2311        index.add(json!({"title": "Auto ID doc"})).unwrap();
2312        let results = index
2313            .search(&SearchExpression::from_json(json!({"match": {"title": "auto"}}), 1).unwrap())
2314            .unwrap();
2315        assert_eq!(results.total_hits().value, 1);
2316
2317        cleanup(&path);
2318    }
2319
2320    #[test]
2321    fn filter_agg_counts_only_matching_docs() {
2322        let path = test_dir("filter_agg");
2323        let schema = Mapping::builder()
2324            .field("title", FieldType::Text)
2325            .field("status", FieldType::Keyword)
2326            .field("price", FieldType::Float)
2327            .build();
2328        let index = Index::create_with_mapping(&path, schema).unwrap();
2329
2330        // 3 active docs, 2 draft docs
2331        index
2332            .add(json!({"title": "doc1", "status": "active", "price": 10.0}))
2333            .unwrap();
2334        index
2335            .add(json!({"title": "doc2", "status": "active", "price": 20.0}))
2336            .unwrap();
2337        index
2338            .add(json!({"title": "doc3", "status": "active", "price": 30.0}))
2339            .unwrap();
2340        index
2341            .add(json!({"title": "doc4", "status": "draft", "price": 40.0}))
2342            .unwrap();
2343        index
2344            .add(json!({"title": "doc5", "status": "draft", "price": 50.0}))
2345            .unwrap();
2346
2347        // Filter agg: count only active docs
2348        let results = index
2349            .search(
2350                &SearchExpression::from_json(
2351                    json!({
2352                        "query": {"match_all": {}},
2353                        "aggs": {
2354                            "active_only": {
2355                                "filter": {"term": {"status": "active"}}
2356                            }
2357                        },
2358                        "size": 0
2359                    }),
2360                    10,
2361                )
2362                .unwrap(),
2363            )
2364            .unwrap();
2365
2366        // total_hits should be 5 (all docs match the top-level match_all)
2367        assert_eq!(results.total_hits().value, 5);
2368
2369        // filter agg should count only the 3 active docs, NOT all 5
2370        let aggs = results.aggregations();
2371        let active = aggs.get("active_only").expect("active_only agg missing");
2372        let active_json = active.to_json();
2373        let doc_count = active_json["buckets"][0]["doc_count"]
2374            .as_u64()
2375            .expect("filter agg should have doc_count in buckets[0]");
2376        assert_eq!(
2377            doc_count, 3,
2378            "filter agg should count 3 active docs, got {doc_count}"
2379        );
2380
2381        cleanup(&path);
2382    }
2383
2384    #[test]
2385    fn value_count_uses_stats_fast_path() {
2386        let path = test_dir("value_count_fast");
2387        let schema = Mapping::builder()
2388            .field("title", FieldType::Text)
2389            .field("price", FieldType::Float)
2390            .build();
2391        let index = Index::create_with_mapping(&path, schema).unwrap();
2392
2393        for i in 0..5 {
2394            index
2395                .add(json!({"title": format!("doc {i}"), "price": (i as f64) * 10.0}))
2396                .unwrap();
2397        }
2398
2399        let results = index
2400            .search(
2401                &SearchExpression::from_json(
2402                    json!({
2403                        "query": {"match_all": {}},
2404                        "aggs": {
2405                            "count_price": {"value_count": {"field": "price"}}
2406                        },
2407                        "size": 0
2408                    }),
2409                    10,
2410                )
2411                .unwrap(),
2412            )
2413            .unwrap();
2414
2415        let aggs = results.aggregations();
2416        let count_agg = aggs.get("count_price").expect("count_price missing");
2417        let value = count_agg.to_json()["value"].as_f64().unwrap();
2418        assert_eq!(value, 5.0, "value_count should be 5");
2419
2420        cleanup(&path);
2421    }
2422
2423    /// Regression test for [[investigation-20260405-04-multi-match-wrong-rewrite]].
2424    ///
2425    /// multi_match defaults to best_fields (dis_max), not most_fields
2426    /// (bool/should). A doc matching in one strong field should outscore
2427    /// a doc matching weakly in multiple fields.
2428    #[test]
2429    fn multi_match_best_fields_scoring() {
2430        let path = test_dir("multi_match_bf");
2431        let index = Index::create_with_mapping(&path, test_schema()).unwrap();
2432
2433        // Doc 0: "search" appears twice in title (high TF → high score), absent from body
2434        index
2435            .add(json!({"title": "search search", "body": "web application framework"}))
2436            .unwrap();
2437        // Doc 1: "search" once in title AND once in body (lower per-field BM25)
2438        index
2439            .add(json!({"title": "database search", "body": "search tools"}))
2440            .unwrap();
2441
2442        // multi_match with default best_fields: score = max(field_scores)
2443        let results = index
2444            .search(
2445                &SearchExpression::from_json(
2446                    json!({"multi_match": {"query": "search", "fields": ["title", "body"]}}),
2447                    10,
2448                )
2449                .unwrap(),
2450            )
2451            .unwrap();
2452
2453        assert_eq!(results.total_hits().value, 2);
2454
2455        // Also run individual match queries to get per-field scores for doc 1
2456        let title_results = index
2457            .search(
2458                &SearchExpression::from_json(json!({"match": {"title": "search"}}), 10).unwrap(),
2459            )
2460            .unwrap();
2461        let body_results = index
2462            .search(&SearchExpression::from_json(json!({"match": {"body": "search"}}), 10).unwrap())
2463            .unwrap();
2464
2465        // Find doc 1's per-field scores
2466        let doc1_title_score = title_results
2467            .iter()
2468            .find(|h| h.source().unwrap()["body"] == "search tools")
2469            .map(|h| h.score())
2470            .unwrap();
2471        let doc1_body_score = body_results
2472            .iter()
2473            .find(|h| h.source().unwrap()["body"] == "search tools")
2474            .map(|h| h.score())
2475            .unwrap();
2476        let doc1_max = doc1_title_score.max(doc1_body_score);
2477        let doc1_sum = doc1_title_score + doc1_body_score;
2478
2479        // Find doc 1's multi_match score
2480        let doc1_mm_score = results
2481            .iter()
2482            .find(|h| h.source().unwrap()["body"] == "search tools")
2483            .map(|h| h.score())
2484            .unwrap();
2485
2486        // With best_fields (correct): score ≈ max(field_scores)
2487        // With bool/should (bug): score ≈ sum(field_scores)
2488        assert!(
2489            (doc1_mm_score - doc1_max).abs() < 1e-5,
2490            "multi_match score ({doc1_mm_score}) should equal max of field scores ({doc1_max}), \
2491             not sum ({doc1_sum})"
2492        );
2493
2494        cleanup(&path);
2495    }
2496
2497    /// Regression test for [[fix-disjunction-heap-inefficiency]].
2498    ///
2499    /// PrefixQuery should produce constant scores per matching doc, not
2500    /// `count_of_matching_terms`. A doc matching 2 prefix-terms must
2501    /// score the same as a doc matching 1 prefix-term.
2502    #[test]
2503    fn prefix_query_constant_score() {
2504        let path = test_dir("prefix_const_score");
2505        let schema = Mapping::builder().field("body", FieldType::Text).build();
2506        let index = Index::create_with_mapping(&path, schema).unwrap();
2507
2508        // Doc 0: contains 2 prefix-matching terms ("search" and "searchable")
2509        index
2510            .add(json!({"body": "search searchable engines"}))
2511            .unwrap();
2512        // Doc 1: contains 1 prefix-matching term ("searching")
2513        index.add(json!({"body": "searching for tools"})).unwrap();
2514
2515        let results = index
2516            .search(&SearchExpression::from_json(json!({"prefix": {"body": "sear"}}), 10).unwrap())
2517            .unwrap();
2518        assert_eq!(results.total_hits().value, 2);
2519
2520        // Both docs should have identical scores under constant-score semantics.
2521        let doc0_score = results
2522            .iter()
2523            .find(|h| h.source().unwrap()["body"] == "search searchable engines")
2524            .map(|h| h.score())
2525            .unwrap();
2526        let doc1_score = results
2527            .iter()
2528            .find(|h| h.source().unwrap()["body"] == "searching for tools")
2529            .map(|h| h.score())
2530            .unwrap();
2531
2532        assert!(
2533            (doc0_score - doc1_score).abs() < 1e-5,
2534            "prefix scores must be constant: doc0 ({doc0_score}) vs doc1 ({doc1_score}). \
2535             ES uses CONSTANT_SCORE_BLENDED_REWRITE — every matching doc gets the same boost \
2536             regardless of how many prefix-terms match."
2537        );
2538
2539        cleanup(&path);
2540    }
2541
2542    /// Regression test for [[fix-wildcard-fuzzy-quadratic-dedup]].
2543    ///
2544    /// Multi-segment wildcard query must return the union of per-segment
2545    /// matches. Per-segment rewrite means each segment enumerates its own
2546    /// terms independently — assert no docs are missed and duplicate
2547    /// terms across segments don't cause spurious extras.
2548    #[test]
2549    fn wildcard_multi_segment_returns_all_matches() {
2550        let path = test_dir("wildcard_multi_seg");
2551        let schema = Mapping::builder().field("tag", FieldType::Keyword).build();
2552        let index = Index::create_with_mapping(&path, schema).unwrap();
2553
2554        // Each add() auto-commits, creating a segment per call (until merge).
2555        // The test is correct even if the writer merges eagerly — the
2556        // wildcard must still find all 5 matching docs.
2557        index.add(json!({"tag": "tech1"})).unwrap();
2558        index.add(json!({"tag": "tech2"})).unwrap();
2559        index.add(json!({"tag": "other"})).unwrap();
2560        index.add(json!({"tag": "tech3"})).unwrap();
2561        index.add(json!({"tag": "tech1"})).unwrap(); // duplicate across segments
2562        index.add(json!({"tag": "tech4"})).unwrap();
2563        index.add(json!({"tag": "unrelated"})).unwrap();
2564
2565        let results = index
2566            .search(
2567                &SearchExpression::from_json(json!({"wildcard": {"tag": "tech*"}}), 20).unwrap(),
2568            )
2569            .unwrap();
2570
2571        // Expected: 5 docs match (tech1 twice, tech2, tech3, tech4).
2572        assert_eq!(
2573            results.total_hits().value,
2574            5,
2575            "multi-segment wildcard missed matches"
2576        );
2577
2578        cleanup(&path);
2579    }
2580
2581    // --- E10: strict value types in sort / search_after. A malformed
2582    // spec must error, never silently sort-by-score or drop the cursor.
2583    // See [[code-must-not-lie]] and [[fix-strict-search-parsing]]. ---
2584
2585    #[test]
2586    fn parse_sort_unknown_order_rejected() {
2587        let err = parse_sort(Some(&json!([{"price": {"order": "ascending"}}]))).unwrap_err();
2588        assert!(format!("{err}").contains("order"), "{err}");
2589    }
2590
2591    #[test]
2592    fn parse_sort_non_string_order_rejected() {
2593        let err = parse_sort(Some(&json!([{"price": {"order": 1}}]))).unwrap_err();
2594        assert!(format!("{err}").contains("order"), "{err}");
2595    }
2596
2597    #[test]
2598    fn parse_sort_object_form_accepted() {
2599        // A single-object sort (ES-valid) now routes through
2600        // parse_sort_item instead of being silently dropped to no-sort.
2601        let sort = parse_sort(Some(&json!({"price": "desc"})))
2602            .unwrap()
2603            .unwrap();
2604        assert_eq!(sort.len(), 1);
2605    }
2606
2607    #[test]
2608    fn parse_sort_malformed_entry_rejected() {
2609        let err = parse_sort(Some(&json!([5]))).unwrap_err();
2610        assert!(format!("{err}").contains("sort"), "{err}");
2611    }
2612
2613    #[test]
2614    fn parse_search_after_object_element_rejected() {
2615        let err = parse_search_after(Some(&json!([1, {}]))).unwrap_err();
2616        assert!(format!("{err}").contains("search_after"), "{err}");
2617    }
2618
2619    #[test]
2620    fn parse_search_after_valid_cursor_accepted() {
2621        let cursor = parse_search_after(Some(&json!([9.99, "doc-42"])))
2622            .unwrap()
2623            .unwrap();
2624        assert_eq!(cursor.len(), 2);
2625    }
2626}