sync_engine/coordinator/
search_api.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Search API for SyncEngine
5//!
6//! Provides full-text search capabilities via RediSearch with SQL fallback.
7//!
8//! # Architecture
9//!
10//! ```text
11//! search(index, query)
12//!       │
13//!       ├─→ Check SearchCache (merkle-validated)
14//!       │        │
15//!       │        └─→ Hit? Return cached keys
16//!       │
17//!       ├─→ FT.SEARCH Redis (fast)
18//!       │        │
19//!       │        └─→ Results? Return
20//!       │
21//!       └─→ Durable tier? SQL fallback
22//!                │
23//!                └─→ Cache results with merkle root
24//! ```
25
26use std::time::Instant;
27use tracing::{debug, info};
28
29use crate::metrics;
30use crate::search::{
31    IndexManager, SearchIndex, SearchCache, SearchCacheStats,
32    Query, RediSearchTranslator, SqlTranslator, SqlParam,
33};
34use crate::sync_item::SyncItem;
35use crate::storage::traits::StorageError;
36
37use super::SyncEngine;
38
39/// Search tier strategy
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
41pub enum SearchTier {
42    /// Redis only - no SQL fallback (for view: prefix ephemeral data)
43    RedisOnly,
44    /// Redis with SQL fallback (for crdt: prefix durable data)
45    #[default]
46    RedisWithSqlFallback,
47}
48
49/// Search result with metadata
50#[derive(Debug, Clone)]
51pub struct SearchResult {
52    /// Matching items
53    pub items: Vec<SyncItem>,
54    /// Source of results
55    pub source: SearchSource,
56    /// Whether results came from cache
57    pub cached: bool,
58}
59
60/// Where search results came from
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub enum SearchSource {
63    /// Results from hot tier (RediSearch/Dragonfly)
64    Hot,
65    /// Results from cold tier (SQL/MySQL)
66    Cold,
67    /// Results from SearchCache
68    Cache,
69    /// No results found
70    Empty,
71}
72
73/// Search-related state for SyncEngine
74#[derive(Default)]
75pub struct SearchState {
76    /// Index manager
77    pub index_manager: IndexManager,
78    /// Search result cache
79    pub cache: SearchCache,
80}
81
82impl SyncEngine {
83    // ═══════════════════════════════════════════════════════════════════════════
84    // Search API
85    // ═══════════════════════════════════════════════════════════════════════════
86
87    /// Register a search index.
88    ///
89    /// Creates the index in RediSearch using FT.CREATE. The index will
90    /// automatically index all JSON documents with matching key prefix.
91    ///
92    /// # Example
93    ///
94    /// ```rust,no_run
95    /// # use sync_engine::{SyncEngine, search::SearchIndex};
96    /// # async fn example(engine: &SyncEngine) -> Result<(), Box<dyn std::error::Error>> {
97    /// // Define index schema
98    /// let index = SearchIndex::new("users", "crdt:users:")
99    ///     .text_sortable("name")
100    ///     .text("email")
101    ///     .numeric_sortable("age")
102    ///     .tag("roles");
103    ///
104    /// // Create in RediSearch
105    /// engine.create_search_index(index).await?;
106    /// # Ok(())
107    /// # }
108    /// ```
109    pub async fn create_search_index(&self, index: SearchIndex) -> Result<(), StorageError> {
110        let l2 = self.l2_store.as_ref().ok_or_else(|| {
111            StorageError::Connection("Redis not available for search index".into())
112        })?;
113
114        // Build FT.CREATE command with global redis prefix
115        let redis_prefix = self.config.read().redis_prefix.clone();
116        let args = index.to_ft_create_args_with_prefix(redis_prefix.as_deref());
117        debug!(index = %index.name, prefix = %index.prefix, redis_prefix = ?redis_prefix, "Creating search index");
118
119        // Execute FT.CREATE via raw Redis command
120        match l2.ft_create(&args).await {
121            Ok(()) => {
122                metrics::record_search_index_operation("create", true);
123                
124                // Register in index manager
125                if let Some(ref search_state) = self.search_state {
126                    search_state.write().index_manager.register(index);
127                }
128
129                info!(index = %args[0], "Search index created");
130                Ok(())
131            }
132            Err(e) => {
133                metrics::record_search_index_operation("create", false);
134                Err(e)
135            }
136        }
137    }
138
139    /// Drop a search index.
140    ///
141    /// Removes the index from RediSearch. Does not delete the indexed documents.
142    pub async fn drop_search_index(&self, name: &str) -> Result<(), StorageError> {
143        let l2 = self.l2_store.as_ref().ok_or_else(|| {
144            StorageError::Connection("Redis not available".into())
145        })?;
146
147        let prefix = self.config.read().redis_prefix.clone().unwrap_or_default();
148        let index_name = format!("{}idx:{}", prefix, name);
149        
150        match l2.ft_dropindex(&index_name).await {
151            Ok(()) => {
152                metrics::record_search_index_operation("drop", true);
153                info!(index = %index_name, "Search index dropped");
154                Ok(())
155            }
156            Err(e) => {
157                metrics::record_search_index_operation("drop", false);
158                Err(e)
159            }
160        }
161    }
162
163    /// Get the view prefix for a search index.
164    ///
165    /// Returns the prefix used for view: keys in SQL (e.g., "view:test:user:").
166    /// This is needed for SQL search filtering to only search materialized views.
167    pub fn get_index_view_prefix(&self, index_name: &str) -> String {
168        let crdt_prefix = if let Some(ref search_state) = self.search_state {
169            search_state.read()
170                .index_manager
171                .get(index_name)
172                .map(|idx| idx.prefix.clone())
173        } else {
174            None
175        };
176
177        let crdt_prefix = crdt_prefix.unwrap_or_else(|| format!("crdt:{}:", index_name));
178        crdt_prefix.replace("crdt:", "view:")
179    }
180
181    /// Search for items using RediSearch query syntax.
182    ///
183    /// Searches the specified index using FT.SEARCH. For durable data (crdt: prefix),
184    /// falls back to SQL if Redis returns no results.
185    ///
186    /// # Arguments
187    ///
188    /// * `index_name` - Name of the search index (without "idx:" prefix)
189    /// * `query` - Query AST built with `Query::` constructors
190    ///
191    /// # Example
192    ///
193    /// ```rust,no_run
194    /// # use sync_engine::{SyncEngine, search::Query};
195    /// # async fn example(engine: &SyncEngine) -> Result<(), Box<dyn std::error::Error>> {
196    /// // Simple field query
197    /// let results = engine.search("users", &Query::field_eq("name", "Alice")).await?;
198    ///
199    ///
200    /// // Complex query with AND/OR
201    /// let query = Query::field_eq("status", "active")
202    ///     .and(Query::numeric_range("age", Some(25.0), Some(40.0)));
203    /// let results = engine.search("users", &query).await?;
204    ///
205    /// for item in results.items {
206    ///     println!("Found: {}", item.object_id);
207    /// }
208    /// # Ok(())
209    /// # }
210    /// ```
211    pub async fn search(
212        &self,
213        index_name: &str,
214        query: &Query,
215    ) -> Result<SearchResult, StorageError> {
216        self.search_with_options(index_name, query, SearchTier::default(), 100).await
217    }
218
219    /// Search with explicit tier and limit options.
220    pub async fn search_with_options(
221        &self,
222        index_name: &str,
223        query: &Query,
224        tier: SearchTier,
225        limit: usize,
226    ) -> Result<SearchResult, StorageError> {
227        let start = Instant::now();
228        
229        // Get index info
230        let prefix = if let Some(ref search_state) = self.search_state {
231            search_state.read()
232                .index_manager
233                .get(index_name)
234                .map(|idx| idx.prefix.clone())
235        } else {
236            None
237        };
238
239        let prefix = prefix.unwrap_or_else(|| format!("crdt:{}:", index_name));
240
241        // Check cache first (if we have merkle root)
242        if let Some(ref search_state) = self.search_state {
243            if let Some(merkle_root) = self.get_merkle_root_for_prefix(&prefix).await {
244                let cached_keys = search_state.read().cache.get(&prefix, query, &merkle_root);
245                if let Some(keys) = cached_keys {
246                    debug!(index = %index_name, count = keys.len(), "Search cache hit");
247                    metrics::record_search_cache(true);
248                    metrics::record_search_latency("cache", start.elapsed());
249                    metrics::record_search_results(keys.len());
250                    let items = self.fetch_items_by_keys(&keys).await?;
251                    return Ok(SearchResult {
252                        items,
253                        source: SearchSource::Cache,
254                        cached: true,
255                    });
256                }
257            }
258        }
259
260        // Try RediSearch
261        let redis_start = Instant::now();
262        let redis_results = self.search_redis(index_name, query, limit).await;
263
264        match redis_results {
265            Ok(items) if !items.is_empty() => {
266                debug!(index = %index_name, count = items.len(), "RediSearch results");
267                metrics::record_search_query("redis", "success");
268                metrics::record_search_latency("redis", redis_start.elapsed());
269                metrics::record_search_results(items.len());
270                Ok(SearchResult {
271                    items,
272                    source: SearchSource::Hot,
273                    cached: false,
274                })
275            }
276            Ok(_) | Err(_) => {
277                // Record Redis attempt (empty or error)
278                if redis_results.is_err() {
279                    metrics::record_search_query("redis", "error");
280                } else {
281                    metrics::record_search_query("redis", "empty");
282                }
283                
284                // Empty or error - try SQL fallback for durable tier
285                if tier == SearchTier::RedisWithSqlFallback {
286                    let sql_start = Instant::now();
287                    // Use view: prefix for SQL search (views have the materialized data)
288                    let view_prefix = prefix.replace("crdt:", "view:");
289                    let sql_results = self.search_sql_with_prefix(query, &view_prefix, limit).await?;
290                    let is_empty = sql_results.is_empty();
291                    
292                    metrics::record_search_query("sql", "success");
293                    metrics::record_search_latency("sql", sql_start.elapsed());
294                    metrics::record_search_results(sql_results.len());
295
296                    // Cache results if we have merkle
297                    if !is_empty {
298                        if let Some(ref search_state) = self.search_state {
299                            if let Some(merkle_root) = self.get_merkle_root_for_prefix(&prefix).await {
300                                let keys: Vec<String> = sql_results.iter()
301                                    .map(|item| item.object_id.clone())
302                                    .collect();
303                                search_state.write().cache.insert(&prefix, query, merkle_root, keys);
304                            }
305                        }
306                    }
307
308                    Ok(SearchResult {
309                        items: sql_results,
310                        source: if is_empty { SearchSource::Empty } else { SearchSource::Cold },
311                        cached: false,
312                    })
313                } else {
314                    // RedisOnly tier - return empty
315                    metrics::record_search_results(0);
316                    Ok(SearchResult {
317                        items: vec![],
318                        source: SearchSource::Empty,
319                        cached: false,
320                    })
321                }
322            }
323        }
324    }
325
326    /// Search using raw RediSearch query string (Redis-only, no SQL fallback).
327    ///
328    /// Use this when you need the full power of RediSearch syntax
329    /// without the Query AST. This is an **advanced API** with caveats:
330    ///
331    /// - **No SQL fallback**: If Redis is unavailable, this will fail
332    /// - **No search cache**: Results are not cached via the merkle system
333    /// - **Manual paths**: You must use `$.payload.{field}` paths in your query
334    /// - **No translation**: The query string is passed directly to FT.SEARCH
335    ///
336    /// Prefer `search()` or `search_with_options()` for most use cases.
337    ///
338    /// # Example
339    /// ```ignore
340    /// // Raw RediSearch query with explicit payload paths
341    /// let results = engine.search_raw(
342    ///     "users",
343    ///     "@name:(Alice Smith) @age:[25 35]",
344    ///     100
345    /// ).await?;
346    /// ```
347    pub async fn search_raw(
348        &self,
349        index_name: &str,
350        query_str: &str,
351        limit: usize,
352    ) -> Result<Vec<SyncItem>, StorageError> {
353        let l2 = self.l2_store.as_ref().ok_or_else(|| {
354            StorageError::Connection("Redis not available for raw search".into())
355        })?;
356
357        let prefix = self.config.read().redis_prefix.clone().unwrap_or_default();
358        let index = format!("{}idx:{}", prefix, index_name);
359        
360        metrics::record_search_query("redis_raw", "attempt");
361        let start = std::time::Instant::now();
362        
363        let keys = l2.ft_search(&index, query_str, limit).await?;
364        let items = self.fetch_items_by_keys(&keys).await?;
365        
366        metrics::record_search_query("redis_raw", "success");
367        metrics::record_search_latency("redis_raw", start.elapsed());
368        metrics::record_search_results(items.len());
369        
370        Ok(items)
371    }
372
373    /// Direct SQL search (bypasses Redis, SQL-only).
374    ///
375    /// Queries the SQL archive directly using JSON_EXTRACT.
376    /// This is an **advanced API** with specific use cases:
377    ///
378    /// - **No Redis**: Bypasses L2 cache entirely
379    /// - **No caching**: Results are not cached via the merkle system
380    /// - **Ground truth**: Queries the durable SQL archive
381    ///
382    /// Useful for:
383    /// - Analytics queries that need complete data
384    /// - When Redis is unavailable or not trusted
385    /// - Bulk operations on archived data
386    ///
387    /// Prefer `search()` or `search_with_options()` for most use cases.
388    pub async fn search_sql(
389        &self,
390        query: &Query,
391        limit: usize,
392    ) -> Result<Vec<SyncItem>, StorageError> {
393        self.search_sql_with_prefix(query, "", limit).await
394    }
395
396    /// Search SQL with a key prefix filter.
397    ///
398    /// Only items whose `id` starts with the given prefix will be searched.
399    /// Uses the schema registry to route to the correct table for partitioned schemas.
400    pub async fn search_sql_with_prefix(
401        &self,
402        query: &Query,
403        key_prefix: &str,
404        limit: usize,
405    ) -> Result<Vec<SyncItem>, StorageError> {
406        let sql_store = self.sql_store.as_ref().ok_or_else(|| {
407            StorageError::Connection("SQL not available".into())
408        })?;
409
410        metrics::record_search_query("sql_direct", "attempt");
411        let start = std::time::Instant::now();
412
413        let sql_query = SqlTranslator::translate(query, "payload");
414        
415        // Determine which table to search based on prefix
416        // For "view:users:" prefix, look up in schema registry
417        let table = self.schema_registry.table_for_key(key_prefix);
418        
419        // Combine prefix filter with query clause
420        let (full_clause, full_params) = if key_prefix.is_empty() {
421            (sql_query.clause.clone(), sql_query.params.clone())
422        } else {
423            // Add prefix filter: id LIKE 'prefix%' AND (original query)
424            let mut params = vec![SqlParam::Text(format!("{}%", key_prefix))];
425            params.extend(sql_query.params.clone());
426            (format!("id LIKE ? AND ({})", sql_query.clause), params)
427        };
428        
429        debug!(clause = %full_clause, prefix = %key_prefix, table = %table, "SQL search");
430
431        let results = sql_store.search_in_table(table, &full_clause, &full_params, limit).await?;
432        
433        metrics::record_search_query("sql_direct", "success");
434        metrics::record_search_latency("sql_direct", start.elapsed());
435        metrics::record_search_results(results.len());
436        
437        Ok(results)
438    }
439
440    /// Count items matching a query in SQL (fast COUNT(*) without fetching data).
441    ///
442    /// Use this for exhaustiveness checks: compare Redis result count with SQL total.
443    /// This is much faster than fetching all results just to count them.
444    ///
445    /// # Example
446    /// ```ignore
447    /// let redis_results = engine.search("users", &query).await?;
448    /// let sql_count = engine.search_count_sql(&query).await?;
449    /// let exhaustive = redis_results.items.len() as u64 == sql_count;
450    /// ```
451    pub async fn search_count_sql(&self, query: &Query) -> Result<u64, StorageError> {
452        self.search_count_sql_with_prefix(query, "").await
453    }
454
455    /// Count items matching a query in SQL with a key prefix filter.
456    /// Uses the schema registry to route to the correct table for partitioned schemas.
457    pub async fn search_count_sql_with_prefix(&self, query: &Query, key_prefix: &str) -> Result<u64, StorageError> {
458        let sql_store = self.sql_store.as_ref().ok_or_else(|| {
459            StorageError::Connection("SQL not available".into())
460        })?;
461
462        let sql_query = SqlTranslator::translate(query, "payload");
463        
464        // Determine which table to search based on prefix
465        let table = self.schema_registry.table_for_key(key_prefix);
466        
467        // Combine prefix filter with query clause
468        let (full_clause, full_params) = if key_prefix.is_empty() {
469            (sql_query.clause.clone(), sql_query.params.clone())
470        } else {
471            let mut params = vec![SqlParam::Text(format!("{}%", key_prefix))];
472            params.extend(sql_query.params.clone());
473            (format!("id LIKE ? AND ({})", sql_query.clause), params)
474        };
475        
476        debug!(clause = %full_clause, prefix = %key_prefix, table = %table, "SQL count");
477
478        sql_store.count_where_in_table(table, &full_clause, &full_params).await
479    }
480
481    /// Get search cache statistics.
482    pub fn search_cache_stats(&self) -> Option<SearchCacheStats> {
483        self.search_state.as_ref().map(|s| s.read().cache.stats())
484    }
485
486    // ═══════════════════════════════════════════════════════════════════════════
487    // Internal helpers
488    // ═══════════════════════════════════════════════════════════════════════════
489
490    /// Fetch items by keys, filtering out None results
491    async fn fetch_items_by_keys(&self, keys: &[String]) -> Result<Vec<SyncItem>, StorageError> {
492        if keys.is_empty() {
493            return Ok(vec![]);
494        }
495        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
496        let results = self.get_many(&key_refs).await;
497        Ok(results.into_iter().flatten().collect())
498    }
499
500    async fn search_redis(
501        &self,
502        index_name: &str,
503        query: &Query,
504        limit: usize,
505    ) -> Result<Vec<SyncItem>, StorageError> {
506        let l2 = self.l2_store.as_ref().ok_or_else(|| {
507            StorageError::Connection("Redis not available".into())
508        })?;
509
510        let prefix = self.config.read().redis_prefix.clone().unwrap_or_default();
511        let index = format!("{}idx:{}", prefix, index_name);
512        
513        // Translate query with potential vector parameters
514        let translated = RediSearchTranslator::translate_with_params(query);
515        debug!(index = %index, query = %translated.query, has_params = %translated.has_params(), "FT.SEARCH");
516
517        let keys = if translated.has_params() {
518            // Vector search with binary parameters
519            l2.ft_search_with_params(&index, &translated.query, &translated.params, limit).await?
520        } else {
521            // Regular search
522            l2.ft_search(&index, &translated.query, limit).await?
523        };
524        
525        self.fetch_items_by_keys(&keys).await
526    }
527
528    async fn get_merkle_root_for_prefix(&self, prefix: &str) -> Option<Vec<u8>> {
529        // Extract the path segment from prefix (e.g., "crdt:users:" -> "crdt:users")
530        let path = prefix.trim_end_matches(':');
531
532        if let Some(ref merkle_cache) = self.merkle_cache {
533            if let Ok(Some(node)) = merkle_cache.get_node(path).await {
534                return Some(node.hash.to_vec());
535            }
536        }
537
538        None
539    }
540}