Skip to main content

daimon_plugin_opensearch/
store.rs

1//! [`OpenSearchVectorStore`] — an OpenSearch k-NN backed [`VectorStore`] implementation.
2
3use std::collections::HashMap;
4
5use daimon_core::vector_store::VectorStore;
6use daimon_core::{DaimonError, Document, Result, ScoredDocument};
7use opensearch::OpenSearch;
8use serde_json::json;
9
10use crate::SpaceType;
11
12/// A [`VectorStore`] backed by OpenSearch with the k-NN plugin.
13///
14/// Use [`OpenSearchVectorStoreBuilder`](crate::OpenSearchVectorStoreBuilder) to construct.
15pub struct OpenSearchVectorStore {
16    pub(crate) client: OpenSearch,
17    pub(crate) index: String,
18    pub(crate) dimensions: usize,
19    pub(crate) space_type: SpaceType,
20}
21
22impl OpenSearchVectorStore {
23    /// Returns a reference to the underlying OpenSearch client.
24    pub fn client(&self) -> &OpenSearch {
25        &self.client
26    }
27
28    /// Returns the index name used by this store.
29    pub fn index(&self) -> &str {
30        &self.index
31    }
32
33    /// Returns the configured vector dimensions.
34    pub fn dimensions(&self) -> usize {
35        self.dimensions
36    }
37
38    fn map_os_error(resp: opensearch::Error) -> DaimonError {
39        DaimonError::Other(format!("opensearch error: {resp}"))
40    }
41}
42
43impl VectorStore for OpenSearchVectorStore {
44    async fn upsert(&self, id: &str, embedding: Vec<f32>, document: Document) -> Result<()> {
45        if embedding.len() != self.dimensions {
46            return Err(DaimonError::Other(format!(
47                "embedding dimension mismatch: expected {}, got {}",
48                self.dimensions,
49                embedding.len()
50            )));
51        }
52
53        let body = json!({
54            "embedding": embedding,
55            "content": document.content,
56            "metadata": document.metadata,
57        });
58
59        let response = self
60            .client
61            .index(opensearch::IndexParts::IndexId(&self.index, id))
62            .body(body)
63            .send()
64            .await
65            .map_err(Self::map_os_error)?;
66
67        let status = response.status_code();
68        if !status.is_success() {
69            let text = response
70                .text()
71                .await
72                .unwrap_or_else(|_| "unknown error".into());
73            return Err(DaimonError::Other(format!(
74                "opensearch upsert failed ({status}): {text}"
75            )));
76        }
77
78        Ok(())
79    }
80
81    async fn query(&self, embedding: Vec<f32>, top_k: usize) -> Result<Vec<ScoredDocument>> {
82        if embedding.len() != self.dimensions {
83            return Err(DaimonError::Other(format!(
84                "embedding dimension mismatch: expected {}, got {}",
85                self.dimensions,
86                embedding.len()
87            )));
88        }
89
90        let body = json!({
91            "size": top_k,
92            "query": {
93                "knn": {
94                    "embedding": {
95                        "vector": embedding,
96                        "k": top_k
97                    }
98                }
99            },
100            "_source": ["content", "metadata"]
101        });
102
103        let response = self
104            .client
105            .search(opensearch::SearchParts::Index(&[&self.index]))
106            .body(body)
107            .send()
108            .await
109            .map_err(Self::map_os_error)?;
110
111        let status = response.status_code();
112        if !status.is_success() {
113            let text = response
114                .text()
115                .await
116                .unwrap_or_else(|_| "unknown error".into());
117            return Err(DaimonError::Other(format!(
118                "opensearch query failed ({status}): {text}"
119            )));
120        }
121
122        let body: serde_json::Value = response
123            .json()
124            .await
125            .map_err(|e| DaimonError::Other(format!("opensearch response parse error: {e}")))?;
126
127        let hits = body["hits"]["hits"]
128            .as_array()
129            .unwrap_or(&Vec::new())
130            .clone();
131
132        let mut results = Vec::with_capacity(hits.len());
133        for hit in &hits {
134            let content = hit["_source"]["content"]
135                .as_str()
136                .unwrap_or_default()
137                .to_string();
138
139            let metadata: HashMap<String, serde_json::Value> = hit["_source"]
140                .get("metadata")
141                .and_then(|m| serde_json::from_value(m.clone()).ok())
142                .unwrap_or_default();
143
144            let raw_score = hit["_score"].as_f64().unwrap_or(0.0);
145
146            // OpenSearch k-NN scores vary by space type:
147            // - cosinesimil: 1 / (1 + cosine_distance), range (0, 1]
148            // - l2: 1 / (1 + l2_distance), range (0, 1]
149            // - innerproduct: already a similarity score
150            // We normalize to a 0..1 range for consistency.
151            let score = match self.space_type {
152                SpaceType::CosineSimilarity | SpaceType::L2 => raw_score,
153                SpaceType::InnerProduct => raw_score,
154            };
155
156            let doc = Document {
157                content,
158                metadata,
159                score: Some(score),
160            };
161            results.push(ScoredDocument::new(doc, score));
162        }
163
164        Ok(results)
165    }
166
167    async fn delete(&self, id: &str) -> Result<bool> {
168        let response = self
169            .client
170            .delete(opensearch::DeleteParts::IndexId(&self.index, id))
171            .send()
172            .await
173            .map_err(Self::map_os_error)?;
174
175        let status = response.status_code();
176        if status == opensearch::http::StatusCode::NOT_FOUND {
177            return Ok(false);
178        }
179        if !status.is_success() {
180            let text = response
181                .text()
182                .await
183                .unwrap_or_else(|_| "unknown error".into());
184            return Err(DaimonError::Other(format!(
185                "opensearch delete failed ({status}): {text}"
186            )));
187        }
188
189        Ok(true)
190    }
191
192    async fn count(&self) -> Result<usize> {
193        let response = self
194            .client
195            .count(opensearch::CountParts::Index(&[&self.index]))
196            .send()
197            .await
198            .map_err(Self::map_os_error)?;
199
200        let status = response.status_code();
201        if !status.is_success() {
202            let text = response
203                .text()
204                .await
205                .unwrap_or_else(|_| "unknown error".into());
206            return Err(DaimonError::Other(format!(
207                "opensearch count failed ({status}): {text}"
208            )));
209        }
210
211        let body: serde_json::Value = response
212            .json()
213            .await
214            .map_err(|e| DaimonError::Other(format!("opensearch response parse error: {e}")))?;
215
216        let count = body["count"].as_u64().unwrap_or(0) as usize;
217        Ok(count)
218    }
219}