sync_engine/coordinator/
search_api.rs

1//! Search API for SyncEngine
2//!
3//! Provides full-text search capabilities via RediSearch with SQL fallback.
4//!
5//! # Architecture
6//!
7//! ```text
8//! search(index, query)
9//!       │
10//!       ├─→ Check SearchCache (merkle-validated)
11//!       │        │
12//!       │        └─→ Hit? Return cached keys
13//!       │
14//!       ├─→ FT.SEARCH Redis (fast)
15//!       │        │
16//!       │        └─→ Results? Return
17//!       │
18//!       └─→ Durable tier? SQL fallback
19//!                │
20//!                └─→ Cache results with merkle root
21//! ```
22
23use std::time::Instant;
24use tracing::{debug, info};
25
26use crate::metrics;
27use crate::search::{
28    IndexManager, SearchIndex, SearchCache, SearchCacheStats,
29    Query, RediSearchTranslator, SqlTranslator,
30};
31use crate::sync_item::SyncItem;
32use crate::storage::traits::StorageError;
33
34use super::SyncEngine;
35
36/// Search tier strategy
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
38pub enum SearchTier {
39    /// Redis only - no SQL fallback (for view: prefix ephemeral data)
40    RedisOnly,
41    /// Redis with SQL fallback (for crdt: prefix durable data)
42    #[default]
43    RedisWithSqlFallback,
44}
45
46/// Search result with metadata
47#[derive(Debug, Clone)]
48pub struct SearchResult {
49    /// Matching items
50    pub items: Vec<SyncItem>,
51    /// Source of results
52    pub source: SearchSource,
53    /// Whether results came from cache
54    pub cached: bool,
55}
56
57/// Where search results came from
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub enum SearchSource {
60    /// Results from RediSearch FT.SEARCH
61    Redis,
62    /// Results from SQL query
63    Sql,
64    /// Results from SearchCache
65    Cache,
66    /// No results found
67    Empty,
68}
69
70/// Search-related state for SyncEngine
71#[derive(Default)]
72pub struct SearchState {
73    /// Index manager
74    pub index_manager: IndexManager,
75    /// Search result cache
76    pub cache: SearchCache,
77}
78
79impl SyncEngine {
80    // ═══════════════════════════════════════════════════════════════════════════
81    // Search API
82    // ═══════════════════════════════════════════════════════════════════════════
83
84    /// Register a search index.
85    ///
86    /// Creates the index in RediSearch using FT.CREATE. The index will
87    /// automatically index all JSON documents with matching key prefix.
88    ///
89    /// # Example
90    ///
91    /// ```rust,no_run
92    /// # use sync_engine::{SyncEngine, search::SearchIndex};
93    /// # async fn example(engine: &SyncEngine) -> Result<(), Box<dyn std::error::Error>> {
94    /// // Define index schema
95    /// let index = SearchIndex::new("users", "crdt:users:")
96    ///     .text_sortable("name")
97    ///     .text("email")
98    ///     .numeric_sortable("age")
99    ///     .tag("roles");
100    ///
101    /// // Create in RediSearch
102    /// engine.create_search_index(index).await?;
103    /// # Ok(())
104    /// # }
105    /// ```
106    pub async fn create_search_index(&self, index: SearchIndex) -> Result<(), StorageError> {
107        let l2 = self.l2_store.as_ref().ok_or_else(|| {
108            StorageError::Connection("Redis not available for search index".into())
109        })?;
110
111        // Build FT.CREATE command with global redis prefix
112        let redis_prefix = self.config.redis_prefix.as_deref();
113        let args = index.to_ft_create_args_with_prefix(redis_prefix);
114        debug!(index = %index.name, prefix = %index.prefix, redis_prefix = ?redis_prefix, "Creating search index");
115
116        // Execute FT.CREATE via raw Redis command
117        match l2.ft_create(&args).await {
118            Ok(()) => {
119                metrics::record_search_index_operation("create", true);
120                
121                // Register in index manager
122                if let Some(ref search_state) = self.search_state {
123                    search_state.write().index_manager.register(index);
124                }
125
126                info!(index = %args[0], "Search index created");
127                Ok(())
128            }
129            Err(e) => {
130                metrics::record_search_index_operation("create", false);
131                Err(e)
132            }
133        }
134    }
135
136    /// Drop a search index.
137    ///
138    /// Removes the index from RediSearch. Does not delete the indexed documents.
139    pub async fn drop_search_index(&self, name: &str) -> Result<(), StorageError> {
140        let l2 = self.l2_store.as_ref().ok_or_else(|| {
141            StorageError::Connection("Redis not available".into())
142        })?;
143
144        let prefix = self.config.redis_prefix.as_deref().unwrap_or("");
145        let index_name = format!("{}idx:{}", prefix, name);
146        
147        match l2.ft_dropindex(&index_name).await {
148            Ok(()) => {
149                metrics::record_search_index_operation("drop", true);
150                info!(index = %index_name, "Search index dropped");
151                Ok(())
152            }
153            Err(e) => {
154                metrics::record_search_index_operation("drop", false);
155                Err(e)
156            }
157        }
158    }
159
160    /// Search for items using RediSearch query syntax.
161    ///
162    /// Searches the specified index using FT.SEARCH. For durable data (crdt: prefix),
163    /// falls back to SQL if Redis returns no results.
164    ///
165    /// # Arguments
166    ///
167    /// * `index_name` - Name of the search index (without "idx:" prefix)
168    /// * `query` - Query AST built with `Query::` constructors
169    ///
170    /// # Example
171    ///
172    /// ```rust,no_run
173    /// # use sync_engine::{SyncEngine, search::Query};
174    /// # async fn example(engine: &SyncEngine) -> Result<(), Box<dyn std::error::Error>> {
175    /// // Simple field query
176    /// let results = engine.search("users", &Query::field_eq("name", "Alice")).await?;
177    ///
178    /// // Complex query with AND/OR
179    /// let query = Query::field_eq("status", "active")
180    ///     .and(Query::numeric_range("age", Some(25.0), Some(40.0)));
181    /// let results = engine.search("users", &query).await?;
182    ///
183    /// for item in results.items {
184    ///     println!("Found: {}", item.object_id);
185    /// }
186    /// # Ok(())
187    /// # }
188    /// ```
189    pub async fn search(
190        &self,
191        index_name: &str,
192        query: &Query,
193    ) -> Result<SearchResult, StorageError> {
194        self.search_with_options(index_name, query, SearchTier::default(), 100).await
195    }
196
197    /// Search with explicit tier and limit options.
198    pub async fn search_with_options(
199        &self,
200        index_name: &str,
201        query: &Query,
202        tier: SearchTier,
203        limit: usize,
204    ) -> Result<SearchResult, StorageError> {
205        let start = Instant::now();
206        
207        // Get index info
208        let prefix = if let Some(ref search_state) = self.search_state {
209            search_state.read()
210                .index_manager
211                .get(index_name)
212                .map(|idx| idx.prefix.clone())
213        } else {
214            None
215        };
216
217        let prefix = prefix.unwrap_or_else(|| format!("crdt:{}:", index_name));
218
219        // Check cache first (if we have merkle root)
220        if let Some(ref search_state) = self.search_state {
221            if let Some(merkle_root) = self.get_merkle_root_for_prefix(&prefix).await {
222                let cached_keys = search_state.read().cache.get(&prefix, query, &merkle_root);
223                if let Some(keys) = cached_keys {
224                    debug!(index = %index_name, count = keys.len(), "Search cache hit");
225                    metrics::record_search_cache(true);
226                    metrics::record_search_latency("cache", start.elapsed());
227                    metrics::record_search_results(keys.len());
228                    let items = self.fetch_items_by_keys(&keys).await?;
229                    return Ok(SearchResult {
230                        items,
231                        source: SearchSource::Cache,
232                        cached: true,
233                    });
234                }
235            }
236        }
237
238        // Try RediSearch
239        let redis_start = Instant::now();
240        let redis_results = self.search_redis(index_name, query, limit).await;
241
242        match redis_results {
243            Ok(items) if !items.is_empty() => {
244                debug!(index = %index_name, count = items.len(), "RediSearch results");
245                metrics::record_search_query("redis", "success");
246                metrics::record_search_latency("redis", redis_start.elapsed());
247                metrics::record_search_results(items.len());
248                Ok(SearchResult {
249                    items,
250                    source: SearchSource::Redis,
251                    cached: false,
252                })
253            }
254            Ok(_) | Err(_) => {
255                // Record Redis attempt (empty or error)
256                if redis_results.is_err() {
257                    metrics::record_search_query("redis", "error");
258                } else {
259                    metrics::record_search_query("redis", "empty");
260                }
261                
262                // Empty or error - try SQL fallback for durable tier
263                if tier == SearchTier::RedisWithSqlFallback {
264                    let sql_start = Instant::now();
265                    let sql_results = self.search_sql(query, limit).await?;
266                    let is_empty = sql_results.is_empty();
267                    
268                    metrics::record_search_query("sql", "success");
269                    metrics::record_search_latency("sql", sql_start.elapsed());
270                    metrics::record_search_results(sql_results.len());
271
272                    // Cache results if we have merkle
273                    if !is_empty {
274                        if let Some(ref search_state) = self.search_state {
275                            if let Some(merkle_root) = self.get_merkle_root_for_prefix(&prefix).await {
276                                let keys: Vec<String> = sql_results.iter()
277                                    .map(|item| item.object_id.clone())
278                                    .collect();
279                                search_state.write().cache.insert(&prefix, query, merkle_root, keys);
280                            }
281                        }
282                    }
283
284                    Ok(SearchResult {
285                        items: sql_results,
286                        source: if is_empty { SearchSource::Empty } else { SearchSource::Sql },
287                        cached: false,
288                    })
289                } else {
290                    // RedisOnly tier - return empty
291                    metrics::record_search_results(0);
292                    Ok(SearchResult {
293                        items: vec![],
294                        source: SearchSource::Empty,
295                        cached: false,
296                    })
297                }
298            }
299        }
300    }
301
302    /// Search using raw RediSearch query string (Redis-only, no SQL fallback).
303    ///
304    /// Use this when you need the full power of RediSearch syntax
305    /// without the Query AST. This is an **advanced API** with caveats:
306    ///
307    /// - **No SQL fallback**: If Redis is unavailable, this will fail
308    /// - **No search cache**: Results are not cached via the merkle system
309    /// - **Manual paths**: You must use `$.payload.{field}` paths in your query
310    /// - **No translation**: The query string is passed directly to FT.SEARCH
311    ///
312    /// Prefer `search()` or `search_with_options()` for most use cases.
313    ///
314    /// # Example
315    /// ```ignore
316    /// // Raw RediSearch query with explicit payload paths
317    /// let results = engine.search_raw(
318    ///     "users",
319    ///     "@name:(Alice Smith) @age:[25 35]",
320    ///     100
321    /// ).await?;
322    /// ```
323    pub async fn search_raw(
324        &self,
325        index_name: &str,
326        query_str: &str,
327        limit: usize,
328    ) -> Result<Vec<SyncItem>, StorageError> {
329        let l2 = self.l2_store.as_ref().ok_or_else(|| {
330            StorageError::Connection("Redis not available for raw search".into())
331        })?;
332
333        let prefix = self.config.redis_prefix.as_deref().unwrap_or("");
334        let index = format!("{}idx:{}", prefix, index_name);
335        
336        metrics::record_search_query("redis_raw", "attempt");
337        let start = std::time::Instant::now();
338        
339        let keys = l2.ft_search(&index, query_str, limit).await?;
340        let items = self.fetch_items_by_keys(&keys).await?;
341        
342        metrics::record_search_query("redis_raw", "success");
343        metrics::record_search_latency("redis_raw", start.elapsed());
344        metrics::record_search_results(items.len());
345        
346        Ok(items)
347    }
348
349    /// Direct SQL search (bypasses Redis, SQL-only).
350    ///
351    /// Queries the SQL archive directly using JSON_EXTRACT.
352    /// This is an **advanced API** with specific use cases:
353    ///
354    /// - **No Redis**: Bypasses L2 cache entirely
355    /// - **No caching**: Results are not cached via the merkle system
356    /// - **Ground truth**: Queries the durable SQL archive
357    ///
358    /// Useful for:
359    /// - Analytics queries that need complete data
360    /// - When Redis is unavailable or not trusted
361    /// - Bulk operations on archived data
362    ///
363    /// Prefer `search()` or `search_with_options()` for most use cases.
364    pub async fn search_sql(
365        &self,
366        query: &Query,
367        limit: usize,
368    ) -> Result<Vec<SyncItem>, StorageError> {
369        let l3 = self.l3_store.as_ref().ok_or_else(|| {
370            StorageError::Connection("SQL not available".into())
371        })?;
372
373        metrics::record_search_query("sql_direct", "attempt");
374        let start = std::time::Instant::now();
375
376        let sql_query = SqlTranslator::translate(query, "data");
377        debug!(clause = %sql_query.clause, "SQL search");
378
379        let results = l3.search(&sql_query.clause, &sql_query.params, limit).await?;
380        
381        metrics::record_search_query("sql_direct", "success");
382        metrics::record_search_latency("sql_direct", start.elapsed());
383        metrics::record_search_results(results.len());
384        
385        Ok(results)
386    }
387
388    /// Get search cache statistics.
389    pub fn search_cache_stats(&self) -> Option<SearchCacheStats> {
390        self.search_state.as_ref().map(|s| s.read().cache.stats())
391    }
392
393    // ═══════════════════════════════════════════════════════════════════════════
394    // Internal helpers
395    // ═══════════════════════════════════════════════════════════════════════════
396
397    /// Fetch items by keys, filtering out None results
398    async fn fetch_items_by_keys(&self, keys: &[String]) -> Result<Vec<SyncItem>, StorageError> {
399        if keys.is_empty() {
400            return Ok(vec![]);
401        }
402        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
403        let results = self.get_many(&key_refs).await;
404        Ok(results.into_iter().flatten().collect())
405    }
406
407    async fn search_redis(
408        &self,
409        index_name: &str,
410        query: &Query,
411        limit: usize,
412    ) -> Result<Vec<SyncItem>, StorageError> {
413        let l2 = self.l2_store.as_ref().ok_or_else(|| {
414            StorageError::Connection("Redis not available".into())
415        })?;
416
417        let prefix = self.config.redis_prefix.as_deref().unwrap_or("");
418        let index = format!("{}idx:{}", prefix, index_name);
419        let query_str = RediSearchTranslator::translate(query);
420        debug!(index = %index, query = %query_str, "FT.SEARCH");
421
422        let keys = l2.ft_search(&index, &query_str, limit).await?;
423        self.fetch_items_by_keys(&keys).await
424    }
425
426    async fn get_merkle_root_for_prefix(&self, prefix: &str) -> Option<Vec<u8>> {
427        // Extract the path segment from prefix (e.g., "crdt:users:" -> "crdt:users")
428        let path = prefix.trim_end_matches(':');
429
430        if let Some(ref redis_merkle) = self.redis_merkle {
431            if let Ok(Some(node)) = redis_merkle.get_node(path).await {
432                return Some(node.hash.to_vec());
433            }
434        }
435
436        None
437    }
438}