sync_engine/coordinator/search_api.rs
1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Search API for SyncEngine
5//!
6//! Provides full-text search capabilities via RediSearch with SQL fallback.
7//!
8//! # Architecture
9//!
10//! ```text
11//! search(index, query)
12//! │
13//! ├─→ Check SearchCache (merkle-validated)
14//! │ │
15//! │ └─→ Hit? Return cached keys
16//! │
17//! ├─→ FT.SEARCH Redis (fast)
18//! │ │
19//! │ └─→ Results? Return
20//! │
21//! └─→ Durable tier? SQL fallback
22//! │
23//! └─→ Cache results with merkle root
24//! ```
25
26use std::time::Instant;
27use tracing::{debug, info};
28
29use crate::metrics;
30use crate::search::{
31 IndexManager, SearchIndex, SearchCache, SearchCacheStats,
32 Query, RediSearchTranslator, SqlTranslator, SqlParam,
33};
34use crate::sync_item::SyncItem;
35use crate::storage::traits::StorageError;
36
37use super::SyncEngine;
38
39/// Search tier strategy
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
41pub enum SearchTier {
42 /// Redis only - no SQL fallback (for view: prefix ephemeral data)
43 RedisOnly,
44 /// Redis with SQL fallback (for crdt: prefix durable data)
45 #[default]
46 RedisWithSqlFallback,
47}
48
49/// Search result with metadata
50#[derive(Debug, Clone)]
51pub struct SearchResult {
52 /// Matching items
53 pub items: Vec<SyncItem>,
54 /// Source of results
55 pub source: SearchSource,
56 /// Whether results came from cache
57 pub cached: bool,
58}
59
60/// Where search results came from
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub enum SearchSource {
63 /// Results from hot tier (RediSearch/Dragonfly)
64 Hot,
65 /// Results from cold tier (SQL/MySQL)
66 Cold,
67 /// Results from SearchCache
68 Cache,
69 /// No results found
70 Empty,
71}
72
73/// Search-related state for SyncEngine
74#[derive(Default)]
75pub struct SearchState {
76 /// Index manager
77 pub index_manager: IndexManager,
78 /// Search result cache
79 pub cache: SearchCache,
80}
81
82impl SyncEngine {
83 // ═══════════════════════════════════════════════════════════════════════════
84 // Search API
85 // ═══════════════════════════════════════════════════════════════════════════
86
87 /// Register a search index.
88 ///
89 /// Creates the index in RediSearch using FT.CREATE. The index will
90 /// automatically index all JSON documents with matching key prefix.
91 ///
92 /// # Example
93 ///
94 /// ```rust,no_run
95 /// # use sync_engine::{SyncEngine, search::SearchIndex};
96 /// # async fn example(engine: &SyncEngine) -> Result<(), Box<dyn std::error::Error>> {
97 /// // Define index schema
98 /// let index = SearchIndex::new("users", "crdt:users:")
99 /// .text_sortable("name")
100 /// .text("email")
101 /// .numeric_sortable("age")
102 /// .tag("roles");
103 ///
104 /// // Create in RediSearch
105 /// engine.create_search_index(index).await?;
106 /// # Ok(())
107 /// # }
108 /// ```
109 pub async fn create_search_index(&self, index: SearchIndex) -> Result<(), StorageError> {
110 let l2 = self.l2_store.as_ref().ok_or_else(|| {
111 StorageError::Connection("Redis not available for search index".into())
112 })?;
113
114 // Build FT.CREATE command with global redis prefix
115 let redis_prefix = self.config.read().redis_prefix.clone();
116 let args = index.to_ft_create_args_with_prefix(redis_prefix.as_deref());
117 debug!(index = %index.name, prefix = %index.prefix, redis_prefix = ?redis_prefix, "Creating search index");
118
119 // Execute FT.CREATE via raw Redis command
120 match l2.ft_create(&args).await {
121 Ok(()) => {
122 metrics::record_search_index_operation("create", true);
123
124 // Register in index manager
125 if let Some(ref search_state) = self.search_state {
126 search_state.write().index_manager.register(index);
127 }
128
129 info!(index = %args[0], "Search index created");
130 Ok(())
131 }
132 Err(e) => {
133 metrics::record_search_index_operation("create", false);
134 Err(e)
135 }
136 }
137 }
138
139 /// Drop a search index.
140 ///
141 /// Removes the index from RediSearch. Does not delete the indexed documents.
142 pub async fn drop_search_index(&self, name: &str) -> Result<(), StorageError> {
143 let l2 = self.l2_store.as_ref().ok_or_else(|| {
144 StorageError::Connection("Redis not available".into())
145 })?;
146
147 let prefix = self.config.read().redis_prefix.clone().unwrap_or_default();
148 let index_name = format!("{}idx:{}", prefix, name);
149
150 match l2.ft_dropindex(&index_name).await {
151 Ok(()) => {
152 metrics::record_search_index_operation("drop", true);
153 info!(index = %index_name, "Search index dropped");
154 Ok(())
155 }
156 Err(e) => {
157 metrics::record_search_index_operation("drop", false);
158 Err(e)
159 }
160 }
161 }
162
163 /// Get the view prefix for a search index.
164 ///
165 /// Returns the prefix used for view: keys in SQL (e.g., "view:test:user:").
166 /// This is needed for SQL search filtering to only search materialized views.
167 pub fn get_index_view_prefix(&self, index_name: &str) -> String {
168 let crdt_prefix = if let Some(ref search_state) = self.search_state {
169 search_state.read()
170 .index_manager
171 .get(index_name)
172 .map(|idx| idx.prefix.clone())
173 } else {
174 None
175 };
176
177 let crdt_prefix = crdt_prefix.unwrap_or_else(|| format!("crdt:{}:", index_name));
178 crdt_prefix.replace("crdt:", "view:")
179 }
180
181 /// Search for items using RediSearch query syntax.
182 ///
183 /// Searches the specified index using FT.SEARCH. For durable data (crdt: prefix),
184 /// falls back to SQL if Redis returns no results.
185 ///
186 /// # Arguments
187 ///
188 /// * `index_name` - Name of the search index (without "idx:" prefix)
189 /// * `query` - Query AST built with `Query::` constructors
190 ///
191 /// # Example
192 ///
193 /// ```rust,no_run
194 /// # use sync_engine::{SyncEngine, search::Query};
195 /// # async fn example(engine: &SyncEngine) -> Result<(), Box<dyn std::error::Error>> {
196 /// // Simple field query
197 /// let results = engine.search("users", &Query::field_eq("name", "Alice")).await?;
198 ///
199 ///
200 /// // Complex query with AND/OR
201 /// let query = Query::field_eq("status", "active")
202 /// .and(Query::numeric_range("age", Some(25.0), Some(40.0)));
203 /// let results = engine.search("users", &query).await?;
204 ///
205 /// for item in results.items {
206 /// println!("Found: {}", item.object_id);
207 /// }
208 /// # Ok(())
209 /// # }
210 /// ```
211 pub async fn search(
212 &self,
213 index_name: &str,
214 query: &Query,
215 ) -> Result<SearchResult, StorageError> {
216 self.search_with_options(index_name, query, SearchTier::default(), 100).await
217 }
218
219 /// Search with explicit tier and limit options.
220 pub async fn search_with_options(
221 &self,
222 index_name: &str,
223 query: &Query,
224 tier: SearchTier,
225 limit: usize,
226 ) -> Result<SearchResult, StorageError> {
227 let start = Instant::now();
228
229 // Get index info
230 let prefix = if let Some(ref search_state) = self.search_state {
231 search_state.read()
232 .index_manager
233 .get(index_name)
234 .map(|idx| idx.prefix.clone())
235 } else {
236 None
237 };
238
239 let prefix = prefix.unwrap_or_else(|| format!("crdt:{}:", index_name));
240
241 // Check cache first (if we have merkle root)
242 if let Some(ref search_state) = self.search_state {
243 if let Some(merkle_root) = self.get_merkle_root_for_prefix(&prefix).await {
244 let cached_keys = search_state.read().cache.get(&prefix, query, &merkle_root);
245 if let Some(keys) = cached_keys {
246 debug!(index = %index_name, count = keys.len(), "Search cache hit");
247 metrics::record_search_cache(true);
248 metrics::record_search_latency("cache", start.elapsed());
249 metrics::record_search_results(keys.len());
250 let items = self.fetch_items_by_keys(&keys).await?;
251 return Ok(SearchResult {
252 items,
253 source: SearchSource::Cache,
254 cached: true,
255 });
256 }
257 }
258 }
259
260 // Try RediSearch
261 let redis_start = Instant::now();
262 let redis_results = self.search_redis(index_name, query, limit).await;
263
264 match redis_results {
265 Ok(items) if !items.is_empty() => {
266 debug!(index = %index_name, count = items.len(), "RediSearch results");
267 metrics::record_search_query("redis", "success");
268 metrics::record_search_latency("redis", redis_start.elapsed());
269 metrics::record_search_results(items.len());
270 Ok(SearchResult {
271 items,
272 source: SearchSource::Hot,
273 cached: false,
274 })
275 }
276 Ok(_) | Err(_) => {
277 // Record Redis attempt (empty or error)
278 if redis_results.is_err() {
279 metrics::record_search_query("redis", "error");
280 } else {
281 metrics::record_search_query("redis", "empty");
282 }
283
284 // Empty or error - try SQL fallback for durable tier
285 if tier == SearchTier::RedisWithSqlFallback {
286 let sql_start = Instant::now();
287 // Use view: prefix for SQL search (views have the materialized data)
288 let view_prefix = prefix.replace("crdt:", "view:");
289 let sql_results = self.search_sql_with_prefix(query, &view_prefix, limit).await?;
290 let is_empty = sql_results.is_empty();
291
292 metrics::record_search_query("sql", "success");
293 metrics::record_search_latency("sql", sql_start.elapsed());
294 metrics::record_search_results(sql_results.len());
295
296 // Cache results if we have merkle
297 if !is_empty {
298 if let Some(ref search_state) = self.search_state {
299 if let Some(merkle_root) = self.get_merkle_root_for_prefix(&prefix).await {
300 let keys: Vec<String> = sql_results.iter()
301 .map(|item| item.object_id.clone())
302 .collect();
303 search_state.write().cache.insert(&prefix, query, merkle_root, keys);
304 }
305 }
306 }
307
308 Ok(SearchResult {
309 items: sql_results,
310 source: if is_empty { SearchSource::Empty } else { SearchSource::Cold },
311 cached: false,
312 })
313 } else {
314 // RedisOnly tier - return empty
315 metrics::record_search_results(0);
316 Ok(SearchResult {
317 items: vec![],
318 source: SearchSource::Empty,
319 cached: false,
320 })
321 }
322 }
323 }
324 }
325
326 /// Search using raw RediSearch query string (Redis-only, no SQL fallback).
327 ///
328 /// Use this when you need the full power of RediSearch syntax
329 /// without the Query AST. This is an **advanced API** with caveats:
330 ///
331 /// - **No SQL fallback**: If Redis is unavailable, this will fail
332 /// - **No search cache**: Results are not cached via the merkle system
333 /// - **Manual paths**: You must use `$.payload.{field}` paths in your query
334 /// - **No translation**: The query string is passed directly to FT.SEARCH
335 ///
336 /// Prefer `search()` or `search_with_options()` for most use cases.
337 ///
338 /// # Example
339 /// ```ignore
340 /// // Raw RediSearch query with explicit payload paths
341 /// let results = engine.search_raw(
342 /// "users",
343 /// "@name:(Alice Smith) @age:[25 35]",
344 /// 100
345 /// ).await?;
346 /// ```
347 pub async fn search_raw(
348 &self,
349 index_name: &str,
350 query_str: &str,
351 limit: usize,
352 ) -> Result<Vec<SyncItem>, StorageError> {
353 let l2 = self.l2_store.as_ref().ok_or_else(|| {
354 StorageError::Connection("Redis not available for raw search".into())
355 })?;
356
357 let prefix = self.config.read().redis_prefix.clone().unwrap_or_default();
358 let index = format!("{}idx:{}", prefix, index_name);
359
360 metrics::record_search_query("redis_raw", "attempt");
361 let start = std::time::Instant::now();
362
363 let keys = l2.ft_search(&index, query_str, limit).await?;
364 let items = self.fetch_items_by_keys(&keys).await?;
365
366 metrics::record_search_query("redis_raw", "success");
367 metrics::record_search_latency("redis_raw", start.elapsed());
368 metrics::record_search_results(items.len());
369
370 Ok(items)
371 }
372
373 /// Direct SQL search (bypasses Redis, SQL-only).
374 ///
375 /// Queries the SQL archive directly using JSON_EXTRACT.
376 /// This is an **advanced API** with specific use cases:
377 ///
378 /// - **No Redis**: Bypasses L2 cache entirely
379 /// - **No caching**: Results are not cached via the merkle system
380 /// - **Ground truth**: Queries the durable SQL archive
381 ///
382 /// Useful for:
383 /// - Analytics queries that need complete data
384 /// - When Redis is unavailable or not trusted
385 /// - Bulk operations on archived data
386 ///
387 /// Prefer `search()` or `search_with_options()` for most use cases.
388 pub async fn search_sql(
389 &self,
390 query: &Query,
391 limit: usize,
392 ) -> Result<Vec<SyncItem>, StorageError> {
393 self.search_sql_with_prefix(query, "", limit).await
394 }
395
396 /// Search SQL with a key prefix filter.
397 ///
398 /// Only items whose `id` starts with the given prefix will be searched.
399 /// Uses the schema registry to route to the correct table for partitioned schemas.
400 pub async fn search_sql_with_prefix(
401 &self,
402 query: &Query,
403 key_prefix: &str,
404 limit: usize,
405 ) -> Result<Vec<SyncItem>, StorageError> {
406 let sql_store = self.sql_store.as_ref().ok_or_else(|| {
407 StorageError::Connection("SQL not available".into())
408 })?;
409
410 metrics::record_search_query("sql_direct", "attempt");
411 let start = std::time::Instant::now();
412
413 let sql_query = SqlTranslator::translate(query, "payload");
414
415 // Determine which table to search based on prefix
416 // For "view:users:" prefix, look up in schema registry
417 let table = self.schema_registry.table_for_key(key_prefix);
418
419 // Combine prefix filter with query clause
420 let (full_clause, full_params) = if key_prefix.is_empty() {
421 (sql_query.clause.clone(), sql_query.params.clone())
422 } else {
423 // Add prefix filter: id LIKE 'prefix%' AND (original query)
424 let mut params = vec![SqlParam::Text(format!("{}%", key_prefix))];
425 params.extend(sql_query.params.clone());
426 (format!("id LIKE ? AND ({})", sql_query.clause), params)
427 };
428
429 debug!(clause = %full_clause, prefix = %key_prefix, table = %table, "SQL search");
430
431 let results = sql_store.search_in_table(table, &full_clause, &full_params, limit).await?;
432
433 metrics::record_search_query("sql_direct", "success");
434 metrics::record_search_latency("sql_direct", start.elapsed());
435 metrics::record_search_results(results.len());
436
437 Ok(results)
438 }
439
440 /// Count items matching a query in SQL (fast COUNT(*) without fetching data).
441 ///
442 /// Use this for exhaustiveness checks: compare Redis result count with SQL total.
443 /// This is much faster than fetching all results just to count them.
444 ///
445 /// # Example
446 /// ```ignore
447 /// let redis_results = engine.search("users", &query).await?;
448 /// let sql_count = engine.search_count_sql(&query).await?;
449 /// let exhaustive = redis_results.items.len() as u64 == sql_count;
450 /// ```
451 pub async fn search_count_sql(&self, query: &Query) -> Result<u64, StorageError> {
452 self.search_count_sql_with_prefix(query, "").await
453 }
454
455 /// Count items matching a query in SQL with a key prefix filter.
456 /// Uses the schema registry to route to the correct table for partitioned schemas.
457 pub async fn search_count_sql_with_prefix(&self, query: &Query, key_prefix: &str) -> Result<u64, StorageError> {
458 let sql_store = self.sql_store.as_ref().ok_or_else(|| {
459 StorageError::Connection("SQL not available".into())
460 })?;
461
462 let sql_query = SqlTranslator::translate(query, "payload");
463
464 // Determine which table to search based on prefix
465 let table = self.schema_registry.table_for_key(key_prefix);
466
467 // Combine prefix filter with query clause
468 let (full_clause, full_params) = if key_prefix.is_empty() {
469 (sql_query.clause.clone(), sql_query.params.clone())
470 } else {
471 let mut params = vec![SqlParam::Text(format!("{}%", key_prefix))];
472 params.extend(sql_query.params.clone());
473 (format!("id LIKE ? AND ({})", sql_query.clause), params)
474 };
475
476 debug!(clause = %full_clause, prefix = %key_prefix, table = %table, "SQL count");
477
478 sql_store.count_where_in_table(table, &full_clause, &full_params).await
479 }
480
481 /// Get search cache statistics.
482 pub fn search_cache_stats(&self) -> Option<SearchCacheStats> {
483 self.search_state.as_ref().map(|s| s.read().cache.stats())
484 }
485
486 // ═══════════════════════════════════════════════════════════════════════════
487 // Internal helpers
488 // ═══════════════════════════════════════════════════════════════════════════
489
490 /// Fetch items by keys, filtering out None results
491 async fn fetch_items_by_keys(&self, keys: &[String]) -> Result<Vec<SyncItem>, StorageError> {
492 if keys.is_empty() {
493 return Ok(vec![]);
494 }
495 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
496 let results = self.get_many(&key_refs).await;
497 Ok(results.into_iter().flatten().collect())
498 }
499
500 async fn search_redis(
501 &self,
502 index_name: &str,
503 query: &Query,
504 limit: usize,
505 ) -> Result<Vec<SyncItem>, StorageError> {
506 let l2 = self.l2_store.as_ref().ok_or_else(|| {
507 StorageError::Connection("Redis not available".into())
508 })?;
509
510 let prefix = self.config.read().redis_prefix.clone().unwrap_or_default();
511 let index = format!("{}idx:{}", prefix, index_name);
512
513 // Translate query with potential vector parameters
514 let translated = RediSearchTranslator::translate_with_params(query);
515 debug!(index = %index, query = %translated.query, has_params = %translated.has_params(), "FT.SEARCH");
516
517 let keys = if translated.has_params() {
518 // Vector search with binary parameters
519 l2.ft_search_with_params(&index, &translated.query, &translated.params, limit).await?
520 } else {
521 // Regular search
522 l2.ft_search(&index, &translated.query, limit).await?
523 };
524
525 self.fetch_items_by_keys(&keys).await
526 }
527
528 async fn get_merkle_root_for_prefix(&self, prefix: &str) -> Option<Vec<u8>> {
529 // Extract the path segment from prefix (e.g., "crdt:users:" -> "crdt:users")
530 let path = prefix.trim_end_matches(':');
531
532 if let Some(ref merkle_cache) = self.merkle_cache {
533 if let Ok(Some(node)) = merkle_cache.get_node(path).await {
534 return Some(node.hash.to_vec());
535 }
536 }
537
538 None
539 }
540}