1pub mod component_graph;
28pub use component_graph::ComponentGraphBootstrapProvider;
29
30use anyhow::Result;
31use async_trait::async_trait;
32use serde::{Deserialize, Serialize};
33use std::collections::HashMap;
34use std::sync::atomic::{AtomicU64, Ordering};
35use std::sync::Arc;
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct BootstrapRequest {
40 pub query_id: String,
41 pub node_labels: Vec<String>,
42 pub relation_labels: Vec<String>,
43 pub request_id: String,
44}
45
46#[derive(Clone)]
58pub struct BootstrapContext {
59 pub server_id: String,
61 pub source_id: String,
63 pub sequence_counter: Arc<AtomicU64>,
65 properties: Arc<HashMap<String, serde_json::Value>>,
67}
68
69impl BootstrapContext {
70 pub fn new_minimal(server_id: String, source_id: String) -> Self {
75 Self {
76 server_id,
77 source_id,
78 sequence_counter: Arc::new(AtomicU64::new(0)),
79 properties: Arc::new(HashMap::new()),
80 }
81 }
82
83 pub fn with_properties(
89 server_id: String,
90 source_id: String,
91 properties: HashMap<String, serde_json::Value>,
92 ) -> Self {
93 Self {
94 server_id,
95 source_id,
96 sequence_counter: Arc::new(AtomicU64::new(0)),
97 properties: Arc::new(properties),
98 }
99 }
100
101 pub fn next_sequence(&self) -> u64 {
103 self.sequence_counter.fetch_add(1, Ordering::SeqCst)
104 }
105
106 pub fn get_property(&self, key: &str) -> Option<serde_json::Value> {
108 self.properties.get(key).cloned()
109 }
110
111 pub fn get_typed_property<T>(&self, key: &str) -> Result<Option<T>>
113 where
114 T: for<'de> Deserialize<'de>,
115 {
116 match self.get_property(key) {
117 Some(value) => Ok(Some(serde_json::from_value(value.clone())?)),
118 None => Ok(None),
119 }
120 }
121}
122
123use crate::channels::BootstrapEventSender;
124
125#[async_trait]
128pub trait BootstrapProvider: Send + Sync {
129 async fn bootstrap(
139 &self,
140 request: BootstrapRequest,
141 context: &BootstrapContext,
142 event_tx: BootstrapEventSender,
143 settings: Option<&crate::config::SourceSubscriptionSettings>,
144 ) -> Result<usize>;
145}
146
147#[async_trait]
150impl BootstrapProvider for Box<dyn BootstrapProvider> {
151 async fn bootstrap(
152 &self,
153 request: BootstrapRequest,
154 context: &BootstrapContext,
155 event_tx: BootstrapEventSender,
156 settings: Option<&crate::config::SourceSubscriptionSettings>,
157 ) -> Result<usize> {
158 (**self)
159 .bootstrap(request, context, event_tx, settings)
160 .await
161 }
162}
163
164#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
179pub struct PostgresBootstrapConfig {
180 }
183
184#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
197pub struct ApplicationBootstrapConfig {
198 }
201
202#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
218pub struct ScriptFileBootstrapConfig {
219 pub file_paths: Vec<String>,
221}
222
223#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
237pub struct PlatformBootstrapConfig {
238 #[serde(skip_serializing_if = "Option::is_none")]
241 pub query_api_url: Option<String>,
242
243 #[serde(default = "default_platform_timeout")]
245 pub timeout_seconds: u64,
246}
247
248fn default_platform_timeout() -> u64 {
249 300
250}
251
252impl Default for PlatformBootstrapConfig {
253 fn default() -> Self {
254 Self {
255 query_api_url: None,
256 timeout_seconds: default_platform_timeout(),
257 }
258 }
259}
260
261#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
263#[serde(tag = "type", rename_all = "lowercase")]
264pub enum BootstrapProviderConfig {
265 Postgres(PostgresBootstrapConfig),
267 Application(ApplicationBootstrapConfig),
269 ScriptFile(ScriptFileBootstrapConfig),
271 Platform(PlatformBootstrapConfig),
274 Noop,
276}
277
278pub struct BootstrapProviderFactory;
284
285impl BootstrapProviderFactory {
286 pub fn create_provider(config: &BootstrapProviderConfig) -> Result<Box<dyn BootstrapProvider>> {
296 match config {
297 BootstrapProviderConfig::Postgres(_) => {
298 Err(anyhow::anyhow!(
299 "PostgreSQL bootstrap provider is available in the drasi-bootstrap-postgres crate. \
300 Use PostgresBootstrapProvider::builder().with_host(...).build() to create it."
301 ))
302 }
303 BootstrapProviderConfig::Application(_) => {
304 Err(anyhow::anyhow!(
305 "Application bootstrap provider is available in the drasi-bootstrap-application crate. \
306 Use ApplicationBootstrapProvider::builder().build() to create it."
307 ))
308 }
309 BootstrapProviderConfig::ScriptFile(config) => {
310 Err(anyhow::anyhow!(
311 "ScriptFile bootstrap provider is available in the drasi-bootstrap-scriptfile crate. \
312 Use ScriptFileBootstrapProvider::builder().with_file(...).build() to create it. \
313 File paths: {:?}",
314 config.file_paths
315 ))
316 }
317 BootstrapProviderConfig::Platform(config) => {
318 Err(anyhow::anyhow!(
319 "Platform bootstrap provider is available in the drasi-bootstrap-platform crate. \
320 Use PlatformBootstrapProvider::builder().with_query_api_url(...).build() to create it. \
321 Config: {config:?}"
322 ))
323 }
324 BootstrapProviderConfig::Noop => {
325 Err(anyhow::anyhow!(
326 "No-op bootstrap provider is available in the drasi-bootstrap-noop crate. \
327 Use NoOpBootstrapProvider::builder().build() or NoOpBootstrapProvider::new() to create it."
328 ))
329 }
330 }
331 }
332}
333
334#[cfg(test)]
335mod tests {
336 use super::*;
337
338 #[test]
339 fn test_platform_bootstrap_config_defaults() {
340 let config = PlatformBootstrapConfig {
341 query_api_url: Some("http://test:8080".to_string()), ..Default::default()
343 };
344 assert_eq!(config.timeout_seconds, 300);
345 assert_eq!(config.query_api_url, Some("http://test:8080".to_string())); }
347
348 #[test]
349 fn test_postgres_bootstrap_config_defaults() {
350 let config = PostgresBootstrapConfig::default();
351 assert_eq!(config, PostgresBootstrapConfig {});
353 }
354
355 #[test]
356 fn test_application_bootstrap_config_defaults() {
357 let config = ApplicationBootstrapConfig::default();
358 assert_eq!(config, ApplicationBootstrapConfig {});
360 }
361
362 #[test]
363 fn test_platform_bootstrap_config_serialization() {
364 let config = BootstrapProviderConfig::Platform(PlatformBootstrapConfig {
365 query_api_url: Some("http://test:8080".to_string()), timeout_seconds: 600,
367 });
368
369 let json = serde_json::to_string(&config).unwrap();
370 assert!(json.contains("\"type\":\"platform\""));
371 assert!(json.contains("\"query_api_url\":\"http://test:8080\"")); assert!(json.contains("\"timeout_seconds\":600"));
373
374 let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
375 match deserialized {
376 BootstrapProviderConfig::Platform(cfg) => {
377 assert_eq!(cfg.query_api_url, Some("http://test:8080".to_string())); assert_eq!(cfg.timeout_seconds, 600);
379 }
380 _ => panic!("Expected Platform variant"),
381 }
382 }
383
384 #[test]
385 fn test_scriptfile_bootstrap_config() {
386 let config = BootstrapProviderConfig::ScriptFile(ScriptFileBootstrapConfig {
387 file_paths: vec![
388 "/path/to/file1.jsonl".to_string(),
389 "/path/to/file2.jsonl".to_string(),
390 ],
391 });
392
393 let json = serde_json::to_string(&config).unwrap();
394 assert!(json.contains("\"type\":\"scriptfile\""));
395 assert!(json.contains("\"file_paths\""));
396
397 let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
398 match deserialized {
399 BootstrapProviderConfig::ScriptFile(cfg) => {
400 assert_eq!(cfg.file_paths.len(), 2);
401 assert_eq!(cfg.file_paths[0], "/path/to/file1.jsonl");
402 assert_eq!(cfg.file_paths[1], "/path/to/file2.jsonl");
403 }
404 _ => panic!("Expected ScriptFile variant"),
405 }
406 }
407
408 #[test]
409 fn test_noop_bootstrap_config() {
410 let config = BootstrapProviderConfig::Noop;
411
412 let json = serde_json::to_string(&config).unwrap();
413 assert!(json.contains("\"type\":\"noop\""));
414
415 let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
416 assert!(matches!(deserialized, BootstrapProviderConfig::Noop));
417 }
418
419 #[test]
420 fn test_postgres_bootstrap_config_serialization() {
421 let config = BootstrapProviderConfig::Postgres(PostgresBootstrapConfig::default());
422
423 let json = serde_json::to_string(&config).unwrap();
424 assert!(json.contains("\"type\":\"postgres\""));
425
426 let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
427 assert!(matches!(deserialized, BootstrapProviderConfig::Postgres(_)));
428 }
429
430 #[test]
431 fn test_application_bootstrap_config_serialization() {
432 let config = BootstrapProviderConfig::Application(ApplicationBootstrapConfig::default());
433
434 let json = serde_json::to_string(&config).unwrap();
435 assert!(json.contains("\"type\":\"application\""));
436
437 let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
438 assert!(matches!(
439 deserialized,
440 BootstrapProviderConfig::Application(_)
441 ));
442 }
443
444 #[test]
445 fn test_yaml_deserialization_platform() {
446 let yaml = r#"
447type: platform
448query_api_url: "http://remote:8080" # DevSkim: ignore DS137138
449timeout_seconds: 300
450"#;
451
452 let config: BootstrapProviderConfig = serde_yaml::from_str(yaml).unwrap();
453 match config {
454 BootstrapProviderConfig::Platform(cfg) => {
455 assert_eq!(cfg.query_api_url, Some("http://remote:8080".to_string())); assert_eq!(cfg.timeout_seconds, 300);
457 }
458 _ => panic!("Expected Platform variant"),
459 }
460 }
461
462 #[test]
463 fn test_yaml_deserialization_scriptfile() {
464 let yaml = r#"
465type: scriptfile
466file_paths:
467 - "/data/file1.jsonl"
468 - "/data/file2.jsonl"
469"#;
470
471 let config: BootstrapProviderConfig = serde_yaml::from_str(yaml).unwrap();
472 match config {
473 BootstrapProviderConfig::ScriptFile(cfg) => {
474 assert_eq!(cfg.file_paths.len(), 2);
475 assert_eq!(cfg.file_paths[0], "/data/file1.jsonl");
476 }
477 _ => panic!("Expected ScriptFile variant"),
478 }
479 }
480
481 #[test]
482 fn test_platform_config_with_defaults() {
483 let yaml = r#"
484type: platform
485query_api_url: "http://test:8080" # DevSkim: ignore DS137138
486"#;
487
488 let config: BootstrapProviderConfig = serde_yaml::from_str(yaml).unwrap();
489 match config {
490 BootstrapProviderConfig::Platform(cfg) => {
491 assert_eq!(cfg.timeout_seconds, 300); }
493 _ => panic!("Expected Platform variant"),
494 }
495 }
496
497 #[test]
498 fn test_bootstrap_config_equality() {
499 let config1 = BootstrapProviderConfig::Platform(PlatformBootstrapConfig {
500 query_api_url: Some("http://test:8080".to_string()), timeout_seconds: 300,
502 });
503
504 let config2 = BootstrapProviderConfig::Platform(PlatformBootstrapConfig {
505 query_api_url: Some("http://test:8080".to_string()), timeout_seconds: 300,
507 });
508
509 assert_eq!(config1, config2);
510 }
511
512 #[test]
513 fn test_backward_compatibility_yaml() {
514 let yaml = r#"
516type: postgres
517"#;
518
519 let config: BootstrapProviderConfig = serde_yaml::from_str(yaml).unwrap();
520 assert!(matches!(config, BootstrapProviderConfig::Postgres(_)));
521 }
522}