1use async_trait::async_trait;
3use opensearch::http::transport::Transport;
4use opensearch::{IndexParts, OpenSearch, SearchParts};
5use serde::{Deserialize, Serialize};
6use serde_json::json;
7use tracing::debug;
8use wfe_core::models::{WorkflowInstance, WorkflowStatus};
9use wfe_core::traits::search::{Page, SearchFilter, SearchIndex, WorkflowSearchResult};
10
11#[derive(Debug, Serialize, Deserialize)]
13struct WorkflowDocument {
14 id: String,
15 workflow_definition_id: String,
16 version: u32,
17 status: String,
18 reference: Option<String>,
19 description: Option<String>,
20 data: serde_json::Value,
21 create_time: String,
22 complete_time: Option<String>,
23}
24
25impl From<&WorkflowInstance> for WorkflowDocument {
26 fn from(instance: &WorkflowInstance) -> Self {
27 Self {
28 id: instance.id.clone(),
29 workflow_definition_id: instance.workflow_definition_id.clone(),
30 version: instance.version,
31 status: serde_json::to_value(instance.status)
32 .ok()
33 .and_then(|v| v.as_str().map(String::from))
34 .unwrap_or_default(),
35 reference: instance.reference.clone(),
36 description: instance.description.clone(),
37 data: instance.data.clone(),
38 create_time: instance.create_time.to_rfc3339(),
39 complete_time: instance.complete_time.map(|t| t.to_rfc3339()),
40 }
41 }
42}
43
44pub struct OpenSearchIndex {
46 client: OpenSearch,
47 index_name: String,
48}
49
50impl OpenSearchIndex {
51 pub fn new(url: &str, index_name: &str) -> wfe_core::Result<Self> {
57 let transport = Transport::single_node(url)
58 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
59 let client = OpenSearch::new(transport);
60 Ok(Self {
61 client,
62 index_name: index_name.to_string(),
63 })
64 }
65
66 pub fn client(&self) -> &OpenSearch {
68 &self.client
69 }
70
71 pub fn index_name(&self) -> &str {
73 &self.index_name
74 }
75}
76
77#[async_trait]
78impl SearchIndex for OpenSearchIndex {
79 async fn start(&self) -> wfe_core::Result<()> {
80 let exists = self
81 .client
82 .indices()
83 .exists(opensearch::indices::IndicesExistsParts::Index(&[
84 &self.index_name
85 ]))
86 .send()
87 .await
88 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
89
90 if exists.status_code().is_success() {
91 debug!(index = %self.index_name, "Index already exists");
92 return Ok(());
93 }
94
95 let body = json!({
96 "mappings": {
97 "properties": {
98 "id": { "type": "keyword" },
99 "workflow_definition_id": { "type": "keyword" },
100 "version": { "type": "integer" },
101 "status": { "type": "keyword" },
102 "reference": { "type": "keyword" },
103 "description": { "type": "text" },
104 "data": { "type": "object", "enabled": false },
105 "create_time": { "type": "date" },
106 "complete_time": { "type": "date" }
107 }
108 }
109 });
110
111 let response = self
112 .client
113 .indices()
114 .create(opensearch::indices::IndicesCreateParts::Index(
115 &self.index_name,
116 ))
117 .body(body)
118 .send()
119 .await
120 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
121
122 if !response.status_code().is_success() {
123 let text = response
124 .text()
125 .await
126 .unwrap_or_else(|_| "unknown error".to_string());
127 return Err(wfe_core::WfeError::Persistence(format!(
128 "Failed to create index: {text}"
129 )));
130 }
131
132 debug!(index = %self.index_name, "Index created");
133 Ok(())
134 }
135
136 async fn stop(&self) -> wfe_core::Result<()> {
137 Ok(())
138 }
139
140 async fn index_workflow(&self, instance: &WorkflowInstance) -> wfe_core::Result<()> {
141 let doc = WorkflowDocument::from(instance);
142
143 let response = self
144 .client
145 .index(IndexParts::IndexId(&self.index_name, &doc.id))
146 .body(serde_json::to_value(&doc)?)
147 .send()
148 .await
149 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
150
151 if !response.status_code().is_success() {
152 let text = response
153 .text()
154 .await
155 .unwrap_or_else(|_| "unknown error".to_string());
156 return Err(wfe_core::WfeError::Persistence(format!(
157 "Failed to index workflow: {text}"
158 )));
159 }
160
161 debug!(id = %instance.id, index = %self.index_name, "Workflow indexed");
162 Ok(())
163 }
164
165 async fn search(
166 &self,
167 terms: &str,
168 skip: u64,
169 take: u64,
170 filters: &[SearchFilter],
171 ) -> wfe_core::Result<Page<WorkflowSearchResult>> {
172 let mut must_clauses: Vec<serde_json::Value> = Vec::new();
173 let mut filter_clauses: Vec<serde_json::Value> = Vec::new();
174
175 if !terms.is_empty() {
176 must_clauses.push(json!({
177 "multi_match": {
178 "query": terms,
179 "fields": ["description", "reference", "workflow_definition_id"]
180 }
181 }));
182 }
183
184 for filter in filters {
185 match filter {
186 SearchFilter::Status(status) => {
187 let status_str = serde_json::to_value(status)
188 .ok()
189 .and_then(|v| v.as_str().map(String::from))
190 .unwrap_or_default();
191 filter_clauses.push(json!({
192 "term": { "status": status_str }
193 }));
194 }
195 SearchFilter::DateRange {
196 field,
197 before,
198 after,
199 } => {
200 let mut range = serde_json::Map::new();
201 if let Some(before) = before {
202 range.insert("lt".to_string(), json!(before.to_rfc3339()));
203 }
204 if let Some(after) = after {
205 range.insert("gte".to_string(), json!(after.to_rfc3339()));
206 }
207 if !range.is_empty() {
208 filter_clauses.push(json!({
209 "range": { field.clone(): range }
210 }));
211 }
212 }
213 SearchFilter::Reference(reference) => {
214 filter_clauses.push(json!({
215 "term": { "reference": reference }
216 }));
217 }
218 }
219 }
220
221 let query = if must_clauses.is_empty() && filter_clauses.is_empty() {
222 json!({ "match_all": {} })
223 } else {
224 let mut bool_query = serde_json::Map::new();
225 if !must_clauses.is_empty() {
226 bool_query.insert("must".to_string(), json!(must_clauses));
227 }
228 if !filter_clauses.is_empty() {
229 bool_query.insert("filter".to_string(), json!(filter_clauses));
230 }
231 json!({ "bool": bool_query })
232 };
233
234 let body = json!({
235 "query": query,
236 "from": skip,
237 "size": take,
238 });
239
240 let response = self
241 .client
242 .search(SearchParts::Index(&[&self.index_name]))
243 .body(body)
244 .send()
245 .await
246 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
247
248 if !response.status_code().is_success() {
249 let text = response
250 .text()
251 .await
252 .unwrap_or_else(|_| "unknown error".to_string());
253 return Err(wfe_core::WfeError::Persistence(format!(
254 "Search failed: {text}"
255 )));
256 }
257
258 let response_body: serde_json::Value = response
259 .json()
260 .await
261 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
262
263 let total = response_body["hits"]["total"]["value"]
264 .as_u64()
265 .unwrap_or(0);
266
267 let hits = response_body["hits"]["hits"]
268 .as_array()
269 .cloned()
270 .unwrap_or_default();
271
272 let mut results = Vec::with_capacity(hits.len());
273 for hit in &hits {
274 let source = &hit["_source"];
275 let status_str = source["status"].as_str().unwrap_or("Runnable");
276 let status: WorkflowStatus =
277 serde_json::from_value(json!(status_str)).unwrap_or_default();
278
279 results.push(WorkflowSearchResult {
280 id: source["id"].as_str().unwrap_or_default().to_string(),
281 workflow_definition_id: source["workflow_definition_id"]
282 .as_str()
283 .unwrap_or_default()
284 .to_string(),
285 version: source["version"].as_u64().unwrap_or(0) as u32,
286 status,
287 reference: source["reference"].as_str().map(String::from),
288 description: source["description"].as_str().map(String::from),
289 });
290 }
291
292 Ok(Page {
293 data: results,
294 total,
295 })
296 }
297}