Skip to main content

drasi_lib/bootstrap/
mod.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Bootstrap provider architecture for Drasi
16//!
17//! This module provides a pluggable bootstrap system that separates bootstrap
18//! concerns from source streaming logic. Bootstrap providers can be reused
19//! across different source types while maintaining access to their parent
20//! source configuration.
21
22use anyhow::Result;
23use async_trait::async_trait;
24use serde::{Deserialize, Serialize};
25use std::collections::HashMap;
26use std::sync::atomic::{AtomicU64, Ordering};
27use std::sync::Arc;
28
29/// Request for bootstrap data from a query
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct BootstrapRequest {
32    pub query_id: String,
33    pub node_labels: Vec<String>,
34    pub relation_labels: Vec<String>,
35    pub request_id: String,
36}
37
38/// Context passed to bootstrap providers
39/// Bootstrap happens through dedicated channels created in source.subscribe().
40///
41/// # Plugin Architecture
42///
43/// BootstrapContext no longer contains source configuration. Bootstrap providers
44/// are created by source plugins using their own typed configurations. This context
45/// provides only the minimal information needed during bootstrap execution:
46/// - Source identification
47/// - Sequence numbering for events
48/// - Optional properties for providers that need runtime data
49#[derive(Clone)]
50pub struct BootstrapContext {
51    /// Unique server ID for logging and tracing
52    pub server_id: String,
53    /// Source ID for labeling bootstrap events
54    pub source_id: String,
55    /// Sequence counter for bootstrap events
56    pub sequence_counter: Arc<AtomicU64>,
57    /// Optional properties that can be set by plugins if needed
58    properties: Arc<HashMap<String, serde_json::Value>>,
59}
60
61impl BootstrapContext {
62    /// Create a minimal bootstrap context with just server and source IDs
63    ///
64    /// This is the preferred constructor. Bootstrap providers should have their
65    /// own configuration - they don't need access to source config.
66    pub fn new_minimal(server_id: String, source_id: String) -> Self {
67        Self {
68            server_id,
69            source_id,
70            sequence_counter: Arc::new(AtomicU64::new(0)),
71            properties: Arc::new(HashMap::new()),
72        }
73    }
74
75    /// Create a bootstrap context with properties
76    ///
77    /// Use this if your bootstrap provider needs access to some properties
78    /// at runtime. The properties should be extracted from your plugin's
79    /// typed configuration.
80    pub fn with_properties(
81        server_id: String,
82        source_id: String,
83        properties: HashMap<String, serde_json::Value>,
84    ) -> Self {
85        Self {
86            server_id,
87            source_id,
88            sequence_counter: Arc::new(AtomicU64::new(0)),
89            properties: Arc::new(properties),
90        }
91    }
92
93    /// Get the next sequence number for bootstrap events
94    pub fn next_sequence(&self) -> u64 {
95        self.sequence_counter.fetch_add(1, Ordering::SeqCst)
96    }
97
98    /// Get a property from the context
99    pub fn get_property(&self, key: &str) -> Option<serde_json::Value> {
100        self.properties.get(key).cloned()
101    }
102
103    /// Get a typed property from the context
104    pub fn get_typed_property<T>(&self, key: &str) -> Result<Option<T>>
105    where
106        T: for<'de> Deserialize<'de>,
107    {
108        match self.get_property(key) {
109            Some(value) => Ok(Some(serde_json::from_value(value.clone())?)),
110            None => Ok(None),
111        }
112    }
113}
114
115use crate::channels::BootstrapEventSender;
116
117/// Trait for bootstrap providers that handle initial data delivery
118/// for newly subscribed queries
119#[async_trait]
120pub trait BootstrapProvider: Send + Sync {
121    /// Perform bootstrap operation for the given request
122    /// Sends bootstrap events to the provided channel
123    /// Returns the number of elements sent
124    ///
125    /// # Arguments
126    /// * `request` - Bootstrap request with query ID and labels
127    /// * `context` - Bootstrap context with source information
128    /// * `event_tx` - Channel to send bootstrap events
129    /// * `settings` - Optional subscription settings with additional query context
130    async fn bootstrap(
131        &self,
132        request: BootstrapRequest,
133        context: &BootstrapContext,
134        event_tx: BootstrapEventSender,
135        settings: Option<&crate::config::SourceSubscriptionSettings>,
136    ) -> Result<usize>;
137}
138
139/// Blanket implementation of BootstrapProvider for boxed trait objects.
140/// This allows `Box<dyn BootstrapProvider>` to be used where BootstrapProvider is expected.
141#[async_trait]
142impl BootstrapProvider for Box<dyn BootstrapProvider> {
143    async fn bootstrap(
144        &self,
145        request: BootstrapRequest,
146        context: &BootstrapContext,
147        event_tx: BootstrapEventSender,
148        settings: Option<&crate::config::SourceSubscriptionSettings>,
149    ) -> Result<usize> {
150        (**self)
151            .bootstrap(request, context, event_tx, settings)
152            .await
153    }
154}
155
156// Typed configuration structs for each bootstrap provider type
157
158/// PostgreSQL bootstrap provider configuration
159///
160/// This provider bootstraps initial data from a PostgreSQL database using
161/// the COPY protocol for efficient data transfer. The provider extracts
162/// connection details from the parent source configuration, so no additional
163/// configuration is needed here.
164///
165/// # Example
166/// ```yaml
167/// bootstrap_provider:
168///   type: postgres
169/// ```
170#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
171pub struct PostgresBootstrapConfig {
172    // No additional config needed - uses parent source config
173    // Include this struct for consistency and future extensibility
174}
175
176/// Application bootstrap provider configuration
177///
178/// This provider bootstraps data from in-memory storage maintained by
179/// application sources. It replays stored insert events to provide initial
180/// data. No additional configuration is required as it uses shared state
181/// from the application source.
182///
183/// # Example
184/// ```yaml
185/// bootstrap_provider:
186///   type: application
187/// ```
188#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
189pub struct ApplicationBootstrapConfig {
190    // No config needed - uses shared state
191    // Include for consistency and future extensibility
192}
193
194/// Script file bootstrap provider configuration
195///
196/// This provider reads bootstrap data from JSONL (JSON Lines) files containing
197/// structured data. Files are processed in the order specified. Each file should
198/// contain one JSON record per line with record types: Header (required first),
199/// Node, Relation, Comment (filtered), Label (checkpoint), and Finish (optional end).
200///
201/// # Example
202/// ```yaml
203/// bootstrap_provider:
204///   type: scriptfile
205///   file_paths:
206///     - "/data/initial_nodes.jsonl"
207///     - "/data/initial_relations.jsonl"
208/// ```
209#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
210pub struct ScriptFileBootstrapConfig {
211    /// List of JSONL files to read (in order)
212    pub file_paths: Vec<String>,
213}
214
215/// Platform bootstrap provider configuration
216///
217/// This provider bootstraps data from a Query API service running in a remote
218/// Drasi environment via HTTP streaming. It's designed for cross-Drasi integration
219/// scenarios where one Drasi instance needs initial data from another.
220///
221/// # Example
222/// ```yaml
223/// bootstrap_provider:
224///   type: platform
225///   query_api_url: "http://remote-drasi:8080" # DevSkim: ignore DS137138
226///   timeout_seconds: 600
227/// ```
228#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
229pub struct PlatformBootstrapConfig {
230    /// URL of the Query API service (e.g., "http://my-source-query-api:8080") // DevSkim: ignore DS137138
231    /// If not specified, falls back to `query_api_url` property from source config
232    #[serde(skip_serializing_if = "Option::is_none")]
233    pub query_api_url: Option<String>,
234
235    /// Timeout for HTTP requests in seconds (default: 300)
236    #[serde(default = "default_platform_timeout")]
237    pub timeout_seconds: u64,
238}
239
240fn default_platform_timeout() -> u64 {
241    300
242}
243
244impl Default for PlatformBootstrapConfig {
245    fn default() -> Self {
246        Self {
247            query_api_url: None,
248            timeout_seconds: default_platform_timeout(),
249        }
250    }
251}
252
253/// Configuration for different types of bootstrap providers
254#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
255#[serde(tag = "type", rename_all = "lowercase")]
256pub enum BootstrapProviderConfig {
257    /// PostgreSQL bootstrap provider
258    Postgres(PostgresBootstrapConfig),
259    /// Application-based bootstrap provider
260    Application(ApplicationBootstrapConfig),
261    /// Script file bootstrap provider
262    ScriptFile(ScriptFileBootstrapConfig),
263    /// Platform bootstrap provider for remote Drasi sources
264    /// Bootstraps data from a Query API service running in a remote Drasi environment
265    Platform(PlatformBootstrapConfig),
266    /// No-op bootstrap provider (returns no data)
267    Noop,
268}
269
270/// Factory for creating bootstrap providers from configuration
271///
272/// This factory is designed to work with plugin-based providers loaded from
273/// separate crates. The default behavior returns errors for non-noop providers
274/// to encourage use of the dedicated plugin crates.
275pub struct BootstrapProviderFactory;
276
277impl BootstrapProviderFactory {
278    /// Create a bootstrap provider from configuration
279    ///
280    /// Currently only supports the noop provider. Other providers are implemented
281    /// in dedicated plugin crates and should be instantiated using the builder pattern:
282    ///
283    /// - PostgreSQL: Use `drasi_bootstrap_postgres::PostgresBootstrapProvider::builder().with_host(...).build()`
284    /// - Platform: Use `drasi_bootstrap_platform::PlatformBootstrapProvider::builder().with_query_api_url(...).build()`
285    /// - ScriptFile: Use `drasi_bootstrap_scriptfile::ScriptFileBootstrapProvider::builder().with_file(...).build()`
286    /// - Application: Use `drasi_bootstrap_application::ApplicationBootstrapProvider::builder().build()`
287    pub fn create_provider(config: &BootstrapProviderConfig) -> Result<Box<dyn BootstrapProvider>> {
288        match config {
289            BootstrapProviderConfig::Postgres(_) => {
290                Err(anyhow::anyhow!(
291                    "PostgreSQL bootstrap provider is available in the drasi-bootstrap-postgres crate. \
292                     Use PostgresBootstrapProvider::builder().with_host(...).build() to create it."
293                ))
294            }
295            BootstrapProviderConfig::Application(_) => {
296                Err(anyhow::anyhow!(
297                    "Application bootstrap provider is available in the drasi-bootstrap-application crate. \
298                     Use ApplicationBootstrapProvider::builder().build() to create it."
299                ))
300            }
301            BootstrapProviderConfig::ScriptFile(config) => {
302                Err(anyhow::anyhow!(
303                    "ScriptFile bootstrap provider is available in the drasi-bootstrap-scriptfile crate. \
304                     Use ScriptFileBootstrapProvider::builder().with_file(...).build() to create it. \
305                     File paths: {:?}",
306                    config.file_paths
307                ))
308            }
309            BootstrapProviderConfig::Platform(config) => {
310                Err(anyhow::anyhow!(
311                    "Platform bootstrap provider is available in the drasi-bootstrap-platform crate. \
312                     Use PlatformBootstrapProvider::builder().with_query_api_url(...).build() to create it. \
313                     Config: {config:?}"
314                ))
315            }
316            BootstrapProviderConfig::Noop => {
317                Err(anyhow::anyhow!(
318                    "No-op bootstrap provider is available in the drasi-bootstrap-noop crate. \
319                     Use NoOpBootstrapProvider::builder().build() or NoOpBootstrapProvider::new() to create it."
320                ))
321            }
322        }
323    }
324}
325
326#[cfg(test)]
327mod tests {
328    use super::*;
329
330    #[test]
331    fn test_platform_bootstrap_config_defaults() {
332        let config = PlatformBootstrapConfig {
333            query_api_url: Some("http://test:8080".to_string()), // DevSkim: ignore DS137138
334            ..Default::default()
335        };
336        assert_eq!(config.timeout_seconds, 300);
337        assert_eq!(config.query_api_url, Some("http://test:8080".to_string())); // DevSkim: ignore DS137138
338    }
339
340    #[test]
341    fn test_postgres_bootstrap_config_defaults() {
342        let config = PostgresBootstrapConfig::default();
343        // Should be empty struct for now
344        assert_eq!(config, PostgresBootstrapConfig {});
345    }
346
347    #[test]
348    fn test_application_bootstrap_config_defaults() {
349        let config = ApplicationBootstrapConfig::default();
350        // Should be empty struct for now
351        assert_eq!(config, ApplicationBootstrapConfig {});
352    }
353
354    #[test]
355    fn test_platform_bootstrap_config_serialization() {
356        let config = BootstrapProviderConfig::Platform(PlatformBootstrapConfig {
357            query_api_url: Some("http://test:8080".to_string()), // DevSkim: ignore DS137138
358            timeout_seconds: 600,
359        });
360
361        let json = serde_json::to_string(&config).unwrap();
362        assert!(json.contains("\"type\":\"platform\""));
363        assert!(json.contains("\"query_api_url\":\"http://test:8080\"")); // DevSkim: ignore DS137138
364        assert!(json.contains("\"timeout_seconds\":600"));
365
366        let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
367        match deserialized {
368            BootstrapProviderConfig::Platform(cfg) => {
369                assert_eq!(cfg.query_api_url, Some("http://test:8080".to_string())); // DevSkim: ignore DS137138
370                assert_eq!(cfg.timeout_seconds, 600);
371            }
372            _ => panic!("Expected Platform variant"),
373        }
374    }
375
376    #[test]
377    fn test_scriptfile_bootstrap_config() {
378        let config = BootstrapProviderConfig::ScriptFile(ScriptFileBootstrapConfig {
379            file_paths: vec![
380                "/path/to/file1.jsonl".to_string(),
381                "/path/to/file2.jsonl".to_string(),
382            ],
383        });
384
385        let json = serde_json::to_string(&config).unwrap();
386        assert!(json.contains("\"type\":\"scriptfile\""));
387        assert!(json.contains("\"file_paths\""));
388
389        let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
390        match deserialized {
391            BootstrapProviderConfig::ScriptFile(cfg) => {
392                assert_eq!(cfg.file_paths.len(), 2);
393                assert_eq!(cfg.file_paths[0], "/path/to/file1.jsonl");
394                assert_eq!(cfg.file_paths[1], "/path/to/file2.jsonl");
395            }
396            _ => panic!("Expected ScriptFile variant"),
397        }
398    }
399
400    #[test]
401    fn test_noop_bootstrap_config() {
402        let config = BootstrapProviderConfig::Noop;
403
404        let json = serde_json::to_string(&config).unwrap();
405        assert!(json.contains("\"type\":\"noop\""));
406
407        let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
408        assert!(matches!(deserialized, BootstrapProviderConfig::Noop));
409    }
410
411    #[test]
412    fn test_postgres_bootstrap_config_serialization() {
413        let config = BootstrapProviderConfig::Postgres(PostgresBootstrapConfig::default());
414
415        let json = serde_json::to_string(&config).unwrap();
416        assert!(json.contains("\"type\":\"postgres\""));
417
418        let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
419        assert!(matches!(deserialized, BootstrapProviderConfig::Postgres(_)));
420    }
421
422    #[test]
423    fn test_application_bootstrap_config_serialization() {
424        let config = BootstrapProviderConfig::Application(ApplicationBootstrapConfig::default());
425
426        let json = serde_json::to_string(&config).unwrap();
427        assert!(json.contains("\"type\":\"application\""));
428
429        let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
430        assert!(matches!(
431            deserialized,
432            BootstrapProviderConfig::Application(_)
433        ));
434    }
435
436    #[test]
437    fn test_yaml_deserialization_platform() {
438        let yaml = r#"
439type: platform
440query_api_url: "http://remote:8080" # DevSkim: ignore DS137138
441timeout_seconds: 300
442"#;
443
444        let config: BootstrapProviderConfig = serde_yaml::from_str(yaml).unwrap();
445        match config {
446            BootstrapProviderConfig::Platform(cfg) => {
447                assert_eq!(cfg.query_api_url, Some("http://remote:8080".to_string())); // DevSkim: ignore DS137138
448                assert_eq!(cfg.timeout_seconds, 300);
449            }
450            _ => panic!("Expected Platform variant"),
451        }
452    }
453
454    #[test]
455    fn test_yaml_deserialization_scriptfile() {
456        let yaml = r#"
457type: scriptfile
458file_paths:
459  - "/data/file1.jsonl"
460  - "/data/file2.jsonl"
461"#;
462
463        let config: BootstrapProviderConfig = serde_yaml::from_str(yaml).unwrap();
464        match config {
465            BootstrapProviderConfig::ScriptFile(cfg) => {
466                assert_eq!(cfg.file_paths.len(), 2);
467                assert_eq!(cfg.file_paths[0], "/data/file1.jsonl");
468            }
469            _ => panic!("Expected ScriptFile variant"),
470        }
471    }
472
473    #[test]
474    fn test_platform_config_with_defaults() {
475        let yaml = r#"
476type: platform
477query_api_url: "http://test:8080" # DevSkim: ignore DS137138
478"#;
479
480        let config: BootstrapProviderConfig = serde_yaml::from_str(yaml).unwrap();
481        match config {
482            BootstrapProviderConfig::Platform(cfg) => {
483                assert_eq!(cfg.timeout_seconds, 300); // Should use default
484            }
485            _ => panic!("Expected Platform variant"),
486        }
487    }
488
489    #[test]
490    fn test_bootstrap_config_equality() {
491        let config1 = BootstrapProviderConfig::Platform(PlatformBootstrapConfig {
492            query_api_url: Some("http://test:8080".to_string()), // DevSkim: ignore DS137138
493            timeout_seconds: 300,
494        });
495
496        let config2 = BootstrapProviderConfig::Platform(PlatformBootstrapConfig {
497            query_api_url: Some("http://test:8080".to_string()), // DevSkim: ignore DS137138
498            timeout_seconds: 300,
499        });
500
501        assert_eq!(config1, config2);
502    }
503
504    #[test]
505    fn test_backward_compatibility_yaml() {
506        // This YAML format should still work
507        let yaml = r#"
508type: postgres
509"#;
510
511        let config: BootstrapProviderConfig = serde_yaml::from_str(yaml).unwrap();
512        assert!(matches!(config, BootstrapProviderConfig::Postgres(_)));
513    }
514}