1use std::time::Instant;
24use tracing::{debug, info};
25
26use crate::metrics;
27use crate::search::{
28 IndexManager, SearchIndex, SearchCache, SearchCacheStats,
29 Query, RediSearchTranslator, SqlTranslator,
30};
31use crate::sync_item::SyncItem;
32use crate::storage::traits::StorageError;
33
34use super::SyncEngine;
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
38pub enum SearchTier {
39 RedisOnly,
41 #[default]
43 RedisWithSqlFallback,
44}
45
46#[derive(Debug, Clone)]
48pub struct SearchResult {
49 pub items: Vec<SyncItem>,
51 pub source: SearchSource,
53 pub cached: bool,
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub enum SearchSource {
60 Redis,
62 Sql,
64 Cache,
66 Empty,
68}
69
70pub struct SearchState {
72 pub index_manager: IndexManager,
74 pub cache: SearchCache,
76}
77
78impl Default for SearchState {
79 fn default() -> Self {
80 Self {
81 index_manager: IndexManager::new(),
82 cache: SearchCache::default(),
83 }
84 }
85}
86
87impl SyncEngine {
88 pub async fn create_search_index(&self, index: SearchIndex) -> Result<(), StorageError> {
115 let l2 = self.l2_store.as_ref().ok_or_else(|| {
116 StorageError::Connection("Redis not available for search index".into())
117 })?;
118
119 let redis_prefix = self.config.redis_prefix.as_deref();
121 let args = index.to_ft_create_args_with_prefix(redis_prefix);
122 debug!(index = %index.name, prefix = %index.prefix, redis_prefix = ?redis_prefix, "Creating search index");
123
124 match l2.ft_create(&args).await {
126 Ok(()) => {
127 metrics::record_search_index_operation("create", true);
128
129 if let Some(ref search_state) = self.search_state {
131 search_state.write().index_manager.register(index);
132 }
133
134 info!(index = %args[0], "Search index created");
135 Ok(())
136 }
137 Err(e) => {
138 metrics::record_search_index_operation("create", false);
139 Err(e)
140 }
141 }
142 }
143
144 pub async fn drop_search_index(&self, name: &str) -> Result<(), StorageError> {
148 let l2 = self.l2_store.as_ref().ok_or_else(|| {
149 StorageError::Connection("Redis not available".into())
150 })?;
151
152 let prefix = self.config.redis_prefix.as_deref().unwrap_or("");
153 let index_name = format!("{}idx:{}", prefix, name);
154
155 match l2.ft_dropindex(&index_name).await {
156 Ok(()) => {
157 metrics::record_search_index_operation("drop", true);
158 info!(index = %index_name, "Search index dropped");
159 Ok(())
160 }
161 Err(e) => {
162 metrics::record_search_index_operation("drop", false);
163 Err(e)
164 }
165 }
166 }
167
168 pub async fn search(
198 &self,
199 index_name: &str,
200 query: &Query,
201 ) -> Result<SearchResult, StorageError> {
202 self.search_with_options(index_name, query, SearchTier::default(), 100).await
203 }
204
205 pub async fn search_with_options(
207 &self,
208 index_name: &str,
209 query: &Query,
210 tier: SearchTier,
211 limit: usize,
212 ) -> Result<SearchResult, StorageError> {
213 let start = Instant::now();
214
215 let prefix = if let Some(ref search_state) = self.search_state {
217 search_state.read()
218 .index_manager
219 .get(index_name)
220 .map(|idx| idx.prefix.clone())
221 } else {
222 None
223 };
224
225 let prefix = prefix.unwrap_or_else(|| format!("crdt:{}:", index_name));
226
227 if let Some(ref search_state) = self.search_state {
229 if let Some(merkle_root) = self.get_merkle_root_for_prefix(&prefix).await {
230 let cached_keys = search_state.read().cache.get(&prefix, query, &merkle_root);
231 if let Some(keys) = cached_keys {
232 debug!(index = %index_name, count = keys.len(), "Search cache hit");
233 metrics::record_search_cache(true);
234 metrics::record_search_latency("cache", start.elapsed());
235 metrics::record_search_results(keys.len());
236 let items = self.fetch_items_by_keys(&keys).await?;
237 return Ok(SearchResult {
238 items,
239 source: SearchSource::Cache,
240 cached: true,
241 });
242 }
243 }
244 }
245
246 let redis_start = Instant::now();
248 let redis_results = self.search_redis(index_name, query, limit).await;
249
250 match redis_results {
251 Ok(items) if !items.is_empty() => {
252 debug!(index = %index_name, count = items.len(), "RediSearch results");
253 metrics::record_search_query("redis", "success");
254 metrics::record_search_latency("redis", redis_start.elapsed());
255 metrics::record_search_results(items.len());
256 Ok(SearchResult {
257 items,
258 source: SearchSource::Redis,
259 cached: false,
260 })
261 }
262 Ok(_) | Err(_) => {
263 if redis_results.is_err() {
265 metrics::record_search_query("redis", "error");
266 } else {
267 metrics::record_search_query("redis", "empty");
268 }
269
270 if tier == SearchTier::RedisWithSqlFallback {
272 let sql_start = Instant::now();
273 let sql_results = self.search_sql(query, limit).await?;
274 let is_empty = sql_results.is_empty();
275
276 metrics::record_search_query("sql", "success");
277 metrics::record_search_latency("sql", sql_start.elapsed());
278 metrics::record_search_results(sql_results.len());
279
280 if !is_empty {
282 if let Some(ref search_state) = self.search_state {
283 if let Some(merkle_root) = self.get_merkle_root_for_prefix(&prefix).await {
284 let keys: Vec<String> = sql_results.iter()
285 .map(|item| item.object_id.clone())
286 .collect();
287 search_state.write().cache.insert(&prefix, query, merkle_root, keys);
288 }
289 }
290 }
291
292 Ok(SearchResult {
293 items: sql_results,
294 source: if is_empty { SearchSource::Empty } else { SearchSource::Sql },
295 cached: false,
296 })
297 } else {
298 metrics::record_search_results(0);
300 Ok(SearchResult {
301 items: vec![],
302 source: SearchSource::Empty,
303 cached: false,
304 })
305 }
306 }
307 }
308 }
309
310 pub async fn search_raw(
332 &self,
333 index_name: &str,
334 query_str: &str,
335 limit: usize,
336 ) -> Result<Vec<SyncItem>, StorageError> {
337 let l2 = self.l2_store.as_ref().ok_or_else(|| {
338 StorageError::Connection("Redis not available for raw search".into())
339 })?;
340
341 let prefix = self.config.redis_prefix.as_deref().unwrap_or("");
342 let index = format!("{}idx:{}", prefix, index_name);
343
344 metrics::record_search_query("redis_raw", "attempt");
345 let start = std::time::Instant::now();
346
347 let keys = l2.ft_search(&index, query_str, limit).await?;
348 let items = self.fetch_items_by_keys(&keys).await?;
349
350 metrics::record_search_query("redis_raw", "success");
351 metrics::record_search_latency("redis_raw", start.elapsed());
352 metrics::record_search_results(items.len());
353
354 Ok(items)
355 }
356
357 pub async fn search_sql(
373 &self,
374 query: &Query,
375 limit: usize,
376 ) -> Result<Vec<SyncItem>, StorageError> {
377 let l3 = self.l3_store.as_ref().ok_or_else(|| {
378 StorageError::Connection("SQL not available".into())
379 })?;
380
381 metrics::record_search_query("sql_direct", "attempt");
382 let start = std::time::Instant::now();
383
384 let sql_query = SqlTranslator::translate(query, "data");
385 debug!(clause = %sql_query.clause, "SQL search");
386
387 let results = l3.search(&sql_query.clause, &sql_query.params, limit).await?;
388
389 metrics::record_search_query("sql_direct", "success");
390 metrics::record_search_latency("sql_direct", start.elapsed());
391 metrics::record_search_results(results.len());
392
393 Ok(results)
394 }
395
396 pub fn search_cache_stats(&self) -> Option<SearchCacheStats> {
398 self.search_state.as_ref().map(|s| s.read().cache.stats())
399 }
400
401 async fn fetch_items_by_keys(&self, keys: &[String]) -> Result<Vec<SyncItem>, StorageError> {
407 if keys.is_empty() {
408 return Ok(vec![]);
409 }
410 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
411 let results = self.get_many(&key_refs).await;
412 Ok(results.into_iter().flatten().collect())
413 }
414
415 async fn search_redis(
416 &self,
417 index_name: &str,
418 query: &Query,
419 limit: usize,
420 ) -> Result<Vec<SyncItem>, StorageError> {
421 let l2 = self.l2_store.as_ref().ok_or_else(|| {
422 StorageError::Connection("Redis not available".into())
423 })?;
424
425 let prefix = self.config.redis_prefix.as_deref().unwrap_or("");
426 let index = format!("{}idx:{}", prefix, index_name);
427 let query_str = RediSearchTranslator::translate(query);
428 debug!(index = %index, query = %query_str, "FT.SEARCH");
429
430 let keys = l2.ft_search(&index, &query_str, limit).await?;
431 self.fetch_items_by_keys(&keys).await
432 }
433
434 async fn get_merkle_root_for_prefix(&self, prefix: &str) -> Option<Vec<u8>> {
435 let path = prefix.trim_end_matches(':');
437
438 if let Some(ref redis_merkle) = self.redis_merkle {
439 if let Ok(Some(node)) = redis_merkle.get_node(path).await {
440 return Some(node.hash.to_vec());
441 }
442 }
443
444 None
445 }
446}