Skip to main content

synaptic_opensearch/
lib.rs

1//! OpenSearch vector store integration for Synaptic.
2//!
3//! This crate provides [`OpenSearchVectorStore`], an implementation of the
4//! [`VectorStore`](synaptic_core::VectorStore) trait backed by
5//! [OpenSearch](https://opensearch.org/) using its k-NN plugin.
6//!
7//! # Example
8//!
9//! ```rust,no_run
10//! use synaptic_opensearch::{OpenSearchConfig, OpenSearchVectorStore};
11//!
12//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
13//! let config = OpenSearchConfig::new("http://localhost:9200", "my_index", 1536)
14//!     .with_credentials("admin", "admin");
15//! let store = OpenSearchVectorStore::new(config);
16//! store.initialize().await?;
17//! # Ok(())
18//! # }
19//! ```
20
21use async_trait::async_trait;
22use serde_json::{json, Value};
23use std::collections::HashMap;
24use synaptic_core::{Document, Embeddings, SynapticError, VectorStore};
25
26/// Configuration for connecting to an OpenSearch cluster.
27#[derive(Debug, Clone)]
28pub struct OpenSearchConfig {
29    /// OpenSearch endpoint URL (e.g., `http://localhost:9200`).
30    pub endpoint: String,
31    /// Index name to store documents in.
32    pub index: String,
33    /// Vector dimension (must match your embedding model).
34    pub dim: usize,
35    /// Optional username for HTTP Basic Auth.
36    pub username: Option<String>,
37    /// Optional password for HTTP Basic Auth.
38    pub password: Option<String>,
39}
40
41impl OpenSearchConfig {
42    /// Create a new configuration with required fields.
43    pub fn new(endpoint: impl Into<String>, index: impl Into<String>, dim: usize) -> Self {
44        Self {
45            endpoint: endpoint.into(),
46            index: index.into(),
47            dim,
48            username: None,
49            password: None,
50        }
51    }
52
53    /// Set HTTP Basic Auth credentials.
54    pub fn with_credentials(
55        mut self,
56        username: impl Into<String>,
57        password: impl Into<String>,
58    ) -> Self {
59        self.username = Some(username.into());
60        self.password = Some(password.into());
61        self
62    }
63}
64
65/// A [`VectorStore`] implementation backed by [OpenSearch](https://opensearch.org/).
66///
67/// Uses OpenSearch's k-NN plugin with HNSW indexing for approximate nearest
68/// neighbor search. Call [`initialize`](OpenSearchVectorStore::initialize)
69/// to create the index with correct mappings before inserting documents.
70pub struct OpenSearchVectorStore {
71    config: OpenSearchConfig,
72    client: reqwest::Client,
73}
74
75impl OpenSearchVectorStore {
76    /// Create a new store with the given configuration.
77    pub fn new(config: OpenSearchConfig) -> Self {
78        Self {
79            config,
80            client: reqwest::Client::new(),
81        }
82    }
83
84    /// Return a reference to the configuration.
85    pub fn config(&self) -> &OpenSearchConfig {
86        &self.config
87    }
88
89    /// Create the OpenSearch index with k-NN mappings if it does not exist.
90    ///
91    /// This is idempotent — calling it when the index already exists is safe.
92    pub async fn initialize(&self) -> Result<(), SynapticError> {
93        // Check if index exists first.
94        let head_url = format!(
95            "{}/{}",
96            self.config.endpoint.trim_end_matches('/'),
97            self.config.index
98        );
99        let mut head_req = self.client.head(&head_url);
100        if let (Some(ref u), Some(ref p)) = (&self.config.username, &self.config.password) {
101            head_req = head_req.basic_auth(u, Some(p));
102        }
103        let head_resp = head_req.send().await.map_err(|e| {
104            SynapticError::VectorStore(format!("OpenSearch HEAD request failed: {e}"))
105        })?;
106        if head_resp.status().is_success() {
107            // Index already exists.
108            return Ok(());
109        }
110
111        let mapping = json!({
112            "settings": {
113                "index": { "knn": true }
114            },
115            "mappings": {
116                "properties": {
117                    "doc_id": { "type": "keyword" },
118                    "content": { "type": "text" },
119                    "metadata": { "type": "object", "enabled": false },
120                    "embedding": {
121                        "type": "knn_vector",
122                        "dimension": self.config.dim,
123                        "method": {
124                            "name": "hnsw",
125                            "space_type": "cosinesimil",
126                            "engine": "nmslib"
127                        }
128                    }
129                }
130            }
131        });
132
133        let put_url = format!(
134            "{}/{}",
135            self.config.endpoint.trim_end_matches('/'),
136            self.config.index
137        );
138        let mut put_req = self
139            .client
140            .put(&put_url)
141            .header("Content-Type", "application/json")
142            .json(&mapping);
143        if let (Some(ref u), Some(ref p)) = (&self.config.username, &self.config.password) {
144            put_req = put_req.basic_auth(u, Some(p));
145        }
146        let put_resp = put_req
147            .send()
148            .await
149            .map_err(|e| SynapticError::VectorStore(format!("OpenSearch PUT index failed: {e}")))?;
150
151        let status = put_resp.status().as_u16();
152        if status >= 400 {
153            let body: Value = put_resp.json().await.unwrap_or_default();
154            // 400 with "already_exists" is fine (race condition).
155            let err_type = body["error"]["type"].as_str().unwrap_or("");
156            if !err_type.contains("already_exists") {
157                return Err(SynapticError::VectorStore(format!(
158                    "OpenSearch create index error (HTTP {status}): {body}"
159                )));
160            }
161        }
162        Ok(())
163    }
164
165    /// Apply basic auth to a request builder if credentials are configured.
166    fn apply_auth(&self, builder: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
167        if let (Some(ref u), Some(ref p)) = (&self.config.username, &self.config.password) {
168            builder.basic_auth(u, Some(p))
169        } else {
170            builder
171        }
172    }
173
174    /// Search by raw vector and return documents with similarity scores.
175    async fn search_by_vector_with_score(
176        &self,
177        embedding: &[f32],
178        k: usize,
179    ) -> Result<Vec<(Document, f32)>, SynapticError> {
180        let body = json!({
181            "size": k,
182            "query": {
183                "knn": {
184                    "embedding": {
185                        "vector": embedding,
186                        "k": k,
187                    }
188                }
189            },
190            "_source": ["doc_id", "content", "metadata"],
191        });
192
193        let search_url = format!(
194            "{}/{}/_search",
195            self.config.endpoint.trim_end_matches('/'),
196            self.config.index
197        );
198        let req = self
199            .apply_auth(self.client.post(&search_url))
200            .header("Content-Type", "application/json")
201            .json(&body);
202
203        let resp = req.send().await.map_err(|e| {
204            SynapticError::VectorStore(format!("OpenSearch search request failed: {e}"))
205        })?;
206
207        let status = resp.status().as_u16();
208        let json: Value = resp.json().await.map_err(|e| {
209            SynapticError::VectorStore(format!("OpenSearch search response parse error: {e}"))
210        })?;
211
212        if status >= 400 {
213            return Err(SynapticError::VectorStore(format!(
214                "OpenSearch search error (HTTP {status}): {json}"
215            )));
216        }
217
218        let hits = json["hits"]["hits"].as_array().cloned().unwrap_or_default();
219
220        let docs = hits
221            .iter()
222            .filter_map(|h| {
223                let src = h["_source"].as_object()?;
224                let id = src["doc_id"].as_str()?.to_string();
225                let content = src["content"].as_str()?.to_string();
226                let metadata: HashMap<String, Value> = src["metadata"]
227                    .as_object()
228                    .map(|m| m.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
229                    .unwrap_or_default();
230                let score = h["_score"].as_f64().unwrap_or(0.0) as f32;
231                Some((Document::with_metadata(id, content, metadata), score))
232            })
233            .collect();
234
235        Ok(docs)
236    }
237}
238
239#[async_trait]
240impl VectorStore for OpenSearchVectorStore {
241    async fn add_documents(
242        &self,
243        docs: Vec<Document>,
244        embeddings: &dyn Embeddings,
245    ) -> Result<Vec<String>, SynapticError> {
246        if docs.is_empty() {
247            return Ok(vec![]);
248        }
249
250        let texts: Vec<&str> = docs.iter().map(|d| d.content.as_str()).collect();
251        let vectors = embeddings.embed_documents(&texts).await?;
252
253        // Build ndjson bulk request body.
254        let mut bulk_body = String::new();
255        for (doc, vec) in docs.iter().zip(vectors.iter()) {
256            let action = json!({
257                "index": {
258                    "_index": self.config.index,
259                    "_id": doc.id,
260                }
261            });
262            let data = json!({
263                "doc_id": doc.id,
264                "content": doc.content,
265                "metadata": doc.metadata,
266                "embedding": vec,
267            });
268            bulk_body.push_str(&action.to_string());
269            bulk_body.push('\n');
270            bulk_body.push_str(&data.to_string());
271            bulk_body.push('\n');
272        }
273
274        let bulk_url = format!(
275            "{}/{}/_bulk",
276            self.config.endpoint.trim_end_matches('/'),
277            self.config.index
278        );
279        let req = self
280            .apply_auth(self.client.post(&bulk_url))
281            .header("Content-Type", "application/x-ndjson")
282            .body(bulk_body);
283
284        let resp = req.send().await.map_err(|e| {
285            SynapticError::VectorStore(format!("OpenSearch bulk request failed: {e}"))
286        })?;
287
288        let status = resp.status().as_u16();
289        if status >= 400 {
290            let text = resp.text().await.unwrap_or_default();
291            return Err(SynapticError::VectorStore(format!(
292                "OpenSearch bulk error (HTTP {status}): {text}"
293            )));
294        }
295
296        Ok(docs.into_iter().map(|d| d.id).collect())
297    }
298
299    async fn similarity_search(
300        &self,
301        query: &str,
302        k: usize,
303        embeddings: &dyn Embeddings,
304    ) -> Result<Vec<Document>, SynapticError> {
305        let results = self
306            .similarity_search_with_score(query, k, embeddings)
307            .await?;
308        Ok(results.into_iter().map(|(doc, _)| doc).collect())
309    }
310
311    async fn similarity_search_with_score(
312        &self,
313        query: &str,
314        k: usize,
315        embeddings: &dyn Embeddings,
316    ) -> Result<Vec<(Document, f32)>, SynapticError> {
317        let qvec = embeddings.embed_query(query).await?;
318        self.search_by_vector_with_score(&qvec, k).await
319    }
320
321    async fn similarity_search_by_vector(
322        &self,
323        embedding: &[f32],
324        k: usize,
325    ) -> Result<Vec<Document>, SynapticError> {
326        let results = self.search_by_vector_with_score(embedding, k).await?;
327        Ok(results.into_iter().map(|(doc, _)| doc).collect())
328    }
329
330    async fn delete(&self, ids: &[&str]) -> Result<(), SynapticError> {
331        if ids.is_empty() {
332            return Ok(());
333        }
334
335        let mut bulk_body = String::new();
336        for id in ids {
337            let action = json!({
338                "delete": {
339                    "_index": self.config.index,
340                    "_id": id,
341                }
342            });
343            bulk_body.push_str(&action.to_string());
344            bulk_body.push('\n');
345        }
346
347        let bulk_url = format!("{}/_bulk", self.config.endpoint.trim_end_matches('/'));
348        let req = self
349            .apply_auth(self.client.post(&bulk_url))
350            .header("Content-Type", "application/x-ndjson")
351            .body(bulk_body);
352
353        let resp = req.send().await.map_err(|e| {
354            SynapticError::VectorStore(format!("OpenSearch delete request failed: {e}"))
355        })?;
356
357        let status = resp.status().as_u16();
358        if status >= 400 {
359            let text = resp.text().await.unwrap_or_default();
360            return Err(SynapticError::VectorStore(format!(
361                "OpenSearch delete error (HTTP {status}): {text}"
362            )));
363        }
364
365        Ok(())
366    }
367}
368
369#[cfg(test)]
370mod tests {
371    use super::*;
372
373    #[test]
374    fn config_new_sets_defaults() {
375        let config = OpenSearchConfig::new("http://localhost:9200", "test_index", 1536);
376        assert_eq!(config.endpoint, "http://localhost:9200");
377        assert_eq!(config.index, "test_index");
378        assert_eq!(config.dim, 1536);
379        assert!(config.username.is_none());
380        assert!(config.password.is_none());
381    }
382
383    #[test]
384    fn config_with_credentials() {
385        let config = OpenSearchConfig::new("http://localhost:9200", "test", 768)
386            .with_credentials("admin", "password");
387        assert_eq!(config.username, Some("admin".to_string()));
388        assert_eq!(config.password, Some("password".to_string()));
389    }
390
391    #[test]
392    fn store_new_creates_instance() {
393        let config = OpenSearchConfig::new("http://localhost:9200", "idx", 512);
394        let store = OpenSearchVectorStore::new(config);
395        assert_eq!(store.config().index, "idx");
396        assert_eq!(store.config().dim, 512);
397    }
398}