1use async_trait::async_trait;
22use serde_json::{json, Value};
23use std::collections::HashMap;
24use synaptic_core::{Document, Embeddings, SynapticError, VectorStore};
25
26#[derive(Debug, Clone)]
28pub struct OpenSearchConfig {
29 pub endpoint: String,
31 pub index: String,
33 pub dim: usize,
35 pub username: Option<String>,
37 pub password: Option<String>,
39}
40
41impl OpenSearchConfig {
42 pub fn new(endpoint: impl Into<String>, index: impl Into<String>, dim: usize) -> Self {
44 Self {
45 endpoint: endpoint.into(),
46 index: index.into(),
47 dim,
48 username: None,
49 password: None,
50 }
51 }
52
53 pub fn with_credentials(
55 mut self,
56 username: impl Into<String>,
57 password: impl Into<String>,
58 ) -> Self {
59 self.username = Some(username.into());
60 self.password = Some(password.into());
61 self
62 }
63}
64
65pub struct OpenSearchVectorStore {
71 config: OpenSearchConfig,
72 client: reqwest::Client,
73}
74
75impl OpenSearchVectorStore {
76 pub fn new(config: OpenSearchConfig) -> Self {
78 Self {
79 config,
80 client: reqwest::Client::new(),
81 }
82 }
83
84 pub fn config(&self) -> &OpenSearchConfig {
86 &self.config
87 }
88
89 pub async fn initialize(&self) -> Result<(), SynapticError> {
93 let head_url = format!(
95 "{}/{}",
96 self.config.endpoint.trim_end_matches('/'),
97 self.config.index
98 );
99 let mut head_req = self.client.head(&head_url);
100 if let (Some(ref u), Some(ref p)) = (&self.config.username, &self.config.password) {
101 head_req = head_req.basic_auth(u, Some(p));
102 }
103 let head_resp = head_req.send().await.map_err(|e| {
104 SynapticError::VectorStore(format!("OpenSearch HEAD request failed: {e}"))
105 })?;
106 if head_resp.status().is_success() {
107 return Ok(());
109 }
110
111 let mapping = json!({
112 "settings": {
113 "index": { "knn": true }
114 },
115 "mappings": {
116 "properties": {
117 "doc_id": { "type": "keyword" },
118 "content": { "type": "text" },
119 "metadata": { "type": "object", "enabled": false },
120 "embedding": {
121 "type": "knn_vector",
122 "dimension": self.config.dim,
123 "method": {
124 "name": "hnsw",
125 "space_type": "cosinesimil",
126 "engine": "nmslib"
127 }
128 }
129 }
130 }
131 });
132
133 let put_url = format!(
134 "{}/{}",
135 self.config.endpoint.trim_end_matches('/'),
136 self.config.index
137 );
138 let mut put_req = self
139 .client
140 .put(&put_url)
141 .header("Content-Type", "application/json")
142 .json(&mapping);
143 if let (Some(ref u), Some(ref p)) = (&self.config.username, &self.config.password) {
144 put_req = put_req.basic_auth(u, Some(p));
145 }
146 let put_resp = put_req
147 .send()
148 .await
149 .map_err(|e| SynapticError::VectorStore(format!("OpenSearch PUT index failed: {e}")))?;
150
151 let status = put_resp.status().as_u16();
152 if status >= 400 {
153 let body: Value = put_resp.json().await.unwrap_or_default();
154 let err_type = body["error"]["type"].as_str().unwrap_or("");
156 if !err_type.contains("already_exists") {
157 return Err(SynapticError::VectorStore(format!(
158 "OpenSearch create index error (HTTP {status}): {body}"
159 )));
160 }
161 }
162 Ok(())
163 }
164
165 fn apply_auth(&self, builder: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
167 if let (Some(ref u), Some(ref p)) = (&self.config.username, &self.config.password) {
168 builder.basic_auth(u, Some(p))
169 } else {
170 builder
171 }
172 }
173
174 async fn search_by_vector_with_score(
176 &self,
177 embedding: &[f32],
178 k: usize,
179 ) -> Result<Vec<(Document, f32)>, SynapticError> {
180 let body = json!({
181 "size": k,
182 "query": {
183 "knn": {
184 "embedding": {
185 "vector": embedding,
186 "k": k,
187 }
188 }
189 },
190 "_source": ["doc_id", "content", "metadata"],
191 });
192
193 let search_url = format!(
194 "{}/{}/_search",
195 self.config.endpoint.trim_end_matches('/'),
196 self.config.index
197 );
198 let req = self
199 .apply_auth(self.client.post(&search_url))
200 .header("Content-Type", "application/json")
201 .json(&body);
202
203 let resp = req.send().await.map_err(|e| {
204 SynapticError::VectorStore(format!("OpenSearch search request failed: {e}"))
205 })?;
206
207 let status = resp.status().as_u16();
208 let json: Value = resp.json().await.map_err(|e| {
209 SynapticError::VectorStore(format!("OpenSearch search response parse error: {e}"))
210 })?;
211
212 if status >= 400 {
213 return Err(SynapticError::VectorStore(format!(
214 "OpenSearch search error (HTTP {status}): {json}"
215 )));
216 }
217
218 let hits = json["hits"]["hits"].as_array().cloned().unwrap_or_default();
219
220 let docs = hits
221 .iter()
222 .filter_map(|h| {
223 let src = h["_source"].as_object()?;
224 let id = src["doc_id"].as_str()?.to_string();
225 let content = src["content"].as_str()?.to_string();
226 let metadata: HashMap<String, Value> = src["metadata"]
227 .as_object()
228 .map(|m| m.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
229 .unwrap_or_default();
230 let score = h["_score"].as_f64().unwrap_or(0.0) as f32;
231 Some((Document::with_metadata(id, content, metadata), score))
232 })
233 .collect();
234
235 Ok(docs)
236 }
237}
238
239#[async_trait]
240impl VectorStore for OpenSearchVectorStore {
241 async fn add_documents(
242 &self,
243 docs: Vec<Document>,
244 embeddings: &dyn Embeddings,
245 ) -> Result<Vec<String>, SynapticError> {
246 if docs.is_empty() {
247 return Ok(vec![]);
248 }
249
250 let texts: Vec<&str> = docs.iter().map(|d| d.content.as_str()).collect();
251 let vectors = embeddings.embed_documents(&texts).await?;
252
253 let mut bulk_body = String::new();
255 for (doc, vec) in docs.iter().zip(vectors.iter()) {
256 let action = json!({
257 "index": {
258 "_index": self.config.index,
259 "_id": doc.id,
260 }
261 });
262 let data = json!({
263 "doc_id": doc.id,
264 "content": doc.content,
265 "metadata": doc.metadata,
266 "embedding": vec,
267 });
268 bulk_body.push_str(&action.to_string());
269 bulk_body.push('\n');
270 bulk_body.push_str(&data.to_string());
271 bulk_body.push('\n');
272 }
273
274 let bulk_url = format!(
275 "{}/{}/_bulk",
276 self.config.endpoint.trim_end_matches('/'),
277 self.config.index
278 );
279 let req = self
280 .apply_auth(self.client.post(&bulk_url))
281 .header("Content-Type", "application/x-ndjson")
282 .body(bulk_body);
283
284 let resp = req.send().await.map_err(|e| {
285 SynapticError::VectorStore(format!("OpenSearch bulk request failed: {e}"))
286 })?;
287
288 let status = resp.status().as_u16();
289 if status >= 400 {
290 let text = resp.text().await.unwrap_or_default();
291 return Err(SynapticError::VectorStore(format!(
292 "OpenSearch bulk error (HTTP {status}): {text}"
293 )));
294 }
295
296 Ok(docs.into_iter().map(|d| d.id).collect())
297 }
298
299 async fn similarity_search(
300 &self,
301 query: &str,
302 k: usize,
303 embeddings: &dyn Embeddings,
304 ) -> Result<Vec<Document>, SynapticError> {
305 let results = self
306 .similarity_search_with_score(query, k, embeddings)
307 .await?;
308 Ok(results.into_iter().map(|(doc, _)| doc).collect())
309 }
310
311 async fn similarity_search_with_score(
312 &self,
313 query: &str,
314 k: usize,
315 embeddings: &dyn Embeddings,
316 ) -> Result<Vec<(Document, f32)>, SynapticError> {
317 let qvec = embeddings.embed_query(query).await?;
318 self.search_by_vector_with_score(&qvec, k).await
319 }
320
321 async fn similarity_search_by_vector(
322 &self,
323 embedding: &[f32],
324 k: usize,
325 ) -> Result<Vec<Document>, SynapticError> {
326 let results = self.search_by_vector_with_score(embedding, k).await?;
327 Ok(results.into_iter().map(|(doc, _)| doc).collect())
328 }
329
330 async fn delete(&self, ids: &[&str]) -> Result<(), SynapticError> {
331 if ids.is_empty() {
332 return Ok(());
333 }
334
335 let mut bulk_body = String::new();
336 for id in ids {
337 let action = json!({
338 "delete": {
339 "_index": self.config.index,
340 "_id": id,
341 }
342 });
343 bulk_body.push_str(&action.to_string());
344 bulk_body.push('\n');
345 }
346
347 let bulk_url = format!("{}/_bulk", self.config.endpoint.trim_end_matches('/'));
348 let req = self
349 .apply_auth(self.client.post(&bulk_url))
350 .header("Content-Type", "application/x-ndjson")
351 .body(bulk_body);
352
353 let resp = req.send().await.map_err(|e| {
354 SynapticError::VectorStore(format!("OpenSearch delete request failed: {e}"))
355 })?;
356
357 let status = resp.status().as_u16();
358 if status >= 400 {
359 let text = resp.text().await.unwrap_or_default();
360 return Err(SynapticError::VectorStore(format!(
361 "OpenSearch delete error (HTTP {status}): {text}"
362 )));
363 }
364
365 Ok(())
366 }
367}
368
369#[cfg(test)]
370mod tests {
371 use super::*;
372
373 #[test]
374 fn config_new_sets_defaults() {
375 let config = OpenSearchConfig::new("http://localhost:9200", "test_index", 1536);
376 assert_eq!(config.endpoint, "http://localhost:9200");
377 assert_eq!(config.index, "test_index");
378 assert_eq!(config.dim, 1536);
379 assert!(config.username.is_none());
380 assert!(config.password.is_none());
381 }
382
383 #[test]
384 fn config_with_credentials() {
385 let config = OpenSearchConfig::new("http://localhost:9200", "test", 768)
386 .with_credentials("admin", "password");
387 assert_eq!(config.username, Some("admin".to_string()));
388 assert_eq!(config.password, Some("password".to_string()));
389 }
390
391 #[test]
392 fn store_new_creates_instance() {
393 let config = OpenSearchConfig::new("http://localhost:9200", "idx", 512);
394 let store = OpenSearchVectorStore::new(config);
395 assert_eq!(store.config().index, "idx");
396 assert_eq!(store.config().dim, 512);
397 }
398}