sync_engine/coordinator/search_api.rs
1//! Search API for SyncEngine
2//!
3//! Provides full-text search capabilities via RediSearch with SQL fallback.
4//!
5//! # Architecture
6//!
7//! ```text
8//! search(index, query)
9//! │
10//! ├─→ Check SearchCache (merkle-validated)
11//! │ │
12//! │ └─→ Hit? Return cached keys
13//! │
14//! ├─→ FT.SEARCH Redis (fast)
15//! │ │
16//! │ └─→ Results? Return
17//! │
18//! └─→ Durable tier? SQL fallback
19//! │
20//! └─→ Cache results with merkle root
21//! ```
22
23use std::time::Instant;
24use tracing::{debug, info};
25
26use crate::metrics;
27use crate::search::{
28 IndexManager, SearchIndex, SearchCache, SearchCacheStats,
29 Query, RediSearchTranslator, SqlTranslator,
30};
31use crate::sync_item::SyncItem;
32use crate::storage::traits::StorageError;
33
34use super::SyncEngine;
35
36/// Search tier strategy
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
38pub enum SearchTier {
39 /// Redis only - no SQL fallback (for view: prefix ephemeral data)
40 RedisOnly,
41 /// Redis with SQL fallback (for crdt: prefix durable data)
42 #[default]
43 RedisWithSqlFallback,
44}
45
46/// Search result with metadata
47#[derive(Debug, Clone)]
48pub struct SearchResult {
49 /// Matching items
50 pub items: Vec<SyncItem>,
51 /// Source of results
52 pub source: SearchSource,
53 /// Whether results came from cache
54 pub cached: bool,
55}
56
57/// Where search results came from
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub enum SearchSource {
60 /// Results from RediSearch FT.SEARCH
61 Redis,
62 /// Results from SQL query
63 Sql,
64 /// Results from SearchCache
65 Cache,
66 /// No results found
67 Empty,
68}
69
70/// Search-related state for SyncEngine
71#[derive(Default)]
72pub struct SearchState {
73 /// Index manager
74 pub index_manager: IndexManager,
75 /// Search result cache
76 pub cache: SearchCache,
77}
78
79impl SyncEngine {
80 // ═══════════════════════════════════════════════════════════════════════════
81 // Search API
82 // ═══════════════════════════════════════════════════════════════════════════
83
84 /// Register a search index.
85 ///
86 /// Creates the index in RediSearch using FT.CREATE. The index will
87 /// automatically index all JSON documents with matching key prefix.
88 ///
89 /// # Example
90 ///
91 /// ```rust,no_run
92 /// # use sync_engine::{SyncEngine, search::SearchIndex};
93 /// # async fn example(engine: &SyncEngine) -> Result<(), Box<dyn std::error::Error>> {
94 /// // Define index schema
95 /// let index = SearchIndex::new("users", "crdt:users:")
96 /// .text_sortable("name")
97 /// .text("email")
98 /// .numeric_sortable("age")
99 /// .tag("roles");
100 ///
101 /// // Create in RediSearch
102 /// engine.create_search_index(index).await?;
103 /// # Ok(())
104 /// # }
105 /// ```
106 pub async fn create_search_index(&self, index: SearchIndex) -> Result<(), StorageError> {
107 let l2 = self.l2_store.as_ref().ok_or_else(|| {
108 StorageError::Connection("Redis not available for search index".into())
109 })?;
110
111 // Build FT.CREATE command with global redis prefix
112 let redis_prefix = self.config.read().redis_prefix.clone();
113 let args = index.to_ft_create_args_with_prefix(redis_prefix.as_deref());
114 debug!(index = %index.name, prefix = %index.prefix, redis_prefix = ?redis_prefix, "Creating search index");
115
116 // Execute FT.CREATE via raw Redis command
117 match l2.ft_create(&args).await {
118 Ok(()) => {
119 metrics::record_search_index_operation("create", true);
120
121 // Register in index manager
122 if let Some(ref search_state) = self.search_state {
123 search_state.write().index_manager.register(index);
124 }
125
126 info!(index = %args[0], "Search index created");
127 Ok(())
128 }
129 Err(e) => {
130 metrics::record_search_index_operation("create", false);
131 Err(e)
132 }
133 }
134 }
135
136 /// Drop a search index.
137 ///
138 /// Removes the index from RediSearch. Does not delete the indexed documents.
139 pub async fn drop_search_index(&self, name: &str) -> Result<(), StorageError> {
140 let l2 = self.l2_store.as_ref().ok_or_else(|| {
141 StorageError::Connection("Redis not available".into())
142 })?;
143
144 let prefix = self.config.read().redis_prefix.clone().unwrap_or_default();
145 let index_name = format!("{}idx:{}", prefix, name);
146
147 match l2.ft_dropindex(&index_name).await {
148 Ok(()) => {
149 metrics::record_search_index_operation("drop", true);
150 info!(index = %index_name, "Search index dropped");
151 Ok(())
152 }
153 Err(e) => {
154 metrics::record_search_index_operation("drop", false);
155 Err(e)
156 }
157 }
158 }
159
160 /// Search for items using RediSearch query syntax.
161 ///
162 /// Searches the specified index using FT.SEARCH. For durable data (crdt: prefix),
163 /// falls back to SQL if Redis returns no results.
164 ///
165 /// # Arguments
166 ///
167 /// * `index_name` - Name of the search index (without "idx:" prefix)
168 /// * `query` - Query AST built with `Query::` constructors
169 ///
170 /// # Example
171 ///
172 /// ```rust,no_run
173 /// # use sync_engine::{SyncEngine, search::Query};
174 /// # async fn example(engine: &SyncEngine) -> Result<(), Box<dyn std::error::Error>> {
175 /// // Simple field query
176 /// let results = engine.search("users", &Query::field_eq("name", "Alice")).await?;
177 ///
178 /// // Complex query with AND/OR
179 /// let query = Query::field_eq("status", "active")
180 /// .and(Query::numeric_range("age", Some(25.0), Some(40.0)));
181 /// let results = engine.search("users", &query).await?;
182 ///
183 /// for item in results.items {
184 /// println!("Found: {}", item.object_id);
185 /// }
186 /// # Ok(())
187 /// # }
188 /// ```
189 pub async fn search(
190 &self,
191 index_name: &str,
192 query: &Query,
193 ) -> Result<SearchResult, StorageError> {
194 self.search_with_options(index_name, query, SearchTier::default(), 100).await
195 }
196
197 /// Search with explicit tier and limit options.
198 pub async fn search_with_options(
199 &self,
200 index_name: &str,
201 query: &Query,
202 tier: SearchTier,
203 limit: usize,
204 ) -> Result<SearchResult, StorageError> {
205 let start = Instant::now();
206
207 // Get index info
208 let prefix = if let Some(ref search_state) = self.search_state {
209 search_state.read()
210 .index_manager
211 .get(index_name)
212 .map(|idx| idx.prefix.clone())
213 } else {
214 None
215 };
216
217 let prefix = prefix.unwrap_or_else(|| format!("crdt:{}:", index_name));
218
219 // Check cache first (if we have merkle root)
220 if let Some(ref search_state) = self.search_state {
221 if let Some(merkle_root) = self.get_merkle_root_for_prefix(&prefix).await {
222 let cached_keys = search_state.read().cache.get(&prefix, query, &merkle_root);
223 if let Some(keys) = cached_keys {
224 debug!(index = %index_name, count = keys.len(), "Search cache hit");
225 metrics::record_search_cache(true);
226 metrics::record_search_latency("cache", start.elapsed());
227 metrics::record_search_results(keys.len());
228 let items = self.fetch_items_by_keys(&keys).await?;
229 return Ok(SearchResult {
230 items,
231 source: SearchSource::Cache,
232 cached: true,
233 });
234 }
235 }
236 }
237
238 // Try RediSearch
239 let redis_start = Instant::now();
240 let redis_results = self.search_redis(index_name, query, limit).await;
241
242 match redis_results {
243 Ok(items) if !items.is_empty() => {
244 debug!(index = %index_name, count = items.len(), "RediSearch results");
245 metrics::record_search_query("redis", "success");
246 metrics::record_search_latency("redis", redis_start.elapsed());
247 metrics::record_search_results(items.len());
248 Ok(SearchResult {
249 items,
250 source: SearchSource::Redis,
251 cached: false,
252 })
253 }
254 Ok(_) | Err(_) => {
255 // Record Redis attempt (empty or error)
256 if redis_results.is_err() {
257 metrics::record_search_query("redis", "error");
258 } else {
259 metrics::record_search_query("redis", "empty");
260 }
261
262 // Empty or error - try SQL fallback for durable tier
263 if tier == SearchTier::RedisWithSqlFallback {
264 let sql_start = Instant::now();
265 let sql_results = self.search_sql(query, limit).await?;
266 let is_empty = sql_results.is_empty();
267
268 metrics::record_search_query("sql", "success");
269 metrics::record_search_latency("sql", sql_start.elapsed());
270 metrics::record_search_results(sql_results.len());
271
272 // Cache results if we have merkle
273 if !is_empty {
274 if let Some(ref search_state) = self.search_state {
275 if let Some(merkle_root) = self.get_merkle_root_for_prefix(&prefix).await {
276 let keys: Vec<String> = sql_results.iter()
277 .map(|item| item.object_id.clone())
278 .collect();
279 search_state.write().cache.insert(&prefix, query, merkle_root, keys);
280 }
281 }
282 }
283
284 Ok(SearchResult {
285 items: sql_results,
286 source: if is_empty { SearchSource::Empty } else { SearchSource::Sql },
287 cached: false,
288 })
289 } else {
290 // RedisOnly tier - return empty
291 metrics::record_search_results(0);
292 Ok(SearchResult {
293 items: vec![],
294 source: SearchSource::Empty,
295 cached: false,
296 })
297 }
298 }
299 }
300 }
301
302 /// Search using raw RediSearch query string (Redis-only, no SQL fallback).
303 ///
304 /// Use this when you need the full power of RediSearch syntax
305 /// without the Query AST. This is an **advanced API** with caveats:
306 ///
307 /// - **No SQL fallback**: If Redis is unavailable, this will fail
308 /// - **No search cache**: Results are not cached via the merkle system
309 /// - **Manual paths**: You must use `$.payload.{field}` paths in your query
310 /// - **No translation**: The query string is passed directly to FT.SEARCH
311 ///
312 /// Prefer `search()` or `search_with_options()` for most use cases.
313 ///
314 /// # Example
315 /// ```ignore
316 /// // Raw RediSearch query with explicit payload paths
317 /// let results = engine.search_raw(
318 /// "users",
319 /// "@name:(Alice Smith) @age:[25 35]",
320 /// 100
321 /// ).await?;
322 /// ```
323 pub async fn search_raw(
324 &self,
325 index_name: &str,
326 query_str: &str,
327 limit: usize,
328 ) -> Result<Vec<SyncItem>, StorageError> {
329 let l2 = self.l2_store.as_ref().ok_or_else(|| {
330 StorageError::Connection("Redis not available for raw search".into())
331 })?;
332
333 let prefix = self.config.read().redis_prefix.clone().unwrap_or_default();
334 let index = format!("{}idx:{}", prefix, index_name);
335
336 metrics::record_search_query("redis_raw", "attempt");
337 let start = std::time::Instant::now();
338
339 let keys = l2.ft_search(&index, query_str, limit).await?;
340 let items = self.fetch_items_by_keys(&keys).await?;
341
342 metrics::record_search_query("redis_raw", "success");
343 metrics::record_search_latency("redis_raw", start.elapsed());
344 metrics::record_search_results(items.len());
345
346 Ok(items)
347 }
348
349 /// Direct SQL search (bypasses Redis, SQL-only).
350 ///
351 /// Queries the SQL archive directly using JSON_EXTRACT.
352 /// This is an **advanced API** with specific use cases:
353 ///
354 /// - **No Redis**: Bypasses L2 cache entirely
355 /// - **No caching**: Results are not cached via the merkle system
356 /// - **Ground truth**: Queries the durable SQL archive
357 ///
358 /// Useful for:
359 /// - Analytics queries that need complete data
360 /// - When Redis is unavailable or not trusted
361 /// - Bulk operations on archived data
362 ///
363 /// Prefer `search()` or `search_with_options()` for most use cases.
364 pub async fn search_sql(
365 &self,
366 query: &Query,
367 limit: usize,
368 ) -> Result<Vec<SyncItem>, StorageError> {
369 let l3 = self.l3_store.as_ref().ok_or_else(|| {
370 StorageError::Connection("SQL not available".into())
371 })?;
372
373 metrics::record_search_query("sql_direct", "attempt");
374 let start = std::time::Instant::now();
375
376 let sql_query = SqlTranslator::translate(query, "data");
377 debug!(clause = %sql_query.clause, "SQL search");
378
379 let results = l3.search(&sql_query.clause, &sql_query.params, limit).await?;
380
381 metrics::record_search_query("sql_direct", "success");
382 metrics::record_search_latency("sql_direct", start.elapsed());
383 metrics::record_search_results(results.len());
384
385 Ok(results)
386 }
387
388 /// Get search cache statistics.
389 pub fn search_cache_stats(&self) -> Option<SearchCacheStats> {
390 self.search_state.as_ref().map(|s| s.read().cache.stats())
391 }
392
393 // ═══════════════════════════════════════════════════════════════════════════
394 // Internal helpers
395 // ═══════════════════════════════════════════════════════════════════════════
396
397 /// Fetch items by keys, filtering out None results
398 async fn fetch_items_by_keys(&self, keys: &[String]) -> Result<Vec<SyncItem>, StorageError> {
399 if keys.is_empty() {
400 return Ok(vec![]);
401 }
402 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
403 let results = self.get_many(&key_refs).await;
404 Ok(results.into_iter().flatten().collect())
405 }
406
407 async fn search_redis(
408 &self,
409 index_name: &str,
410 query: &Query,
411 limit: usize,
412 ) -> Result<Vec<SyncItem>, StorageError> {
413 let l2 = self.l2_store.as_ref().ok_or_else(|| {
414 StorageError::Connection("Redis not available".into())
415 })?;
416
417 let prefix = self.config.read().redis_prefix.clone().unwrap_or_default();
418 let index = format!("{}idx:{}", prefix, index_name);
419 let query_str = RediSearchTranslator::translate(query);
420 debug!(index = %index, query = %query_str, "FT.SEARCH");
421
422 let keys = l2.ft_search(&index, &query_str, limit).await?;
423 self.fetch_items_by_keys(&keys).await
424 }
425
426 async fn get_merkle_root_for_prefix(&self, prefix: &str) -> Option<Vec<u8>> {
427 // Extract the path segment from prefix (e.g., "crdt:users:" -> "crdt:users")
428 let path = prefix.trim_end_matches(':');
429
430 if let Some(ref redis_merkle) = self.redis_merkle {
431 if let Ok(Some(node)) = redis_merkle.get_node(path).await {
432 return Some(node.hash.to_vec());
433 }
434 }
435
436 None
437 }
438}