drasi_lib/config/runtime.rs
1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use serde::{Deserialize, Serialize};
16use std::collections::HashMap;
17use std::sync::Arc;
18
19use super::schema::QueryConfig;
20use crate::channels::ComponentStatus;
21use crate::indexes::IndexBackendPlugin;
22use crate::indexes::IndexFactory;
23use crate::state_store::{MemoryStateStoreProvider, StateStoreProvider};
24
25/// Runtime representation of a source with execution status
26///
27/// `SourceRuntime` combines configuration with runtime state information like
28/// current execution status and error messages. It's used for monitoring and
29/// managing source lifecycle.
30///
31/// # Status Values
32///
33/// - `ComponentStatus::Stopped`: Source is configured but not running
34/// - `ComponentStatus::Starting`: Source is initializing
35/// - `ComponentStatus::Running`: Source is actively ingesting data
36/// - `ComponentStatus::Error`: Source encountered an error (see `error_message`)
37///
38/// # Thread Safety
39///
40/// This struct is `Clone` and `Serialize` for sharing across threads and APIs.
41///
42/// # Examples
43///
44/// ```no_run
45/// use drasi_lib::{DrasiLib, ComponentStatus};
46///
47/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
48/// let core = DrasiLib::builder().with_id("my-server").build().await?;
49/// core.start().await?;
50///
51/// // Get runtime information for a source
52/// let source_info = core.get_source_info("orders_db").await?;
53/// println!("Source {} is {:?}", source_info.id, source_info.status);
54///
55/// if let Some(error) = source_info.error_message {
56/// eprintln!("Source error: {}", error);
57/// }
58/// # Ok(())
59/// # }
60/// ```
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct SourceRuntime {
63 /// Unique identifier for the source
64 pub id: String,
65 /// Type of source (e.g., "postgres", "http", "mock", "platform")
66 pub source_type: String,
67 /// Current status of the source
68 pub status: ComponentStatus,
69 /// Error message if status is Error
70 #[serde(skip_serializing_if = "Option::is_none")]
71 pub error_message: Option<String>,
72 /// Source-specific configuration properties
73 pub properties: HashMap<String, serde_json::Value>,
74}
75
76/// Runtime representation of a query with execution status
77///
78/// `QueryRuntime` combines query configuration with runtime state information.
79/// Used for monitoring query execution, tracking which sources it subscribes to,
80/// and inspecting any runtime errors.
81///
82/// # Status Values
83///
84/// - `ComponentStatus::Stopped`: Query is configured but not processing
85/// - `ComponentStatus::Starting`: Query is initializing (bootstrap phase)
86/// - `ComponentStatus::Running`: Query is actively processing events
87/// - `ComponentStatus::Error`: Query encountered an error (see `error_message`)
88///
89/// # Examples
90///
91/// ```no_run
92/// use drasi_lib::{DrasiLib, ComponentStatus};
93///
94/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
95/// let core = DrasiLib::builder().with_id("my-server").build().await?;
96/// core.start().await?;
97///
98/// // Get runtime information for a query
99/// let query_info = core.get_query_info("active_orders").await?;
100/// println!("Query: {}", query_info.query);
101/// println!("Status: {:?}", query_info.status);
102/// println!("Source subscriptions: {:?}", query_info.source_subscriptions);
103///
104/// if let Some(joins) = query_info.joins {
105/// println!("Synthetic joins configured: {}", joins.len());
106/// }
107/// # Ok(())
108/// # }
109/// ```
110#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct QueryRuntime {
112 /// Unique identifier for the query
113 pub id: String,
114 /// Cypher or GQL query string
115 pub query: String,
116 /// Current status of the query
117 pub status: ComponentStatus,
118 /// Error message if status is Error
119 #[serde(skip_serializing_if = "Option::is_none")]
120 pub error_message: Option<String>,
121 /// Source subscriptions with middleware pipelines
122 pub source_subscriptions: Vec<super::schema::SourceSubscriptionConfig>,
123 /// Optional synthetic joins for the query
124 #[serde(skip_serializing_if = "Option::is_none")]
125 pub joins: Option<Vec<super::schema::QueryJoinConfig>>,
126}
127
128/// Runtime representation of a reaction with execution status
129///
130/// `ReactionRuntime` combines reaction configuration with runtime state information.
131/// Used for monitoring reaction execution, tracking which queries it subscribes to,
132/// and inspecting delivery status.
133///
134/// # Status Values
135///
136/// - `ComponentStatus::Stopped`: Reaction is configured but not running
137/// - `ComponentStatus::Starting`: Reaction is initializing connections
138/// - `ComponentStatus::Running`: Reaction is actively delivering results
139/// - `ComponentStatus::Error`: Reaction encountered an error (see `error_message`)
140///
141/// # Examples
142///
143/// ```no_run
144/// use drasi_lib::{DrasiLib, ComponentStatus};
145///
146/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
147/// let core = DrasiLib::builder().with_id("my-server").build().await?;
148/// core.start().await?;
149///
150/// // Get runtime information for a reaction
151/// let reaction_info = core.get_reaction_info("order_webhook").await?;
152/// println!("Reaction {} ({}) is {:?}",
153/// reaction_info.id,
154/// reaction_info.reaction_type,
155/// reaction_info.status
156/// );
157/// println!("Subscribed to queries: {:?}", reaction_info.queries);
158///
159/// if let Some(error) = reaction_info.error_message {
160/// eprintln!("Reaction error: {}", error);
161/// }
162/// # Ok(())
163/// # }
164/// ```
165#[derive(Debug, Clone, Serialize, Deserialize)]
166pub struct ReactionRuntime {
167 /// Unique identifier for the reaction
168 pub id: String,
169 /// Type of reaction (e.g., "log", "http", "grpc", "sse", "platform")
170 pub reaction_type: String,
171 /// Current status of the reaction
172 pub status: ComponentStatus,
173 /// Error message if status is Error
174 #[serde(skip_serializing_if = "Option::is_none")]
175 pub error_message: Option<String>,
176 /// IDs of queries this reaction subscribes to
177 pub queries: Vec<String>,
178 /// Reaction-specific configuration properties
179 pub properties: HashMap<String, serde_json::Value>,
180}
181
182impl From<QueryConfig> for QueryRuntime {
183 fn from(config: QueryConfig) -> Self {
184 Self {
185 id: config.id,
186 query: config.query,
187 status: ComponentStatus::Stopped,
188 error_message: None,
189 source_subscriptions: config.sources,
190 joins: config.joins,
191 }
192 }
193}
194
195/// Runtime configuration with applied defaults
196///
197/// `RuntimeConfig` represents a fully-resolved configuration with all global defaults
198/// applied to individual components. It's created from [`DrasiLibConfig`](super::schema::DrasiLibConfig)
199/// and used internally by [`DrasiLib`](crate::DrasiLib) for execution.
200///
201/// # Plugin Architecture
202///
203/// **Important**: drasi-lib has ZERO awareness of which plugins exist. Sources and
204/// reactions are passed as owned instances via `add_source()` and `add_reaction()`.
205/// Only queries are stored in RuntimeConfig.
206///
207/// # Default Application
208///
209/// When converting from `DrasiLibConfig` to `RuntimeConfig`, global capacity
210/// settings are applied to queries that don't specify their own values:
211///
212/// - **priority_queue_capacity**: Applied to queries (default: 10000)
213/// - **dispatch_buffer_capacity**: Applied to queries (default: 1000)
214///
215/// # Examples
216///
217/// ```yaml
218/// id: my-server
219/// priority_queue_capacity: 50000 # Global default
220///
221/// queries:
222/// - id: q1
223/// query: "MATCH (n) RETURN n"
224/// source_subscriptions:
225/// - source_id: s1
226/// # priority_queue_capacity will be 50000 (inherited)
227///
228/// - id: q2
229/// query: "MATCH (m) RETURN m"
230/// source_subscriptions:
231/// - source_id: s1
232/// priority_queue_capacity: 100000 # Override global
233/// ```
234#[derive(Clone)]
235pub struct RuntimeConfig {
236 /// Unique identifier for this DrasiLib instance
237 pub id: String,
238 /// Index factory for creating storage backend indexes for queries
239 pub index_factory: Arc<IndexFactory>,
240 /// State store provider for plugin state persistence
241 pub state_store_provider: Arc<dyn StateStoreProvider>,
242 /// Query configurations (sources/reactions are now instance-only)
243 pub queries: Vec<QueryConfig>,
244 /// Original global priority queue capacity (before applying to queries)
245 pub global_priority_queue_capacity: Option<usize>,
246 /// Original global dispatch buffer capacity (before applying to queries)
247 pub global_dispatch_buffer_capacity: Option<usize>,
248 /// Original storage backend configurations
249 pub storage_backends: Vec<crate::indexes::StorageBackendConfig>,
250}
251
252impl std::fmt::Debug for RuntimeConfig {
253 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254 f.debug_struct("RuntimeConfig")
255 .field("id", &self.id)
256 .field("index_factory", &self.index_factory)
257 .field("state_store_provider", &"<dyn StateStoreProvider>")
258 .field("queries", &self.queries)
259 .field(
260 "global_priority_queue_capacity",
261 &self.global_priority_queue_capacity,
262 )
263 .field(
264 "global_dispatch_buffer_capacity",
265 &self.global_dispatch_buffer_capacity,
266 )
267 .field("storage_backends", &self.storage_backends)
268 .finish()
269 }
270}
271
272impl RuntimeConfig {
273 /// Create a new RuntimeConfig with optional index backend and state store providers.
274 ///
275 /// When an index provider is supplied, RocksDB and Redis/Garnet storage backends
276 /// will delegate to the provider for index creation. Without a provider, only
277 /// in-memory storage backends can be used.
278 ///
279 /// When a state store provider is supplied, it will be used for plugin state
280 /// persistence. Without a provider, the default in-memory state store is used.
281 ///
282 /// # Arguments
283 ///
284 /// * `config` - The DrasiLib configuration
285 /// * `index_provider` - Optional index backend plugin for persistent storage
286 /// * `state_store_provider` - Optional state store provider for plugin state
287 pub fn new(
288 config: super::schema::DrasiLibConfig,
289 index_provider: Option<Arc<dyn IndexBackendPlugin>>,
290 state_store_provider: Option<Arc<dyn StateStoreProvider>>,
291 ) -> Self {
292 // Preserve original global defaults for config snapshot round-tripping
293 let global_priority_queue_capacity = config.priority_queue_capacity;
294 let global_dispatch_buffer_capacity = config.dispatch_buffer_capacity;
295
296 // Get the global defaults (or hardcoded fallbacks)
297 let global_priority_queue = global_priority_queue_capacity.unwrap_or(10000);
298 let global_dispatch_capacity = global_dispatch_buffer_capacity.unwrap_or(1000);
299
300 // Preserve original storage backends for config snapshot
301 let storage_backends = config.storage_backends.clone();
302
303 // Create IndexFactory from storage backend configurations with optional plugin
304 let index_factory = Arc::new(IndexFactory::new(config.storage_backends, index_provider));
305
306 // Use provided state store or default to in-memory
307 let state_store_provider: Arc<dyn StateStoreProvider> =
308 state_store_provider.unwrap_or_else(|| Arc::new(MemoryStateStoreProvider::new()));
309
310 // Apply global defaults to queries
311 let queries = config
312 .queries
313 .into_iter()
314 .map(|mut q| {
315 if q.priority_queue_capacity.is_none() {
316 q.priority_queue_capacity = Some(global_priority_queue);
317 }
318 if q.dispatch_buffer_capacity.is_none() {
319 q.dispatch_buffer_capacity = Some(global_dispatch_capacity);
320 }
321 q
322 })
323 .collect();
324
325 Self {
326 id: config.id,
327 index_factory,
328 state_store_provider,
329 queries,
330 global_priority_queue_capacity,
331 global_dispatch_buffer_capacity,
332 storage_backends,
333 }
334 }
335}
336
337impl From<super::schema::DrasiLibConfig> for RuntimeConfig {
338 fn from(config: super::schema::DrasiLibConfig) -> Self {
339 // Default to no index provider and no state store provider
340 Self::new(config, None, None)
341 }
342}