Skip to main content

drasi_lib/config/
schema.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Result;
16use serde::{Deserialize, Serialize};
17use std::collections::HashSet;
18
19use crate::channels::DispatchMode;
20use crate::indexes::{StorageBackendConfig, StorageBackendRef};
21use crate::recovery::RecoveryPolicy;
22use drasi_core::models::SourceMiddlewareConfig;
23
24/// Query language for continuous queries
25///
26/// Drasi supports two query languages for continuous query processing:
27///
28/// # Query Languages
29///
30/// - **Cypher**: Default graph query language with pattern matching
31/// - **GQL**: GraphQL-style queries compiled to Cypher
32///
33/// # Default Behavior
34///
35/// If not specified, queries default to `QueryLanguage::Cypher`.
36///
37/// # Examples
38///
39/// ## Using Cypher (Default)
40///
41/// ```yaml
42/// queries:
43///   - id: active_orders
44///     query: "MATCH (o:Order) WHERE o.status = 'active' RETURN o"
45///     queryLanguage: Cypher  # Optional, this is the default
46///     sources: [orders_db]
47/// ```
48///
49/// ## Using GQL
50///
51/// ```yaml
52/// queries:
53///   - id: user_data
54///     query: |
55///       {
56///         users(status: "active") {
57///           id
58///           name
59///           email
60///         }
61///       }
62///     queryLanguage: GQL
63///     sources: [users_db]
64/// ```
65///
66/// # Important Limitations
67///
68/// **Unsupported Clauses**: ORDER BY, TOP, and LIMIT clauses are not supported in continuous
69/// queries as they conflict with incremental result computation.
70#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
71pub enum QueryLanguage {
72    #[default]
73    Cypher,
74    GQL,
75}
76
77/// Source subscription configuration for queries
78///
79/// `SourceSubscriptionConfig` defines how a query subscribes to a specific source,
80/// including any middleware pipeline to apply to changes from that source.
81///
82/// # Fields
83///
84/// - **source_id**: ID of the source to subscribe to
85/// - **nodes**: Optional list of node labels to subscribe to from this source
86/// - **relations**: Optional list of relation labels to subscribe to from this source
87/// - **pipeline**: Optional list of middleware IDs to apply to changes from this source
88///
89/// # Examples
90///
91/// ## Simple Subscription (No Pipeline)
92///
93/// ```yaml
94/// source_subscriptions:
95///   - source_id: orders_db
96///     pipeline: []
97/// ```
98///
99/// ## Subscription with Middleware Pipeline
100///
101/// ```yaml
102/// source_subscriptions:
103///   - source_id: raw_events
104///     pipeline: [decoder, mapper, validator]
105/// ```
106///
107/// ## Subscription with Label Filtering
108///
109/// ```yaml
110/// source_subscriptions:
111///   - source_id: orders_db
112///     nodes: [Order, Customer]
113///     relations: [PLACED_BY]
114///     pipeline: []
115/// ```
116#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct SourceSubscriptionConfig {
118    pub source_id: String,
119    #[serde(default)]
120    pub nodes: Vec<String>,
121    #[serde(default)]
122    pub relations: Vec<String>,
123    #[serde(default)]
124    pub pipeline: Vec<String>,
125}
126
127/// Settings passed to a source when subscribing
128///
129/// `SourceSubscriptionSettings` contains all the information a source needs to
130/// intelligently handle bootstrap and subscription for a query, including the
131/// specific node and relation labels the query is interested in.
132///
133/// # Fields
134///
135/// - **source_id**: ID of the source
136/// - **enable_bootstrap**: Whether to request initial data
137/// - **query_id**: ID of the subscribing query
138/// - **nodes**: Set of node labels the query is interested in from this source
139/// - **relations**: Set of relation labels the query is interested in from this source
140///
141/// # Example
142///
143/// ```ignore
144/// use drasi_lib::config::SourceSubscriptionSettings;
145/// use std::collections::HashSet;
146///
147/// let settings = SourceSubscriptionSettings {
148///     source_id: "orders_db".to_string(),
149///     enable_bootstrap: true,
150///     query_id: "my-query".to_string(),
151///     nodes: ["Order", "Customer"].iter().map(|s| s.to_string()).collect(),
152///     relations: ["PLACED_BY"].iter().map(|s| s.to_string()).collect(),
153///     resume_from: None,
154///     request_position_handle: false,
155/// };
156/// ```
157#[derive(Debug, Clone)]
158pub struct SourceSubscriptionSettings {
159    pub source_id: String,
160    pub enable_bootstrap: bool,
161    pub query_id: String,
162    pub nodes: HashSet<String>,
163    pub relations: HashSet<String>,
164    /// If set, the subscribing query requests events replayed from this sequence position.
165    /// Only meaningful when the source returns `supports_replay() == true`.
166    pub resume_from: Option<u64>,
167    /// If true, the query requests a shared `Arc<AtomicU64>` position handle in the
168    /// `SubscriptionResponse` for reporting its durably-processed position back to the source.
169    pub request_position_handle: bool,
170}
171
172/// Root configuration for drasi-lib
173///
174/// `DrasiLibConfig` is the top-level configuration structure for DrasiLib.
175/// It defines server settings, queries, and storage backends.
176///
177/// # Plugin Architecture
178///
179/// **Important**: drasi-lib has ZERO awareness of which plugins exist. Sources and
180/// reactions are passed as owned instances via `with_source()` and `with_reaction()`
181/// on the builder. Only queries can be configured via the builder.
182///
183/// # Configuration Structure
184///
185/// A typical configuration has these sections:
186///
187/// 1. **id**: Unique server identifier (optional, defaults to UUID)
188/// 2. **priority_queue_capacity**: Default capacity for event queues (optional)
189/// 3. **dispatch_buffer_capacity**: Default capacity for dispatch buffers (optional)
190/// 4. **storage_backends**: Storage backend definitions (optional)
191/// 5. **queries**: Continuous queries to process data
192///
193/// # Capacity Settings
194///
195/// - **priority_queue_capacity**: Default capacity for timestamp-ordered event queues in
196///   queries and reactions. Higher values support more out-of-order events but consume
197///   more memory. Default: 10000
198///
199/// - **dispatch_buffer_capacity**: Default capacity for event dispatch channels between
200///   components (sources → queries, queries → reactions). Higher values improve throughput
201///   under load but consume more memory. Default: 1000
202///
203/// Individual components can override these defaults by setting their own capacity values.
204///
205/// # Thread Safety
206///
207/// This struct is `Clone` and can be safely shared across threads.
208///
209/// # Usage
210///
211/// Use `DrasiLib::builder()` to create instances:
212///
213/// ```no_run
214/// use drasi_lib::{DrasiLib, Query};
215///
216/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
217/// let core = DrasiLib::builder()
218///     .with_id("my-server")
219///     .with_query(
220///         Query::cypher("my-query")
221///             .query("MATCH (n) RETURN n")
222///             .from_source("my-source")
223///             .build()
224///     )
225///     .build()
226///     .await?;
227/// # Ok(())
228/// # }
229/// ```
230///
231/// # Validation
232///
233/// Call [`validate()`](DrasiLibConfig::validate) to check:
234/// - Unique query IDs
235/// - Valid storage backend references
236///
237/// Note: Source and reaction validation happens at runtime when instances are added.
238#[derive(Debug, Clone, Serialize, Deserialize)]
239pub struct DrasiLibConfig {
240    /// Unique identifier for this DrasiLib instance (defaults to UUID)
241    #[serde(default = "default_id")]
242    pub id: String,
243    /// Default priority queue capacity for queries and reactions (default: 10000 if not specified)
244    #[serde(default, skip_serializing_if = "Option::is_none")]
245    pub priority_queue_capacity: Option<usize>,
246    /// Default dispatch buffer capacity for sources and queries (default: 1000 if not specified)
247    #[serde(default, skip_serializing_if = "Option::is_none")]
248    pub dispatch_buffer_capacity: Option<usize>,
249    /// Global storage backend definitions that can be referenced by queries
250    #[serde(default)]
251    pub storage_backends: Vec<StorageBackendConfig>,
252    /// Query configurations
253    #[serde(default)]
254    pub queries: Vec<QueryConfig>,
255}
256
257impl Default for DrasiLibConfig {
258    fn default() -> Self {
259        Self {
260            id: default_id(),
261            priority_queue_capacity: None,
262            dispatch_buffer_capacity: None,
263            storage_backends: Vec::new(),
264            queries: Vec::new(),
265        }
266    }
267}
268
269/// Configuration for a continuous query
270///
271/// `QueryConfig` defines a continuous query that processes data changes from sources
272/// and emits incremental result updates. Queries subscribe to one or more sources and
273/// maintain materialized views that update automatically as data changes.
274///
275/// # Query Languages
276///
277/// Queries can be written in either:
278/// - **Cypher**: Default graph pattern matching language
279/// - **GQL**: GraphQL-style queries (compiled to Cypher)
280///
281/// **Important**: ORDER BY, TOP, and LIMIT clauses are not supported in continuous
282/// queries as they conflict with incremental result computation.
283///
284/// # Bootstrap Processing
285///
286/// - **enableBootstrap**: Controls whether the query processes initial data (default: true)
287/// - **bootstrapBufferSize**: Event buffer size during bootstrap phase (default: 10000)
288///
289/// During bootstrap, events are buffered to maintain ordering while initial data loads.
290/// After bootstrap completes, queries switch to incremental processing mode.
291///
292/// # Synthetic Joins
293///
294/// Queries can define synthetic relationships between node types from different sources
295/// via the `joins` field. This creates virtual edges based on property equality without
296/// requiring physical relationships in the source data.
297///
298/// # Configuration Fields
299///
300/// - **id**: Unique identifier (referenced by reactions)
301/// - **query**: Query string in specified language
302/// - **queryLanguage**: Cypher or GQL (default: Cypher)
303/// - **sources**: Source IDs to subscribe to
304/// - **auto_start**: Start automatically (default: true)
305/// - **joins**: Optional synthetic join definitions
306/// - **enableBootstrap**: Process initial data (default: true)
307/// - **bootstrapBufferSize**: Buffer size during bootstrap (default: 10000)
308/// - **priority_queue_capacity**: Out-of-order event queue size (overrides global)
309/// - **dispatch_buffer_capacity**: Output buffer size (overrides global)
310/// - **dispatch_mode**: Broadcast or Channel routing
311///
312/// # Examples
313///
314/// ## Basic Cypher Query
315///
316/// ```yaml
317/// queries:
318///   - id: active_orders
319///     query: "MATCH (o:Order) WHERE o.status = 'active' RETURN o"
320///     queryLanguage: Cypher  # Optional, this is default
321///     sources: [orders_db]
322///     auto_start: true
323///     enableBootstrap: true
324///     bootstrapBufferSize: 10000
325/// ```
326///
327/// ## Query with Multiple Sources
328///
329/// ```yaml
330/// queries:
331///   - id: order_customer_join
332///     query: |
333///       MATCH (o:Order)-[:BELONGS_TO]->(c:Customer)
334///       WHERE o.status = 'active'
335///       RETURN o, c
336///     sources: [orders_db, customers_db]
337/// ```
338///
339/// ## Query with Synthetic Joins
340///
341/// ```yaml
342/// queries:
343///   - id: synthetic_join_query
344///     query: |
345///       MATCH (o:Order)-[:CUSTOMER]->(c:Customer)
346///       RETURN o.id, c.name
347///     sources: [orders_db, customers_db]
348///     joins:
349///       - id: CUSTOMER              # Relationship type in query
350///         keys:
351///           - label: Order
352///             property: customer_id
353///           - label: Customer
354///             property: id
355/// ```
356///
357/// ## High-Throughput Query
358///
359/// ```yaml
360/// queries:
361///   - id: high_volume_processing
362///     query: "MATCH (n:Event) WHERE n.timestamp > timestamp() - 60000 RETURN n"
363///     sources: [event_stream]
364///     priority_queue_capacity: 100000  # Large queue for many out-of-order events
365///     dispatch_buffer_capacity: 10000  # Large output buffer
366///     bootstrapBufferSize: 50000       # Large bootstrap buffer
367/// ```
368///
369/// ## GQL Query
370///
371/// ```yaml
372/// queries:
373///   - id: gql_users
374///     query: |
375///       {
376///         users(status: "active") {
377///           id
378///           name
379///           email
380///         }
381///       }
382///     queryLanguage: GQL
383///     sources: [users_db]
384/// ```
385#[derive(Debug, Clone, Serialize, Deserialize)]
386pub struct QueryConfig {
387    /// Unique identifier for the query
388    pub id: String,
389    /// Query string (Cypher or GQL depending on query_language)
390    pub query: String,
391    /// Query language to use (default: Cypher)
392    #[serde(default, rename = "queryLanguage")]
393    pub query_language: QueryLanguage,
394    /// Middleware configurations for this query
395    #[serde(default)]
396    pub middleware: Vec<SourceMiddlewareConfig>,
397    /// Source subscriptions with optional middleware pipelines
398    #[serde(default)]
399    pub sources: Vec<SourceSubscriptionConfig>,
400    /// Whether to automatically start this query (default: true)
401    #[serde(default = "default_auto_start")]
402    pub auto_start: bool,
403    /// Optional synthetic joins for the query
404    #[serde(skip_serializing_if = "Option::is_none")]
405    pub joins: Option<Vec<QueryJoinConfig>>,
406    /// Whether to enable bootstrap (default: true)
407    #[serde(default = "default_enable_bootstrap", rename = "enableBootstrap")]
408    pub enable_bootstrap: bool,
409    /// Maximum number of events to buffer during bootstrap (default: 10000)
410    #[serde(
411        default = "default_bootstrap_buffer_size",
412        rename = "bootstrapBufferSize"
413    )]
414    pub bootstrap_buffer_size: usize,
415    /// Priority queue capacity for this query (default: server global, or 10000 if not specified)
416    #[serde(default, skip_serializing_if = "Option::is_none")]
417    pub priority_queue_capacity: Option<usize>,
418    /// Dispatch buffer capacity for this query (default: server global, or 1000 if not specified)
419    #[serde(default, skip_serializing_if = "Option::is_none")]
420    pub dispatch_buffer_capacity: Option<usize>,
421    /// Dispatch mode for this query (default: Channel)
422    #[serde(default, skip_serializing_if = "Option::is_none")]
423    pub dispatch_mode: Option<DispatchMode>,
424    /// Storage backend for this query (default: in-memory)
425    /// Can reference a named backend or provide inline configuration
426    #[serde(skip_serializing_if = "Option::is_none")]
427    pub storage_backend: Option<StorageBackendRef>,
428    /// Recovery policy when a source cannot honor a requested resume position.
429    /// `None` inherits the global default (itself defaulting to `Strict`).
430    /// See [`RecoveryPolicy`](crate::RecoveryPolicy).
431    #[serde(
432        default,
433        skip_serializing_if = "Option::is_none",
434        rename = "recoveryPolicy"
435    )]
436    pub recovery_policy: Option<RecoveryPolicy>,
437}
438
439/// Synthetic join configuration for queries
440///
441/// `QueryJoinConfig` defines a virtual relationship between node types from different
442/// sources. This allows queries to join data without requiring physical relationships
443/// in the source systems.
444///
445/// # Join Semantics
446///
447/// Joins create synthetic edges by matching property values across nodes. The `id`
448/// field specifies the relationship type used in the query's MATCH pattern, and `keys`
449/// define which properties to match.
450///
451/// # Examples
452///
453/// ## Simple Join on Single Property
454///
455/// ```yaml
456/// joins:
457///   - id: CUSTOMER              # Use in query: MATCH (o:Order)-[:CUSTOMER]->(c:Customer)
458///     keys:
459///       - label: Order
460///         property: customer_id  # Order.customer_id
461///       - label: Customer
462///         property: id           # Customer.id
463///       # Creates edge when Order.customer_id == Customer.id
464/// ```
465///
466/// ## Multi-Source Join
467///
468/// ```yaml
469/// joins:
470///   - id: ASSIGNED_TO
471///     keys:
472///       - label: Task
473///         property: assignee_id
474///       - label: User
475///         property: user_id
476/// ```
477#[derive(Debug, Clone, Serialize, Deserialize)]
478pub struct QueryJoinConfig {
479    /// Unique identifier for the join (should match relationship type in query)
480    pub id: String,
481    /// Keys defining the join relationship
482    pub keys: Vec<QueryJoinKeyConfig>,
483}
484
485/// Join key specification for synthetic joins
486///
487/// `QueryJoinKeyConfig` specifies one side of a join condition by identifying
488/// a node label and the property to use for matching.
489///
490/// # Example
491///
492/// For joining orders to customers:
493///
494/// ```yaml
495/// keys:
496///   - label: Order
497///     property: customer_id
498///   - label: Customer
499///     property: id
500/// ```
501///
502/// This creates an edge when `Order.customer_id == Customer.id`.
503#[derive(Debug, Clone, Serialize, Deserialize)]
504pub struct QueryJoinKeyConfig {
505    /// Node label to match
506    pub label: String,
507    /// Property to use for joining
508    pub property: String,
509}
510
511impl DrasiLibConfig {
512    /// Validate configuration consistency and references
513    ///
514    /// Performs comprehensive validation checks:
515    /// - Ensures all query IDs are unique
516    /// - Validates storage backend configurations
517    ///
518    /// Note: Source and reaction validation happens at runtime when instances are added,
519    /// since drasi-lib has no knowledge of specific plugin configurations.
520    ///
521    /// # Errors
522    ///
523    /// Returns error if validation fails with a description of the problem.
524    pub fn validate(&self) -> Result<()> {
525        // Validate unique query ids
526        let mut query_ids = std::collections::HashSet::new();
527        for query in &self.queries {
528            if !query_ids.insert(&query.id) {
529                return Err(anyhow::anyhow!("Duplicate query id: '{}'", query.id));
530            }
531        }
532
533        // Validate unique storage backend ids
534        let mut storage_backend_ids = std::collections::HashSet::new();
535        for backend in &self.storage_backends {
536            if !storage_backend_ids.insert(&backend.id) {
537                return Err(anyhow::anyhow!(
538                    "Duplicate storage backend id: '{}'",
539                    backend.id
540                ));
541            }
542            // Validate backend configuration
543            backend.spec.validate().map_err(|e| {
544                anyhow::anyhow!(
545                    "Storage backend '{}' has invalid configuration: {}",
546                    backend.id,
547                    e
548                )
549            })?;
550        }
551
552        // Validate storage backend references in queries
553        for query in &self.queries {
554            if let Some(backend_ref) = &query.storage_backend {
555                match backend_ref {
556                    StorageBackendRef::Named(backend_id) => {
557                        if !storage_backend_ids.contains(backend_id) {
558                            return Err(anyhow::anyhow!(
559                                "Query '{}' references unknown storage backend: '{}'",
560                                query.id,
561                                backend_id
562                            ));
563                        }
564                    }
565                    StorageBackendRef::Inline(spec) => {
566                        // Validate inline backend configuration
567                        spec.validate().map_err(|e| {
568                            anyhow::anyhow!(
569                                "Query '{}' has invalid inline storage backend configuration: {}",
570                                query.id,
571                                e
572                            )
573                        })?;
574                    }
575                }
576            }
577        }
578
579        Ok(())
580    }
581}
582
583fn default_id() -> String {
584    uuid::Uuid::new_v4().to_string()
585}
586
587fn default_auto_start() -> bool {
588    true
589}
590
591fn default_enable_bootstrap() -> bool {
592    true
593}
594
595fn default_bootstrap_buffer_size() -> usize {
596    10000
597}
598
599// Conversion implementations for QueryJoin types
600impl From<QueryJoinKeyConfig> for drasi_core::models::QueryJoinKey {
601    fn from(config: QueryJoinKeyConfig) -> Self {
602        drasi_core::models::QueryJoinKey {
603            label: config.label,
604            property: config.property,
605        }
606    }
607}
608
609impl From<QueryJoinConfig> for drasi_core::models::QueryJoin {
610    fn from(config: QueryJoinConfig) -> Self {
611        drasi_core::models::QueryJoin {
612            id: config.id,
613            keys: config.keys.into_iter().map(|k| k.into()).collect(),
614        }
615    }
616}