Skip to main content

drasi_lib/config/
schema.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Result;
16use serde::{Deserialize, Serialize};
17use std::collections::HashSet;
18
19use crate::channels::DispatchMode;
20use crate::indexes::{StorageBackendConfig, StorageBackendRef};
21use drasi_core::models::SourceMiddlewareConfig;
22
23/// Query language for continuous queries
24///
25/// Drasi supports two query languages for continuous query processing:
26///
27/// # Query Languages
28///
29/// - **Cypher**: Default graph query language with pattern matching
30/// - **GQL**: GraphQL-style queries compiled to Cypher
31///
32/// # Default Behavior
33///
34/// If not specified, queries default to `QueryLanguage::Cypher`.
35///
36/// # Examples
37///
38/// ## Using Cypher (Default)
39///
40/// ```yaml
41/// queries:
42///   - id: active_orders
43///     query: "MATCH (o:Order) WHERE o.status = 'active' RETURN o"
44///     queryLanguage: Cypher  # Optional, this is the default
45///     sources: [orders_db]
46/// ```
47///
48/// ## Using GQL
49///
50/// ```yaml
51/// queries:
52///   - id: user_data
53///     query: |
54///       {
55///         users(status: "active") {
56///           id
57///           name
58///           email
59///         }
60///       }
61///     queryLanguage: GQL
62///     sources: [users_db]
63/// ```
64///
65/// # Important Limitations
66///
67/// **Unsupported Clauses**: ORDER BY, TOP, and LIMIT clauses are not supported in continuous
68/// queries as they conflict with incremental result computation.
69#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
70pub enum QueryLanguage {
71    #[default]
72    Cypher,
73    GQL,
74}
75
76/// Source subscription configuration for queries
77///
78/// `SourceSubscriptionConfig` defines how a query subscribes to a specific source,
79/// including any middleware pipeline to apply to changes from that source.
80///
81/// # Fields
82///
83/// - **source_id**: ID of the source to subscribe to
84/// - **nodes**: Optional list of node labels to subscribe to from this source
85/// - **relations**: Optional list of relation labels to subscribe to from this source
86/// - **pipeline**: Optional list of middleware IDs to apply to changes from this source
87///
88/// # Examples
89///
90/// ## Simple Subscription (No Pipeline)
91///
92/// ```yaml
93/// source_subscriptions:
94///   - source_id: orders_db
95///     pipeline: []
96/// ```
97///
98/// ## Subscription with Middleware Pipeline
99///
100/// ```yaml
101/// source_subscriptions:
102///   - source_id: raw_events
103///     pipeline: [decoder, mapper, validator]
104/// ```
105///
106/// ## Subscription with Label Filtering
107///
108/// ```yaml
109/// source_subscriptions:
110///   - source_id: orders_db
111///     nodes: [Order, Customer]
112///     relations: [PLACED_BY]
113///     pipeline: []
114/// ```
115#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct SourceSubscriptionConfig {
117    pub source_id: String,
118    #[serde(default)]
119    pub nodes: Vec<String>,
120    #[serde(default)]
121    pub relations: Vec<String>,
122    #[serde(default)]
123    pub pipeline: Vec<String>,
124}
125
126/// Settings passed to a source when subscribing
127///
128/// `SourceSubscriptionSettings` contains all the information a source needs to
129/// intelligently handle bootstrap and subscription for a query, including the
130/// specific node and relation labels the query is interested in.
131///
132/// # Fields
133///
134/// - **source_id**: ID of the source
135/// - **enable_bootstrap**: Whether to request initial data
136/// - **query_id**: ID of the subscribing query
137/// - **nodes**: Set of node labels the query is interested in from this source
138/// - **relations**: Set of relation labels the query is interested in from this source
139///
140/// # Example
141///
142/// ```ignore
143/// use drasi_lib::config::SourceSubscriptionSettings;
144/// use std::collections::HashSet;
145///
146/// let settings = SourceSubscriptionSettings {
147///     source_id: "orders_db".to_string(),
148///     enable_bootstrap: true,
149///     query_id: "my-query".to_string(),
150///     nodes: ["Order", "Customer"].iter().map(|s| s.to_string()).collect(),
151///     relations: ["PLACED_BY"].iter().map(|s| s.to_string()).collect(),
152/// };
153/// ```
154#[derive(Debug, Clone)]
155pub struct SourceSubscriptionSettings {
156    pub source_id: String,
157    pub enable_bootstrap: bool,
158    pub query_id: String,
159    pub nodes: HashSet<String>,
160    pub relations: HashSet<String>,
161}
162
163/// Root configuration for Drasi Server Core
164///
165/// `DrasiLibConfig` is the top-level configuration structure for DrasiLib.
166/// It defines server settings, queries, and storage backends.
167///
168/// # Plugin Architecture
169///
170/// **Important**: drasi-lib has ZERO awareness of which plugins exist. Sources and
171/// reactions are passed as owned instances via `with_source()` and `with_reaction()`
172/// on the builder. Only queries can be configured via the builder.
173///
174/// # Configuration Structure
175///
176/// A typical configuration has these sections:
177///
178/// 1. **id**: Unique server identifier (optional, defaults to UUID)
179/// 2. **priority_queue_capacity**: Default capacity for event queues (optional)
180/// 3. **dispatch_buffer_capacity**: Default capacity for dispatch buffers (optional)
181/// 4. **storage_backends**: Storage backend definitions (optional)
182/// 5. **queries**: Continuous queries to process data
183///
184/// # Capacity Settings
185///
186/// - **priority_queue_capacity**: Default capacity for timestamp-ordered event queues in
187///   queries and reactions. Higher values support more out-of-order events but consume
188///   more memory. Default: 10000
189///
190/// - **dispatch_buffer_capacity**: Default capacity for event dispatch channels between
191///   components (sources → queries, queries → reactions). Higher values improve throughput
192///   under load but consume more memory. Default: 1000
193///
194/// Individual components can override these defaults by setting their own capacity values.
195///
196/// # Thread Safety
197///
198/// This struct is `Clone` and can be safely shared across threads.
199///
200/// # Usage
201///
202/// Use `DrasiLib::builder()` to create instances:
203///
204/// ```no_run
205/// use drasi_lib::{DrasiLib, Query};
206///
207/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
208/// let core = DrasiLib::builder()
209///     .with_id("my-server")
210///     .with_query(
211///         Query::cypher("my-query")
212///             .query("MATCH (n) RETURN n")
213///             .from_source("my-source")
214///             .build()
215///     )
216///     .build()
217///     .await?;
218/// # Ok(())
219/// # }
220/// ```
221///
222/// # Validation
223///
224/// Call [`validate()`](DrasiLibConfig::validate) to check:
225/// - Unique query IDs
226/// - Valid storage backend references
227///
228/// Note: Source and reaction validation happens at runtime when instances are added.
229#[derive(Debug, Clone, Serialize, Deserialize)]
230pub struct DrasiLibConfig {
231    /// Unique identifier for this DrasiLib instance (defaults to UUID)
232    #[serde(default = "default_id")]
233    pub id: String,
234    /// Default priority queue capacity for queries and reactions (default: 10000 if not specified)
235    #[serde(default, skip_serializing_if = "Option::is_none")]
236    pub priority_queue_capacity: Option<usize>,
237    /// Default dispatch buffer capacity for sources and queries (default: 1000 if not specified)
238    #[serde(default, skip_serializing_if = "Option::is_none")]
239    pub dispatch_buffer_capacity: Option<usize>,
240    /// Global storage backend definitions that can be referenced by queries
241    #[serde(default)]
242    pub storage_backends: Vec<StorageBackendConfig>,
243    /// Query configurations
244    #[serde(default)]
245    pub queries: Vec<QueryConfig>,
246}
247
248impl Default for DrasiLibConfig {
249    fn default() -> Self {
250        Self {
251            id: default_id(),
252            priority_queue_capacity: None,
253            dispatch_buffer_capacity: None,
254            storage_backends: Vec::new(),
255            queries: Vec::new(),
256        }
257    }
258}
259
260/// Configuration for a continuous query
261///
262/// `QueryConfig` defines a continuous query that processes data changes from sources
263/// and emits incremental result updates. Queries subscribe to one or more sources and
264/// maintain materialized views that update automatically as data changes.
265///
266/// # Query Languages
267///
268/// Queries can be written in either:
269/// - **Cypher**: Default graph pattern matching language
270/// - **GQL**: GraphQL-style queries (compiled to Cypher)
271///
272/// **Important**: ORDER BY, TOP, and LIMIT clauses are not supported in continuous
273/// queries as they conflict with incremental result computation.
274///
275/// # Bootstrap Processing
276///
277/// - **enableBootstrap**: Controls whether the query processes initial data (default: true)
278/// - **bootstrapBufferSize**: Event buffer size during bootstrap phase (default: 10000)
279///
280/// During bootstrap, events are buffered to maintain ordering while initial data loads.
281/// After bootstrap completes, queries switch to incremental processing mode.
282///
283/// # Synthetic Joins
284///
285/// Queries can define synthetic relationships between node types from different sources
286/// via the `joins` field. This creates virtual edges based on property equality without
287/// requiring physical relationships in the source data.
288///
289/// # Configuration Fields
290///
291/// - **id**: Unique identifier (referenced by reactions)
292/// - **query**: Query string in specified language
293/// - **queryLanguage**: Cypher or GQL (default: Cypher)
294/// - **sources**: Source IDs to subscribe to
295/// - **auto_start**: Start automatically (default: true)
296/// - **joins**: Optional synthetic join definitions
297/// - **enableBootstrap**: Process initial data (default: true)
298/// - **bootstrapBufferSize**: Buffer size during bootstrap (default: 10000)
299/// - **priority_queue_capacity**: Out-of-order event queue size (overrides global)
300/// - **dispatch_buffer_capacity**: Output buffer size (overrides global)
301/// - **dispatch_mode**: Broadcast or Channel routing
302///
303/// # Examples
304///
305/// ## Basic Cypher Query
306///
307/// ```yaml
308/// queries:
309///   - id: active_orders
310///     query: "MATCH (o:Order) WHERE o.status = 'active' RETURN o"
311///     queryLanguage: Cypher  # Optional, this is default
312///     sources: [orders_db]
313///     auto_start: true
314///     enableBootstrap: true
315///     bootstrapBufferSize: 10000
316/// ```
317///
318/// ## Query with Multiple Sources
319///
320/// ```yaml
321/// queries:
322///   - id: order_customer_join
323///     query: |
324///       MATCH (o:Order)-[:BELONGS_TO]->(c:Customer)
325///       WHERE o.status = 'active'
326///       RETURN o, c
327///     sources: [orders_db, customers_db]
328/// ```
329///
330/// ## Query with Synthetic Joins
331///
332/// ```yaml
333/// queries:
334///   - id: synthetic_join_query
335///     query: |
336///       MATCH (o:Order)-[:CUSTOMER]->(c:Customer)
337///       RETURN o.id, c.name
338///     sources: [orders_db, customers_db]
339///     joins:
340///       - id: CUSTOMER              # Relationship type in query
341///         keys:
342///           - label: Order
343///             property: customer_id
344///           - label: Customer
345///             property: id
346/// ```
347///
348/// ## High-Throughput Query
349///
350/// ```yaml
351/// queries:
352///   - id: high_volume_processing
353///     query: "MATCH (n:Event) WHERE n.timestamp > timestamp() - 60000 RETURN n"
354///     sources: [event_stream]
355///     priority_queue_capacity: 100000  # Large queue for many out-of-order events
356///     dispatch_buffer_capacity: 10000  # Large output buffer
357///     bootstrapBufferSize: 50000       # Large bootstrap buffer
358/// ```
359///
360/// ## GQL Query
361///
362/// ```yaml
363/// queries:
364///   - id: gql_users
365///     query: |
366///       {
367///         users(status: "active") {
368///           id
369///           name
370///           email
371///         }
372///       }
373///     queryLanguage: GQL
374///     sources: [users_db]
375/// ```
376#[derive(Debug, Clone, Serialize, Deserialize)]
377pub struct QueryConfig {
378    /// Unique identifier for the query
379    pub id: String,
380    /// Query string (Cypher or GQL depending on query_language)
381    pub query: String,
382    /// Query language to use (default: Cypher)
383    #[serde(default, rename = "queryLanguage")]
384    pub query_language: QueryLanguage,
385    /// Middleware configurations for this query
386    #[serde(default)]
387    pub middleware: Vec<SourceMiddlewareConfig>,
388    /// Source subscriptions with optional middleware pipelines
389    #[serde(default)]
390    pub sources: Vec<SourceSubscriptionConfig>,
391    /// Whether to automatically start this query (default: true)
392    #[serde(default = "default_auto_start")]
393    pub auto_start: bool,
394    /// Optional synthetic joins for the query
395    #[serde(skip_serializing_if = "Option::is_none")]
396    pub joins: Option<Vec<QueryJoinConfig>>,
397    /// Whether to enable bootstrap (default: true)
398    #[serde(default = "default_enable_bootstrap", rename = "enableBootstrap")]
399    pub enable_bootstrap: bool,
400    /// Maximum number of events to buffer during bootstrap (default: 10000)
401    #[serde(
402        default = "default_bootstrap_buffer_size",
403        rename = "bootstrapBufferSize"
404    )]
405    pub bootstrap_buffer_size: usize,
406    /// Priority queue capacity for this query (default: server global, or 10000 if not specified)
407    #[serde(default, skip_serializing_if = "Option::is_none")]
408    pub priority_queue_capacity: Option<usize>,
409    /// Dispatch buffer capacity for this query (default: server global, or 1000 if not specified)
410    #[serde(default, skip_serializing_if = "Option::is_none")]
411    pub dispatch_buffer_capacity: Option<usize>,
412    /// Dispatch mode for this query (default: Channel)
413    #[serde(default, skip_serializing_if = "Option::is_none")]
414    pub dispatch_mode: Option<DispatchMode>,
415    /// Storage backend for this query (default: in-memory)
416    /// Can reference a named backend or provide inline configuration
417    #[serde(skip_serializing_if = "Option::is_none")]
418    pub storage_backend: Option<StorageBackendRef>,
419}
420
421/// Synthetic join configuration for queries
422///
423/// `QueryJoinConfig` defines a virtual relationship between node types from different
424/// sources. This allows queries to join data without requiring physical relationships
425/// in the source systems.
426///
427/// # Join Semantics
428///
429/// Joins create synthetic edges by matching property values across nodes. The `id`
430/// field specifies the relationship type used in the query's MATCH pattern, and `keys`
431/// define which properties to match.
432///
433/// # Examples
434///
435/// ## Simple Join on Single Property
436///
437/// ```yaml
438/// joins:
439///   - id: CUSTOMER              # Use in query: MATCH (o:Order)-[:CUSTOMER]->(c:Customer)
440///     keys:
441///       - label: Order
442///         property: customer_id  # Order.customer_id
443///       - label: Customer
444///         property: id           # Customer.id
445///       # Creates edge when Order.customer_id == Customer.id
446/// ```
447///
448/// ## Multi-Source Join
449///
450/// ```yaml
451/// joins:
452///   - id: ASSIGNED_TO
453///     keys:
454///       - label: Task
455///         property: assignee_id
456///       - label: User
457///         property: user_id
458/// ```
459#[derive(Debug, Clone, Serialize, Deserialize)]
460pub struct QueryJoinConfig {
461    /// Unique identifier for the join (should match relationship type in query)
462    pub id: String,
463    /// Keys defining the join relationship
464    pub keys: Vec<QueryJoinKeyConfig>,
465}
466
467/// Join key specification for synthetic joins
468///
469/// `QueryJoinKeyConfig` specifies one side of a join condition by identifying
470/// a node label and the property to use for matching.
471///
472/// # Example
473///
474/// For joining orders to customers:
475///
476/// ```yaml
477/// keys:
478///   - label: Order
479///     property: customer_id
480///   - label: Customer
481///     property: id
482/// ```
483///
484/// This creates an edge when `Order.customer_id == Customer.id`.
485#[derive(Debug, Clone, Serialize, Deserialize)]
486pub struct QueryJoinKeyConfig {
487    /// Node label to match
488    pub label: String,
489    /// Property to use for joining
490    pub property: String,
491}
492
493impl DrasiLibConfig {
494    /// Validate configuration consistency and references
495    ///
496    /// Performs comprehensive validation checks:
497    /// - Ensures all query IDs are unique
498    /// - Validates storage backend configurations
499    ///
500    /// Note: Source and reaction validation happens at runtime when instances are added,
501    /// since drasi-lib has no knowledge of specific plugin configurations.
502    ///
503    /// # Errors
504    ///
505    /// Returns error if validation fails with a description of the problem.
506    pub fn validate(&self) -> Result<()> {
507        // Validate unique query ids
508        let mut query_ids = std::collections::HashSet::new();
509        for query in &self.queries {
510            if !query_ids.insert(&query.id) {
511                return Err(anyhow::anyhow!("Duplicate query id: '{}'", query.id));
512            }
513        }
514
515        // Validate unique storage backend ids
516        let mut storage_backend_ids = std::collections::HashSet::new();
517        for backend in &self.storage_backends {
518            if !storage_backend_ids.insert(&backend.id) {
519                return Err(anyhow::anyhow!(
520                    "Duplicate storage backend id: '{}'",
521                    backend.id
522                ));
523            }
524            // Validate backend configuration
525            backend.spec.validate().map_err(|e| {
526                anyhow::anyhow!(
527                    "Storage backend '{}' has invalid configuration: {}",
528                    backend.id,
529                    e
530                )
531            })?;
532        }
533
534        // Validate storage backend references in queries
535        for query in &self.queries {
536            if let Some(backend_ref) = &query.storage_backend {
537                match backend_ref {
538                    StorageBackendRef::Named(backend_id) => {
539                        if !storage_backend_ids.contains(backend_id) {
540                            return Err(anyhow::anyhow!(
541                                "Query '{}' references unknown storage backend: '{}'",
542                                query.id,
543                                backend_id
544                            ));
545                        }
546                    }
547                    StorageBackendRef::Inline(spec) => {
548                        // Validate inline backend configuration
549                        spec.validate().map_err(|e| {
550                            anyhow::anyhow!(
551                                "Query '{}' has invalid inline storage backend configuration: {}",
552                                query.id,
553                                e
554                            )
555                        })?;
556                    }
557                }
558            }
559        }
560
561        Ok(())
562    }
563}
564
565fn default_id() -> String {
566    uuid::Uuid::new_v4().to_string()
567}
568
569fn default_auto_start() -> bool {
570    true
571}
572
573fn default_enable_bootstrap() -> bool {
574    true
575}
576
577fn default_bootstrap_buffer_size() -> usize {
578    10000
579}
580
581// Conversion implementations for QueryJoin types
582impl From<QueryJoinKeyConfig> for drasi_core::models::QueryJoinKey {
583    fn from(config: QueryJoinKeyConfig) -> Self {
584        drasi_core::models::QueryJoinKey {
585            label: config.label,
586            property: config.property,
587        }
588    }
589}
590
591impl From<QueryJoinConfig> for drasi_core::models::QueryJoin {
592    fn from(config: QueryJoinConfig) -> Self {
593        drasi_core::models::QueryJoin {
594            id: config.id,
595            keys: config.keys.into_iter().map(|k| k.into()).collect(),
596        }
597    }
598}