sync_engine/coordinator/
search_api.rs

1//! Search API for SyncEngine
2//!
3//! Provides full-text search capabilities via RediSearch with SQL fallback.
4//!
5//! # Architecture
6//!
7//! ```text
8//! search(index, query)
9//!       │
10//!       ├─→ Check SearchCache (merkle-validated)
11//!       │        │
12//!       │        └─→ Hit? Return cached keys
13//!       │
14//!       ├─→ FT.SEARCH Redis (fast)
15//!       │        │
16//!       │        └─→ Results? Return
17//!       │
18//!       └─→ Durable tier? SQL fallback
19//!                │
20//!                └─→ Cache results with merkle root
21//! ```
22
23use std::time::Instant;
24use tracing::{debug, info};
25
26use crate::metrics;
27use crate::search::{
28    IndexManager, SearchIndex, SearchCache, SearchCacheStats,
29    Query, RediSearchTranslator, SqlTranslator,
30};
31use crate::sync_item::SyncItem;
32use crate::storage::traits::StorageError;
33
34use super::SyncEngine;
35
36/// Search tier strategy
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
38pub enum SearchTier {
39    /// Redis only - no SQL fallback (for view: prefix ephemeral data)
40    RedisOnly,
41    /// Redis with SQL fallback (for crdt: prefix durable data)
42    #[default]
43    RedisWithSqlFallback,
44}
45
46/// Search result with metadata
47#[derive(Debug, Clone)]
48pub struct SearchResult {
49    /// Matching items
50    pub items: Vec<SyncItem>,
51    /// Source of results
52    pub source: SearchSource,
53    /// Whether results came from cache
54    pub cached: bool,
55}
56
57/// Where search results came from
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub enum SearchSource {
60    /// Results from RediSearch FT.SEARCH
61    Redis,
62    /// Results from SQL query
63    Sql,
64    /// Results from SearchCache
65    Cache,
66    /// No results found
67    Empty,
68}
69
70/// Search-related state for SyncEngine
71pub struct SearchState {
72    /// Index manager
73    pub index_manager: IndexManager,
74    /// Search result cache
75    pub cache: SearchCache,
76}
77
78impl Default for SearchState {
79    fn default() -> Self {
80        Self {
81            index_manager: IndexManager::new(),
82            cache: SearchCache::default(),
83        }
84    }
85}
86
87impl SyncEngine {
88    // ═══════════════════════════════════════════════════════════════════════════
89    // Search API
90    // ═══════════════════════════════════════════════════════════════════════════
91
92    /// Register a search index.
93    ///
94    /// Creates the index in RediSearch using FT.CREATE. The index will
95    /// automatically index all JSON documents with matching key prefix.
96    ///
97    /// # Example
98    ///
99    /// ```rust,no_run
100    /// # use sync_engine::{SyncEngine, search::SearchIndex};
101    /// # async fn example(engine: &SyncEngine) -> Result<(), Box<dyn std::error::Error>> {
102    /// // Define index schema
103    /// let index = SearchIndex::new("users", "crdt:users:")
104    ///     .text_sortable("name")
105    ///     .text("email")
106    ///     .numeric_sortable("age")
107    ///     .tag("roles");
108    ///
109    /// // Create in RediSearch
110    /// engine.create_search_index(index).await?;
111    /// # Ok(())
112    /// # }
113    /// ```
114    pub async fn create_search_index(&self, index: SearchIndex) -> Result<(), StorageError> {
115        let l2 = self.l2_store.as_ref().ok_or_else(|| {
116            StorageError::Connection("Redis not available for search index".into())
117        })?;
118
119        // Build FT.CREATE command with global redis prefix
120        let redis_prefix = self.config.redis_prefix.as_deref();
121        let args = index.to_ft_create_args_with_prefix(redis_prefix);
122        debug!(index = %index.name, prefix = %index.prefix, redis_prefix = ?redis_prefix, "Creating search index");
123
124        // Execute FT.CREATE via raw Redis command
125        match l2.ft_create(&args).await {
126            Ok(()) => {
127                metrics::record_search_index_operation("create", true);
128                
129                // Register in index manager
130                if let Some(ref search_state) = self.search_state {
131                    search_state.write().index_manager.register(index);
132                }
133
134                info!(index = %args[0], "Search index created");
135                Ok(())
136            }
137            Err(e) => {
138                metrics::record_search_index_operation("create", false);
139                Err(e)
140            }
141        }
142    }
143
144    /// Drop a search index.
145    ///
146    /// Removes the index from RediSearch. Does not delete the indexed documents.
147    pub async fn drop_search_index(&self, name: &str) -> Result<(), StorageError> {
148        let l2 = self.l2_store.as_ref().ok_or_else(|| {
149            StorageError::Connection("Redis not available".into())
150        })?;
151
152        let prefix = self.config.redis_prefix.as_deref().unwrap_or("");
153        let index_name = format!("{}idx:{}", prefix, name);
154        
155        match l2.ft_dropindex(&index_name).await {
156            Ok(()) => {
157                metrics::record_search_index_operation("drop", true);
158                info!(index = %index_name, "Search index dropped");
159                Ok(())
160            }
161            Err(e) => {
162                metrics::record_search_index_operation("drop", false);
163                Err(e)
164            }
165        }
166    }
167
168    /// Search for items using RediSearch query syntax.
169    ///
170    /// Searches the specified index using FT.SEARCH. For durable data (crdt: prefix),
171    /// falls back to SQL if Redis returns no results.
172    ///
173    /// # Arguments
174    ///
175    /// * `index_name` - Name of the search index (without "idx:" prefix)
176    /// * `query` - Query AST built with `Query::` constructors
177    ///
178    /// # Example
179    ///
180    /// ```rust,no_run
181    /// # use sync_engine::{SyncEngine, search::Query};
182    /// # async fn example(engine: &SyncEngine) -> Result<(), Box<dyn std::error::Error>> {
183    /// // Simple field query
184    /// let results = engine.search("users", &Query::field_eq("name", "Alice")).await?;
185    ///
186    /// // Complex query with AND/OR
187    /// let query = Query::field_eq("status", "active")
188    ///     .and(Query::numeric_range("age", Some(25.0), Some(40.0)));
189    /// let results = engine.search("users", &query).await?;
190    ///
191    /// for item in results.items {
192    ///     println!("Found: {}", item.object_id);
193    /// }
194    /// # Ok(())
195    /// # }
196    /// ```
197    pub async fn search(
198        &self,
199        index_name: &str,
200        query: &Query,
201    ) -> Result<SearchResult, StorageError> {
202        self.search_with_options(index_name, query, SearchTier::default(), 100).await
203    }
204
205    /// Search with explicit tier and limit options.
206    pub async fn search_with_options(
207        &self,
208        index_name: &str,
209        query: &Query,
210        tier: SearchTier,
211        limit: usize,
212    ) -> Result<SearchResult, StorageError> {
213        let start = Instant::now();
214        
215        // Get index info
216        let prefix = if let Some(ref search_state) = self.search_state {
217            search_state.read()
218                .index_manager
219                .get(index_name)
220                .map(|idx| idx.prefix.clone())
221        } else {
222            None
223        };
224
225        let prefix = prefix.unwrap_or_else(|| format!("crdt:{}:", index_name));
226
227        // Check cache first (if we have merkle root)
228        if let Some(ref search_state) = self.search_state {
229            if let Some(merkle_root) = self.get_merkle_root_for_prefix(&prefix).await {
230                let cached_keys = search_state.read().cache.get(&prefix, query, &merkle_root);
231                if let Some(keys) = cached_keys {
232                    debug!(index = %index_name, count = keys.len(), "Search cache hit");
233                    metrics::record_search_cache(true);
234                    metrics::record_search_latency("cache", start.elapsed());
235                    metrics::record_search_results(keys.len());
236                    let items = self.fetch_items_by_keys(&keys).await?;
237                    return Ok(SearchResult {
238                        items,
239                        source: SearchSource::Cache,
240                        cached: true,
241                    });
242                }
243            }
244        }
245
246        // Try RediSearch
247        let redis_start = Instant::now();
248        let redis_results = self.search_redis(index_name, query, limit).await;
249
250        match redis_results {
251            Ok(items) if !items.is_empty() => {
252                debug!(index = %index_name, count = items.len(), "RediSearch results");
253                metrics::record_search_query("redis", "success");
254                metrics::record_search_latency("redis", redis_start.elapsed());
255                metrics::record_search_results(items.len());
256                Ok(SearchResult {
257                    items,
258                    source: SearchSource::Redis,
259                    cached: false,
260                })
261            }
262            Ok(_) | Err(_) => {
263                // Record Redis attempt (empty or error)
264                if redis_results.is_err() {
265                    metrics::record_search_query("redis", "error");
266                } else {
267                    metrics::record_search_query("redis", "empty");
268                }
269                
270                // Empty or error - try SQL fallback for durable tier
271                if tier == SearchTier::RedisWithSqlFallback {
272                    let sql_start = Instant::now();
273                    let sql_results = self.search_sql(query, limit).await?;
274                    let is_empty = sql_results.is_empty();
275                    
276                    metrics::record_search_query("sql", "success");
277                    metrics::record_search_latency("sql", sql_start.elapsed());
278                    metrics::record_search_results(sql_results.len());
279
280                    // Cache results if we have merkle
281                    if !is_empty {
282                        if let Some(ref search_state) = self.search_state {
283                            if let Some(merkle_root) = self.get_merkle_root_for_prefix(&prefix).await {
284                                let keys: Vec<String> = sql_results.iter()
285                                    .map(|item| item.object_id.clone())
286                                    .collect();
287                                search_state.write().cache.insert(&prefix, query, merkle_root, keys);
288                            }
289                        }
290                    }
291
292                    Ok(SearchResult {
293                        items: sql_results,
294                        source: if is_empty { SearchSource::Empty } else { SearchSource::Sql },
295                        cached: false,
296                    })
297                } else {
298                    // RedisOnly tier - return empty
299                    metrics::record_search_results(0);
300                    Ok(SearchResult {
301                        items: vec![],
302                        source: SearchSource::Empty,
303                        cached: false,
304                    })
305                }
306            }
307        }
308    }
309
310    /// Search using raw RediSearch query string (Redis-only, no SQL fallback).
311    ///
312    /// Use this when you need the full power of RediSearch syntax
313    /// without the Query AST. This is an **advanced API** with caveats:
314    ///
315    /// - **No SQL fallback**: If Redis is unavailable, this will fail
316    /// - **No search cache**: Results are not cached via the merkle system
317    /// - **Manual paths**: You must use `$.payload.{field}` paths in your query
318    /// - **No translation**: The query string is passed directly to FT.SEARCH
319    ///
320    /// Prefer `search()` or `search_with_options()` for most use cases.
321    ///
322    /// # Example
323    /// ```ignore
324    /// // Raw RediSearch query with explicit payload paths
325    /// let results = engine.search_raw(
326    ///     "users",
327    ///     "@name:(Alice Smith) @age:[25 35]",
328    ///     100
329    /// ).await?;
330    /// ```
331    pub async fn search_raw(
332        &self,
333        index_name: &str,
334        query_str: &str,
335        limit: usize,
336    ) -> Result<Vec<SyncItem>, StorageError> {
337        let l2 = self.l2_store.as_ref().ok_or_else(|| {
338            StorageError::Connection("Redis not available for raw search".into())
339        })?;
340
341        let prefix = self.config.redis_prefix.as_deref().unwrap_or("");
342        let index = format!("{}idx:{}", prefix, index_name);
343        
344        metrics::record_search_query("redis_raw", "attempt");
345        let start = std::time::Instant::now();
346        
347        let keys = l2.ft_search(&index, query_str, limit).await?;
348        let items = self.fetch_items_by_keys(&keys).await?;
349        
350        metrics::record_search_query("redis_raw", "success");
351        metrics::record_search_latency("redis_raw", start.elapsed());
352        metrics::record_search_results(items.len());
353        
354        Ok(items)
355    }
356
357    /// Direct SQL search (bypasses Redis, SQL-only).
358    ///
359    /// Queries the SQL archive directly using JSON_EXTRACT.
360    /// This is an **advanced API** with specific use cases:
361    ///
362    /// - **No Redis**: Bypasses L2 cache entirely
363    /// - **No caching**: Results are not cached via the merkle system
364    /// - **Ground truth**: Queries the durable SQL archive
365    ///
366    /// Useful for:
367    /// - Analytics queries that need complete data
368    /// - When Redis is unavailable or not trusted
369    /// - Bulk operations on archived data
370    ///
371    /// Prefer `search()` or `search_with_options()` for most use cases.
372    pub async fn search_sql(
373        &self,
374        query: &Query,
375        limit: usize,
376    ) -> Result<Vec<SyncItem>, StorageError> {
377        let l3 = self.l3_store.as_ref().ok_or_else(|| {
378            StorageError::Connection("SQL not available".into())
379        })?;
380
381        metrics::record_search_query("sql_direct", "attempt");
382        let start = std::time::Instant::now();
383
384        let sql_query = SqlTranslator::translate(query, "data");
385        debug!(clause = %sql_query.clause, "SQL search");
386
387        let results = l3.search(&sql_query.clause, &sql_query.params, limit).await?;
388        
389        metrics::record_search_query("sql_direct", "success");
390        metrics::record_search_latency("sql_direct", start.elapsed());
391        metrics::record_search_results(results.len());
392        
393        Ok(results)
394    }
395
396    /// Get search cache statistics.
397    pub fn search_cache_stats(&self) -> Option<SearchCacheStats> {
398        self.search_state.as_ref().map(|s| s.read().cache.stats())
399    }
400
401    // ═══════════════════════════════════════════════════════════════════════════
402    // Internal helpers
403    // ═══════════════════════════════════════════════════════════════════════════
404
405    /// Fetch items by keys, filtering out None results
406    async fn fetch_items_by_keys(&self, keys: &[String]) -> Result<Vec<SyncItem>, StorageError> {
407        if keys.is_empty() {
408            return Ok(vec![]);
409        }
410        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
411        let results = self.get_many(&key_refs).await;
412        Ok(results.into_iter().flatten().collect())
413    }
414
415    async fn search_redis(
416        &self,
417        index_name: &str,
418        query: &Query,
419        limit: usize,
420    ) -> Result<Vec<SyncItem>, StorageError> {
421        let l2 = self.l2_store.as_ref().ok_or_else(|| {
422            StorageError::Connection("Redis not available".into())
423        })?;
424
425        let prefix = self.config.redis_prefix.as_deref().unwrap_or("");
426        let index = format!("{}idx:{}", prefix, index_name);
427        let query_str = RediSearchTranslator::translate(query);
428        debug!(index = %index, query = %query_str, "FT.SEARCH");
429
430        let keys = l2.ft_search(&index, &query_str, limit).await?;
431        self.fetch_items_by_keys(&keys).await
432    }
433
434    async fn get_merkle_root_for_prefix(&self, prefix: &str) -> Option<Vec<u8>> {
435        // Extract the path segment from prefix (e.g., "crdt:users:" -> "crdt:users")
436        let path = prefix.trim_end_matches(':');
437
438        if let Some(ref redis_merkle) = self.redis_merkle {
439            if let Ok(Some(node)) = redis_merkle.get_node(path).await {
440                return Some(node.hash.to_vec());
441            }
442        }
443
444        None
445    }
446}