Skip to main content

drasi_lib/bootstrap/
mod.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Bootstrap provider architecture for Drasi
16//!
17//! This module provides a pluggable bootstrap system that separates bootstrap
18//! concerns from source streaming logic. Bootstrap providers can be reused
19//! across different source types while maintaining access to their parent
20//! source configuration.
21//!
22//! # Included Providers
23//!
24//! - [`ComponentGraphBootstrapProvider`]: Bootstraps from the [`ComponentGraph`] snapshot
25//!   for the built-in component graph source.
26
27pub mod component_graph;
28pub use component_graph::ComponentGraphBootstrapProvider;
29
30use anyhow::Result;
31use async_trait::async_trait;
32use bytes::Bytes;
33use serde::{Deserialize, Serialize};
34use std::collections::HashMap;
35use std::sync::atomic::{AtomicU64, Ordering};
36use std::sync::Arc;
37
38/// Request for bootstrap data from a query
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct BootstrapRequest {
41    pub query_id: String,
42    pub node_labels: Vec<String>,
43    pub relation_labels: Vec<String>,
44    pub request_id: String,
45}
46
47/// Context passed to bootstrap providers
48/// Bootstrap happens through dedicated channels created in source.subscribe().
49///
50/// # Plugin Architecture
51///
52/// BootstrapContext no longer contains source configuration. Bootstrap providers
53/// are created by source plugins using their own typed configurations. This context
54/// provides only the minimal information needed during bootstrap execution:
55/// - Source identification
56/// - Sequence numbering for events
57/// - Optional properties for providers that need runtime data
58#[derive(Clone)]
59pub struct BootstrapContext {
60    /// Unique server ID for logging and tracing
61    pub server_id: String,
62    /// Source ID for labeling bootstrap events
63    pub source_id: String,
64    /// Sequence counter for bootstrap events
65    pub sequence_counter: Arc<AtomicU64>,
66    /// Optional properties that can be set by plugins if needed
67    properties: Arc<HashMap<String, serde_json::Value>>,
68}
69
70impl BootstrapContext {
71    /// Create a minimal bootstrap context with just server and source IDs
72    ///
73    /// This is the preferred constructor. Bootstrap providers should have their
74    /// own configuration - they don't need access to source config.
75    pub fn new_minimal(server_id: String, source_id: String) -> Self {
76        Self {
77            server_id,
78            source_id,
79            sequence_counter: Arc::new(AtomicU64::new(0)),
80            properties: Arc::new(HashMap::new()),
81        }
82    }
83
84    /// Create a bootstrap context with properties
85    ///
86    /// Use this if your bootstrap provider needs access to some properties
87    /// at runtime. The properties should be extracted from your plugin's
88    /// typed configuration.
89    pub fn with_properties(
90        server_id: String,
91        source_id: String,
92        properties: HashMap<String, serde_json::Value>,
93    ) -> Self {
94        Self {
95            server_id,
96            source_id,
97            sequence_counter: Arc::new(AtomicU64::new(0)),
98            properties: Arc::new(properties),
99        }
100    }
101
102    /// Get the next sequence number for bootstrap events
103    pub fn next_sequence(&self) -> u64 {
104        self.sequence_counter.fetch_add(1, Ordering::SeqCst)
105    }
106
107    /// Get a property from the context
108    pub fn get_property(&self, key: &str) -> Option<serde_json::Value> {
109        self.properties.get(key).cloned()
110    }
111
112    /// Get a typed property from the context
113    pub fn get_typed_property<T>(&self, key: &str) -> Result<Option<T>>
114    where
115        T: for<'de> Deserialize<'de>,
116    {
117        match self.get_property(key) {
118            Some(value) => Ok(Some(serde_json::from_value(value.clone())?)),
119            None => Ok(None),
120        }
121    }
122}
123
124use crate::channels::BootstrapEventSender;
125
126/// Result of a bootstrap operation, carrying the event count plus handover
127/// metadata that lets the query transition cleanly from bootstrap to streaming.
128///
129/// See design doc 02 §5 — Bootstrap-to-Streaming Handover.
130///
131/// # Fields
132/// * `event_count` - Number of bootstrap events sent through the channel.
133/// * `last_sequence` - The snapshot's position in the source's sequence space
134///   (e.g., a Postgres WAL LSN), when known. `None` for providers that have no
135///   positional concept (e.g., script file, no-op).
136/// * `sequences_aligned` - Whether the bootstrap's sequence namespace matches
137///   the streaming source's sequence namespace. `true` only when the query
138///   can safely dedup buffered stream events against `last_sequence`
139///   (typically homogeneous source + bootstrapper, e.g., Postgres / Postgres).
140#[derive(Debug, Clone, Default)]
141pub struct BootstrapResult {
142    pub event_count: usize,
143    pub last_sequence: Option<u64>,
144    pub sequences_aligned: bool,
145    /// Opaque position bytes marking the snapshot boundary in the source's
146    /// native address space (e.g., a database WAL LSN). When set, the
147    /// framework persists this as the initial checkpoint so crash-recovery
148    /// after bootstrap can resume without re-bootstrapping.
149    ///
150    /// Must be at most [`SourceBase::MAX_SOURCE_POSITION_BYTES`] (64 KB).
151    pub source_position: Option<Bytes>,
152}
153
154/// Trait for bootstrap providers that handle initial data delivery
155/// for newly subscribed queries.
156#[async_trait]
157pub trait BootstrapProvider: Send + Sync {
158    /// Perform bootstrap operation for the given request.
159    /// Sends bootstrap events to the provided channel.
160    /// Returns a [`BootstrapResult`] carrying the event count and handover metadata.
161    ///
162    /// # Arguments
163    /// * `request` - Bootstrap request with query ID and labels
164    /// * `context` - Bootstrap context with source information
165    /// * `event_tx` - Channel to send bootstrap events
166    /// * `settings` - Optional subscription settings with additional query context
167    async fn bootstrap(
168        &self,
169        request: BootstrapRequest,
170        context: &BootstrapContext,
171        event_tx: BootstrapEventSender,
172        settings: Option<&crate::config::SourceSubscriptionSettings>,
173    ) -> Result<BootstrapResult>;
174}
175
176/// Blanket implementation of BootstrapProvider for boxed trait objects.
177/// This allows `Box<dyn BootstrapProvider>` to be used where BootstrapProvider is expected.
178#[async_trait]
179impl BootstrapProvider for Box<dyn BootstrapProvider> {
180    async fn bootstrap(
181        &self,
182        request: BootstrapRequest,
183        context: &BootstrapContext,
184        event_tx: BootstrapEventSender,
185        settings: Option<&crate::config::SourceSubscriptionSettings>,
186    ) -> Result<BootstrapResult> {
187        (**self)
188            .bootstrap(request, context, event_tx, settings)
189            .await
190    }
191}
192
193// Typed configuration structs for each bootstrap provider type
194
195/// PostgreSQL bootstrap provider configuration
196///
197/// This provider bootstraps initial data from a PostgreSQL database using
198/// the COPY protocol for efficient data transfer. The provider extracts
199/// connection details from the parent source configuration, so no additional
200/// configuration is needed here.
201///
202/// # Example
203/// ```yaml
204/// bootstrap_provider:
205///   type: postgres
206/// ```
207#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
208pub struct PostgresBootstrapConfig {
209    // No additional config needed - uses parent source config
210    // Include this struct for consistency and future extensibility
211}
212
213/// Application bootstrap provider configuration
214///
215/// This provider bootstraps data from in-memory storage maintained by
216/// application sources. It replays stored insert events to provide initial
217/// data. No additional configuration is required as it uses shared state
218/// from the application source.
219///
220/// # Example
221/// ```yaml
222/// bootstrap_provider:
223///   type: application
224/// ```
225#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
226pub struct ApplicationBootstrapConfig {
227    // No config needed - uses shared state
228    // Include for consistency and future extensibility
229}
230
231/// Script file bootstrap provider configuration
232///
233/// This provider reads bootstrap data from JSONL (JSON Lines) files containing
234/// structured data. Files are processed in the order specified. Each file should
235/// contain one JSON record per line with record types: Header (required first),
236/// Node, Relation, Comment (filtered), Label (checkpoint), and Finish (optional end).
237///
238/// # Example
239/// ```yaml
240/// bootstrap_provider:
241///   type: scriptfile
242///   file_paths:
243///     - "/data/initial_nodes.jsonl"
244///     - "/data/initial_relations.jsonl"
245/// ```
246#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
247pub struct ScriptFileBootstrapConfig {
248    /// List of JSONL files to read (in order)
249    pub file_paths: Vec<String>,
250}
251
252/// Platform bootstrap provider configuration
253///
254/// This provider bootstraps data from a Query API service running in a remote
255/// Drasi environment via HTTP streaming. It's designed for cross-Drasi integration
256/// scenarios where one Drasi instance needs initial data from another.
257///
258/// # Example
259/// ```yaml
260/// bootstrap_provider:
261///   type: platform
262///   query_api_url: "http://remote-drasi:8080" # DevSkim: ignore DS137138
263///   timeout_seconds: 600
264/// ```
265#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
266pub struct PlatformBootstrapConfig {
267    /// URL of the Query API service (e.g., "http://my-source-query-api:8080") // DevSkim: ignore DS137138
268    /// If not specified, falls back to `query_api_url` property from source config
269    #[serde(skip_serializing_if = "Option::is_none")]
270    pub query_api_url: Option<String>,
271
272    /// Timeout for HTTP requests in seconds (default: 300)
273    #[serde(default = "default_platform_timeout")]
274    pub timeout_seconds: u64,
275}
276
277fn default_platform_timeout() -> u64 {
278    300
279}
280
281impl Default for PlatformBootstrapConfig {
282    fn default() -> Self {
283        Self {
284            query_api_url: None,
285            timeout_seconds: default_platform_timeout(),
286        }
287    }
288}
289
290/// Configuration for different types of bootstrap providers
291#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
292#[serde(tag = "type", rename_all = "lowercase")]
293pub enum BootstrapProviderConfig {
294    /// PostgreSQL bootstrap provider
295    Postgres(PostgresBootstrapConfig),
296    /// Application-based bootstrap provider
297    Application(ApplicationBootstrapConfig),
298    /// Script file bootstrap provider
299    ScriptFile(ScriptFileBootstrapConfig),
300    /// Platform bootstrap provider for remote Drasi sources
301    /// Bootstraps data from a Query API service running in a remote Drasi environment
302    Platform(PlatformBootstrapConfig),
303    /// No-op bootstrap provider (returns no data)
304    Noop,
305}
306
307/// Factory for creating bootstrap providers from configuration
308///
309/// This factory is designed to work with plugin-based providers loaded from
310/// separate crates. The default behavior returns errors for non-noop providers
311/// to encourage use of the dedicated plugin crates.
312pub struct BootstrapProviderFactory;
313
314impl BootstrapProviderFactory {
315    /// Create a bootstrap provider from configuration
316    ///
317    /// Currently only supports the noop provider. Other providers are implemented
318    /// in dedicated plugin crates and should be instantiated using the builder pattern:
319    ///
320    /// - PostgreSQL: Use `drasi_bootstrap_postgres::PostgresBootstrapProvider::builder().with_host(...).build()`
321    /// - Platform: Use `drasi_bootstrap_platform::PlatformBootstrapProvider::builder().with_query_api_url(...).build()`
322    /// - ScriptFile: Use `drasi_bootstrap_scriptfile::ScriptFileBootstrapProvider::builder().with_file(...).build()`
323    /// - Application: Use `drasi_bootstrap_application::ApplicationBootstrapProvider::builder().build()`
324    pub fn create_provider(config: &BootstrapProviderConfig) -> Result<Box<dyn BootstrapProvider>> {
325        match config {
326            BootstrapProviderConfig::Postgres(_) => {
327                Err(anyhow::anyhow!(
328                    "PostgreSQL bootstrap provider is available in the drasi-bootstrap-postgres crate. \
329                     Use PostgresBootstrapProvider::builder().with_host(...).build() to create it."
330                ))
331            }
332            BootstrapProviderConfig::Application(_) => {
333                Err(anyhow::anyhow!(
334                    "Application bootstrap provider is available in the drasi-bootstrap-application crate. \
335                     Use ApplicationBootstrapProvider::builder().build() to create it."
336                ))
337            }
338            BootstrapProviderConfig::ScriptFile(config) => {
339                Err(anyhow::anyhow!(
340                    "ScriptFile bootstrap provider is available in the drasi-bootstrap-scriptfile crate. \
341                     Use ScriptFileBootstrapProvider::builder().with_file(...).build() to create it. \
342                     File paths: {:?}",
343                    config.file_paths
344                ))
345            }
346            BootstrapProviderConfig::Platform(config) => {
347                Err(anyhow::anyhow!(
348                    "Platform bootstrap provider is available in the drasi-bootstrap-platform crate. \
349                     Use PlatformBootstrapProvider::builder().with_query_api_url(...).build() to create it. \
350                     Config: {config:?}"
351                ))
352            }
353            BootstrapProviderConfig::Noop => {
354                Err(anyhow::anyhow!(
355                    "No-op bootstrap provider is available in the drasi-bootstrap-noop crate. \
356                     Use NoOpBootstrapProvider::builder().build() or NoOpBootstrapProvider::new() to create it."
357                ))
358            }
359        }
360    }
361}
362
363#[cfg(test)]
364mod tests {
365    use super::*;
366
367    #[test]
368    fn test_platform_bootstrap_config_defaults() {
369        let config = PlatformBootstrapConfig {
370            query_api_url: Some("http://test:8080".to_string()), // DevSkim: ignore DS137138
371            ..Default::default()
372        };
373        assert_eq!(config.timeout_seconds, 300);
374        assert_eq!(config.query_api_url, Some("http://test:8080".to_string())); // DevSkim: ignore DS137138
375    }
376
377    #[test]
378    fn test_postgres_bootstrap_config_defaults() {
379        let config = PostgresBootstrapConfig::default();
380        // Should be empty struct for now
381        assert_eq!(config, PostgresBootstrapConfig {});
382    }
383
384    #[test]
385    fn test_application_bootstrap_config_defaults() {
386        let config = ApplicationBootstrapConfig::default();
387        // Should be empty struct for now
388        assert_eq!(config, ApplicationBootstrapConfig {});
389    }
390
391    #[test]
392    fn test_platform_bootstrap_config_serialization() {
393        let config = BootstrapProviderConfig::Platform(PlatformBootstrapConfig {
394            query_api_url: Some("http://test:8080".to_string()), // DevSkim: ignore DS137138
395            timeout_seconds: 600,
396        });
397
398        let json = serde_json::to_string(&config).unwrap();
399        assert!(json.contains("\"type\":\"platform\""));
400        assert!(json.contains("\"query_api_url\":\"http://test:8080\"")); // DevSkim: ignore DS137138
401        assert!(json.contains("\"timeout_seconds\":600"));
402
403        let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
404        match deserialized {
405            BootstrapProviderConfig::Platform(cfg) => {
406                assert_eq!(cfg.query_api_url, Some("http://test:8080".to_string())); // DevSkim: ignore DS137138
407                assert_eq!(cfg.timeout_seconds, 600);
408            }
409            _ => panic!("Expected Platform variant"),
410        }
411    }
412
413    #[test]
414    fn test_scriptfile_bootstrap_config() {
415        let config = BootstrapProviderConfig::ScriptFile(ScriptFileBootstrapConfig {
416            file_paths: vec![
417                "/path/to/file1.jsonl".to_string(),
418                "/path/to/file2.jsonl".to_string(),
419            ],
420        });
421
422        let json = serde_json::to_string(&config).unwrap();
423        assert!(json.contains("\"type\":\"scriptfile\""));
424        assert!(json.contains("\"file_paths\""));
425
426        let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
427        match deserialized {
428            BootstrapProviderConfig::ScriptFile(cfg) => {
429                assert_eq!(cfg.file_paths.len(), 2);
430                assert_eq!(cfg.file_paths[0], "/path/to/file1.jsonl");
431                assert_eq!(cfg.file_paths[1], "/path/to/file2.jsonl");
432            }
433            _ => panic!("Expected ScriptFile variant"),
434        }
435    }
436
437    #[test]
438    fn test_noop_bootstrap_config() {
439        let config = BootstrapProviderConfig::Noop;
440
441        let json = serde_json::to_string(&config).unwrap();
442        assert!(json.contains("\"type\":\"noop\""));
443
444        let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
445        assert!(matches!(deserialized, BootstrapProviderConfig::Noop));
446    }
447
448    #[test]
449    fn test_postgres_bootstrap_config_serialization() {
450        let config = BootstrapProviderConfig::Postgres(PostgresBootstrapConfig::default());
451
452        let json = serde_json::to_string(&config).unwrap();
453        assert!(json.contains("\"type\":\"postgres\""));
454
455        let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
456        assert!(matches!(deserialized, BootstrapProviderConfig::Postgres(_)));
457    }
458
459    #[test]
460    fn test_application_bootstrap_config_serialization() {
461        let config = BootstrapProviderConfig::Application(ApplicationBootstrapConfig::default());
462
463        let json = serde_json::to_string(&config).unwrap();
464        assert!(json.contains("\"type\":\"application\""));
465
466        let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
467        assert!(matches!(
468            deserialized,
469            BootstrapProviderConfig::Application(_)
470        ));
471    }
472
473    #[test]
474    fn test_yaml_deserialization_platform() {
475        let yaml = r#"
476type: platform
477query_api_url: "http://remote:8080" # DevSkim: ignore DS137138
478timeout_seconds: 300
479"#;
480
481        let config: BootstrapProviderConfig = serde_yaml::from_str(yaml).unwrap();
482        match config {
483            BootstrapProviderConfig::Platform(cfg) => {
484                assert_eq!(cfg.query_api_url, Some("http://remote:8080".to_string())); // DevSkim: ignore DS137138
485                assert_eq!(cfg.timeout_seconds, 300);
486            }
487            _ => panic!("Expected Platform variant"),
488        }
489    }
490
491    #[test]
492    fn test_yaml_deserialization_scriptfile() {
493        let yaml = r#"
494type: scriptfile
495file_paths:
496  - "/data/file1.jsonl"
497  - "/data/file2.jsonl"
498"#;
499
500        let config: BootstrapProviderConfig = serde_yaml::from_str(yaml).unwrap();
501        match config {
502            BootstrapProviderConfig::ScriptFile(cfg) => {
503                assert_eq!(cfg.file_paths.len(), 2);
504                assert_eq!(cfg.file_paths[0], "/data/file1.jsonl");
505            }
506            _ => panic!("Expected ScriptFile variant"),
507        }
508    }
509
510    #[test]
511    fn test_platform_config_with_defaults() {
512        let yaml = r#"
513type: platform
514query_api_url: "http://test:8080" # DevSkim: ignore DS137138
515"#;
516
517        let config: BootstrapProviderConfig = serde_yaml::from_str(yaml).unwrap();
518        match config {
519            BootstrapProviderConfig::Platform(cfg) => {
520                assert_eq!(cfg.timeout_seconds, 300); // Should use default
521            }
522            _ => panic!("Expected Platform variant"),
523        }
524    }
525
526    #[test]
527    fn test_bootstrap_config_equality() {
528        let config1 = BootstrapProviderConfig::Platform(PlatformBootstrapConfig {
529            query_api_url: Some("http://test:8080".to_string()), // DevSkim: ignore DS137138
530            timeout_seconds: 300,
531        });
532
533        let config2 = BootstrapProviderConfig::Platform(PlatformBootstrapConfig {
534            query_api_url: Some("http://test:8080".to_string()), // DevSkim: ignore DS137138
535            timeout_seconds: 300,
536        });
537
538        assert_eq!(config1, config2);
539    }
540
541    #[test]
542    fn test_backward_compatibility_yaml() {
543        // This YAML format should still work
544        let yaml = r#"
545type: postgres
546"#;
547
548        let config: BootstrapProviderConfig = serde_yaml::from_str(yaml).unwrap();
549        assert!(matches!(config, BootstrapProviderConfig::Postgres(_)));
550    }
551}