sync_engine/coordinator/
search_api.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Search API for SyncEngine
5//!
6//! Provides full-text search capabilities via RediSearch with SQL fallback.
7//!
8//! # Architecture
9//!
10//! ```text
11//! search(index, query)
12//!       │
13//!       ├─→ Check SearchCache (merkle-validated)
14//!       │        │
15//!       │        └─→ Hit? Return cached keys
16//!       │
17//!       ├─→ FT.SEARCH Redis (fast)
18//!       │        │
19//!       │        └─→ Results? Return
20//!       │
21//!       └─→ Durable tier? SQL fallback
22//!                │
23//!                └─→ Cache results with merkle root
24//! ```
25
26use std::time::Instant;
27use tracing::{debug, info};
28
29use crate::metrics;
30use crate::search::{
31    IndexManager, SearchIndex, SearchCache, SearchCacheStats,
32    Query, RediSearchTranslator, SqlTranslator,
33};
34use crate::sync_item::SyncItem;
35use crate::storage::traits::StorageError;
36
37use super::SyncEngine;
38
39/// Search tier strategy
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
41pub enum SearchTier {
42    /// Redis only - no SQL fallback (for view: prefix ephemeral data)
43    RedisOnly,
44    /// Redis with SQL fallback (for crdt: prefix durable data)
45    #[default]
46    RedisWithSqlFallback,
47}
48
49/// Search result with metadata
50#[derive(Debug, Clone)]
51pub struct SearchResult {
52    /// Matching items
53    pub items: Vec<SyncItem>,
54    /// Source of results
55    pub source: SearchSource,
56    /// Whether results came from cache
57    pub cached: bool,
58}
59
60/// Where search results came from
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub enum SearchSource {
63    /// Results from RediSearch FT.SEARCH
64    Redis,
65    /// Results from SQL query
66    Sql,
67    /// Results from SearchCache
68    Cache,
69    /// No results found
70    Empty,
71}
72
73/// Search-related state for SyncEngine
74#[derive(Default)]
75pub struct SearchState {
76    /// Index manager
77    pub index_manager: IndexManager,
78    /// Search result cache
79    pub cache: SearchCache,
80}
81
82impl SyncEngine {
83    // ═══════════════════════════════════════════════════════════════════════════
84    // Search API
85    // ═══════════════════════════════════════════════════════════════════════════
86
87    /// Register a search index.
88    ///
89    /// Creates the index in RediSearch using FT.CREATE. The index will
90    /// automatically index all JSON documents with matching key prefix.
91    ///
92    /// # Example
93    ///
94    /// ```rust,no_run
95    /// # use sync_engine::{SyncEngine, search::SearchIndex};
96    /// # async fn example(engine: &SyncEngine) -> Result<(), Box<dyn std::error::Error>> {
97    /// // Define index schema
98    /// let index = SearchIndex::new("users", "crdt:users:")
99    ///     .text_sortable("name")
100    ///     .text("email")
101    ///     .numeric_sortable("age")
102    ///     .tag("roles");
103    ///
104    /// // Create in RediSearch
105    /// engine.create_search_index(index).await?;
106    /// # Ok(())
107    /// # }
108    /// ```
109    pub async fn create_search_index(&self, index: SearchIndex) -> Result<(), StorageError> {
110        let l2 = self.l2_store.as_ref().ok_or_else(|| {
111            StorageError::Connection("Redis not available for search index".into())
112        })?;
113
114        // Build FT.CREATE command with global redis prefix
115        let redis_prefix = self.config.read().redis_prefix.clone();
116        let args = index.to_ft_create_args_with_prefix(redis_prefix.as_deref());
117        debug!(index = %index.name, prefix = %index.prefix, redis_prefix = ?redis_prefix, "Creating search index");
118
119        // Execute FT.CREATE via raw Redis command
120        match l2.ft_create(&args).await {
121            Ok(()) => {
122                metrics::record_search_index_operation("create", true);
123                
124                // Register in index manager
125                if let Some(ref search_state) = self.search_state {
126                    search_state.write().index_manager.register(index);
127                }
128
129                info!(index = %args[0], "Search index created");
130                Ok(())
131            }
132            Err(e) => {
133                metrics::record_search_index_operation("create", false);
134                Err(e)
135            }
136        }
137    }
138
139    /// Drop a search index.
140    ///
141    /// Removes the index from RediSearch. Does not delete the indexed documents.
142    pub async fn drop_search_index(&self, name: &str) -> Result<(), StorageError> {
143        let l2 = self.l2_store.as_ref().ok_or_else(|| {
144            StorageError::Connection("Redis not available".into())
145        })?;
146
147        let prefix = self.config.read().redis_prefix.clone().unwrap_or_default();
148        let index_name = format!("{}idx:{}", prefix, name);
149        
150        match l2.ft_dropindex(&index_name).await {
151            Ok(()) => {
152                metrics::record_search_index_operation("drop", true);
153                info!(index = %index_name, "Search index dropped");
154                Ok(())
155            }
156            Err(e) => {
157                metrics::record_search_index_operation("drop", false);
158                Err(e)
159            }
160        }
161    }
162
163    /// Search for items using RediSearch query syntax.
164    ///
165    /// Searches the specified index using FT.SEARCH. For durable data (crdt: prefix),
166    /// falls back to SQL if Redis returns no results.
167    ///
168    /// # Arguments
169    ///
170    /// * `index_name` - Name of the search index (without "idx:" prefix)
171    /// * `query` - Query AST built with `Query::` constructors
172    ///
173    /// # Example
174    ///
175    /// ```rust,no_run
176    /// # use sync_engine::{SyncEngine, search::Query};
177    /// # async fn example(engine: &SyncEngine) -> Result<(), Box<dyn std::error::Error>> {
178    /// // Simple field query
179    /// let results = engine.search("users", &Query::field_eq("name", "Alice")).await?;
180    ///
181    /// // Complex query with AND/OR
182    /// let query = Query::field_eq("status", "active")
183    ///     .and(Query::numeric_range("age", Some(25.0), Some(40.0)));
184    /// let results = engine.search("users", &query).await?;
185    ///
186    /// for item in results.items {
187    ///     println!("Found: {}", item.object_id);
188    /// }
189    /// # Ok(())
190    /// # }
191    /// ```
192    pub async fn search(
193        &self,
194        index_name: &str,
195        query: &Query,
196    ) -> Result<SearchResult, StorageError> {
197        self.search_with_options(index_name, query, SearchTier::default(), 100).await
198    }
199
200    /// Search with explicit tier and limit options.
201    pub async fn search_with_options(
202        &self,
203        index_name: &str,
204        query: &Query,
205        tier: SearchTier,
206        limit: usize,
207    ) -> Result<SearchResult, StorageError> {
208        let start = Instant::now();
209        
210        // Get index info
211        let prefix = if let Some(ref search_state) = self.search_state {
212            search_state.read()
213                .index_manager
214                .get(index_name)
215                .map(|idx| idx.prefix.clone())
216        } else {
217            None
218        };
219
220        let prefix = prefix.unwrap_or_else(|| format!("crdt:{}:", index_name));
221
222        // Check cache first (if we have merkle root)
223        if let Some(ref search_state) = self.search_state {
224            if let Some(merkle_root) = self.get_merkle_root_for_prefix(&prefix).await {
225                let cached_keys = search_state.read().cache.get(&prefix, query, &merkle_root);
226                if let Some(keys) = cached_keys {
227                    debug!(index = %index_name, count = keys.len(), "Search cache hit");
228                    metrics::record_search_cache(true);
229                    metrics::record_search_latency("cache", start.elapsed());
230                    metrics::record_search_results(keys.len());
231                    let items = self.fetch_items_by_keys(&keys).await?;
232                    return Ok(SearchResult {
233                        items,
234                        source: SearchSource::Cache,
235                        cached: true,
236                    });
237                }
238            }
239        }
240
241        // Try RediSearch
242        let redis_start = Instant::now();
243        let redis_results = self.search_redis(index_name, query, limit).await;
244
245        match redis_results {
246            Ok(items) if !items.is_empty() => {
247                debug!(index = %index_name, count = items.len(), "RediSearch results");
248                metrics::record_search_query("redis", "success");
249                metrics::record_search_latency("redis", redis_start.elapsed());
250                metrics::record_search_results(items.len());
251                Ok(SearchResult {
252                    items,
253                    source: SearchSource::Redis,
254                    cached: false,
255                })
256            }
257            Ok(_) | Err(_) => {
258                // Record Redis attempt (empty or error)
259                if redis_results.is_err() {
260                    metrics::record_search_query("redis", "error");
261                } else {
262                    metrics::record_search_query("redis", "empty");
263                }
264                
265                // Empty or error - try SQL fallback for durable tier
266                if tier == SearchTier::RedisWithSqlFallback {
267                    let sql_start = Instant::now();
268                    let sql_results = self.search_sql(query, limit).await?;
269                    let is_empty = sql_results.is_empty();
270                    
271                    metrics::record_search_query("sql", "success");
272                    metrics::record_search_latency("sql", sql_start.elapsed());
273                    metrics::record_search_results(sql_results.len());
274
275                    // Cache results if we have merkle
276                    if !is_empty {
277                        if let Some(ref search_state) = self.search_state {
278                            if let Some(merkle_root) = self.get_merkle_root_for_prefix(&prefix).await {
279                                let keys: Vec<String> = sql_results.iter()
280                                    .map(|item| item.object_id.clone())
281                                    .collect();
282                                search_state.write().cache.insert(&prefix, query, merkle_root, keys);
283                            }
284                        }
285                    }
286
287                    Ok(SearchResult {
288                        items: sql_results,
289                        source: if is_empty { SearchSource::Empty } else { SearchSource::Sql },
290                        cached: false,
291                    })
292                } else {
293                    // RedisOnly tier - return empty
294                    metrics::record_search_results(0);
295                    Ok(SearchResult {
296                        items: vec![],
297                        source: SearchSource::Empty,
298                        cached: false,
299                    })
300                }
301            }
302        }
303    }
304
305    /// Search using raw RediSearch query string (Redis-only, no SQL fallback).
306    ///
307    /// Use this when you need the full power of RediSearch syntax
308    /// without the Query AST. This is an **advanced API** with caveats:
309    ///
310    /// - **No SQL fallback**: If Redis is unavailable, this will fail
311    /// - **No search cache**: Results are not cached via the merkle system
312    /// - **Manual paths**: You must use `$.payload.{field}` paths in your query
313    /// - **No translation**: The query string is passed directly to FT.SEARCH
314    ///
315    /// Prefer `search()` or `search_with_options()` for most use cases.
316    ///
317    /// # Example
318    /// ```ignore
319    /// // Raw RediSearch query with explicit payload paths
320    /// let results = engine.search_raw(
321    ///     "users",
322    ///     "@name:(Alice Smith) @age:[25 35]",
323    ///     100
324    /// ).await?;
325    /// ```
326    pub async fn search_raw(
327        &self,
328        index_name: &str,
329        query_str: &str,
330        limit: usize,
331    ) -> Result<Vec<SyncItem>, StorageError> {
332        let l2 = self.l2_store.as_ref().ok_or_else(|| {
333            StorageError::Connection("Redis not available for raw search".into())
334        })?;
335
336        let prefix = self.config.read().redis_prefix.clone().unwrap_or_default();
337        let index = format!("{}idx:{}", prefix, index_name);
338        
339        metrics::record_search_query("redis_raw", "attempt");
340        let start = std::time::Instant::now();
341        
342        let keys = l2.ft_search(&index, query_str, limit).await?;
343        let items = self.fetch_items_by_keys(&keys).await?;
344        
345        metrics::record_search_query("redis_raw", "success");
346        metrics::record_search_latency("redis_raw", start.elapsed());
347        metrics::record_search_results(items.len());
348        
349        Ok(items)
350    }
351
352    /// Direct SQL search (bypasses Redis, SQL-only).
353    ///
354    /// Queries the SQL archive directly using JSON_EXTRACT.
355    /// This is an **advanced API** with specific use cases:
356    ///
357    /// - **No Redis**: Bypasses L2 cache entirely
358    /// - **No caching**: Results are not cached via the merkle system
359    /// - **Ground truth**: Queries the durable SQL archive
360    ///
361    /// Useful for:
362    /// - Analytics queries that need complete data
363    /// - When Redis is unavailable or not trusted
364    /// - Bulk operations on archived data
365    ///
366    /// Prefer `search()` or `search_with_options()` for most use cases.
367    pub async fn search_sql(
368        &self,
369        query: &Query,
370        limit: usize,
371    ) -> Result<Vec<SyncItem>, StorageError> {
372        let l3 = self.l3_store.as_ref().ok_or_else(|| {
373            StorageError::Connection("SQL not available".into())
374        })?;
375
376        metrics::record_search_query("sql_direct", "attempt");
377        let start = std::time::Instant::now();
378
379        let sql_query = SqlTranslator::translate(query, "data");
380        debug!(clause = %sql_query.clause, "SQL search");
381
382        let results = l3.search(&sql_query.clause, &sql_query.params, limit).await?;
383        
384        metrics::record_search_query("sql_direct", "success");
385        metrics::record_search_latency("sql_direct", start.elapsed());
386        metrics::record_search_results(results.len());
387        
388        Ok(results)
389    }
390
391    /// Get search cache statistics.
392    pub fn search_cache_stats(&self) -> Option<SearchCacheStats> {
393        self.search_state.as_ref().map(|s| s.read().cache.stats())
394    }
395
396    // ═══════════════════════════════════════════════════════════════════════════
397    // Internal helpers
398    // ═══════════════════════════════════════════════════════════════════════════
399
400    /// Fetch items by keys, filtering out None results
401    async fn fetch_items_by_keys(&self, keys: &[String]) -> Result<Vec<SyncItem>, StorageError> {
402        if keys.is_empty() {
403            return Ok(vec![]);
404        }
405        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
406        let results = self.get_many(&key_refs).await;
407        Ok(results.into_iter().flatten().collect())
408    }
409
410    async fn search_redis(
411        &self,
412        index_name: &str,
413        query: &Query,
414        limit: usize,
415    ) -> Result<Vec<SyncItem>, StorageError> {
416        let l2 = self.l2_store.as_ref().ok_or_else(|| {
417            StorageError::Connection("Redis not available".into())
418        })?;
419
420        let prefix = self.config.read().redis_prefix.clone().unwrap_or_default();
421        let index = format!("{}idx:{}", prefix, index_name);
422        
423        // Translate query with potential vector parameters
424        let translated = RediSearchTranslator::translate_with_params(query);
425        debug!(index = %index, query = %translated.query, has_params = %translated.has_params(), "FT.SEARCH");
426
427        let keys = if translated.has_params() {
428            // Vector search with binary parameters
429            l2.ft_search_with_params(&index, &translated.query, &translated.params, limit).await?
430        } else {
431            // Regular search
432            l2.ft_search(&index, &translated.query, limit).await?
433        };
434        
435        self.fetch_items_by_keys(&keys).await
436    }
437
438    async fn get_merkle_root_for_prefix(&self, prefix: &str) -> Option<Vec<u8>> {
439        // Extract the path segment from prefix (e.g., "crdt:users:" -> "crdt:users")
440        let path = prefix.trim_end_matches(':');
441
442        if let Some(ref redis_merkle) = self.redis_merkle {
443            if let Ok(Some(node)) = redis_merkle.get_node(path).await {
444                return Some(node.hash.to_vec());
445            }
446        }
447
448        None
449    }
450}