pub mod component_graph;
pub use component_graph::ComponentGraphBootstrapProvider;
use anyhow::Result;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BootstrapRequest {
pub query_id: String,
pub node_labels: Vec<String>,
pub relation_labels: Vec<String>,
pub request_id: String,
}
#[derive(Clone)]
pub struct BootstrapContext {
pub server_id: String,
pub source_id: String,
pub sequence_counter: Arc<AtomicU64>,
properties: Arc<HashMap<String, serde_json::Value>>,
}
impl BootstrapContext {
pub fn new_minimal(server_id: String, source_id: String) -> Self {
Self {
server_id,
source_id,
sequence_counter: Arc::new(AtomicU64::new(0)),
properties: Arc::new(HashMap::new()),
}
}
pub fn with_properties(
server_id: String,
source_id: String,
properties: HashMap<String, serde_json::Value>,
) -> Self {
Self {
server_id,
source_id,
sequence_counter: Arc::new(AtomicU64::new(0)),
properties: Arc::new(properties),
}
}
pub fn next_sequence(&self) -> u64 {
self.sequence_counter.fetch_add(1, Ordering::SeqCst)
}
pub fn get_property(&self, key: &str) -> Option<serde_json::Value> {
self.properties.get(key).cloned()
}
pub fn get_typed_property<T>(&self, key: &str) -> Result<Option<T>>
where
T: for<'de> Deserialize<'de>,
{
match self.get_property(key) {
Some(value) => Ok(Some(serde_json::from_value(value.clone())?)),
None => Ok(None),
}
}
}
use crate::channels::BootstrapEventSender;
#[derive(Debug, Clone, Default)]
pub struct BootstrapResult {
pub event_count: usize,
pub last_sequence: Option<u64>,
pub sequences_aligned: bool,
}
#[async_trait]
pub trait BootstrapProvider: Send + Sync {
async fn bootstrap(
&self,
request: BootstrapRequest,
context: &BootstrapContext,
event_tx: BootstrapEventSender,
settings: Option<&crate::config::SourceSubscriptionSettings>,
) -> Result<BootstrapResult>;
}
#[async_trait]
impl BootstrapProvider for Box<dyn BootstrapProvider> {
async fn bootstrap(
&self,
request: BootstrapRequest,
context: &BootstrapContext,
event_tx: BootstrapEventSender,
settings: Option<&crate::config::SourceSubscriptionSettings>,
) -> Result<BootstrapResult> {
(**self)
.bootstrap(request, context, event_tx, settings)
.await
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
pub struct PostgresBootstrapConfig {
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
pub struct ApplicationBootstrapConfig {
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ScriptFileBootstrapConfig {
pub file_paths: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PlatformBootstrapConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub query_api_url: Option<String>,
#[serde(default = "default_platform_timeout")]
pub timeout_seconds: u64,
}
fn default_platform_timeout() -> u64 {
300
}
impl Default for PlatformBootstrapConfig {
fn default() -> Self {
Self {
query_api_url: None,
timeout_seconds: default_platform_timeout(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum BootstrapProviderConfig {
Postgres(PostgresBootstrapConfig),
Application(ApplicationBootstrapConfig),
ScriptFile(ScriptFileBootstrapConfig),
Platform(PlatformBootstrapConfig),
Noop,
}
pub struct BootstrapProviderFactory;
impl BootstrapProviderFactory {
pub fn create_provider(config: &BootstrapProviderConfig) -> Result<Box<dyn BootstrapProvider>> {
match config {
BootstrapProviderConfig::Postgres(_) => {
Err(anyhow::anyhow!(
"PostgreSQL bootstrap provider is available in the drasi-bootstrap-postgres crate. \
Use PostgresBootstrapProvider::builder().with_host(...).build() to create it."
))
}
BootstrapProviderConfig::Application(_) => {
Err(anyhow::anyhow!(
"Application bootstrap provider is available in the drasi-bootstrap-application crate. \
Use ApplicationBootstrapProvider::builder().build() to create it."
))
}
BootstrapProviderConfig::ScriptFile(config) => {
Err(anyhow::anyhow!(
"ScriptFile bootstrap provider is available in the drasi-bootstrap-scriptfile crate. \
Use ScriptFileBootstrapProvider::builder().with_file(...).build() to create it. \
File paths: {:?}",
config.file_paths
))
}
BootstrapProviderConfig::Platform(config) => {
Err(anyhow::anyhow!(
"Platform bootstrap provider is available in the drasi-bootstrap-platform crate. \
Use PlatformBootstrapProvider::builder().with_query_api_url(...).build() to create it. \
Config: {config:?}"
))
}
BootstrapProviderConfig::Noop => {
Err(anyhow::anyhow!(
"No-op bootstrap provider is available in the drasi-bootstrap-noop crate. \
Use NoOpBootstrapProvider::builder().build() or NoOpBootstrapProvider::new() to create it."
))
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_platform_bootstrap_config_defaults() {
let config = PlatformBootstrapConfig {
query_api_url: Some("http://test:8080".to_string()), ..Default::default()
};
assert_eq!(config.timeout_seconds, 300);
assert_eq!(config.query_api_url, Some("http://test:8080".to_string())); }
#[test]
fn test_postgres_bootstrap_config_defaults() {
let config = PostgresBootstrapConfig::default();
assert_eq!(config, PostgresBootstrapConfig {});
}
#[test]
fn test_application_bootstrap_config_defaults() {
let config = ApplicationBootstrapConfig::default();
assert_eq!(config, ApplicationBootstrapConfig {});
}
#[test]
fn test_platform_bootstrap_config_serialization() {
let config = BootstrapProviderConfig::Platform(PlatformBootstrapConfig {
query_api_url: Some("http://test:8080".to_string()), timeout_seconds: 600,
});
let json = serde_json::to_string(&config).unwrap();
assert!(json.contains("\"type\":\"platform\""));
assert!(json.contains("\"query_api_url\":\"http://test:8080\"")); assert!(json.contains("\"timeout_seconds\":600"));
let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
match deserialized {
BootstrapProviderConfig::Platform(cfg) => {
assert_eq!(cfg.query_api_url, Some("http://test:8080".to_string())); assert_eq!(cfg.timeout_seconds, 600);
}
_ => panic!("Expected Platform variant"),
}
}
#[test]
fn test_scriptfile_bootstrap_config() {
let config = BootstrapProviderConfig::ScriptFile(ScriptFileBootstrapConfig {
file_paths: vec![
"/path/to/file1.jsonl".to_string(),
"/path/to/file2.jsonl".to_string(),
],
});
let json = serde_json::to_string(&config).unwrap();
assert!(json.contains("\"type\":\"scriptfile\""));
assert!(json.contains("\"file_paths\""));
let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
match deserialized {
BootstrapProviderConfig::ScriptFile(cfg) => {
assert_eq!(cfg.file_paths.len(), 2);
assert_eq!(cfg.file_paths[0], "/path/to/file1.jsonl");
assert_eq!(cfg.file_paths[1], "/path/to/file2.jsonl");
}
_ => panic!("Expected ScriptFile variant"),
}
}
#[test]
fn test_noop_bootstrap_config() {
let config = BootstrapProviderConfig::Noop;
let json = serde_json::to_string(&config).unwrap();
assert!(json.contains("\"type\":\"noop\""));
let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
assert!(matches!(deserialized, BootstrapProviderConfig::Noop));
}
#[test]
fn test_postgres_bootstrap_config_serialization() {
let config = BootstrapProviderConfig::Postgres(PostgresBootstrapConfig::default());
let json = serde_json::to_string(&config).unwrap();
assert!(json.contains("\"type\":\"postgres\""));
let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
assert!(matches!(deserialized, BootstrapProviderConfig::Postgres(_)));
}
#[test]
fn test_application_bootstrap_config_serialization() {
let config = BootstrapProviderConfig::Application(ApplicationBootstrapConfig::default());
let json = serde_json::to_string(&config).unwrap();
assert!(json.contains("\"type\":\"application\""));
let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
assert!(matches!(
deserialized,
BootstrapProviderConfig::Application(_)
));
}
#[test]
fn test_yaml_deserialization_platform() {
let yaml = r#"
type: platform
query_api_url: "http://remote:8080" # DevSkim: ignore DS137138
timeout_seconds: 300
"#;
let config: BootstrapProviderConfig = serde_yaml::from_str(yaml).unwrap();
match config {
BootstrapProviderConfig::Platform(cfg) => {
assert_eq!(cfg.query_api_url, Some("http://remote:8080".to_string())); assert_eq!(cfg.timeout_seconds, 300);
}
_ => panic!("Expected Platform variant"),
}
}
#[test]
fn test_yaml_deserialization_scriptfile() {
let yaml = r#"
type: scriptfile
file_paths:
- "/data/file1.jsonl"
- "/data/file2.jsonl"
"#;
let config: BootstrapProviderConfig = serde_yaml::from_str(yaml).unwrap();
match config {
BootstrapProviderConfig::ScriptFile(cfg) => {
assert_eq!(cfg.file_paths.len(), 2);
assert_eq!(cfg.file_paths[0], "/data/file1.jsonl");
}
_ => panic!("Expected ScriptFile variant"),
}
}
#[test]
fn test_platform_config_with_defaults() {
let yaml = r#"
type: platform
query_api_url: "http://test:8080" # DevSkim: ignore DS137138
"#;
let config: BootstrapProviderConfig = serde_yaml::from_str(yaml).unwrap();
match config {
BootstrapProviderConfig::Platform(cfg) => {
assert_eq!(cfg.timeout_seconds, 300); }
_ => panic!("Expected Platform variant"),
}
}
#[test]
fn test_bootstrap_config_equality() {
let config1 = BootstrapProviderConfig::Platform(PlatformBootstrapConfig {
query_api_url: Some("http://test:8080".to_string()), timeout_seconds: 300,
});
let config2 = BootstrapProviderConfig::Platform(PlatformBootstrapConfig {
query_api_url: Some("http://test:8080".to_string()), timeout_seconds: 300,
});
assert_eq!(config1, config2);
}
#[test]
fn test_backward_compatibility_yaml() {
let yaml = r#"
type: postgres
"#;
let config: BootstrapProviderConfig = serde_yaml::from_str(yaml).unwrap();
assert!(matches!(config, BootstrapProviderConfig::Postgres(_)));
}
}