1#![warn(missing_docs)]
9
10use serde::{Deserialize, Serialize, de::DeserializeOwned};
11use std::time::Duration;
12use wae_types::WaeError;
13
14pub type SearchResult<T> = Result<T, WaeError>;
16
17#[derive(Debug, Clone)]
19pub struct SearchConfig {
20 pub url: String,
22 pub connection_timeout: Duration,
24 pub operation_timeout: Duration,
26 pub max_retries: usize,
28 pub retry_interval: Duration,
30 pub index_prefix: String,
32}
33
34impl Default for SearchConfig {
35 fn default() -> Self {
36 Self {
37 url: "http://localhost:9200".to_string(),
38 connection_timeout: Duration::from_secs(5),
39 operation_timeout: Duration::from_secs(30),
40 max_retries: 3,
41 retry_interval: Duration::from_millis(100),
42 index_prefix: String::new(),
43 }
44 }
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct SearchHit<T> {
50 pub id: String,
52 pub index: String,
54 pub score: Option<f64>,
56 pub source: T,
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct SearchResponse<T> {
63 pub total: u64,
65 pub hits: Vec<SearchHit<T>>,
67 pub took: u64,
69 pub timed_out: bool,
71}
72
73#[derive(Debug, Clone)]
75pub struct SearchQuery {
76 pub query: serde_json::Value,
78 pub from: Option<u64>,
80 pub size: Option<u64>,
82 pub sort: Option<serde_json::Value>,
84 pub aggregations: Option<serde_json::Value>,
86}
87
88#[derive(Debug, Clone)]
90pub enum BulkOperation {
91 Index {
93 id: Option<String>,
95 source: serde_json::Value,
97 },
98 Create {
100 id: String,
102 source: serde_json::Value,
104 },
105 Update {
107 id: String,
109 doc: serde_json::Value,
111 },
112 Delete {
114 id: String,
116 },
117}
118
119#[derive(Debug, Clone)]
121pub struct BulkResponse {
122 pub has_errors: bool,
124 pub items: Vec<BulkItemResult>,
126 pub took: u64,
128}
129
130#[derive(Debug, Clone)]
132pub struct BulkItemResult {
133 pub operation: String,
135 pub id: String,
137 pub index: String,
139 pub success: bool,
141 pub error: Option<String>,
143 pub version: Option<u64>,
145}
146
147#[async_trait::async_trait]
152pub trait SearchBackend: Send + Sync {
153 async fn ping(&self) -> SearchResult<bool>;
155
156 async fn create_index(
158 &self,
159 index: &str,
160 settings: Option<serde_json::Value>,
161 mappings: Option<serde_json::Value>,
162 ) -> SearchResult<bool>;
163
164 async fn delete_index(&self, index: &str) -> SearchResult<bool>;
166
167 async fn index_exists(&self, index: &str) -> SearchResult<bool>;
169
170 async fn get_index(&self, index: &str) -> SearchResult<serde_json::Value>;
172
173 async fn index_document(&self, index: &str, id: Option<&str>, document: &serde_json::Value) -> SearchResult<String>;
175
176 async fn get_document(&self, index: &str, id: &str) -> SearchResult<Option<serde_json::Value>>;
178
179 async fn update_document(&self, index: &str, id: &str, doc: &serde_json::Value) -> SearchResult<bool>;
181
182 async fn delete_document(&self, index: &str, id: &str) -> SearchResult<bool>;
184
185 async fn document_exists(&self, index: &str, id: &str) -> SearchResult<bool>;
187
188 async fn bulk(&self, index: &str, operations: Vec<BulkOperation>) -> SearchResult<BulkResponse>;
190
191 async fn search_raw(&self, index: &str, query: SearchQuery) -> SearchResult<SearchResponse<serde_json::Value>>;
193
194 async fn delete_by_query(&self, index: &str, query: serde_json::Value) -> SearchResult<u64>;
196
197 async fn update_by_query(&self, index: &str, query: serde_json::Value, script: serde_json::Value) -> SearchResult<u64>;
199
200 fn config(&self) -> &SearchConfig;
202}
203
204pub struct SearchService {
206 backend: Box<dyn SearchBackend>,
207}
208
209impl SearchService {
210 pub fn new(backend: Box<dyn SearchBackend>) -> Self {
212 Self { backend }
213 }
214
215 pub async fn ping(&self) -> SearchResult<bool> {
217 self.backend.ping().await
218 }
219
220 pub async fn create_index(
222 &self,
223 index: &str,
224 settings: Option<serde_json::Value>,
225 mappings: Option<serde_json::Value>,
226 ) -> SearchResult<bool> {
227 let full_index = self.build_index_name(index);
228 self.backend.create_index(&full_index, settings, mappings).await
229 }
230
231 pub async fn delete_index(&self, index: &str) -> SearchResult<bool> {
233 let full_index = self.build_index_name(index);
234 self.backend.delete_index(&full_index).await
235 }
236
237 pub async fn index_exists(&self, index: &str) -> SearchResult<bool> {
239 let full_index = self.build_index_name(index);
240 self.backend.index_exists(&full_index).await
241 }
242
243 pub async fn get_index(&self, index: &str) -> SearchResult<serde_json::Value> {
245 let full_index = self.build_index_name(index);
246 self.backend.get_index(&full_index).await
247 }
248
249 pub async fn index_document<T: Serialize + ?Sized>(
251 &self,
252 index: &str,
253 id: Option<&str>,
254 document: &T,
255 ) -> SearchResult<String> {
256 let full_index = self.build_index_name(index);
257 let doc = serde_json::to_value(document).map_err(|_| WaeError::serialization_failed(std::any::type_name::<T>()))?;
258 self.backend.index_document(&full_index, id, &doc).await
259 }
260
261 pub async fn get_document<T: DeserializeOwned>(&self, index: &str, id: &str) -> SearchResult<Option<T>> {
263 let full_index = self.build_index_name(index);
264 let result = self.backend.get_document(&full_index, id).await?;
265 match result {
266 Some(value) => {
267 let doc =
268 serde_json::from_value(value).map_err(|_| WaeError::deserialization_failed(std::any::type_name::<T>()))?;
269 Ok(Some(doc))
270 }
271 None => Ok(None),
272 }
273 }
274
275 pub async fn update_document<T: Serialize + ?Sized>(&self, index: &str, id: &str, doc: &T) -> SearchResult<bool> {
277 let full_index = self.build_index_name(index);
278 let doc_value = serde_json::to_value(doc).map_err(|_| WaeError::serialization_failed(std::any::type_name::<T>()))?;
279 self.backend.update_document(&full_index, id, &doc_value).await
280 }
281
282 pub async fn delete_document(&self, index: &str, id: &str) -> SearchResult<bool> {
284 let full_index = self.build_index_name(index);
285 self.backend.delete_document(&full_index, id).await
286 }
287
288 pub async fn document_exists(&self, index: &str, id: &str) -> SearchResult<bool> {
290 let full_index = self.build_index_name(index);
291 self.backend.document_exists(&full_index, id).await
292 }
293
294 pub async fn bulk(&self, index: &str, operations: Vec<BulkOperation>) -> SearchResult<BulkResponse> {
296 let full_index = self.build_index_name(index);
297 self.backend.bulk(&full_index, operations).await
298 }
299
300 pub async fn search<T: DeserializeOwned>(&self, index: &str, query: SearchQuery) -> SearchResult<SearchResponse<T>> {
302 let full_index = self.build_index_name(index);
303 let raw_response = self.backend.search_raw(&full_index, query).await?;
304
305 let mut hits = Vec::with_capacity(raw_response.hits.len());
306 for hit in raw_response.hits {
307 let source = serde_json::from_value(hit.source)
308 .map_err(|_| wae_types::WaeError::deserialization_failed(std::any::type_name::<T>()))?;
309 hits.push(SearchHit { id: hit.id, index: hit.index, score: hit.score, source });
310 }
311
312 Ok(SearchResponse { total: raw_response.total, hits, took: raw_response.took, timed_out: raw_response.timed_out })
313 }
314
315 pub async fn delete_by_query(&self, index: &str, query: serde_json::Value) -> SearchResult<u64> {
317 let full_index = self.build_index_name(index);
318 self.backend.delete_by_query(&full_index, query).await
319 }
320
321 pub async fn update_by_query(&self, index: &str, query: serde_json::Value, script: serde_json::Value) -> SearchResult<u64> {
323 let full_index = self.build_index_name(index);
324 self.backend.update_by_query(&full_index, query, script).await
325 }
326
327 pub fn config(&self) -> &SearchConfig {
329 self.backend.config()
330 }
331
332 pub fn build_index_name(&self, index: &str) -> String {
334 let config = self.config();
335 if config.index_prefix.is_empty() { index.to_string() } else { format!("{}_{}", config.index_prefix, index) }
336 }
337}