Skip to main content

drasi_lib/bootstrap/
mod.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Bootstrap provider architecture for Drasi
16//!
17//! This module provides a pluggable bootstrap system that separates bootstrap
18//! concerns from source streaming logic. Bootstrap providers can be reused
19//! across different source types while maintaining access to their parent
20//! source configuration.
21//!
22//! # Included Providers
23//!
24//! - [`ComponentGraphBootstrapProvider`]: Bootstraps from the [`ComponentGraph`] snapshot
25//!   for the built-in component graph source.
26
27pub mod component_graph;
28pub use component_graph::ComponentGraphBootstrapProvider;
29
30use anyhow::Result;
31use async_trait::async_trait;
32use serde::{Deserialize, Serialize};
33use std::collections::HashMap;
34use std::sync::atomic::{AtomicU64, Ordering};
35use std::sync::Arc;
36
37/// Request for bootstrap data from a query
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct BootstrapRequest {
40    pub query_id: String,
41    pub node_labels: Vec<String>,
42    pub relation_labels: Vec<String>,
43    pub request_id: String,
44}
45
46/// Context passed to bootstrap providers
47/// Bootstrap happens through dedicated channels created in source.subscribe().
48///
49/// # Plugin Architecture
50///
51/// BootstrapContext no longer contains source configuration. Bootstrap providers
52/// are created by source plugins using their own typed configurations. This context
53/// provides only the minimal information needed during bootstrap execution:
54/// - Source identification
55/// - Sequence numbering for events
56/// - Optional properties for providers that need runtime data
57#[derive(Clone)]
58pub struct BootstrapContext {
59    /// Unique server ID for logging and tracing
60    pub server_id: String,
61    /// Source ID for labeling bootstrap events
62    pub source_id: String,
63    /// Sequence counter for bootstrap events
64    pub sequence_counter: Arc<AtomicU64>,
65    /// Optional properties that can be set by plugins if needed
66    properties: Arc<HashMap<String, serde_json::Value>>,
67}
68
69impl BootstrapContext {
70    /// Create a minimal bootstrap context with just server and source IDs
71    ///
72    /// This is the preferred constructor. Bootstrap providers should have their
73    /// own configuration - they don't need access to source config.
74    pub fn new_minimal(server_id: String, source_id: String) -> Self {
75        Self {
76            server_id,
77            source_id,
78            sequence_counter: Arc::new(AtomicU64::new(0)),
79            properties: Arc::new(HashMap::new()),
80        }
81    }
82
83    /// Create a bootstrap context with properties
84    ///
85    /// Use this if your bootstrap provider needs access to some properties
86    /// at runtime. The properties should be extracted from your plugin's
87    /// typed configuration.
88    pub fn with_properties(
89        server_id: String,
90        source_id: String,
91        properties: HashMap<String, serde_json::Value>,
92    ) -> Self {
93        Self {
94            server_id,
95            source_id,
96            sequence_counter: Arc::new(AtomicU64::new(0)),
97            properties: Arc::new(properties),
98        }
99    }
100
101    /// Get the next sequence number for bootstrap events
102    pub fn next_sequence(&self) -> u64 {
103        self.sequence_counter.fetch_add(1, Ordering::SeqCst)
104    }
105
106    /// Get a property from the context
107    pub fn get_property(&self, key: &str) -> Option<serde_json::Value> {
108        self.properties.get(key).cloned()
109    }
110
111    /// Get a typed property from the context
112    pub fn get_typed_property<T>(&self, key: &str) -> Result<Option<T>>
113    where
114        T: for<'de> Deserialize<'de>,
115    {
116        match self.get_property(key) {
117            Some(value) => Ok(Some(serde_json::from_value(value.clone())?)),
118            None => Ok(None),
119        }
120    }
121}
122
123use crate::channels::BootstrapEventSender;
124
125/// Trait for bootstrap providers that handle initial data delivery
126/// for newly subscribed queries
127#[async_trait]
128pub trait BootstrapProvider: Send + Sync {
129    /// Perform bootstrap operation for the given request
130    /// Sends bootstrap events to the provided channel
131    /// Returns the number of elements sent
132    ///
133    /// # Arguments
134    /// * `request` - Bootstrap request with query ID and labels
135    /// * `context` - Bootstrap context with source information
136    /// * `event_tx` - Channel to send bootstrap events
137    /// * `settings` - Optional subscription settings with additional query context
138    async fn bootstrap(
139        &self,
140        request: BootstrapRequest,
141        context: &BootstrapContext,
142        event_tx: BootstrapEventSender,
143        settings: Option<&crate::config::SourceSubscriptionSettings>,
144    ) -> Result<usize>;
145}
146
147/// Blanket implementation of BootstrapProvider for boxed trait objects.
148/// This allows `Box<dyn BootstrapProvider>` to be used where BootstrapProvider is expected.
149#[async_trait]
150impl BootstrapProvider for Box<dyn BootstrapProvider> {
151    async fn bootstrap(
152        &self,
153        request: BootstrapRequest,
154        context: &BootstrapContext,
155        event_tx: BootstrapEventSender,
156        settings: Option<&crate::config::SourceSubscriptionSettings>,
157    ) -> Result<usize> {
158        (**self)
159            .bootstrap(request, context, event_tx, settings)
160            .await
161    }
162}
163
164// Typed configuration structs for each bootstrap provider type
165
166/// PostgreSQL bootstrap provider configuration
167///
168/// This provider bootstraps initial data from a PostgreSQL database using
169/// the COPY protocol for efficient data transfer. The provider extracts
170/// connection details from the parent source configuration, so no additional
171/// configuration is needed here.
172///
173/// # Example
174/// ```yaml
175/// bootstrap_provider:
176///   type: postgres
177/// ```
178#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
179pub struct PostgresBootstrapConfig {
180    // No additional config needed - uses parent source config
181    // Include this struct for consistency and future extensibility
182}
183
184/// Application bootstrap provider configuration
185///
186/// This provider bootstraps data from in-memory storage maintained by
187/// application sources. It replays stored insert events to provide initial
188/// data. No additional configuration is required as it uses shared state
189/// from the application source.
190///
191/// # Example
192/// ```yaml
193/// bootstrap_provider:
194///   type: application
195/// ```
196#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
197pub struct ApplicationBootstrapConfig {
198    // No config needed - uses shared state
199    // Include for consistency and future extensibility
200}
201
202/// Script file bootstrap provider configuration
203///
204/// This provider reads bootstrap data from JSONL (JSON Lines) files containing
205/// structured data. Files are processed in the order specified. Each file should
206/// contain one JSON record per line with record types: Header (required first),
207/// Node, Relation, Comment (filtered), Label (checkpoint), and Finish (optional end).
208///
209/// # Example
210/// ```yaml
211/// bootstrap_provider:
212///   type: scriptfile
213///   file_paths:
214///     - "/data/initial_nodes.jsonl"
215///     - "/data/initial_relations.jsonl"
216/// ```
217#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
218pub struct ScriptFileBootstrapConfig {
219    /// List of JSONL files to read (in order)
220    pub file_paths: Vec<String>,
221}
222
223/// Platform bootstrap provider configuration
224///
225/// This provider bootstraps data from a Query API service running in a remote
226/// Drasi environment via HTTP streaming. It's designed for cross-Drasi integration
227/// scenarios where one Drasi instance needs initial data from another.
228///
229/// # Example
230/// ```yaml
231/// bootstrap_provider:
232///   type: platform
233///   query_api_url: "http://remote-drasi:8080" # DevSkim: ignore DS137138
234///   timeout_seconds: 600
235/// ```
236#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
237pub struct PlatformBootstrapConfig {
238    /// URL of the Query API service (e.g., "http://my-source-query-api:8080") // DevSkim: ignore DS137138
239    /// If not specified, falls back to `query_api_url` property from source config
240    #[serde(skip_serializing_if = "Option::is_none")]
241    pub query_api_url: Option<String>,
242
243    /// Timeout for HTTP requests in seconds (default: 300)
244    #[serde(default = "default_platform_timeout")]
245    pub timeout_seconds: u64,
246}
247
248fn default_platform_timeout() -> u64 {
249    300
250}
251
252impl Default for PlatformBootstrapConfig {
253    fn default() -> Self {
254        Self {
255            query_api_url: None,
256            timeout_seconds: default_platform_timeout(),
257        }
258    }
259}
260
261/// Configuration for different types of bootstrap providers
262#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
263#[serde(tag = "type", rename_all = "lowercase")]
264pub enum BootstrapProviderConfig {
265    /// PostgreSQL bootstrap provider
266    Postgres(PostgresBootstrapConfig),
267    /// Application-based bootstrap provider
268    Application(ApplicationBootstrapConfig),
269    /// Script file bootstrap provider
270    ScriptFile(ScriptFileBootstrapConfig),
271    /// Platform bootstrap provider for remote Drasi sources
272    /// Bootstraps data from a Query API service running in a remote Drasi environment
273    Platform(PlatformBootstrapConfig),
274    /// No-op bootstrap provider (returns no data)
275    Noop,
276}
277
278/// Factory for creating bootstrap providers from configuration
279///
280/// This factory is designed to work with plugin-based providers loaded from
281/// separate crates. The default behavior returns errors for non-noop providers
282/// to encourage use of the dedicated plugin crates.
283pub struct BootstrapProviderFactory;
284
285impl BootstrapProviderFactory {
286    /// Create a bootstrap provider from configuration
287    ///
288    /// Currently only supports the noop provider. Other providers are implemented
289    /// in dedicated plugin crates and should be instantiated using the builder pattern:
290    ///
291    /// - PostgreSQL: Use `drasi_bootstrap_postgres::PostgresBootstrapProvider::builder().with_host(...).build()`
292    /// - Platform: Use `drasi_bootstrap_platform::PlatformBootstrapProvider::builder().with_query_api_url(...).build()`
293    /// - ScriptFile: Use `drasi_bootstrap_scriptfile::ScriptFileBootstrapProvider::builder().with_file(...).build()`
294    /// - Application: Use `drasi_bootstrap_application::ApplicationBootstrapProvider::builder().build()`
295    pub fn create_provider(config: &BootstrapProviderConfig) -> Result<Box<dyn BootstrapProvider>> {
296        match config {
297            BootstrapProviderConfig::Postgres(_) => {
298                Err(anyhow::anyhow!(
299                    "PostgreSQL bootstrap provider is available in the drasi-bootstrap-postgres crate. \
300                     Use PostgresBootstrapProvider::builder().with_host(...).build() to create it."
301                ))
302            }
303            BootstrapProviderConfig::Application(_) => {
304                Err(anyhow::anyhow!(
305                    "Application bootstrap provider is available in the drasi-bootstrap-application crate. \
306                     Use ApplicationBootstrapProvider::builder().build() to create it."
307                ))
308            }
309            BootstrapProviderConfig::ScriptFile(config) => {
310                Err(anyhow::anyhow!(
311                    "ScriptFile bootstrap provider is available in the drasi-bootstrap-scriptfile crate. \
312                     Use ScriptFileBootstrapProvider::builder().with_file(...).build() to create it. \
313                     File paths: {:?}",
314                    config.file_paths
315                ))
316            }
317            BootstrapProviderConfig::Platform(config) => {
318                Err(anyhow::anyhow!(
319                    "Platform bootstrap provider is available in the drasi-bootstrap-platform crate. \
320                     Use PlatformBootstrapProvider::builder().with_query_api_url(...).build() to create it. \
321                     Config: {config:?}"
322                ))
323            }
324            BootstrapProviderConfig::Noop => {
325                Err(anyhow::anyhow!(
326                    "No-op bootstrap provider is available in the drasi-bootstrap-noop crate. \
327                     Use NoOpBootstrapProvider::builder().build() or NoOpBootstrapProvider::new() to create it."
328                ))
329            }
330        }
331    }
332}
333
334#[cfg(test)]
335mod tests {
336    use super::*;
337
338    #[test]
339    fn test_platform_bootstrap_config_defaults() {
340        let config = PlatformBootstrapConfig {
341            query_api_url: Some("http://test:8080".to_string()), // DevSkim: ignore DS137138
342            ..Default::default()
343        };
344        assert_eq!(config.timeout_seconds, 300);
345        assert_eq!(config.query_api_url, Some("http://test:8080".to_string())); // DevSkim: ignore DS137138
346    }
347
348    #[test]
349    fn test_postgres_bootstrap_config_defaults() {
350        let config = PostgresBootstrapConfig::default();
351        // Should be empty struct for now
352        assert_eq!(config, PostgresBootstrapConfig {});
353    }
354
355    #[test]
356    fn test_application_bootstrap_config_defaults() {
357        let config = ApplicationBootstrapConfig::default();
358        // Should be empty struct for now
359        assert_eq!(config, ApplicationBootstrapConfig {});
360    }
361
362    #[test]
363    fn test_platform_bootstrap_config_serialization() {
364        let config = BootstrapProviderConfig::Platform(PlatformBootstrapConfig {
365            query_api_url: Some("http://test:8080".to_string()), // DevSkim: ignore DS137138
366            timeout_seconds: 600,
367        });
368
369        let json = serde_json::to_string(&config).unwrap();
370        assert!(json.contains("\"type\":\"platform\""));
371        assert!(json.contains("\"query_api_url\":\"http://test:8080\"")); // DevSkim: ignore DS137138
372        assert!(json.contains("\"timeout_seconds\":600"));
373
374        let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
375        match deserialized {
376            BootstrapProviderConfig::Platform(cfg) => {
377                assert_eq!(cfg.query_api_url, Some("http://test:8080".to_string())); // DevSkim: ignore DS137138
378                assert_eq!(cfg.timeout_seconds, 600);
379            }
380            _ => panic!("Expected Platform variant"),
381        }
382    }
383
384    #[test]
385    fn test_scriptfile_bootstrap_config() {
386        let config = BootstrapProviderConfig::ScriptFile(ScriptFileBootstrapConfig {
387            file_paths: vec![
388                "/path/to/file1.jsonl".to_string(),
389                "/path/to/file2.jsonl".to_string(),
390            ],
391        });
392
393        let json = serde_json::to_string(&config).unwrap();
394        assert!(json.contains("\"type\":\"scriptfile\""));
395        assert!(json.contains("\"file_paths\""));
396
397        let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
398        match deserialized {
399            BootstrapProviderConfig::ScriptFile(cfg) => {
400                assert_eq!(cfg.file_paths.len(), 2);
401                assert_eq!(cfg.file_paths[0], "/path/to/file1.jsonl");
402                assert_eq!(cfg.file_paths[1], "/path/to/file2.jsonl");
403            }
404            _ => panic!("Expected ScriptFile variant"),
405        }
406    }
407
408    #[test]
409    fn test_noop_bootstrap_config() {
410        let config = BootstrapProviderConfig::Noop;
411
412        let json = serde_json::to_string(&config).unwrap();
413        assert!(json.contains("\"type\":\"noop\""));
414
415        let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
416        assert!(matches!(deserialized, BootstrapProviderConfig::Noop));
417    }
418
419    #[test]
420    fn test_postgres_bootstrap_config_serialization() {
421        let config = BootstrapProviderConfig::Postgres(PostgresBootstrapConfig::default());
422
423        let json = serde_json::to_string(&config).unwrap();
424        assert!(json.contains("\"type\":\"postgres\""));
425
426        let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
427        assert!(matches!(deserialized, BootstrapProviderConfig::Postgres(_)));
428    }
429
430    #[test]
431    fn test_application_bootstrap_config_serialization() {
432        let config = BootstrapProviderConfig::Application(ApplicationBootstrapConfig::default());
433
434        let json = serde_json::to_string(&config).unwrap();
435        assert!(json.contains("\"type\":\"application\""));
436
437        let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
438        assert!(matches!(
439            deserialized,
440            BootstrapProviderConfig::Application(_)
441        ));
442    }
443
444    #[test]
445    fn test_yaml_deserialization_platform() {
446        let yaml = r#"
447type: platform
448query_api_url: "http://remote:8080" # DevSkim: ignore DS137138
449timeout_seconds: 300
450"#;
451
452        let config: BootstrapProviderConfig = serde_yaml::from_str(yaml).unwrap();
453        match config {
454            BootstrapProviderConfig::Platform(cfg) => {
455                assert_eq!(cfg.query_api_url, Some("http://remote:8080".to_string())); // DevSkim: ignore DS137138
456                assert_eq!(cfg.timeout_seconds, 300);
457            }
458            _ => panic!("Expected Platform variant"),
459        }
460    }
461
462    #[test]
463    fn test_yaml_deserialization_scriptfile() {
464        let yaml = r#"
465type: scriptfile
466file_paths:
467  - "/data/file1.jsonl"
468  - "/data/file2.jsonl"
469"#;
470
471        let config: BootstrapProviderConfig = serde_yaml::from_str(yaml).unwrap();
472        match config {
473            BootstrapProviderConfig::ScriptFile(cfg) => {
474                assert_eq!(cfg.file_paths.len(), 2);
475                assert_eq!(cfg.file_paths[0], "/data/file1.jsonl");
476            }
477            _ => panic!("Expected ScriptFile variant"),
478        }
479    }
480
481    #[test]
482    fn test_platform_config_with_defaults() {
483        let yaml = r#"
484type: platform
485query_api_url: "http://test:8080" # DevSkim: ignore DS137138
486"#;
487
488        let config: BootstrapProviderConfig = serde_yaml::from_str(yaml).unwrap();
489        match config {
490            BootstrapProviderConfig::Platform(cfg) => {
491                assert_eq!(cfg.timeout_seconds, 300); // Should use default
492            }
493            _ => panic!("Expected Platform variant"),
494        }
495    }
496
497    #[test]
498    fn test_bootstrap_config_equality() {
499        let config1 = BootstrapProviderConfig::Platform(PlatformBootstrapConfig {
500            query_api_url: Some("http://test:8080".to_string()), // DevSkim: ignore DS137138
501            timeout_seconds: 300,
502        });
503
504        let config2 = BootstrapProviderConfig::Platform(PlatformBootstrapConfig {
505            query_api_url: Some("http://test:8080".to_string()), // DevSkim: ignore DS137138
506            timeout_seconds: 300,
507        });
508
509        assert_eq!(config1, config2);
510    }
511
512    #[test]
513    fn test_backward_compatibility_yaml() {
514        // This YAML format should still work
515        let yaml = r#"
516type: postgres
517"#;
518
519        let config: BootstrapProviderConfig = serde_yaml::from_str(yaml).unwrap();
520        assert!(matches!(config, BootstrapProviderConfig::Postgres(_)));
521    }
522}