Skip to main content

wae_search/
lib.rs

1//! WAE Search - Elasticsearch/OpenSearch 搜索服务抽象层
2//!
3//! 提供统一的搜索能力抽象,支持 Elasticsearch 和 OpenSearch 后端。
4//!
5//! 深度融合 tokio 运行时,所有 API 都是异步优先设计。
6//! 微服务架构友好,支持索引操作、文档 CRUD、搜索查询等特性。
7
8#![warn(missing_docs)]
9
10use serde::{Deserialize, Serialize, de::DeserializeOwned};
11use std::time::Duration;
12use wae_types::WaeError;
13
14/// 搜索操作结果类型
15pub type SearchResult<T> = Result<T, WaeError>;
16
17/// 搜索配置
18#[derive(Debug, Clone)]
19pub struct SearchConfig {
20    /// 搜索服务地址
21    pub url: String,
22    /// 连接超时
23    pub connection_timeout: Duration,
24    /// 操作超时
25    pub operation_timeout: Duration,
26    /// 最大重试次数
27    pub max_retries: usize,
28    /// 重试间隔
29    pub retry_interval: Duration,
30    /// 默认索引名前缀
31    pub index_prefix: String,
32}
33
34impl Default for SearchConfig {
35    fn default() -> Self {
36        Self {
37            url: "http://localhost:9200".to_string(),
38            connection_timeout: Duration::from_secs(5),
39            operation_timeout: Duration::from_secs(30),
40            max_retries: 3,
41            retry_interval: Duration::from_millis(100),
42            index_prefix: String::new(),
43        }
44    }
45}
46
47/// 搜索命中结果
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct SearchHit<T> {
50    /// 文档 ID
51    pub id: String,
52    /// 文档索引
53    pub index: String,
54    /// 文档分数
55    pub score: Option<f64>,
56    /// 文档源数据
57    pub source: T,
58}
59
60/// 搜索结果
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct SearchResponse<T> {
63    /// 总命中数
64    pub total: u64,
65    /// 命中结果列表
66    pub hits: Vec<SearchHit<T>>,
67    /// 搜索耗时(毫秒)
68    pub took: u64,
69    /// 搜索是否超时
70    pub timed_out: bool,
71}
72
73/// 搜索查询参数
74#[derive(Debug, Clone)]
75pub struct SearchQuery {
76    /// 查询 DSL(JSON 格式)
77    pub query: serde_json::Value,
78    /// 起始位置
79    pub from: Option<u64>,
80    /// 返回数量
81    pub size: Option<u64>,
82    /// 排序配置
83    pub sort: Option<serde_json::Value>,
84    /// 聚合配置
85    pub aggregations: Option<serde_json::Value>,
86}
87
88/// 文档批量操作项
89#[derive(Debug, Clone)]
90pub enum BulkOperation {
91    /// 索引文档(创建或更新)
92    Index {
93        /// 文档 ID(可选)
94        id: Option<String>,
95        /// 文档源数据
96        source: serde_json::Value,
97    },
98    /// 创建文档(仅当不存在时)
99    Create {
100        /// 文档 ID
101        id: String,
102        /// 文档源数据
103        source: serde_json::Value,
104    },
105    /// 更新文档
106    Update {
107        /// 文档 ID
108        id: String,
109        /// 更新数据
110        doc: serde_json::Value,
111    },
112    /// 删除文档
113    Delete {
114        /// 文档 ID
115        id: String,
116    },
117}
118
119/// 批量操作结果
120#[derive(Debug, Clone)]
121pub struct BulkResponse {
122    /// 是否有错误
123    pub has_errors: bool,
124    /// 操作结果列表
125    pub items: Vec<BulkItemResult>,
126    /// 操作耗时(毫秒)
127    pub took: u64,
128}
129
130/// 批量操作项结果
131#[derive(Debug, Clone)]
132pub struct BulkItemResult {
133    /// 操作类型
134    pub operation: String,
135    /// 文档 ID
136    pub id: String,
137    /// 文档索引
138    pub index: String,
139    /// 是否成功
140    pub success: bool,
141    /// 错误信息(如果失败)
142    pub error: Option<String>,
143    /// 版本号
144    pub version: Option<u64>,
145}
146
147/// 搜索服务核心 trait (dyn 兼容)
148///
149/// 定义统一的搜索操作接口,支持索引管理、文档 CRUD、搜索查询。
150/// 所有方法都是异步的,适配 tokio 运行时。
151#[async_trait::async_trait]
152pub trait SearchBackend: Send + Sync {
153    /// 检查搜索服务是否可用
154    async fn ping(&self) -> SearchResult<bool>;
155
156    /// 创建索引
157    async fn create_index(
158        &self,
159        index: &str,
160        settings: Option<serde_json::Value>,
161        mappings: Option<serde_json::Value>,
162    ) -> SearchResult<bool>;
163
164    /// 删除索引
165    async fn delete_index(&self, index: &str) -> SearchResult<bool>;
166
167    /// 检查索引是否存在
168    async fn index_exists(&self, index: &str) -> SearchResult<bool>;
169
170    /// 获取索引信息
171    async fn get_index(&self, index: &str) -> SearchResult<serde_json::Value>;
172
173    /// 索引文档(创建或更新)
174    async fn index_document(&self, index: &str, id: Option<&str>, document: &serde_json::Value) -> SearchResult<String>;
175
176    /// 获取文档
177    async fn get_document(&self, index: &str, id: &str) -> SearchResult<Option<serde_json::Value>>;
178
179    /// 更新文档
180    async fn update_document(&self, index: &str, id: &str, doc: &serde_json::Value) -> SearchResult<bool>;
181
182    /// 删除文档
183    async fn delete_document(&self, index: &str, id: &str) -> SearchResult<bool>;
184
185    /// 文档是否存在
186    async fn document_exists(&self, index: &str, id: &str) -> SearchResult<bool>;
187
188    /// 批量操作
189    async fn bulk(&self, index: &str, operations: Vec<BulkOperation>) -> SearchResult<BulkResponse>;
190
191    /// 搜索文档(返回原始 JSON 响应)
192    async fn search_raw(&self, index: &str, query: SearchQuery) -> SearchResult<SearchResponse<serde_json::Value>>;
193
194    /// 按查询删除文档
195    async fn delete_by_query(&self, index: &str, query: serde_json::Value) -> SearchResult<u64>;
196
197    /// 按查询更新文档
198    async fn update_by_query(&self, index: &str, query: serde_json::Value, script: serde_json::Value) -> SearchResult<u64>;
199
200    /// 获取搜索配置
201    fn config(&self) -> &SearchConfig;
202}
203
204/// 搜索服务 (提供泛型封装)
205pub struct SearchService {
206    backend: Box<dyn SearchBackend>,
207}
208
209impl SearchService {
210    /// 从后端创建搜索服务
211    pub fn new(backend: Box<dyn SearchBackend>) -> Self {
212        Self { backend }
213    }
214
215    /// 检查搜索服务是否可用
216    pub async fn ping(&self) -> SearchResult<bool> {
217        self.backend.ping().await
218    }
219
220    /// 创建索引
221    pub async fn create_index(
222        &self,
223        index: &str,
224        settings: Option<serde_json::Value>,
225        mappings: Option<serde_json::Value>,
226    ) -> SearchResult<bool> {
227        let full_index = self.build_index_name(index);
228        self.backend.create_index(&full_index, settings, mappings).await
229    }
230
231    /// 删除索引
232    pub async fn delete_index(&self, index: &str) -> SearchResult<bool> {
233        let full_index = self.build_index_name(index);
234        self.backend.delete_index(&full_index).await
235    }
236
237    /// 检查索引是否存在
238    pub async fn index_exists(&self, index: &str) -> SearchResult<bool> {
239        let full_index = self.build_index_name(index);
240        self.backend.index_exists(&full_index).await
241    }
242
243    /// 获取索引信息
244    pub async fn get_index(&self, index: &str) -> SearchResult<serde_json::Value> {
245        let full_index = self.build_index_name(index);
246        self.backend.get_index(&full_index).await
247    }
248
249    /// 索引文档(创建或更新)
250    pub async fn index_document<T: Serialize + ?Sized>(
251        &self,
252        index: &str,
253        id: Option<&str>,
254        document: &T,
255    ) -> SearchResult<String> {
256        let full_index = self.build_index_name(index);
257        let doc = serde_json::to_value(document).map_err(|_| WaeError::serialization_failed(std::any::type_name::<T>()))?;
258        self.backend.index_document(&full_index, id, &doc).await
259    }
260
261    /// 获取文档
262    pub async fn get_document<T: DeserializeOwned>(&self, index: &str, id: &str) -> SearchResult<Option<T>> {
263        let full_index = self.build_index_name(index);
264        let result = self.backend.get_document(&full_index, id).await?;
265        match result {
266            Some(value) => {
267                let doc =
268                    serde_json::from_value(value).map_err(|_| WaeError::deserialization_failed(std::any::type_name::<T>()))?;
269                Ok(Some(doc))
270            }
271            None => Ok(None),
272        }
273    }
274
275    /// 更新文档
276    pub async fn update_document<T: Serialize + ?Sized>(&self, index: &str, id: &str, doc: &T) -> SearchResult<bool> {
277        let full_index = self.build_index_name(index);
278        let doc_value = serde_json::to_value(doc).map_err(|_| WaeError::serialization_failed(std::any::type_name::<T>()))?;
279        self.backend.update_document(&full_index, id, &doc_value).await
280    }
281
282    /// 删除文档
283    pub async fn delete_document(&self, index: &str, id: &str) -> SearchResult<bool> {
284        let full_index = self.build_index_name(index);
285        self.backend.delete_document(&full_index, id).await
286    }
287
288    /// 文档是否存在
289    pub async fn document_exists(&self, index: &str, id: &str) -> SearchResult<bool> {
290        let full_index = self.build_index_name(index);
291        self.backend.document_exists(&full_index, id).await
292    }
293
294    /// 批量操作
295    pub async fn bulk(&self, index: &str, operations: Vec<BulkOperation>) -> SearchResult<BulkResponse> {
296        let full_index = self.build_index_name(index);
297        self.backend.bulk(&full_index, operations).await
298    }
299
300    /// 搜索文档
301    pub async fn search<T: DeserializeOwned>(&self, index: &str, query: SearchQuery) -> SearchResult<SearchResponse<T>> {
302        let full_index = self.build_index_name(index);
303        let raw_response = self.backend.search_raw(&full_index, query).await?;
304
305        let mut hits = Vec::with_capacity(raw_response.hits.len());
306        for hit in raw_response.hits {
307            let source = serde_json::from_value(hit.source)
308                .map_err(|_| wae_types::WaeError::deserialization_failed(std::any::type_name::<T>()))?;
309            hits.push(SearchHit { id: hit.id, index: hit.index, score: hit.score, source });
310        }
311
312        Ok(SearchResponse { total: raw_response.total, hits, took: raw_response.took, timed_out: raw_response.timed_out })
313    }
314
315    /// 按查询删除文档
316    pub async fn delete_by_query(&self, index: &str, query: serde_json::Value) -> SearchResult<u64> {
317        let full_index = self.build_index_name(index);
318        self.backend.delete_by_query(&full_index, query).await
319    }
320
321    /// 按查询更新文档
322    pub async fn update_by_query(&self, index: &str, query: serde_json::Value, script: serde_json::Value) -> SearchResult<u64> {
323        let full_index = self.build_index_name(index);
324        self.backend.update_by_query(&full_index, query, script).await
325    }
326
327    /// 获取搜索配置
328    pub fn config(&self) -> &SearchConfig {
329        self.backend.config()
330    }
331
332    /// 构建带前缀的完整索引名
333    pub fn build_index_name(&self, index: &str) -> String {
334        let config = self.config();
335        if config.index_prefix.is_empty() { index.to_string() } else { format!("{}_{}", config.index_prefix, index) }
336    }
337}