drasi_lib/config/runtime.rs
1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use serde::{Deserialize, Serialize};
16use std::collections::HashMap;
17use std::sync::Arc;
18
19use super::schema::QueryConfig;
20use crate::channels::ComponentStatus;
21use crate::identity::IdentityProvider;
22use crate::indexes::IndexBackendPlugin;
23use crate::indexes::IndexFactory;
24use crate::state_store::{MemoryStateStoreProvider, StateStoreProvider};
25
26/// Runtime representation of a source with execution status
27///
28/// `SourceRuntime` combines configuration with runtime state information like
29/// current execution status and error messages. It's used for monitoring and
30/// managing source lifecycle.
31///
32/// # Status Values
33///
34/// - `ComponentStatus::Stopped`: Source is configured but not running
35/// - `ComponentStatus::Starting`: Source is initializing
36/// - `ComponentStatus::Running`: Source is actively ingesting data
37/// - `ComponentStatus::Error`: Source encountered an error (see `error_message`)
38///
39/// # Thread Safety
40///
41/// This struct is `Clone` and `Serialize` for sharing across threads and APIs.
42///
43/// # Examples
44///
45/// ```no_run
46/// use drasi_lib::{DrasiLib, ComponentStatus};
47///
48/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
49/// let core = DrasiLib::builder().with_id("my-server").build().await?;
50/// core.start().await?;
51///
52/// // Get runtime information for a source
53/// let source_info = core.get_source_info("orders_db").await?;
54/// println!("Source {} is {:?}", source_info.id, source_info.status);
55///
56/// if let Some(error) = source_info.error_message {
57/// eprintln!("Source error: {}", error);
58/// }
59/// # Ok(())
60/// # }
61/// ```
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct SourceRuntime {
64 /// Unique identifier for the source
65 pub id: String,
66 /// Type of source (e.g., "postgres", "http", "mock", "platform")
67 pub source_type: String,
68 /// Current status of the source
69 pub status: ComponentStatus,
70 /// Error message if status is Error
71 #[serde(skip_serializing_if = "Option::is_none")]
72 pub error_message: Option<String>,
73 /// Source-specific configuration properties
74 pub properties: HashMap<String, serde_json::Value>,
75}
76
77/// Runtime representation of a query with execution status
78///
79/// `QueryRuntime` combines query configuration with runtime state information.
80/// Used for monitoring query execution, tracking which sources it subscribes to,
81/// and inspecting any runtime errors.
82///
83/// # Status Values
84///
85/// - `ComponentStatus::Stopped`: Query is configured but not processing
86/// - `ComponentStatus::Starting`: Query is initializing (bootstrap phase)
87/// - `ComponentStatus::Running`: Query is actively processing events
88/// - `ComponentStatus::Error`: Query encountered an error (see `error_message`)
89///
90/// # Examples
91///
92/// ```no_run
93/// use drasi_lib::{DrasiLib, ComponentStatus};
94///
95/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
96/// let core = DrasiLib::builder().with_id("my-server").build().await?;
97/// core.start().await?;
98///
99/// // Get runtime information for a query
100/// let query_info = core.get_query_info("active_orders").await?;
101/// println!("Query: {}", query_info.query);
102/// println!("Status: {:?}", query_info.status);
103/// println!("Source subscriptions: {:?}", query_info.source_subscriptions);
104///
105/// if let Some(joins) = query_info.joins {
106/// println!("Synthetic joins configured: {}", joins.len());
107/// }
108/// # Ok(())
109/// # }
110/// ```
111#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct QueryRuntime {
113 /// Unique identifier for the query
114 pub id: String,
115 /// Cypher or GQL query string
116 pub query: String,
117 /// Current status of the query
118 pub status: ComponentStatus,
119 /// Error message if status is Error
120 #[serde(skip_serializing_if = "Option::is_none")]
121 pub error_message: Option<String>,
122 /// Source subscriptions with middleware pipelines
123 pub source_subscriptions: Vec<super::schema::SourceSubscriptionConfig>,
124 /// Optional synthetic joins for the query
125 #[serde(skip_serializing_if = "Option::is_none")]
126 pub joins: Option<Vec<super::schema::QueryJoinConfig>>,
127}
128
129/// Runtime representation of a reaction with execution status
130///
131/// `ReactionRuntime` combines reaction configuration with runtime state information.
132/// Used for monitoring reaction execution, tracking which queries it subscribes to,
133/// and inspecting delivery status.
134///
135/// # Status Values
136///
137/// - `ComponentStatus::Stopped`: Reaction is configured but not running
138/// - `ComponentStatus::Starting`: Reaction is initializing connections
139/// - `ComponentStatus::Running`: Reaction is actively delivering results
140/// - `ComponentStatus::Error`: Reaction encountered an error (see `error_message`)
141///
142/// # Examples
143///
144/// ```no_run
145/// use drasi_lib::{DrasiLib, ComponentStatus};
146///
147/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
148/// let core = DrasiLib::builder().with_id("my-server").build().await?;
149/// core.start().await?;
150///
151/// // Get runtime information for a reaction
152/// let reaction_info = core.get_reaction_info("order_webhook").await?;
153/// println!("Reaction {} ({}) is {:?}",
154/// reaction_info.id,
155/// reaction_info.reaction_type,
156/// reaction_info.status
157/// );
158/// println!("Subscribed to queries: {:?}", reaction_info.queries);
159///
160/// if let Some(error) = reaction_info.error_message {
161/// eprintln!("Reaction error: {}", error);
162/// }
163/// # Ok(())
164/// # }
165/// ```
166#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct ReactionRuntime {
168 /// Unique identifier for the reaction
169 pub id: String,
170 /// Type of reaction (e.g., "log", "http", "grpc", "sse", "platform")
171 pub reaction_type: String,
172 /// Current status of the reaction
173 pub status: ComponentStatus,
174 /// Error message if status is Error
175 #[serde(skip_serializing_if = "Option::is_none")]
176 pub error_message: Option<String>,
177 /// IDs of queries this reaction subscribes to
178 pub queries: Vec<String>,
179 /// Reaction-specific configuration properties
180 pub properties: HashMap<String, serde_json::Value>,
181}
182
183impl From<QueryConfig> for QueryRuntime {
184 fn from(config: QueryConfig) -> Self {
185 Self {
186 id: config.id,
187 query: config.query,
188 status: ComponentStatus::Stopped,
189 error_message: None,
190 source_subscriptions: config.sources,
191 joins: config.joins,
192 }
193 }
194}
195
196/// Runtime configuration with applied defaults
197///
198/// `RuntimeConfig` represents a fully-resolved configuration with all global defaults
199/// applied to individual components. It's created from [`DrasiLibConfig`](super::schema::DrasiLibConfig)
200/// and used internally by [`DrasiLib`](crate::DrasiLib) for execution.
201///
202/// # Plugin Architecture
203///
204/// **Important**: drasi-lib has ZERO awareness of which plugins exist. Sources and
205/// reactions are passed as owned instances via `add_source()` and `add_reaction()`.
206/// Only queries are stored in RuntimeConfig.
207///
208/// # Default Application
209///
210/// When converting from `DrasiLibConfig` to `RuntimeConfig`, global capacity
211/// settings are applied to queries that don't specify their own values:
212///
213/// - **priority_queue_capacity**: Applied to queries (default: 10000)
214/// - **dispatch_buffer_capacity**: Applied to queries (default: 1000)
215///
216/// # Examples
217///
218/// ```yaml
219/// id: my-server
220/// priority_queue_capacity: 50000 # Global default
221///
222/// queries:
223/// - id: q1
224/// query: "MATCH (n) RETURN n"
225/// source_subscriptions:
226/// - source_id: s1
227/// # priority_queue_capacity will be 50000 (inherited)
228///
229/// - id: q2
230/// query: "MATCH (m) RETURN m"
231/// source_subscriptions:
232/// - source_id: s1
233/// priority_queue_capacity: 100000 # Override global
234/// ```
235#[derive(Clone)]
236pub struct RuntimeConfig {
237 /// Unique identifier for this DrasiLib instance
238 pub id: String,
239 /// Index factory for creating storage backend indexes for queries
240 pub index_factory: Arc<IndexFactory>,
241 /// State store provider for plugin state persistence
242 pub state_store_provider: Arc<dyn StateStoreProvider>,
243 /// Optional identity provider for credential injection into sources/reactions
244 pub identity_provider: Option<Arc<dyn IdentityProvider>>,
245 /// Optional secret store provider for resolving `ConfigValue::Secret` references
246 pub secret_store_provider: Option<Arc<dyn crate::secret_store::SecretStoreProvider>>,
247 /// Query configurations (sources/reactions are now instance-only)
248 pub queries: Vec<QueryConfig>,
249 /// Original global priority queue capacity (before applying to queries)
250 pub global_priority_queue_capacity: Option<usize>,
251 /// Original global dispatch buffer capacity (before applying to queries)
252 pub global_dispatch_buffer_capacity: Option<usize>,
253 /// Original storage backend configurations
254 pub storage_backends: Vec<crate::indexes::StorageBackendConfig>,
255 /// Global default recovery policy for all queries.
256 /// Per-query `QueryConfig::recovery_policy` overrides this.
257 /// If neither is set, defaults to `Strict`.
258 pub default_recovery_policy: Option<crate::recovery::RecoveryPolicy>,
259}
260
261impl std::fmt::Debug for RuntimeConfig {
262 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
263 f.debug_struct("RuntimeConfig")
264 .field("id", &self.id)
265 .field("index_factory", &self.index_factory)
266 .field("state_store_provider", &"<dyn StateStoreProvider>")
267 .field(
268 "identity_provider",
269 &self
270 .identity_provider
271 .as_ref()
272 .map(|_| "<dyn IdentityProvider>"),
273 )
274 .field(
275 "secret_store_provider",
276 &self
277 .secret_store_provider
278 .as_ref()
279 .map(|_| "<dyn SecretStoreProvider>"),
280 )
281 .field("queries", &self.queries)
282 .field(
283 "global_priority_queue_capacity",
284 &self.global_priority_queue_capacity,
285 )
286 .field(
287 "global_dispatch_buffer_capacity",
288 &self.global_dispatch_buffer_capacity,
289 )
290 .field("storage_backends", &self.storage_backends)
291 .field("default_recovery_policy", &self.default_recovery_policy)
292 .finish()
293 }
294}
295
296impl RuntimeConfig {
297 /// Create a new RuntimeConfig with optional index backend, state store, and identity providers.
298 ///
299 /// When an index provider is supplied, RocksDB and Redis/Garnet storage backends
300 /// will delegate to the provider for index creation. Without a provider, only
301 /// in-memory storage backends can be used.
302 ///
303 /// When a state store provider is supplied, it will be used for plugin state
304 /// persistence. Without a provider, the default in-memory state store is used.
305 ///
306 /// When an identity provider is supplied, it will be injected into source and
307 /// reaction runtime contexts for credential acquisition.
308 ///
309 /// # Arguments
310 ///
311 /// * `config` - The DrasiLib configuration
312 /// * `index_provider` - Optional index backend plugin for persistent storage
313 /// * `state_store_provider` - Optional state store provider for plugin state
314 /// * `identity_provider` - Optional identity provider for credential injection
315 /// * `secret_store_provider` - Optional secret store provider for resolving secrets
316 pub fn new(
317 config: super::schema::DrasiLibConfig,
318 index_provider: Option<Arc<dyn IndexBackendPlugin>>,
319 state_store_provider: Option<Arc<dyn StateStoreProvider>>,
320 identity_provider: Option<Arc<dyn IdentityProvider>>,
321 secret_store_provider: Option<Arc<dyn crate::secret_store::SecretStoreProvider>>,
322 default_recovery_policy: Option<crate::recovery::RecoveryPolicy>,
323 ) -> Self {
324 // Preserve original global defaults for config snapshot round-tripping
325 let global_priority_queue_capacity = config.priority_queue_capacity;
326 let global_dispatch_buffer_capacity = config.dispatch_buffer_capacity;
327
328 // Get the global defaults (or hardcoded fallbacks)
329 let global_priority_queue = global_priority_queue_capacity.unwrap_or(10000);
330 let global_dispatch_capacity = global_dispatch_buffer_capacity.unwrap_or(1000);
331
332 // Preserve original storage backends for config snapshot
333 let storage_backends = config.storage_backends.clone();
334
335 // Create IndexFactory from storage backend configurations with optional plugin
336 let index_factory = Arc::new(IndexFactory::new(config.storage_backends, index_provider));
337
338 // Use provided state store or default to in-memory
339 let state_store_provider: Arc<dyn StateStoreProvider> =
340 state_store_provider.unwrap_or_else(|| Arc::new(MemoryStateStoreProvider::new()));
341
342 // Apply global defaults to queries
343 let queries = config
344 .queries
345 .into_iter()
346 .map(|mut q| {
347 if q.priority_queue_capacity.is_none() {
348 q.priority_queue_capacity = Some(global_priority_queue);
349 }
350 if q.dispatch_buffer_capacity.is_none() {
351 q.dispatch_buffer_capacity = Some(global_dispatch_capacity);
352 }
353 q
354 })
355 .collect();
356
357 Self {
358 id: config.id,
359 index_factory,
360 state_store_provider,
361 identity_provider,
362 secret_store_provider,
363 queries,
364 global_priority_queue_capacity,
365 global_dispatch_buffer_capacity,
366 storage_backends,
367 default_recovery_policy,
368 }
369 }
370}
371
372impl From<super::schema::DrasiLibConfig> for RuntimeConfig {
373 fn from(config: super::schema::DrasiLibConfig) -> Self {
374 Self::new(config, None, None, None, None, None)
375 }
376}