Skip to main content

drasi_lib/bootstrap/
mod.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Bootstrap provider architecture for Drasi
16//!
17//! This module provides a pluggable bootstrap system that separates bootstrap
18//! concerns from source streaming logic. Bootstrap providers can be reused
19//! across different source types while maintaining access to their parent
20//! source configuration.
21//!
22//! # Included Providers
23//!
24//! - [`ComponentGraphBootstrapProvider`]: Bootstraps from the [`ComponentGraph`] snapshot
25//!   for the built-in component graph source.
26
27pub mod component_graph;
28pub use component_graph::ComponentGraphBootstrapProvider;
29
30use anyhow::Result;
31use async_trait::async_trait;
32use serde::{Deserialize, Serialize};
33use std::collections::HashMap;
34use std::sync::atomic::{AtomicU64, Ordering};
35use std::sync::Arc;
36
37/// Request for bootstrap data from a query
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct BootstrapRequest {
40    pub query_id: String,
41    pub node_labels: Vec<String>,
42    pub relation_labels: Vec<String>,
43    pub request_id: String,
44}
45
46/// Context passed to bootstrap providers
47/// Bootstrap happens through dedicated channels created in source.subscribe().
48///
49/// # Plugin Architecture
50///
51/// BootstrapContext no longer contains source configuration. Bootstrap providers
52/// are created by source plugins using their own typed configurations. This context
53/// provides only the minimal information needed during bootstrap execution:
54/// - Source identification
55/// - Sequence numbering for events
56/// - Optional properties for providers that need runtime data
57#[derive(Clone)]
58pub struct BootstrapContext {
59    /// Unique server ID for logging and tracing
60    pub server_id: String,
61    /// Source ID for labeling bootstrap events
62    pub source_id: String,
63    /// Sequence counter for bootstrap events
64    pub sequence_counter: Arc<AtomicU64>,
65    /// Optional properties that can be set by plugins if needed
66    properties: Arc<HashMap<String, serde_json::Value>>,
67}
68
69impl BootstrapContext {
70    /// Create a minimal bootstrap context with just server and source IDs
71    ///
72    /// This is the preferred constructor. Bootstrap providers should have their
73    /// own configuration - they don't need access to source config.
74    pub fn new_minimal(server_id: String, source_id: String) -> Self {
75        Self {
76            server_id,
77            source_id,
78            sequence_counter: Arc::new(AtomicU64::new(0)),
79            properties: Arc::new(HashMap::new()),
80        }
81    }
82
83    /// Create a bootstrap context with properties
84    ///
85    /// Use this if your bootstrap provider needs access to some properties
86    /// at runtime. The properties should be extracted from your plugin's
87    /// typed configuration.
88    pub fn with_properties(
89        server_id: String,
90        source_id: String,
91        properties: HashMap<String, serde_json::Value>,
92    ) -> Self {
93        Self {
94            server_id,
95            source_id,
96            sequence_counter: Arc::new(AtomicU64::new(0)),
97            properties: Arc::new(properties),
98        }
99    }
100
101    /// Get the next sequence number for bootstrap events
102    pub fn next_sequence(&self) -> u64 {
103        self.sequence_counter.fetch_add(1, Ordering::SeqCst)
104    }
105
106    /// Get a property from the context
107    pub fn get_property(&self, key: &str) -> Option<serde_json::Value> {
108        self.properties.get(key).cloned()
109    }
110
111    /// Get a typed property from the context
112    pub fn get_typed_property<T>(&self, key: &str) -> Result<Option<T>>
113    where
114        T: for<'de> Deserialize<'de>,
115    {
116        match self.get_property(key) {
117            Some(value) => Ok(Some(serde_json::from_value(value.clone())?)),
118            None => Ok(None),
119        }
120    }
121}
122
123use crate::channels::BootstrapEventSender;
124
125/// Result of a bootstrap operation, carrying the event count plus handover
126/// metadata that lets the query transition cleanly from bootstrap to streaming.
127///
128/// See design doc 02 §5 — Bootstrap-to-Streaming Handover.
129///
130/// # Fields
131/// * `event_count` - Number of bootstrap events sent through the channel.
132/// * `last_sequence` - The snapshot's position in the source's sequence space
133///   (e.g., a Postgres WAL LSN), when known. `None` for providers that have no
134///   positional concept (e.g., script file, no-op).
135/// * `sequences_aligned` - Whether the bootstrap's sequence namespace matches
136///   the streaming source's sequence namespace. `true` only when the query
137///   can safely dedup buffered stream events against `last_sequence`
138///   (typically homogeneous source + bootstrapper, e.g., Postgres / Postgres).
139#[derive(Debug, Clone, Default)]
140pub struct BootstrapResult {
141    pub event_count: usize,
142    pub last_sequence: Option<u64>,
143    pub sequences_aligned: bool,
144}
145
146/// Trait for bootstrap providers that handle initial data delivery
147/// for newly subscribed queries.
148#[async_trait]
149pub trait BootstrapProvider: Send + Sync {
150    /// Perform bootstrap operation for the given request.
151    /// Sends bootstrap events to the provided channel.
152    /// Returns a [`BootstrapResult`] carrying the event count and handover metadata.
153    ///
154    /// # Arguments
155    /// * `request` - Bootstrap request with query ID and labels
156    /// * `context` - Bootstrap context with source information
157    /// * `event_tx` - Channel to send bootstrap events
158    /// * `settings` - Optional subscription settings with additional query context
159    async fn bootstrap(
160        &self,
161        request: BootstrapRequest,
162        context: &BootstrapContext,
163        event_tx: BootstrapEventSender,
164        settings: Option<&crate::config::SourceSubscriptionSettings>,
165    ) -> Result<BootstrapResult>;
166}
167
168/// Blanket implementation of BootstrapProvider for boxed trait objects.
169/// This allows `Box<dyn BootstrapProvider>` to be used where BootstrapProvider is expected.
170#[async_trait]
171impl BootstrapProvider for Box<dyn BootstrapProvider> {
172    async fn bootstrap(
173        &self,
174        request: BootstrapRequest,
175        context: &BootstrapContext,
176        event_tx: BootstrapEventSender,
177        settings: Option<&crate::config::SourceSubscriptionSettings>,
178    ) -> Result<BootstrapResult> {
179        (**self)
180            .bootstrap(request, context, event_tx, settings)
181            .await
182    }
183}
184
185// Typed configuration structs for each bootstrap provider type
186
187/// PostgreSQL bootstrap provider configuration
188///
189/// This provider bootstraps initial data from a PostgreSQL database using
190/// the COPY protocol for efficient data transfer. The provider extracts
191/// connection details from the parent source configuration, so no additional
192/// configuration is needed here.
193///
194/// # Example
195/// ```yaml
196/// bootstrap_provider:
197///   type: postgres
198/// ```
199#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
200pub struct PostgresBootstrapConfig {
201    // No additional config needed - uses parent source config
202    // Include this struct for consistency and future extensibility
203}
204
205/// Application bootstrap provider configuration
206///
207/// This provider bootstraps data from in-memory storage maintained by
208/// application sources. It replays stored insert events to provide initial
209/// data. No additional configuration is required as it uses shared state
210/// from the application source.
211///
212/// # Example
213/// ```yaml
214/// bootstrap_provider:
215///   type: application
216/// ```
217#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
218pub struct ApplicationBootstrapConfig {
219    // No config needed - uses shared state
220    // Include for consistency and future extensibility
221}
222
223/// Script file bootstrap provider configuration
224///
225/// This provider reads bootstrap data from JSONL (JSON Lines) files containing
226/// structured data. Files are processed in the order specified. Each file should
227/// contain one JSON record per line with record types: Header (required first),
228/// Node, Relation, Comment (filtered), Label (checkpoint), and Finish (optional end).
229///
230/// # Example
231/// ```yaml
232/// bootstrap_provider:
233///   type: scriptfile
234///   file_paths:
235///     - "/data/initial_nodes.jsonl"
236///     - "/data/initial_relations.jsonl"
237/// ```
238#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
239pub struct ScriptFileBootstrapConfig {
240    /// List of JSONL files to read (in order)
241    pub file_paths: Vec<String>,
242}
243
244/// Platform bootstrap provider configuration
245///
246/// This provider bootstraps data from a Query API service running in a remote
247/// Drasi environment via HTTP streaming. It's designed for cross-Drasi integration
248/// scenarios where one Drasi instance needs initial data from another.
249///
250/// # Example
251/// ```yaml
252/// bootstrap_provider:
253///   type: platform
254///   query_api_url: "http://remote-drasi:8080" # DevSkim: ignore DS137138
255///   timeout_seconds: 600
256/// ```
257#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
258pub struct PlatformBootstrapConfig {
259    /// URL of the Query API service (e.g., "http://my-source-query-api:8080") // DevSkim: ignore DS137138
260    /// If not specified, falls back to `query_api_url` property from source config
261    #[serde(skip_serializing_if = "Option::is_none")]
262    pub query_api_url: Option<String>,
263
264    /// Timeout for HTTP requests in seconds (default: 300)
265    #[serde(default = "default_platform_timeout")]
266    pub timeout_seconds: u64,
267}
268
269fn default_platform_timeout() -> u64 {
270    300
271}
272
273impl Default for PlatformBootstrapConfig {
274    fn default() -> Self {
275        Self {
276            query_api_url: None,
277            timeout_seconds: default_platform_timeout(),
278        }
279    }
280}
281
282/// Configuration for different types of bootstrap providers
283#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
284#[serde(tag = "type", rename_all = "lowercase")]
285pub enum BootstrapProviderConfig {
286    /// PostgreSQL bootstrap provider
287    Postgres(PostgresBootstrapConfig),
288    /// Application-based bootstrap provider
289    Application(ApplicationBootstrapConfig),
290    /// Script file bootstrap provider
291    ScriptFile(ScriptFileBootstrapConfig),
292    /// Platform bootstrap provider for remote Drasi sources
293    /// Bootstraps data from a Query API service running in a remote Drasi environment
294    Platform(PlatformBootstrapConfig),
295    /// No-op bootstrap provider (returns no data)
296    Noop,
297}
298
299/// Factory for creating bootstrap providers from configuration
300///
301/// This factory is designed to work with plugin-based providers loaded from
302/// separate crates. The default behavior returns errors for non-noop providers
303/// to encourage use of the dedicated plugin crates.
304pub struct BootstrapProviderFactory;
305
306impl BootstrapProviderFactory {
307    /// Create a bootstrap provider from configuration
308    ///
309    /// Currently only supports the noop provider. Other providers are implemented
310    /// in dedicated plugin crates and should be instantiated using the builder pattern:
311    ///
312    /// - PostgreSQL: Use `drasi_bootstrap_postgres::PostgresBootstrapProvider::builder().with_host(...).build()`
313    /// - Platform: Use `drasi_bootstrap_platform::PlatformBootstrapProvider::builder().with_query_api_url(...).build()`
314    /// - ScriptFile: Use `drasi_bootstrap_scriptfile::ScriptFileBootstrapProvider::builder().with_file(...).build()`
315    /// - Application: Use `drasi_bootstrap_application::ApplicationBootstrapProvider::builder().build()`
316    pub fn create_provider(config: &BootstrapProviderConfig) -> Result<Box<dyn BootstrapProvider>> {
317        match config {
318            BootstrapProviderConfig::Postgres(_) => {
319                Err(anyhow::anyhow!(
320                    "PostgreSQL bootstrap provider is available in the drasi-bootstrap-postgres crate. \
321                     Use PostgresBootstrapProvider::builder().with_host(...).build() to create it."
322                ))
323            }
324            BootstrapProviderConfig::Application(_) => {
325                Err(anyhow::anyhow!(
326                    "Application bootstrap provider is available in the drasi-bootstrap-application crate. \
327                     Use ApplicationBootstrapProvider::builder().build() to create it."
328                ))
329            }
330            BootstrapProviderConfig::ScriptFile(config) => {
331                Err(anyhow::anyhow!(
332                    "ScriptFile bootstrap provider is available in the drasi-bootstrap-scriptfile crate. \
333                     Use ScriptFileBootstrapProvider::builder().with_file(...).build() to create it. \
334                     File paths: {:?}",
335                    config.file_paths
336                ))
337            }
338            BootstrapProviderConfig::Platform(config) => {
339                Err(anyhow::anyhow!(
340                    "Platform bootstrap provider is available in the drasi-bootstrap-platform crate. \
341                     Use PlatformBootstrapProvider::builder().with_query_api_url(...).build() to create it. \
342                     Config: {config:?}"
343                ))
344            }
345            BootstrapProviderConfig::Noop => {
346                Err(anyhow::anyhow!(
347                    "No-op bootstrap provider is available in the drasi-bootstrap-noop crate. \
348                     Use NoOpBootstrapProvider::builder().build() or NoOpBootstrapProvider::new() to create it."
349                ))
350            }
351        }
352    }
353}
354
355#[cfg(test)]
356mod tests {
357    use super::*;
358
359    #[test]
360    fn test_platform_bootstrap_config_defaults() {
361        let config = PlatformBootstrapConfig {
362            query_api_url: Some("http://test:8080".to_string()), // DevSkim: ignore DS137138
363            ..Default::default()
364        };
365        assert_eq!(config.timeout_seconds, 300);
366        assert_eq!(config.query_api_url, Some("http://test:8080".to_string())); // DevSkim: ignore DS137138
367    }
368
369    #[test]
370    fn test_postgres_bootstrap_config_defaults() {
371        let config = PostgresBootstrapConfig::default();
372        // Should be empty struct for now
373        assert_eq!(config, PostgresBootstrapConfig {});
374    }
375
376    #[test]
377    fn test_application_bootstrap_config_defaults() {
378        let config = ApplicationBootstrapConfig::default();
379        // Should be empty struct for now
380        assert_eq!(config, ApplicationBootstrapConfig {});
381    }
382
383    #[test]
384    fn test_platform_bootstrap_config_serialization() {
385        let config = BootstrapProviderConfig::Platform(PlatformBootstrapConfig {
386            query_api_url: Some("http://test:8080".to_string()), // DevSkim: ignore DS137138
387            timeout_seconds: 600,
388        });
389
390        let json = serde_json::to_string(&config).unwrap();
391        assert!(json.contains("\"type\":\"platform\""));
392        assert!(json.contains("\"query_api_url\":\"http://test:8080\"")); // DevSkim: ignore DS137138
393        assert!(json.contains("\"timeout_seconds\":600"));
394
395        let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
396        match deserialized {
397            BootstrapProviderConfig::Platform(cfg) => {
398                assert_eq!(cfg.query_api_url, Some("http://test:8080".to_string())); // DevSkim: ignore DS137138
399                assert_eq!(cfg.timeout_seconds, 600);
400            }
401            _ => panic!("Expected Platform variant"),
402        }
403    }
404
405    #[test]
406    fn test_scriptfile_bootstrap_config() {
407        let config = BootstrapProviderConfig::ScriptFile(ScriptFileBootstrapConfig {
408            file_paths: vec![
409                "/path/to/file1.jsonl".to_string(),
410                "/path/to/file2.jsonl".to_string(),
411            ],
412        });
413
414        let json = serde_json::to_string(&config).unwrap();
415        assert!(json.contains("\"type\":\"scriptfile\""));
416        assert!(json.contains("\"file_paths\""));
417
418        let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
419        match deserialized {
420            BootstrapProviderConfig::ScriptFile(cfg) => {
421                assert_eq!(cfg.file_paths.len(), 2);
422                assert_eq!(cfg.file_paths[0], "/path/to/file1.jsonl");
423                assert_eq!(cfg.file_paths[1], "/path/to/file2.jsonl");
424            }
425            _ => panic!("Expected ScriptFile variant"),
426        }
427    }
428
429    #[test]
430    fn test_noop_bootstrap_config() {
431        let config = BootstrapProviderConfig::Noop;
432
433        let json = serde_json::to_string(&config).unwrap();
434        assert!(json.contains("\"type\":\"noop\""));
435
436        let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
437        assert!(matches!(deserialized, BootstrapProviderConfig::Noop));
438    }
439
440    #[test]
441    fn test_postgres_bootstrap_config_serialization() {
442        let config = BootstrapProviderConfig::Postgres(PostgresBootstrapConfig::default());
443
444        let json = serde_json::to_string(&config).unwrap();
445        assert!(json.contains("\"type\":\"postgres\""));
446
447        let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
448        assert!(matches!(deserialized, BootstrapProviderConfig::Postgres(_)));
449    }
450
451    #[test]
452    fn test_application_bootstrap_config_serialization() {
453        let config = BootstrapProviderConfig::Application(ApplicationBootstrapConfig::default());
454
455        let json = serde_json::to_string(&config).unwrap();
456        assert!(json.contains("\"type\":\"application\""));
457
458        let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
459        assert!(matches!(
460            deserialized,
461            BootstrapProviderConfig::Application(_)
462        ));
463    }
464
465    #[test]
466    fn test_yaml_deserialization_platform() {
467        let yaml = r#"
468type: platform
469query_api_url: "http://remote:8080" # DevSkim: ignore DS137138
470timeout_seconds: 300
471"#;
472
473        let config: BootstrapProviderConfig = serde_yaml::from_str(yaml).unwrap();
474        match config {
475            BootstrapProviderConfig::Platform(cfg) => {
476                assert_eq!(cfg.query_api_url, Some("http://remote:8080".to_string())); // DevSkim: ignore DS137138
477                assert_eq!(cfg.timeout_seconds, 300);
478            }
479            _ => panic!("Expected Platform variant"),
480        }
481    }
482
483    #[test]
484    fn test_yaml_deserialization_scriptfile() {
485        let yaml = r#"
486type: scriptfile
487file_paths:
488  - "/data/file1.jsonl"
489  - "/data/file2.jsonl"
490"#;
491
492        let config: BootstrapProviderConfig = serde_yaml::from_str(yaml).unwrap();
493        match config {
494            BootstrapProviderConfig::ScriptFile(cfg) => {
495                assert_eq!(cfg.file_paths.len(), 2);
496                assert_eq!(cfg.file_paths[0], "/data/file1.jsonl");
497            }
498            _ => panic!("Expected ScriptFile variant"),
499        }
500    }
501
502    #[test]
503    fn test_platform_config_with_defaults() {
504        let yaml = r#"
505type: platform
506query_api_url: "http://test:8080" # DevSkim: ignore DS137138
507"#;
508
509        let config: BootstrapProviderConfig = serde_yaml::from_str(yaml).unwrap();
510        match config {
511            BootstrapProviderConfig::Platform(cfg) => {
512                assert_eq!(cfg.timeout_seconds, 300); // Should use default
513            }
514            _ => panic!("Expected Platform variant"),
515        }
516    }
517
518    #[test]
519    fn test_bootstrap_config_equality() {
520        let config1 = BootstrapProviderConfig::Platform(PlatformBootstrapConfig {
521            query_api_url: Some("http://test:8080".to_string()), // DevSkim: ignore DS137138
522            timeout_seconds: 300,
523        });
524
525        let config2 = BootstrapProviderConfig::Platform(PlatformBootstrapConfig {
526            query_api_url: Some("http://test:8080".to_string()), // DevSkim: ignore DS137138
527            timeout_seconds: 300,
528        });
529
530        assert_eq!(config1, config2);
531    }
532
533    #[test]
534    fn test_backward_compatibility_yaml() {
535        // This YAML format should still work
536        let yaml = r#"
537type: postgres
538"#;
539
540        let config: BootstrapProviderConfig = serde_yaml::from_str(yaml).unwrap();
541        assert!(matches!(config, BootstrapProviderConfig::Postgres(_)));
542    }
543}