1use std::time::Instant;
27use tracing::{debug, info};
28
29use crate::metrics;
30use crate::search::{
31 IndexManager, SearchIndex, SearchCache, SearchCacheStats,
32 Query, RediSearchTranslator, SqlTranslator, SqlParam,
33};
34use crate::sync_item::SyncItem;
35use crate::storage::traits::StorageError;
36
37use super::SyncEngine;
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
41pub enum SearchTier {
42 RedisOnly,
44 #[default]
46 RedisWithSqlFallback,
47}
48
49#[derive(Debug, Clone)]
51pub struct SearchResult {
52 pub items: Vec<SyncItem>,
54 pub source: SearchSource,
56 pub cached: bool,
58}
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub enum SearchSource {
63 Redis,
65 Sql,
67 Cache,
69 Empty,
71}
72
73#[derive(Default)]
75pub struct SearchState {
76 pub index_manager: IndexManager,
78 pub cache: SearchCache,
80}
81
82impl SyncEngine {
83 pub async fn create_search_index(&self, index: SearchIndex) -> Result<(), StorageError> {
110 let l2 = self.l2_store.as_ref().ok_or_else(|| {
111 StorageError::Connection("Redis not available for search index".into())
112 })?;
113
114 let redis_prefix = self.config.read().redis_prefix.clone();
116 let args = index.to_ft_create_args_with_prefix(redis_prefix.as_deref());
117 debug!(index = %index.name, prefix = %index.prefix, redis_prefix = ?redis_prefix, "Creating search index");
118
119 match l2.ft_create(&args).await {
121 Ok(()) => {
122 metrics::record_search_index_operation("create", true);
123
124 if let Some(ref search_state) = self.search_state {
126 search_state.write().index_manager.register(index);
127 }
128
129 info!(index = %args[0], "Search index created");
130 Ok(())
131 }
132 Err(e) => {
133 metrics::record_search_index_operation("create", false);
134 Err(e)
135 }
136 }
137 }
138
139 pub async fn drop_search_index(&self, name: &str) -> Result<(), StorageError> {
143 let l2 = self.l2_store.as_ref().ok_or_else(|| {
144 StorageError::Connection("Redis not available".into())
145 })?;
146
147 let prefix = self.config.read().redis_prefix.clone().unwrap_or_default();
148 let index_name = format!("{}idx:{}", prefix, name);
149
150 match l2.ft_dropindex(&index_name).await {
151 Ok(()) => {
152 metrics::record_search_index_operation("drop", true);
153 info!(index = %index_name, "Search index dropped");
154 Ok(())
155 }
156 Err(e) => {
157 metrics::record_search_index_operation("drop", false);
158 Err(e)
159 }
160 }
161 }
162
163 pub fn get_index_view_prefix(&self, index_name: &str) -> String {
168 let crdt_prefix = if let Some(ref search_state) = self.search_state {
169 search_state.read()
170 .index_manager
171 .get(index_name)
172 .map(|idx| idx.prefix.clone())
173 } else {
174 None
175 };
176
177 let crdt_prefix = crdt_prefix.unwrap_or_else(|| format!("crdt:{}:", index_name));
178 crdt_prefix.replace("crdt:", "view:")
179 }
180
181 pub async fn search(
212 &self,
213 index_name: &str,
214 query: &Query,
215 ) -> Result<SearchResult, StorageError> {
216 self.search_with_options(index_name, query, SearchTier::default(), 100).await
217 }
218
219 pub async fn search_with_options(
221 &self,
222 index_name: &str,
223 query: &Query,
224 tier: SearchTier,
225 limit: usize,
226 ) -> Result<SearchResult, StorageError> {
227 let start = Instant::now();
228
229 let prefix = if let Some(ref search_state) = self.search_state {
231 search_state.read()
232 .index_manager
233 .get(index_name)
234 .map(|idx| idx.prefix.clone())
235 } else {
236 None
237 };
238
239 let prefix = prefix.unwrap_or_else(|| format!("crdt:{}:", index_name));
240
241 if let Some(ref search_state) = self.search_state {
243 if let Some(merkle_root) = self.get_merkle_root_for_prefix(&prefix).await {
244 let cached_keys = search_state.read().cache.get(&prefix, query, &merkle_root);
245 if let Some(keys) = cached_keys {
246 debug!(index = %index_name, count = keys.len(), "Search cache hit");
247 metrics::record_search_cache(true);
248 metrics::record_search_latency("cache", start.elapsed());
249 metrics::record_search_results(keys.len());
250 let items = self.fetch_items_by_keys(&keys).await?;
251 return Ok(SearchResult {
252 items,
253 source: SearchSource::Cache,
254 cached: true,
255 });
256 }
257 }
258 }
259
260 let redis_start = Instant::now();
262 let redis_results = self.search_redis(index_name, query, limit).await;
263
264 match redis_results {
265 Ok(items) if !items.is_empty() => {
266 debug!(index = %index_name, count = items.len(), "RediSearch results");
267 metrics::record_search_query("redis", "success");
268 metrics::record_search_latency("redis", redis_start.elapsed());
269 metrics::record_search_results(items.len());
270 Ok(SearchResult {
271 items,
272 source: SearchSource::Redis,
273 cached: false,
274 })
275 }
276 Ok(_) | Err(_) => {
277 if redis_results.is_err() {
279 metrics::record_search_query("redis", "error");
280 } else {
281 metrics::record_search_query("redis", "empty");
282 }
283
284 if tier == SearchTier::RedisWithSqlFallback {
286 let sql_start = Instant::now();
287 let view_prefix = prefix.replace("crdt:", "view:");
289 let sql_results = self.search_sql_with_prefix(query, &view_prefix, limit).await?;
290 let is_empty = sql_results.is_empty();
291
292 metrics::record_search_query("sql", "success");
293 metrics::record_search_latency("sql", sql_start.elapsed());
294 metrics::record_search_results(sql_results.len());
295
296 if !is_empty {
298 if let Some(ref search_state) = self.search_state {
299 if let Some(merkle_root) = self.get_merkle_root_for_prefix(&prefix).await {
300 let keys: Vec<String> = sql_results.iter()
301 .map(|item| item.object_id.clone())
302 .collect();
303 search_state.write().cache.insert(&prefix, query, merkle_root, keys);
304 }
305 }
306 }
307
308 Ok(SearchResult {
309 items: sql_results,
310 source: if is_empty { SearchSource::Empty } else { SearchSource::Sql },
311 cached: false,
312 })
313 } else {
314 metrics::record_search_results(0);
316 Ok(SearchResult {
317 items: vec![],
318 source: SearchSource::Empty,
319 cached: false,
320 })
321 }
322 }
323 }
324 }
325
326 pub async fn search_raw(
348 &self,
349 index_name: &str,
350 query_str: &str,
351 limit: usize,
352 ) -> Result<Vec<SyncItem>, StorageError> {
353 let l2 = self.l2_store.as_ref().ok_or_else(|| {
354 StorageError::Connection("Redis not available for raw search".into())
355 })?;
356
357 let prefix = self.config.read().redis_prefix.clone().unwrap_or_default();
358 let index = format!("{}idx:{}", prefix, index_name);
359
360 metrics::record_search_query("redis_raw", "attempt");
361 let start = std::time::Instant::now();
362
363 let keys = l2.ft_search(&index, query_str, limit).await?;
364 let items = self.fetch_items_by_keys(&keys).await?;
365
366 metrics::record_search_query("redis_raw", "success");
367 metrics::record_search_latency("redis_raw", start.elapsed());
368 metrics::record_search_results(items.len());
369
370 Ok(items)
371 }
372
373 pub async fn search_sql(
389 &self,
390 query: &Query,
391 limit: usize,
392 ) -> Result<Vec<SyncItem>, StorageError> {
393 self.search_sql_with_prefix(query, "", limit).await
394 }
395
396 pub async fn search_sql_with_prefix(
401 &self,
402 query: &Query,
403 key_prefix: &str,
404 limit: usize,
405 ) -> Result<Vec<SyncItem>, StorageError> {
406 let l3 = self.l3_store.as_ref().ok_or_else(|| {
407 StorageError::Connection("SQL not available".into())
408 })?;
409
410 metrics::record_search_query("sql_direct", "attempt");
411 let start = std::time::Instant::now();
412
413 let sql_query = SqlTranslator::translate(query, "payload");
414
415 let (full_clause, full_params) = if key_prefix.is_empty() {
417 (sql_query.clause.clone(), sql_query.params.clone())
418 } else {
419 let mut params = vec![SqlParam::Text(format!("{}%", key_prefix))];
421 params.extend(sql_query.params.clone());
422 (format!("id LIKE ? AND ({})", sql_query.clause), params)
423 };
424
425 debug!(clause = %full_clause, prefix = %key_prefix, "SQL search");
426
427 let results = l3.search(&full_clause, &full_params, limit).await?;
428
429 metrics::record_search_query("sql_direct", "success");
430 metrics::record_search_latency("sql_direct", start.elapsed());
431 metrics::record_search_results(results.len());
432
433 Ok(results)
434 }
435
436 pub async fn search_count_sql(&self, query: &Query) -> Result<u64, StorageError> {
448 self.search_count_sql_with_prefix(query, "").await
449 }
450
451 pub async fn search_count_sql_with_prefix(&self, query: &Query, key_prefix: &str) -> Result<u64, StorageError> {
453 let l3 = self.l3_store.as_ref().ok_or_else(|| {
454 StorageError::Connection("SQL not available".into())
455 })?;
456
457 let sql_query = SqlTranslator::translate(query, "payload");
458
459 let (full_clause, full_params) = if key_prefix.is_empty() {
461 (sql_query.clause.clone(), sql_query.params.clone())
462 } else {
463 let mut params = vec![SqlParam::Text(format!("{}%", key_prefix))];
464 params.extend(sql_query.params.clone());
465 (format!("id LIKE ? AND ({})", sql_query.clause), params)
466 };
467
468 debug!(clause = %full_clause, prefix = %key_prefix, "SQL count");
469
470 l3.count_where(&full_clause, &full_params).await
471 }
472
473 pub fn search_cache_stats(&self) -> Option<SearchCacheStats> {
475 self.search_state.as_ref().map(|s| s.read().cache.stats())
476 }
477
478 async fn fetch_items_by_keys(&self, keys: &[String]) -> Result<Vec<SyncItem>, StorageError> {
484 if keys.is_empty() {
485 return Ok(vec![]);
486 }
487 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
488 let results = self.get_many(&key_refs).await;
489 Ok(results.into_iter().flatten().collect())
490 }
491
492 async fn search_redis(
493 &self,
494 index_name: &str,
495 query: &Query,
496 limit: usize,
497 ) -> Result<Vec<SyncItem>, StorageError> {
498 let l2 = self.l2_store.as_ref().ok_or_else(|| {
499 StorageError::Connection("Redis not available".into())
500 })?;
501
502 let prefix = self.config.read().redis_prefix.clone().unwrap_or_default();
503 let index = format!("{}idx:{}", prefix, index_name);
504
505 let translated = RediSearchTranslator::translate_with_params(query);
507 debug!(index = %index, query = %translated.query, has_params = %translated.has_params(), "FT.SEARCH");
508
509 let keys = if translated.has_params() {
510 l2.ft_search_with_params(&index, &translated.query, &translated.params, limit).await?
512 } else {
513 l2.ft_search(&index, &translated.query, limit).await?
515 };
516
517 self.fetch_items_by_keys(&keys).await
518 }
519
520 async fn get_merkle_root_for_prefix(&self, prefix: &str) -> Option<Vec<u8>> {
521 let path = prefix.trim_end_matches(':');
523
524 if let Some(ref redis_merkle) = self.redis_merkle {
525 if let Ok(Some(node)) = redis_merkle.get_node(path).await {
526 return Some(node.hash.to_vec());
527 }
528 }
529
530 None
531 }
532}