Skip to main content

wfe_opensearch/
lib.rs

1//! wfe-opensearch — OpenSearch index provider for workflow and log search in WFE.
2use async_trait::async_trait;
3use opensearch::http::transport::Transport;
4use opensearch::{IndexParts, OpenSearch, SearchParts};
5use serde::{Deserialize, Serialize};
6use serde_json::json;
7use tracing::debug;
8use wfe_core::models::{WorkflowInstance, WorkflowStatus};
9use wfe_core::traits::search::{Page, SearchFilter, SearchIndex, WorkflowSearchResult};
10
11/// Document structure stored in OpenSearch.
12#[derive(Debug, Serialize, Deserialize)]
13struct WorkflowDocument {
14    id: String,
15    workflow_definition_id: String,
16    version: u32,
17    status: String,
18    reference: Option<String>,
19    description: Option<String>,
20    data: serde_json::Value,
21    create_time: String,
22    complete_time: Option<String>,
23}
24
25impl From<&WorkflowInstance> for WorkflowDocument {
26    fn from(instance: &WorkflowInstance) -> Self {
27        Self {
28            id: instance.id.clone(),
29            workflow_definition_id: instance.workflow_definition_id.clone(),
30            version: instance.version,
31            status: serde_json::to_value(instance.status)
32                .ok()
33                .and_then(|v| v.as_str().map(String::from))
34                .unwrap_or_default(),
35            reference: instance.reference.clone(),
36            description: instance.description.clone(),
37            data: instance.data.clone(),
38            create_time: instance.create_time.to_rfc3339(),
39            complete_time: instance.complete_time.map(|t| t.to_rfc3339()),
40        }
41    }
42}
43
44/// OpenSearch-backed search index for workflow instances.
45pub struct OpenSearchIndex {
46    client: OpenSearch,
47    index_name: String,
48}
49
50impl OpenSearchIndex {
51    /// Create a new OpenSearch index provider.
52    ///
53    /// # Arguments
54    /// * `url` - OpenSearch server URL (e.g. `http://localhost:9200`)
55    /// * `index_name` - Name of the index to use
56    pub fn new(url: &str, index_name: &str) -> wfe_core::Result<Self> {
57        let transport = Transport::single_node(url)
58            .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
59        let client = OpenSearch::new(transport);
60        Ok(Self {
61            client,
62            index_name: index_name.to_string(),
63        })
64    }
65
66    /// Get a reference to the underlying OpenSearch client (useful for tests).
67    pub fn client(&self) -> &OpenSearch {
68        &self.client
69    }
70
71    /// Get the index name.
72    pub fn index_name(&self) -> &str {
73        &self.index_name
74    }
75}
76
77#[async_trait]
78impl SearchIndex for OpenSearchIndex {
79    async fn start(&self) -> wfe_core::Result<()> {
80        let exists = self
81            .client
82            .indices()
83            .exists(opensearch::indices::IndicesExistsParts::Index(&[
84                &self.index_name
85            ]))
86            .send()
87            .await
88            .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
89
90        if exists.status_code().is_success() {
91            debug!(index = %self.index_name, "Index already exists");
92            return Ok(());
93        }
94
95        let body = json!({
96            "mappings": {
97                "properties": {
98                    "id": { "type": "keyword" },
99                    "workflow_definition_id": { "type": "keyword" },
100                    "version": { "type": "integer" },
101                    "status": { "type": "keyword" },
102                    "reference": { "type": "keyword" },
103                    "description": { "type": "text" },
104                    "data": { "type": "object", "enabled": false },
105                    "create_time": { "type": "date" },
106                    "complete_time": { "type": "date" }
107                }
108            }
109        });
110
111        let response = self
112            .client
113            .indices()
114            .create(opensearch::indices::IndicesCreateParts::Index(
115                &self.index_name,
116            ))
117            .body(body)
118            .send()
119            .await
120            .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
121
122        if !response.status_code().is_success() {
123            let text = response
124                .text()
125                .await
126                .unwrap_or_else(|_| "unknown error".to_string());
127            return Err(wfe_core::WfeError::Persistence(format!(
128                "Failed to create index: {text}"
129            )));
130        }
131
132        debug!(index = %self.index_name, "Index created");
133        Ok(())
134    }
135
136    async fn stop(&self) -> wfe_core::Result<()> {
137        Ok(())
138    }
139
140    async fn index_workflow(&self, instance: &WorkflowInstance) -> wfe_core::Result<()> {
141        let doc = WorkflowDocument::from(instance);
142
143        let response = self
144            .client
145            .index(IndexParts::IndexId(&self.index_name, &doc.id))
146            .body(serde_json::to_value(&doc)?)
147            .send()
148            .await
149            .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
150
151        if !response.status_code().is_success() {
152            let text = response
153                .text()
154                .await
155                .unwrap_or_else(|_| "unknown error".to_string());
156            return Err(wfe_core::WfeError::Persistence(format!(
157                "Failed to index workflow: {text}"
158            )));
159        }
160
161        debug!(id = %instance.id, index = %self.index_name, "Workflow indexed");
162        Ok(())
163    }
164
165    async fn search(
166        &self,
167        terms: &str,
168        skip: u64,
169        take: u64,
170        filters: &[SearchFilter],
171    ) -> wfe_core::Result<Page<WorkflowSearchResult>> {
172        let mut must_clauses: Vec<serde_json::Value> = Vec::new();
173        let mut filter_clauses: Vec<serde_json::Value> = Vec::new();
174
175        if !terms.is_empty() {
176            must_clauses.push(json!({
177                "multi_match": {
178                    "query": terms,
179                    "fields": ["description", "reference", "workflow_definition_id"]
180                }
181            }));
182        }
183
184        for filter in filters {
185            match filter {
186                SearchFilter::Status(status) => {
187                    let status_str = serde_json::to_value(status)
188                        .ok()
189                        .and_then(|v| v.as_str().map(String::from))
190                        .unwrap_or_default();
191                    filter_clauses.push(json!({
192                        "term": { "status": status_str }
193                    }));
194                }
195                SearchFilter::DateRange {
196                    field,
197                    before,
198                    after,
199                } => {
200                    let mut range = serde_json::Map::new();
201                    if let Some(before) = before {
202                        range.insert("lt".to_string(), json!(before.to_rfc3339()));
203                    }
204                    if let Some(after) = after {
205                        range.insert("gte".to_string(), json!(after.to_rfc3339()));
206                    }
207                    if !range.is_empty() {
208                        filter_clauses.push(json!({
209                            "range": { field.clone(): range }
210                        }));
211                    }
212                }
213                SearchFilter::Reference(reference) => {
214                    filter_clauses.push(json!({
215                        "term": { "reference": reference }
216                    }));
217                }
218            }
219        }
220
221        let query = if must_clauses.is_empty() && filter_clauses.is_empty() {
222            json!({ "match_all": {} })
223        } else {
224            let mut bool_query = serde_json::Map::new();
225            if !must_clauses.is_empty() {
226                bool_query.insert("must".to_string(), json!(must_clauses));
227            }
228            if !filter_clauses.is_empty() {
229                bool_query.insert("filter".to_string(), json!(filter_clauses));
230            }
231            json!({ "bool": bool_query })
232        };
233
234        let body = json!({
235            "query": query,
236            "from": skip,
237            "size": take,
238        });
239
240        let response = self
241            .client
242            .search(SearchParts::Index(&[&self.index_name]))
243            .body(body)
244            .send()
245            .await
246            .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
247
248        if !response.status_code().is_success() {
249            let text = response
250                .text()
251                .await
252                .unwrap_or_else(|_| "unknown error".to_string());
253            return Err(wfe_core::WfeError::Persistence(format!(
254                "Search failed: {text}"
255            )));
256        }
257
258        let response_body: serde_json::Value = response
259            .json()
260            .await
261            .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
262
263        let total = response_body["hits"]["total"]["value"]
264            .as_u64()
265            .unwrap_or(0);
266
267        let hits = response_body["hits"]["hits"]
268            .as_array()
269            .cloned()
270            .unwrap_or_default();
271
272        let mut results = Vec::with_capacity(hits.len());
273        for hit in &hits {
274            let source = &hit["_source"];
275            let status_str = source["status"].as_str().unwrap_or("Runnable");
276            let status: WorkflowStatus =
277                serde_json::from_value(json!(status_str)).unwrap_or_default();
278
279            results.push(WorkflowSearchResult {
280                id: source["id"].as_str().unwrap_or_default().to_string(),
281                workflow_definition_id: source["workflow_definition_id"]
282                    .as_str()
283                    .unwrap_or_default()
284                    .to_string(),
285                version: source["version"].as_u64().unwrap_or(0) as u32,
286                status,
287                reference: source["reference"].as_str().map(String::from),
288                description: source["description"].as_str().map(String::from),
289            });
290        }
291
292        Ok(Page {
293            data: results,
294            total,
295        })
296    }
297}