Skip to main content

drasi_lib/lib_core_ops/
query_ops.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Query management operations for DrasiLib
16//!
17//! This module provides all query-related operations including creating, removing,
18//! starting, and stopping queries.
19
20use anyhow::Result as AnyhowResult;
21
22use crate::channels::ComponentStatus;
23use crate::component_ops::map_state_error;
24use crate::config::{QueryConfig, QueryRuntime};
25use crate::error::{DrasiError, Result};
26use crate::lib_core::DrasiLib;
27
28impl DrasiLib {
29    /// Create a query in a running server
30    ///
31    /// # Example
32    /// ```no_run
33    /// # use drasi_lib::{DrasiLib, Query};
34    /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
35    /// core.add_query(
36    ///     Query::cypher("new-query")
37    ///         .query("MATCH (n) RETURN n")
38    ///         .from_source("source1")
39    ///         .auto_start(true)
40    ///         .build()
41    /// ).await?;
42    /// # Ok(())
43    /// # }
44    /// ```
45    pub async fn add_query(&self, query: QueryConfig) -> Result<()> {
46        self.state_guard.require_initialized().await?;
47
48        self.add_query_with_options(query, true)
49            .await
50            .map_err(|e| DrasiError::provisioning(format!("Failed to add query: {e}")))?;
51
52        Ok(())
53    }
54
55    /// Remove a query from a running server
56    ///
57    /// If the query is running, it will be stopped first before removal.
58    ///
59    /// # Example
60    /// ```no_run
61    /// # use drasi_lib::DrasiLib;
62    /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
63    /// core.remove_query("old-query").await?;
64    /// # Ok(())
65    /// # }
66    /// ```
67    pub async fn remove_query(&self, id: &str) -> Result<()> {
68        self.state_guard.require_initialized().await?;
69
70        // Stop if running
71        let status = self
72            .query_manager
73            .get_query_status(id.to_string())
74            .await
75            .map_err(|_| DrasiError::component_not_found("query", id))?;
76
77        if matches!(status, ComponentStatus::Running) {
78            self.query_manager
79                .stop_query(id.to_string())
80                .await
81                .map_err(|e| DrasiError::provisioning(format!("Failed to stop query: {e}")))?;
82        }
83
84        // Delete the query
85        self.query_manager
86            .delete_query(id.to_string())
87            .await
88            .map_err(|e| DrasiError::provisioning(format!("Failed to delete query: {e}")))?;
89
90        Ok(())
91    }
92
93    /// Start a stopped query
94    ///
95    /// This will create the necessary subscriptions to source data streams.
96    ///
97    /// # Example
98    /// ```no_run
99    /// # use drasi_lib::DrasiLib;
100    /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
101    /// core.start_query("my-query").await?;
102    /// # Ok(())
103    /// # }
104    /// ```
105    pub async fn start_query(&self, id: &str) -> Result<()> {
106        self.state_guard.require_initialized().await?;
107
108        // Verify query exists
109        let _config = self
110            .query_manager
111            .get_query_config(id)
112            .await
113            .ok_or_else(|| DrasiError::component_not_found("query", id))?;
114
115        // Query will subscribe directly to sources when started
116        map_state_error(
117            self.query_manager.start_query(id.to_string()).await,
118            "query",
119            id,
120        )
121    }
122
123    /// Stop a running query
124    ///
125    /// # Example
126    /// ```no_run
127    /// # use drasi_lib::DrasiLib;
128    /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
129    /// core.stop_query("my-query").await?;
130    /// # Ok(())
131    /// # }
132    /// ```
133    pub async fn stop_query(&self, id: &str) -> Result<()> {
134        self.state_guard.require_initialized().await?;
135
136        // Stop the query (it unsubscribes from sources automatically)
137        map_state_error(
138            self.query_manager.stop_query(id.to_string()).await,
139            "query",
140            id,
141        )?;
142
143        Ok(())
144    }
145
146    /// List all queries with their current status
147    ///
148    /// # Example
149    /// ```no_run
150    /// # use drasi_lib::DrasiLib;
151    /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
152    /// let queries = core.list_queries().await?;
153    /// for (id, status) in queries {
154    ///     println!("Query {}: {:?}", id, status);
155    /// }
156    /// # Ok(())
157    /// # }
158    /// ```
159    pub async fn list_queries(&self) -> Result<Vec<(String, ComponentStatus)>> {
160        self.inspection.list_queries().await
161    }
162
163    /// Get detailed information about a specific query
164    ///
165    /// # Example
166    /// ```no_run
167    /// # use drasi_lib::DrasiLib;
168    /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
169    /// let query_info = core.get_query_info("my-query").await?;
170    /// println!("Query: {}", query_info.query);
171    /// println!("Status: {:?}", query_info.status);
172    /// println!("Source subscriptions: {:?}", query_info.source_subscriptions);
173    /// # Ok(())
174    /// # }
175    /// ```
176    pub async fn get_query_info(&self, id: &str) -> Result<QueryRuntime> {
177        self.inspection.get_query_info(id).await
178    }
179
180    /// Get the current status of a specific query
181    ///
182    /// # Example
183    /// ```no_run
184    /// # use drasi_lib::DrasiLib;
185    /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
186    /// let status = core.get_query_status("my-query").await?;
187    /// println!("Query status: {:?}", status);
188    /// # Ok(())
189    /// # }
190    /// ```
191    pub async fn get_query_status(&self, id: &str) -> Result<ComponentStatus> {
192        self.inspection.get_query_status(id).await
193    }
194
195    /// Get the current result set for a running query
196    ///
197    /// # Example
198    /// ```no_run
199    /// # use drasi_lib::DrasiLib;
200    /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
201    /// let results = core.get_query_results("my-query").await?;
202    /// println!("Current results: {} items", results.len());
203    /// # Ok(())
204    /// # }
205    /// ```
206    pub async fn get_query_results(&self, id: &str) -> Result<Vec<serde_json::Value>> {
207        self.inspection.get_query_results(id).await
208    }
209
210    /// Get the full configuration for a specific query
211    ///
212    /// This returns the complete query configuration including all fields like auto_start and joins,
213    /// unlike `get_query_info()` which only returns runtime information.
214    ///
215    /// # Example
216    /// ```no_run
217    /// # use drasi_lib::DrasiLib;
218    /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
219    /// let config = core.get_query_config("my-query").await?;
220    /// println!("Auto-start: {}", config.auto_start);
221    /// # Ok(())
222    /// # }
223    /// ```
224    pub async fn get_query_config(&self, id: &str) -> Result<QueryConfig> {
225        self.inspection.get_query_config(id).await
226    }
227
228    /// Internal helper for creating queries with auto-start control
229    pub(crate) async fn add_query_with_options(
230        &self,
231        config: QueryConfig,
232        allow_auto_start: bool,
233    ) -> AnyhowResult<()> {
234        let query_id = config.id.clone();
235        let should_auto_start = config.auto_start;
236
237        // Add the query (without saving during initialization)
238        self.query_manager.add_query_without_save(config).await?;
239
240        // Start if auto-start is enabled and allowed
241        if should_auto_start && allow_auto_start {
242            self.query_manager.start_query(query_id).await?;
243        }
244
245        Ok(())
246    }
247}