sync_engine/coordinator/search_api.rs
1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Search API for SyncEngine
5//!
6//! Provides full-text search capabilities via RediSearch with SQL fallback.
7//!
8//! # Architecture
9//!
10//! ```text
11//! search(index, query)
12//! │
13//! ├─→ Check SearchCache (merkle-validated)
14//! │ │
15//! │ └─→ Hit? Return cached keys
16//! │
17//! ├─→ FT.SEARCH Redis (fast)
18//! │ │
19//! │ └─→ Results? Return
20//! │
21//! └─→ Durable tier? SQL fallback
22//! │
23//! └─→ Cache results with merkle root
24//! ```
25
26use std::time::Instant;
27use tracing::{debug, info};
28
29use crate::metrics;
30use crate::search::{
31 IndexManager, SearchIndex, SearchCache, SearchCacheStats,
32 Query, RediSearchTranslator, SqlTranslator,
33};
34use crate::sync_item::SyncItem;
35use crate::storage::traits::StorageError;
36
37use super::SyncEngine;
38
39/// Search tier strategy
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
41pub enum SearchTier {
42 /// Redis only - no SQL fallback (for view: prefix ephemeral data)
43 RedisOnly,
44 /// Redis with SQL fallback (for crdt: prefix durable data)
45 #[default]
46 RedisWithSqlFallback,
47}
48
49/// Search result with metadata
50#[derive(Debug, Clone)]
51pub struct SearchResult {
52 /// Matching items
53 pub items: Vec<SyncItem>,
54 /// Source of results
55 pub source: SearchSource,
56 /// Whether results came from cache
57 pub cached: bool,
58}
59
60/// Where search results came from
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub enum SearchSource {
63 /// Results from RediSearch FT.SEARCH
64 Redis,
65 /// Results from SQL query
66 Sql,
67 /// Results from SearchCache
68 Cache,
69 /// No results found
70 Empty,
71}
72
73/// Search-related state for SyncEngine
74#[derive(Default)]
75pub struct SearchState {
76 /// Index manager
77 pub index_manager: IndexManager,
78 /// Search result cache
79 pub cache: SearchCache,
80}
81
82impl SyncEngine {
83 // ═══════════════════════════════════════════════════════════════════════════
84 // Search API
85 // ═══════════════════════════════════════════════════════════════════════════
86
87 /// Register a search index.
88 ///
89 /// Creates the index in RediSearch using FT.CREATE. The index will
90 /// automatically index all JSON documents with matching key prefix.
91 ///
92 /// # Example
93 ///
94 /// ```rust,no_run
95 /// # use sync_engine::{SyncEngine, search::SearchIndex};
96 /// # async fn example(engine: &SyncEngine) -> Result<(), Box<dyn std::error::Error>> {
97 /// // Define index schema
98 /// let index = SearchIndex::new("users", "crdt:users:")
99 /// .text_sortable("name")
100 /// .text("email")
101 /// .numeric_sortable("age")
102 /// .tag("roles");
103 ///
104 /// // Create in RediSearch
105 /// engine.create_search_index(index).await?;
106 /// # Ok(())
107 /// # }
108 /// ```
109 pub async fn create_search_index(&self, index: SearchIndex) -> Result<(), StorageError> {
110 let l2 = self.l2_store.as_ref().ok_or_else(|| {
111 StorageError::Connection("Redis not available for search index".into())
112 })?;
113
114 // Build FT.CREATE command with global redis prefix
115 let redis_prefix = self.config.read().redis_prefix.clone();
116 let args = index.to_ft_create_args_with_prefix(redis_prefix.as_deref());
117 debug!(index = %index.name, prefix = %index.prefix, redis_prefix = ?redis_prefix, "Creating search index");
118
119 // Execute FT.CREATE via raw Redis command
120 match l2.ft_create(&args).await {
121 Ok(()) => {
122 metrics::record_search_index_operation("create", true);
123
124 // Register in index manager
125 if let Some(ref search_state) = self.search_state {
126 search_state.write().index_manager.register(index);
127 }
128
129 info!(index = %args[0], "Search index created");
130 Ok(())
131 }
132 Err(e) => {
133 metrics::record_search_index_operation("create", false);
134 Err(e)
135 }
136 }
137 }
138
139 /// Drop a search index.
140 ///
141 /// Removes the index from RediSearch. Does not delete the indexed documents.
142 pub async fn drop_search_index(&self, name: &str) -> Result<(), StorageError> {
143 let l2 = self.l2_store.as_ref().ok_or_else(|| {
144 StorageError::Connection("Redis not available".into())
145 })?;
146
147 let prefix = self.config.read().redis_prefix.clone().unwrap_or_default();
148 let index_name = format!("{}idx:{}", prefix, name);
149
150 match l2.ft_dropindex(&index_name).await {
151 Ok(()) => {
152 metrics::record_search_index_operation("drop", true);
153 info!(index = %index_name, "Search index dropped");
154 Ok(())
155 }
156 Err(e) => {
157 metrics::record_search_index_operation("drop", false);
158 Err(e)
159 }
160 }
161 }
162
163 /// Search for items using RediSearch query syntax.
164 ///
165 /// Searches the specified index using FT.SEARCH. For durable data (crdt: prefix),
166 /// falls back to SQL if Redis returns no results.
167 ///
168 /// # Arguments
169 ///
170 /// * `index_name` - Name of the search index (without "idx:" prefix)
171 /// * `query` - Query AST built with `Query::` constructors
172 ///
173 /// # Example
174 ///
175 /// ```rust,no_run
176 /// # use sync_engine::{SyncEngine, search::Query};
177 /// # async fn example(engine: &SyncEngine) -> Result<(), Box<dyn std::error::Error>> {
178 /// // Simple field query
179 /// let results = engine.search("users", &Query::field_eq("name", "Alice")).await?;
180 ///
181 /// // Complex query with AND/OR
182 /// let query = Query::field_eq("status", "active")
183 /// .and(Query::numeric_range("age", Some(25.0), Some(40.0)));
184 /// let results = engine.search("users", &query).await?;
185 ///
186 /// for item in results.items {
187 /// println!("Found: {}", item.object_id);
188 /// }
189 /// # Ok(())
190 /// # }
191 /// ```
192 pub async fn search(
193 &self,
194 index_name: &str,
195 query: &Query,
196 ) -> Result<SearchResult, StorageError> {
197 self.search_with_options(index_name, query, SearchTier::default(), 100).await
198 }
199
200 /// Search with explicit tier and limit options.
201 pub async fn search_with_options(
202 &self,
203 index_name: &str,
204 query: &Query,
205 tier: SearchTier,
206 limit: usize,
207 ) -> Result<SearchResult, StorageError> {
208 let start = Instant::now();
209
210 // Get index info
211 let prefix = if let Some(ref search_state) = self.search_state {
212 search_state.read()
213 .index_manager
214 .get(index_name)
215 .map(|idx| idx.prefix.clone())
216 } else {
217 None
218 };
219
220 let prefix = prefix.unwrap_or_else(|| format!("crdt:{}:", index_name));
221
222 // Check cache first (if we have merkle root)
223 if let Some(ref search_state) = self.search_state {
224 if let Some(merkle_root) = self.get_merkle_root_for_prefix(&prefix).await {
225 let cached_keys = search_state.read().cache.get(&prefix, query, &merkle_root);
226 if let Some(keys) = cached_keys {
227 debug!(index = %index_name, count = keys.len(), "Search cache hit");
228 metrics::record_search_cache(true);
229 metrics::record_search_latency("cache", start.elapsed());
230 metrics::record_search_results(keys.len());
231 let items = self.fetch_items_by_keys(&keys).await?;
232 return Ok(SearchResult {
233 items,
234 source: SearchSource::Cache,
235 cached: true,
236 });
237 }
238 }
239 }
240
241 // Try RediSearch
242 let redis_start = Instant::now();
243 let redis_results = self.search_redis(index_name, query, limit).await;
244
245 match redis_results {
246 Ok(items) if !items.is_empty() => {
247 debug!(index = %index_name, count = items.len(), "RediSearch results");
248 metrics::record_search_query("redis", "success");
249 metrics::record_search_latency("redis", redis_start.elapsed());
250 metrics::record_search_results(items.len());
251 Ok(SearchResult {
252 items,
253 source: SearchSource::Redis,
254 cached: false,
255 })
256 }
257 Ok(_) | Err(_) => {
258 // Record Redis attempt (empty or error)
259 if redis_results.is_err() {
260 metrics::record_search_query("redis", "error");
261 } else {
262 metrics::record_search_query("redis", "empty");
263 }
264
265 // Empty or error - try SQL fallback for durable tier
266 if tier == SearchTier::RedisWithSqlFallback {
267 let sql_start = Instant::now();
268 let sql_results = self.search_sql(query, limit).await?;
269 let is_empty = sql_results.is_empty();
270
271 metrics::record_search_query("sql", "success");
272 metrics::record_search_latency("sql", sql_start.elapsed());
273 metrics::record_search_results(sql_results.len());
274
275 // Cache results if we have merkle
276 if !is_empty {
277 if let Some(ref search_state) = self.search_state {
278 if let Some(merkle_root) = self.get_merkle_root_for_prefix(&prefix).await {
279 let keys: Vec<String> = sql_results.iter()
280 .map(|item| item.object_id.clone())
281 .collect();
282 search_state.write().cache.insert(&prefix, query, merkle_root, keys);
283 }
284 }
285 }
286
287 Ok(SearchResult {
288 items: sql_results,
289 source: if is_empty { SearchSource::Empty } else { SearchSource::Sql },
290 cached: false,
291 })
292 } else {
293 // RedisOnly tier - return empty
294 metrics::record_search_results(0);
295 Ok(SearchResult {
296 items: vec![],
297 source: SearchSource::Empty,
298 cached: false,
299 })
300 }
301 }
302 }
303 }
304
305 /// Search using raw RediSearch query string (Redis-only, no SQL fallback).
306 ///
307 /// Use this when you need the full power of RediSearch syntax
308 /// without the Query AST. This is an **advanced API** with caveats:
309 ///
310 /// - **No SQL fallback**: If Redis is unavailable, this will fail
311 /// - **No search cache**: Results are not cached via the merkle system
312 /// - **Manual paths**: You must use `$.payload.{field}` paths in your query
313 /// - **No translation**: The query string is passed directly to FT.SEARCH
314 ///
315 /// Prefer `search()` or `search_with_options()` for most use cases.
316 ///
317 /// # Example
318 /// ```ignore
319 /// // Raw RediSearch query with explicit payload paths
320 /// let results = engine.search_raw(
321 /// "users",
322 /// "@name:(Alice Smith) @age:[25 35]",
323 /// 100
324 /// ).await?;
325 /// ```
326 pub async fn search_raw(
327 &self,
328 index_name: &str,
329 query_str: &str,
330 limit: usize,
331 ) -> Result<Vec<SyncItem>, StorageError> {
332 let l2 = self.l2_store.as_ref().ok_or_else(|| {
333 StorageError::Connection("Redis not available for raw search".into())
334 })?;
335
336 let prefix = self.config.read().redis_prefix.clone().unwrap_or_default();
337 let index = format!("{}idx:{}", prefix, index_name);
338
339 metrics::record_search_query("redis_raw", "attempt");
340 let start = std::time::Instant::now();
341
342 let keys = l2.ft_search(&index, query_str, limit).await?;
343 let items = self.fetch_items_by_keys(&keys).await?;
344
345 metrics::record_search_query("redis_raw", "success");
346 metrics::record_search_latency("redis_raw", start.elapsed());
347 metrics::record_search_results(items.len());
348
349 Ok(items)
350 }
351
352 /// Direct SQL search (bypasses Redis, SQL-only).
353 ///
354 /// Queries the SQL archive directly using JSON_EXTRACT.
355 /// This is an **advanced API** with specific use cases:
356 ///
357 /// - **No Redis**: Bypasses L2 cache entirely
358 /// - **No caching**: Results are not cached via the merkle system
359 /// - **Ground truth**: Queries the durable SQL archive
360 ///
361 /// Useful for:
362 /// - Analytics queries that need complete data
363 /// - When Redis is unavailable or not trusted
364 /// - Bulk operations on archived data
365 ///
366 /// Prefer `search()` or `search_with_options()` for most use cases.
367 pub async fn search_sql(
368 &self,
369 query: &Query,
370 limit: usize,
371 ) -> Result<Vec<SyncItem>, StorageError> {
372 let l3 = self.l3_store.as_ref().ok_or_else(|| {
373 StorageError::Connection("SQL not available".into())
374 })?;
375
376 metrics::record_search_query("sql_direct", "attempt");
377 let start = std::time::Instant::now();
378
379 let sql_query = SqlTranslator::translate(query, "data");
380 debug!(clause = %sql_query.clause, "SQL search");
381
382 let results = l3.search(&sql_query.clause, &sql_query.params, limit).await?;
383
384 metrics::record_search_query("sql_direct", "success");
385 metrics::record_search_latency("sql_direct", start.elapsed());
386 metrics::record_search_results(results.len());
387
388 Ok(results)
389 }
390
391 /// Get search cache statistics.
392 pub fn search_cache_stats(&self) -> Option<SearchCacheStats> {
393 self.search_state.as_ref().map(|s| s.read().cache.stats())
394 }
395
396 // ═══════════════════════════════════════════════════════════════════════════
397 // Internal helpers
398 // ═══════════════════════════════════════════════════════════════════════════
399
400 /// Fetch items by keys, filtering out None results
401 async fn fetch_items_by_keys(&self, keys: &[String]) -> Result<Vec<SyncItem>, StorageError> {
402 if keys.is_empty() {
403 return Ok(vec![]);
404 }
405 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
406 let results = self.get_many(&key_refs).await;
407 Ok(results.into_iter().flatten().collect())
408 }
409
410 async fn search_redis(
411 &self,
412 index_name: &str,
413 query: &Query,
414 limit: usize,
415 ) -> Result<Vec<SyncItem>, StorageError> {
416 let l2 = self.l2_store.as_ref().ok_or_else(|| {
417 StorageError::Connection("Redis not available".into())
418 })?;
419
420 let prefix = self.config.read().redis_prefix.clone().unwrap_or_default();
421 let index = format!("{}idx:{}", prefix, index_name);
422
423 // Translate query with potential vector parameters
424 let translated = RediSearchTranslator::translate_with_params(query);
425 debug!(index = %index, query = %translated.query, has_params = %translated.has_params(), "FT.SEARCH");
426
427 let keys = if translated.has_params() {
428 // Vector search with binary parameters
429 l2.ft_search_with_params(&index, &translated.query, &translated.params, limit).await?
430 } else {
431 // Regular search
432 l2.ft_search(&index, &translated.query, limit).await?
433 };
434
435 self.fetch_items_by_keys(&keys).await
436 }
437
438 async fn get_merkle_root_for_prefix(&self, prefix: &str) -> Option<Vec<u8>> {
439 // Extract the path segment from prefix (e.g., "crdt:users:" -> "crdt:users")
440 let path = prefix.trim_end_matches(':');
441
442 if let Some(ref redis_merkle) = self.redis_merkle {
443 if let Ok(Some(node)) = redis_merkle.get_node(path).await {
444 return Some(node.hash.to_vec());
445 }
446 }
447
448 None
449 }
450}