Skip to main content

drasi_lib/config/
schema.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Result;
16use bytes::Bytes;
17use serde::{Deserialize, Serialize};
18use std::collections::HashSet;
19
20use crate::channels::DispatchMode;
21use crate::indexes::{StorageBackendConfig, StorageBackendRef};
22use crate::recovery::RecoveryPolicy;
23use drasi_core::models::SourceMiddlewareConfig;
24
25/// Query language for continuous queries
26///
27/// Drasi supports two query languages for continuous query processing:
28///
29/// # Query Languages
30///
31/// - **Cypher**: Default graph query language with pattern matching
32/// - **GQL**: GraphQL-style queries compiled to Cypher
33///
34/// # Default Behavior
35///
36/// If not specified, queries default to `QueryLanguage::Cypher`.
37///
38/// # Examples
39///
40/// ## Using Cypher (Default)
41///
42/// ```yaml
43/// queries:
44///   - id: active_orders
45///     query: "MATCH (o:Order) WHERE o.status = 'active' RETURN o"
46///     queryLanguage: Cypher  # Optional, this is the default
47///     sources: [orders_db]
48/// ```
49///
50/// ## Using GQL
51///
52/// ```yaml
53/// queries:
54///   - id: user_data
55///     query: |
56///       {
57///         users(status: "active") {
58///           id
59///           name
60///           email
61///         }
62///       }
63///     queryLanguage: GQL
64///     sources: [users_db]
65/// ```
66///
67/// # Important Limitations
68///
69/// **Unsupported Clauses**: ORDER BY, TOP, and LIMIT clauses are not supported in continuous
70/// queries as they conflict with incremental result computation.
71#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
72pub enum QueryLanguage {
73    #[default]
74    Cypher,
75    GQL,
76}
77
78/// Source subscription configuration for queries
79///
80/// `SourceSubscriptionConfig` defines how a query subscribes to a specific source,
81/// including any middleware pipeline to apply to changes from that source.
82///
83/// # Fields
84///
85/// - **source_id**: ID of the source to subscribe to
86/// - **nodes**: Optional list of node labels to subscribe to from this source
87/// - **relations**: Optional list of relation labels to subscribe to from this source
88/// - **pipeline**: Optional list of middleware IDs to apply to changes from this source
89///
90/// # Examples
91///
92/// ## Simple Subscription (No Pipeline)
93///
94/// ```yaml
95/// source_subscriptions:
96///   - source_id: orders_db
97///     pipeline: []
98/// ```
99///
100/// ## Subscription with Middleware Pipeline
101///
102/// ```yaml
103/// source_subscriptions:
104///   - source_id: raw_events
105///     pipeline: [decoder, mapper, validator]
106/// ```
107///
108/// ## Subscription with Label Filtering
109///
110/// ```yaml
111/// source_subscriptions:
112///   - source_id: orders_db
113///     nodes: [Order, Customer]
114///     relations: [PLACED_BY]
115///     pipeline: []
116/// ```
117#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct SourceSubscriptionConfig {
119    pub source_id: String,
120    #[serde(default)]
121    pub nodes: Vec<String>,
122    #[serde(default)]
123    pub relations: Vec<String>,
124    #[serde(default)]
125    pub pipeline: Vec<String>,
126}
127
128/// Settings passed to a source when subscribing
129///
130/// `SourceSubscriptionSettings` contains all the information a source needs to
131/// intelligently handle bootstrap and subscription for a query, including the
132/// specific node and relation labels the query is interested in.
133///
134/// # Fields
135///
136/// - **source_id**: ID of the source
137/// - **enable_bootstrap**: Whether to request initial data
138/// - **query_id**: ID of the subscribing query
139/// - **nodes**: Set of node labels the query is interested in from this source
140/// - **relations**: Set of relation labels the query is interested in from this source
141///
142/// # Example
143///
144/// ```ignore
145/// use drasi_lib::config::SourceSubscriptionSettings;
146/// use std::collections::HashSet;
147///
148/// let settings = SourceSubscriptionSettings {
149///     source_id: "orders_db".to_string(),
150///     enable_bootstrap: true,
151///     query_id: "my-query".to_string(),
152///     nodes: ["Order", "Customer"].iter().map(|s| s.to_string()).collect(),
153///     relations: ["PLACED_BY"].iter().map(|s| s.to_string()).collect(),
154///     resume_from: None,
155///     request_position_handle: false,
156///     last_sequence: None,
157/// };
158/// ```
159#[derive(Debug, Clone)]
160pub struct SourceSubscriptionSettings {
161    pub source_id: String,
162    pub enable_bootstrap: bool,
163    pub query_id: String,
164    pub nodes: HashSet<String>,
165    pub relations: HashSet<String>,
166    /// If set, the subscribing query requests events replayed from this source position.
167    /// Contains the opaque position bytes that the source interprets to seek its change stream.
168    /// Only meaningful when the source returns `supports_replay() == true`.
169    pub resume_from: Option<Bytes>,
170    /// If true, the query requests a shared `Arc<AtomicU64>` position handle in the
171    /// `SubscriptionResponse` for reporting its durably-processed position back to the source.
172    pub request_position_handle: bool,
173    /// The last sequence number processed by this query before shutdown.
174    /// Sources use this to ensure their sequence counter continues from where it left off,
175    /// maintaining monotonicity across restarts.
176    pub last_sequence: Option<u64>,
177}
178
179/// Root configuration for drasi-lib
180///
181/// `DrasiLibConfig` is the top-level configuration structure for DrasiLib.
182/// It defines server settings, queries, and storage backends.
183///
184/// # Plugin Architecture
185///
186/// **Important**: drasi-lib has ZERO awareness of which plugins exist. Sources and
187/// reactions are passed as owned instances via `with_source()` and `with_reaction()`
188/// on the builder. Only queries can be configured via the builder.
189///
190/// # Configuration Structure
191///
192/// A typical configuration has these sections:
193///
194/// 1. **id**: Unique server identifier (optional, defaults to UUID)
195/// 2. **priority_queue_capacity**: Default capacity for event queues (optional)
196/// 3. **dispatch_buffer_capacity**: Default capacity for dispatch buffers (optional)
197/// 4. **storage_backends**: Storage backend definitions (optional)
198/// 5. **queries**: Continuous queries to process data
199///
200/// # Capacity Settings
201///
202/// - **priority_queue_capacity**: Default capacity for timestamp-ordered event queues in
203///   queries and reactions. Higher values support more out-of-order events but consume
204///   more memory. Default: 10000
205///
206/// - **dispatch_buffer_capacity**: Default capacity for event dispatch channels between
207///   components (sources → queries, queries → reactions). Higher values improve throughput
208///   under load but consume more memory. Default: 1000
209///
210/// Individual components can override these defaults by setting their own capacity values.
211///
212/// # Thread Safety
213///
214/// This struct is `Clone` and can be safely shared across threads.
215///
216/// # Usage
217///
218/// Use `DrasiLib::builder()` to create instances:
219///
220/// ```no_run
221/// use drasi_lib::{DrasiLib, Query};
222///
223/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
224/// let core = DrasiLib::builder()
225///     .with_id("my-server")
226///     .with_query(
227///         Query::cypher("my-query")
228///             .query("MATCH (n) RETURN n")
229///             .from_source("my-source")
230///             .build()
231///     )
232///     .build()
233///     .await?;
234/// # Ok(())
235/// # }
236/// ```
237///
238/// # Validation
239///
240/// Call [`validate()`](DrasiLibConfig::validate) to check:
241/// - Unique query IDs
242/// - Valid storage backend references
243///
244/// Note: Source and reaction validation happens at runtime when instances are added.
245#[derive(Debug, Clone, Serialize, Deserialize)]
246pub struct DrasiLibConfig {
247    /// Unique identifier for this DrasiLib instance (defaults to UUID)
248    #[serde(default = "default_id")]
249    pub id: String,
250    /// Default priority queue capacity for queries and reactions (default: 10000 if not specified)
251    #[serde(default, skip_serializing_if = "Option::is_none")]
252    pub priority_queue_capacity: Option<usize>,
253    /// Default dispatch buffer capacity for sources and queries (default: 1000 if not specified)
254    #[serde(default, skip_serializing_if = "Option::is_none")]
255    pub dispatch_buffer_capacity: Option<usize>,
256    /// Global storage backend definitions that can be referenced by queries
257    #[serde(default)]
258    pub storage_backends: Vec<StorageBackendConfig>,
259    /// Query configurations
260    #[serde(default)]
261    pub queries: Vec<QueryConfig>,
262}
263
264impl Default for DrasiLibConfig {
265    fn default() -> Self {
266        Self {
267            id: default_id(),
268            priority_queue_capacity: None,
269            dispatch_buffer_capacity: None,
270            storage_backends: Vec::new(),
271            queries: Vec::new(),
272        }
273    }
274}
275
276/// Configuration for a continuous query
277///
278/// `QueryConfig` defines a continuous query that processes data changes from sources
279/// and emits incremental result updates. Queries subscribe to one or more sources and
280/// maintain materialized views that update automatically as data changes.
281///
282/// # Query Languages
283///
284/// Queries can be written in either:
285/// - **Cypher**: Default graph pattern matching language
286/// - **GQL**: GraphQL-style queries (compiled to Cypher)
287///
288/// **Important**: ORDER BY, TOP, and LIMIT clauses are not supported in continuous
289/// queries as they conflict with incremental result computation.
290///
291/// # Bootstrap Processing
292///
293/// - **enableBootstrap**: Controls whether the query processes initial data (default: true)
294/// - **bootstrapBufferSize**: Event buffer size during bootstrap phase (default: 10000)
295///
296/// During bootstrap, events are buffered to maintain ordering while initial data loads.
297/// After bootstrap completes, queries switch to incremental processing mode.
298///
299/// # Synthetic Joins
300///
301/// Queries can define synthetic relationships between node types from different sources
302/// via the `joins` field. This creates virtual edges based on property equality without
303/// requiring physical relationships in the source data.
304///
305/// # Configuration Fields
306///
307/// - **id**: Unique identifier (referenced by reactions)
308/// - **query**: Query string in specified language
309/// - **queryLanguage**: Cypher or GQL (default: Cypher)
310/// - **sources**: Source IDs to subscribe to
311/// - **auto_start**: Start automatically (default: true)
312/// - **joins**: Optional synthetic join definitions
313/// - **enableBootstrap**: Process initial data (default: true)
314/// - **bootstrapBufferSize**: Buffer size during bootstrap (default: 10000)
315/// - **priority_queue_capacity**: Out-of-order event queue size (overrides global)
316/// - **dispatch_buffer_capacity**: Output buffer size (overrides global)
317/// - **dispatch_mode**: Broadcast or Channel routing
318///
319/// # Examples
320///
321/// ## Basic Cypher Query
322///
323/// ```yaml
324/// queries:
325///   - id: active_orders
326///     query: "MATCH (o:Order) WHERE o.status = 'active' RETURN o"
327///     queryLanguage: Cypher  # Optional, this is default
328///     sources: [orders_db]
329///     auto_start: true
330///     enableBootstrap: true
331///     bootstrapBufferSize: 10000
332/// ```
333///
334/// ## Query with Multiple Sources
335///
336/// ```yaml
337/// queries:
338///   - id: order_customer_join
339///     query: |
340///       MATCH (o:Order)-[:BELONGS_TO]->(c:Customer)
341///       WHERE o.status = 'active'
342///       RETURN o, c
343///     sources: [orders_db, customers_db]
344/// ```
345///
346/// ## Query with Synthetic Joins
347///
348/// ```yaml
349/// queries:
350///   - id: synthetic_join_query
351///     query: |
352///       MATCH (o:Order)-[:CUSTOMER]->(c:Customer)
353///       RETURN o.id, c.name
354///     sources: [orders_db, customers_db]
355///     joins:
356///       - id: CUSTOMER              # Relationship type in query
357///         keys:
358///           - label: Order
359///             property: customer_id
360///           - label: Customer
361///             property: id
362/// ```
363///
364/// ## High-Throughput Query
365///
366/// ```yaml
367/// queries:
368///   - id: high_volume_processing
369///     query: "MATCH (n:Event) WHERE n.timestamp > timestamp() - 60000 RETURN n"
370///     sources: [event_stream]
371///     priority_queue_capacity: 100000  # Large queue for many out-of-order events
372///     dispatch_buffer_capacity: 10000  # Large output buffer
373///     bootstrapBufferSize: 50000       # Large bootstrap buffer
374/// ```
375///
376/// ## GQL Query
377///
378/// ```yaml
379/// queries:
380///   - id: gql_users
381///     query: |
382///       {
383///         users(status: "active") {
384///           id
385///           name
386///           email
387///         }
388///       }
389///     queryLanguage: GQL
390///     sources: [users_db]
391/// ```
392#[derive(Debug, Clone, Serialize, Deserialize)]
393pub struct QueryConfig {
394    /// Unique identifier for the query
395    pub id: String,
396    /// Query string (Cypher or GQL depending on query_language)
397    pub query: String,
398    /// Query language to use (default: Cypher)
399    #[serde(default, rename = "queryLanguage")]
400    pub query_language: QueryLanguage,
401    /// Middleware configurations for this query
402    #[serde(default)]
403    pub middleware: Vec<SourceMiddlewareConfig>,
404    /// Source subscriptions with optional middleware pipelines
405    #[serde(default)]
406    pub sources: Vec<SourceSubscriptionConfig>,
407    /// Whether to automatically start this query (default: true)
408    #[serde(default = "default_auto_start")]
409    pub auto_start: bool,
410    /// Optional synthetic joins for the query
411    #[serde(skip_serializing_if = "Option::is_none")]
412    pub joins: Option<Vec<QueryJoinConfig>>,
413    /// Whether to enable bootstrap (default: true)
414    #[serde(default = "default_enable_bootstrap", rename = "enableBootstrap")]
415    pub enable_bootstrap: bool,
416    /// Maximum number of events to buffer during bootstrap (default: 10000)
417    #[serde(
418        default = "default_bootstrap_buffer_size",
419        rename = "bootstrapBufferSize"
420    )]
421    pub bootstrap_buffer_size: usize,
422    /// Priority queue capacity for this query (default: server global, or 10000 if not specified)
423    #[serde(default, skip_serializing_if = "Option::is_none")]
424    pub priority_queue_capacity: Option<usize>,
425    /// Dispatch buffer capacity for this query (default: server global, or 1000 if not specified)
426    #[serde(default, skip_serializing_if = "Option::is_none")]
427    pub dispatch_buffer_capacity: Option<usize>,
428    /// Dispatch mode for this query (default: Channel)
429    #[serde(default, skip_serializing_if = "Option::is_none")]
430    pub dispatch_mode: Option<DispatchMode>,
431    /// Storage backend for this query (default: in-memory)
432    /// Can reference a named backend or provide inline configuration
433    #[serde(skip_serializing_if = "Option::is_none")]
434    pub storage_backend: Option<StorageBackendRef>,
435    /// Recovery policy when a source cannot honor a requested resume position.
436    /// `None` inherits the global default (itself defaulting to `Strict`).
437    /// See [`RecoveryPolicy`](crate::RecoveryPolicy).
438    #[serde(
439        default,
440        skip_serializing_if = "Option::is_none",
441        rename = "recoveryPolicy"
442    )]
443    pub recovery_policy: Option<RecoveryPolicy>,
444    /// Maximum number of recent QueryResult emissions retained in the outbox ring buffer.
445    /// Reactions use this buffer for gap recovery via `fetch_outbox`.
446    /// Default: 1000. Minimum: 1 (values below 1 are clamped to 1).
447    #[serde(default = "default_outbox_capacity", rename = "outboxCapacity")]
448    pub outbox_capacity: usize,
449    /// Maximum time in seconds that `fetch_snapshot` / `fetch_outbox` will wait for
450    /// the query to finish bootstrapping before returning `FetchError::TimedOut`.
451    /// Default: 300 (5 minutes).
452    #[serde(
453        default = "default_bootstrap_timeout_secs",
454        rename = "bootstrapTimeoutSecs"
455    )]
456    pub bootstrap_timeout_secs: u64,
457}
458
459/// Synthetic join configuration for queries
460///
461/// `QueryJoinConfig` defines a virtual relationship between node types from different
462/// sources. This allows queries to join data without requiring physical relationships
463/// in the source systems.
464///
465/// # Join Semantics
466///
467/// Joins create synthetic edges by matching property values across nodes. The `id`
468/// field specifies the relationship type used in the query's MATCH pattern, and `keys`
469/// define which properties to match.
470///
471/// # Examples
472///
473/// ## Simple Join on Single Property
474///
475/// ```yaml
476/// joins:
477///   - id: CUSTOMER              # Use in query: MATCH (o:Order)-[:CUSTOMER]->(c:Customer)
478///     keys:
479///       - label: Order
480///         property: customer_id  # Order.customer_id
481///       - label: Customer
482///         property: id           # Customer.id
483///       # Creates edge when Order.customer_id == Customer.id
484/// ```
485///
486/// ## Multi-Source Join
487///
488/// ```yaml
489/// joins:
490///   - id: ASSIGNED_TO
491///     keys:
492///       - label: Task
493///         property: assignee_id
494///       - label: User
495///         property: user_id
496/// ```
497#[derive(Debug, Clone, Serialize, Deserialize)]
498pub struct QueryJoinConfig {
499    /// Unique identifier for the join (should match relationship type in query)
500    pub id: String,
501    /// Keys defining the join relationship
502    pub keys: Vec<QueryJoinKeyConfig>,
503}
504
505/// Join key specification for synthetic joins
506///
507/// `QueryJoinKeyConfig` specifies one side of a join condition by identifying
508/// a node label and the property to use for matching.
509///
510/// # Example
511///
512/// For joining orders to customers:
513///
514/// ```yaml
515/// keys:
516///   - label: Order
517///     property: customer_id
518///   - label: Customer
519///     property: id
520/// ```
521///
522/// This creates an edge when `Order.customer_id == Customer.id`.
523#[derive(Debug, Clone, Serialize, Deserialize)]
524pub struct QueryJoinKeyConfig {
525    /// Node label to match
526    pub label: String,
527    /// Property to use for joining
528    pub property: String,
529}
530
531impl DrasiLibConfig {
532    /// Validate configuration consistency and references
533    ///
534    /// Performs comprehensive validation checks:
535    /// - Ensures all query IDs are unique
536    /// - Validates storage backend configurations
537    ///
538    /// Note: Source and reaction validation happens at runtime when instances are added,
539    /// since drasi-lib has no knowledge of specific plugin configurations.
540    ///
541    /// # Errors
542    ///
543    /// Returns error if validation fails with a description of the problem.
544    pub fn validate(&self) -> Result<()> {
545        // Validate unique query ids
546        let mut query_ids = std::collections::HashSet::new();
547        for query in &self.queries {
548            if !query_ids.insert(&query.id) {
549                return Err(anyhow::anyhow!("Duplicate query id: '{}'", query.id));
550            }
551        }
552
553        // Validate unique storage backend ids
554        let mut storage_backend_ids = std::collections::HashSet::new();
555        for backend in &self.storage_backends {
556            if !storage_backend_ids.insert(&backend.id) {
557                return Err(anyhow::anyhow!(
558                    "Duplicate storage backend id: '{}'",
559                    backend.id
560                ));
561            }
562            // Validate backend configuration
563            backend.spec.validate().map_err(|e| {
564                anyhow::anyhow!(
565                    "Storage backend '{}' has invalid configuration: {}",
566                    backend.id,
567                    e
568                )
569            })?;
570        }
571
572        // Validate storage backend references in queries
573        for query in &self.queries {
574            if let Some(backend_ref) = &query.storage_backend {
575                match backend_ref {
576                    StorageBackendRef::Named(backend_id) => {
577                        if !storage_backend_ids.contains(backend_id) {
578                            return Err(anyhow::anyhow!(
579                                "Query '{}' references unknown storage backend: '{}'",
580                                query.id,
581                                backend_id
582                            ));
583                        }
584                    }
585                    StorageBackendRef::Inline(spec) => {
586                        // Validate inline backend configuration
587                        spec.validate().map_err(|e| {
588                            anyhow::anyhow!(
589                                "Query '{}' has invalid inline storage backend configuration: {}",
590                                query.id,
591                                e
592                            )
593                        })?;
594                    }
595                }
596            }
597        }
598
599        Ok(())
600    }
601}
602
603fn default_id() -> String {
604    uuid::Uuid::new_v4().to_string()
605}
606
607fn default_auto_start() -> bool {
608    true
609}
610
611fn default_enable_bootstrap() -> bool {
612    true
613}
614
615fn default_bootstrap_buffer_size() -> usize {
616    10000
617}
618
619fn default_outbox_capacity() -> usize {
620    crate::queries::output_state::DEFAULT_OUTBOX_CAPACITY
621}
622
623fn default_bootstrap_timeout_secs() -> u64 {
624    300
625}
626
627// Conversion implementations for QueryJoin types
628impl From<QueryJoinKeyConfig> for drasi_core::models::QueryJoinKey {
629    fn from(config: QueryJoinKeyConfig) -> Self {
630        drasi_core::models::QueryJoinKey {
631            label: config.label,
632            property: config.property,
633        }
634    }
635}
636
637impl From<QueryJoinConfig> for drasi_core::models::QueryJoin {
638    fn from(config: QueryJoinConfig) -> Self {
639        drasi_core::models::QueryJoin {
640            id: config.id,
641            keys: config.keys.into_iter().map(|k| k.into()).collect(),
642        }
643    }
644}