drasi_lib/lib_core_ops/query_ops.rs
1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Query management operations for DrasiLib
16//!
17//! This module provides all query-related operations including creating, removing,
18//! starting, and stopping queries.
19
20use anyhow::Result as AnyhowResult;
21
22use crate::channels::ComponentStatus;
23use crate::component_ops::map_state_error;
24use crate::config::{QueryConfig, QueryRuntime};
25use crate::error::{DrasiError, Result};
26use crate::lib_core::DrasiLib;
27
28impl DrasiLib {
29 /// Create a query in a running server
30 ///
31 /// # Example
32 /// ```no_run
33 /// # use drasi_lib::{DrasiLib, Query};
34 /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
35 /// core.add_query(
36 /// Query::cypher("new-query")
37 /// .query("MATCH (n) RETURN n")
38 /// .from_source("source1")
39 /// .auto_start(true)
40 /// .build()
41 /// ).await?;
42 /// # Ok(())
43 /// # }
44 /// ```
45 pub async fn add_query(&self, query: QueryConfig) -> Result<()> {
46 self.state_guard.require_initialized().await?;
47
48 self.add_query_with_options(query, true)
49 .await
50 .map_err(|e| DrasiError::provisioning(format!("Failed to add query: {e}")))?;
51
52 Ok(())
53 }
54
55 /// Remove a query from a running server
56 ///
57 /// If the query is running, it will be stopped first before removal.
58 ///
59 /// # Example
60 /// ```no_run
61 /// # use drasi_lib::DrasiLib;
62 /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
63 /// core.remove_query("old-query").await?;
64 /// # Ok(())
65 /// # }
66 /// ```
67 pub async fn remove_query(&self, id: &str) -> Result<()> {
68 self.state_guard.require_initialized().await?;
69
70 // Stop if running
71 let status = self
72 .query_manager
73 .get_query_status(id.to_string())
74 .await
75 .map_err(|_| DrasiError::component_not_found("query", id))?;
76
77 if matches!(status, ComponentStatus::Running) {
78 self.query_manager
79 .stop_query(id.to_string())
80 .await
81 .map_err(|e| DrasiError::provisioning(format!("Failed to stop query: {e}")))?;
82 }
83
84 // Delete the query
85 self.query_manager
86 .delete_query(id.to_string())
87 .await
88 .map_err(|e| DrasiError::provisioning(format!("Failed to delete query: {e}")))?;
89
90 Ok(())
91 }
92
93 /// Start a stopped query
94 ///
95 /// This will create the necessary subscriptions to source data streams.
96 ///
97 /// # Example
98 /// ```no_run
99 /// # use drasi_lib::DrasiLib;
100 /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
101 /// core.start_query("my-query").await?;
102 /// # Ok(())
103 /// # }
104 /// ```
105 pub async fn start_query(&self, id: &str) -> Result<()> {
106 self.state_guard.require_initialized().await?;
107
108 // Verify query exists
109 let _config = self
110 .query_manager
111 .get_query_config(id)
112 .await
113 .ok_or_else(|| DrasiError::component_not_found("query", id))?;
114
115 // Query will subscribe directly to sources when started
116 map_state_error(
117 self.query_manager.start_query(id.to_string()).await,
118 "query",
119 id,
120 )
121 }
122
123 /// Stop a running query
124 ///
125 /// # Example
126 /// ```no_run
127 /// # use drasi_lib::DrasiLib;
128 /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
129 /// core.stop_query("my-query").await?;
130 /// # Ok(())
131 /// # }
132 /// ```
133 pub async fn stop_query(&self, id: &str) -> Result<()> {
134 self.state_guard.require_initialized().await?;
135
136 // Stop the query (it unsubscribes from sources automatically)
137 map_state_error(
138 self.query_manager.stop_query(id.to_string()).await,
139 "query",
140 id,
141 )?;
142
143 Ok(())
144 }
145
146 /// List all queries with their current status
147 ///
148 /// # Example
149 /// ```no_run
150 /// # use drasi_lib::DrasiLib;
151 /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
152 /// let queries = core.list_queries().await?;
153 /// for (id, status) in queries {
154 /// println!("Query {}: {:?}", id, status);
155 /// }
156 /// # Ok(())
157 /// # }
158 /// ```
159 pub async fn list_queries(&self) -> Result<Vec<(String, ComponentStatus)>> {
160 self.inspection.list_queries().await
161 }
162
163 /// Get detailed information about a specific query
164 ///
165 /// # Example
166 /// ```no_run
167 /// # use drasi_lib::DrasiLib;
168 /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
169 /// let query_info = core.get_query_info("my-query").await?;
170 /// println!("Query: {}", query_info.query);
171 /// println!("Status: {:?}", query_info.status);
172 /// println!("Source subscriptions: {:?}", query_info.source_subscriptions);
173 /// # Ok(())
174 /// # }
175 /// ```
176 pub async fn get_query_info(&self, id: &str) -> Result<QueryRuntime> {
177 self.inspection.get_query_info(id).await
178 }
179
180 /// Get the current status of a specific query
181 ///
182 /// # Example
183 /// ```no_run
184 /// # use drasi_lib::DrasiLib;
185 /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
186 /// let status = core.get_query_status("my-query").await?;
187 /// println!("Query status: {:?}", status);
188 /// # Ok(())
189 /// # }
190 /// ```
191 pub async fn get_query_status(&self, id: &str) -> Result<ComponentStatus> {
192 self.inspection.get_query_status(id).await
193 }
194
195 /// Get the current result set for a running query
196 ///
197 /// # Example
198 /// ```no_run
199 /// # use drasi_lib::DrasiLib;
200 /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
201 /// let results = core.get_query_results("my-query").await?;
202 /// println!("Current results: {} items", results.len());
203 /// # Ok(())
204 /// # }
205 /// ```
206 pub async fn get_query_results(&self, id: &str) -> Result<Vec<serde_json::Value>> {
207 self.inspection.get_query_results(id).await
208 }
209
210 /// Get the full configuration for a specific query
211 ///
212 /// This returns the complete query configuration including all fields like auto_start and joins,
213 /// unlike `get_query_info()` which only returns runtime information.
214 ///
215 /// # Example
216 /// ```no_run
217 /// # use drasi_lib::DrasiLib;
218 /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
219 /// let config = core.get_query_config("my-query").await?;
220 /// println!("Auto-start: {}", config.auto_start);
221 /// # Ok(())
222 /// # }
223 /// ```
224 pub async fn get_query_config(&self, id: &str) -> Result<QueryConfig> {
225 self.inspection.get_query_config(id).await
226 }
227
228 /// Internal helper for creating queries with auto-start control
229 pub(crate) async fn add_query_with_options(
230 &self,
231 config: QueryConfig,
232 allow_auto_start: bool,
233 ) -> AnyhowResult<()> {
234 let query_id = config.id.clone();
235 let should_auto_start = config.auto_start;
236
237 // Add the query (without saving during initialization)
238 self.query_manager.add_query_without_save(config).await?;
239
240 // Start if auto-start is enabled and allowed
241 if should_auto_start && allow_auto_start {
242 self.query_manager.start_query(query_id).await?;
243 }
244
245 Ok(())
246 }
247}