daimon_plugin_opensearch/
store.rs1use std::collections::HashMap;
4
5use daimon_core::vector_store::VectorStore;
6use daimon_core::{DaimonError, Document, Result, ScoredDocument};
7use opensearch::OpenSearch;
8use serde_json::json;
9
10use crate::SpaceType;
11
12pub struct OpenSearchVectorStore {
16 pub(crate) client: OpenSearch,
17 pub(crate) index: String,
18 pub(crate) dimensions: usize,
19 pub(crate) space_type: SpaceType,
20}
21
22impl OpenSearchVectorStore {
23 pub fn client(&self) -> &OpenSearch {
25 &self.client
26 }
27
28 pub fn index(&self) -> &str {
30 &self.index
31 }
32
33 pub fn dimensions(&self) -> usize {
35 self.dimensions
36 }
37
38 fn map_os_error(resp: opensearch::Error) -> DaimonError {
39 DaimonError::Other(format!("opensearch error: {resp}"))
40 }
41}
42
43impl VectorStore for OpenSearchVectorStore {
44 async fn upsert(&self, id: &str, embedding: Vec<f32>, document: Document) -> Result<()> {
45 if embedding.len() != self.dimensions {
46 return Err(DaimonError::Other(format!(
47 "embedding dimension mismatch: expected {}, got {}",
48 self.dimensions,
49 embedding.len()
50 )));
51 }
52
53 let body = json!({
54 "embedding": embedding,
55 "content": document.content,
56 "metadata": document.metadata,
57 });
58
59 let response = self
60 .client
61 .index(opensearch::IndexParts::IndexId(&self.index, id))
62 .body(body)
63 .send()
64 .await
65 .map_err(Self::map_os_error)?;
66
67 let status = response.status_code();
68 if !status.is_success() {
69 let text = response
70 .text()
71 .await
72 .unwrap_or_else(|_| "unknown error".into());
73 return Err(DaimonError::Other(format!(
74 "opensearch upsert failed ({status}): {text}"
75 )));
76 }
77
78 Ok(())
79 }
80
81 async fn query(&self, embedding: Vec<f32>, top_k: usize) -> Result<Vec<ScoredDocument>> {
82 if embedding.len() != self.dimensions {
83 return Err(DaimonError::Other(format!(
84 "embedding dimension mismatch: expected {}, got {}",
85 self.dimensions,
86 embedding.len()
87 )));
88 }
89
90 let body = json!({
91 "size": top_k,
92 "query": {
93 "knn": {
94 "embedding": {
95 "vector": embedding,
96 "k": top_k
97 }
98 }
99 },
100 "_source": ["content", "metadata"]
101 });
102
103 let response = self
104 .client
105 .search(opensearch::SearchParts::Index(&[&self.index]))
106 .body(body)
107 .send()
108 .await
109 .map_err(Self::map_os_error)?;
110
111 let status = response.status_code();
112 if !status.is_success() {
113 let text = response
114 .text()
115 .await
116 .unwrap_or_else(|_| "unknown error".into());
117 return Err(DaimonError::Other(format!(
118 "opensearch query failed ({status}): {text}"
119 )));
120 }
121
122 let body: serde_json::Value = response
123 .json()
124 .await
125 .map_err(|e| DaimonError::Other(format!("opensearch response parse error: {e}")))?;
126
127 let hits = body["hits"]["hits"]
128 .as_array()
129 .unwrap_or(&Vec::new())
130 .clone();
131
132 let mut results = Vec::with_capacity(hits.len());
133 for hit in &hits {
134 let content = hit["_source"]["content"]
135 .as_str()
136 .unwrap_or_default()
137 .to_string();
138
139 let metadata: HashMap<String, serde_json::Value> = hit["_source"]
140 .get("metadata")
141 .and_then(|m| serde_json::from_value(m.clone()).ok())
142 .unwrap_or_default();
143
144 let raw_score = hit["_score"].as_f64().unwrap_or(0.0);
145
146 let score = match self.space_type {
152 SpaceType::CosineSimilarity | SpaceType::L2 => raw_score,
153 SpaceType::InnerProduct => raw_score,
154 };
155
156 let doc = Document {
157 content,
158 metadata,
159 score: Some(score),
160 };
161 results.push(ScoredDocument::new(doc, score));
162 }
163
164 Ok(results)
165 }
166
167 async fn delete(&self, id: &str) -> Result<bool> {
168 let response = self
169 .client
170 .delete(opensearch::DeleteParts::IndexId(&self.index, id))
171 .send()
172 .await
173 .map_err(Self::map_os_error)?;
174
175 let status = response.status_code();
176 if status == opensearch::http::StatusCode::NOT_FOUND {
177 return Ok(false);
178 }
179 if !status.is_success() {
180 let text = response
181 .text()
182 .await
183 .unwrap_or_else(|_| "unknown error".into());
184 return Err(DaimonError::Other(format!(
185 "opensearch delete failed ({status}): {text}"
186 )));
187 }
188
189 Ok(true)
190 }
191
192 async fn count(&self) -> Result<usize> {
193 let response = self
194 .client
195 .count(opensearch::CountParts::Index(&[&self.index]))
196 .send()
197 .await
198 .map_err(Self::map_os_error)?;
199
200 let status = response.status_code();
201 if !status.is_success() {
202 let text = response
203 .text()
204 .await
205 .unwrap_or_else(|_| "unknown error".into());
206 return Err(DaimonError::Other(format!(
207 "opensearch count failed ({status}): {text}"
208 )));
209 }
210
211 let body: serde_json::Value = response
212 .json()
213 .await
214 .map_err(|e| DaimonError::Other(format!("opensearch response parse error: {e}")))?;
215
216 let count = body["count"].as_u64().unwrap_or(0) as usize;
217 Ok(count)
218 }
219}