1pub mod component_graph;
28pub use component_graph::ComponentGraphBootstrapProvider;
29
30use anyhow::Result;
31use async_trait::async_trait;
32use bytes::Bytes;
33use serde::{Deserialize, Serialize};
34use std::collections::HashMap;
35use std::sync::atomic::{AtomicU64, Ordering};
36use std::sync::Arc;
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct BootstrapRequest {
41 pub query_id: String,
42 pub node_labels: Vec<String>,
43 pub relation_labels: Vec<String>,
44 pub request_id: String,
45}
46
47#[derive(Clone)]
59pub struct BootstrapContext {
60 pub server_id: String,
62 pub source_id: String,
64 pub sequence_counter: Arc<AtomicU64>,
66 properties: Arc<HashMap<String, serde_json::Value>>,
68}
69
70impl BootstrapContext {
71 pub fn new_minimal(server_id: String, source_id: String) -> Self {
76 Self {
77 server_id,
78 source_id,
79 sequence_counter: Arc::new(AtomicU64::new(0)),
80 properties: Arc::new(HashMap::new()),
81 }
82 }
83
84 pub fn with_properties(
90 server_id: String,
91 source_id: String,
92 properties: HashMap<String, serde_json::Value>,
93 ) -> Self {
94 Self {
95 server_id,
96 source_id,
97 sequence_counter: Arc::new(AtomicU64::new(0)),
98 properties: Arc::new(properties),
99 }
100 }
101
102 pub fn next_sequence(&self) -> u64 {
104 self.sequence_counter.fetch_add(1, Ordering::SeqCst)
105 }
106
107 pub fn get_property(&self, key: &str) -> Option<serde_json::Value> {
109 self.properties.get(key).cloned()
110 }
111
112 pub fn get_typed_property<T>(&self, key: &str) -> Result<Option<T>>
114 where
115 T: for<'de> Deserialize<'de>,
116 {
117 match self.get_property(key) {
118 Some(value) => Ok(Some(serde_json::from_value(value.clone())?)),
119 None => Ok(None),
120 }
121 }
122}
123
124use crate::channels::BootstrapEventSender;
125
126#[derive(Debug, Clone, Default)]
141pub struct BootstrapResult {
142 pub event_count: usize,
143 pub last_sequence: Option<u64>,
144 pub sequences_aligned: bool,
145 pub source_position: Option<Bytes>,
152}
153
154#[async_trait]
157pub trait BootstrapProvider: Send + Sync {
158 async fn bootstrap(
168 &self,
169 request: BootstrapRequest,
170 context: &BootstrapContext,
171 event_tx: BootstrapEventSender,
172 settings: Option<&crate::config::SourceSubscriptionSettings>,
173 ) -> Result<BootstrapResult>;
174}
175
176#[async_trait]
179impl BootstrapProvider for Box<dyn BootstrapProvider> {
180 async fn bootstrap(
181 &self,
182 request: BootstrapRequest,
183 context: &BootstrapContext,
184 event_tx: BootstrapEventSender,
185 settings: Option<&crate::config::SourceSubscriptionSettings>,
186 ) -> Result<BootstrapResult> {
187 (**self)
188 .bootstrap(request, context, event_tx, settings)
189 .await
190 }
191}
192
193#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
208pub struct PostgresBootstrapConfig {
209 }
212
213#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
226pub struct ApplicationBootstrapConfig {
227 }
230
231#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
247pub struct ScriptFileBootstrapConfig {
248 pub file_paths: Vec<String>,
250}
251
252#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
266pub struct PlatformBootstrapConfig {
267 #[serde(skip_serializing_if = "Option::is_none")]
270 pub query_api_url: Option<String>,
271
272 #[serde(default = "default_platform_timeout")]
274 pub timeout_seconds: u64,
275}
276
277fn default_platform_timeout() -> u64 {
278 300
279}
280
281impl Default for PlatformBootstrapConfig {
282 fn default() -> Self {
283 Self {
284 query_api_url: None,
285 timeout_seconds: default_platform_timeout(),
286 }
287 }
288}
289
290#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
292#[serde(tag = "type", rename_all = "lowercase")]
293pub enum BootstrapProviderConfig {
294 Postgres(PostgresBootstrapConfig),
296 Application(ApplicationBootstrapConfig),
298 ScriptFile(ScriptFileBootstrapConfig),
300 Platform(PlatformBootstrapConfig),
303 Noop,
305}
306
307pub struct BootstrapProviderFactory;
313
314impl BootstrapProviderFactory {
315 pub fn create_provider(config: &BootstrapProviderConfig) -> Result<Box<dyn BootstrapProvider>> {
325 match config {
326 BootstrapProviderConfig::Postgres(_) => {
327 Err(anyhow::anyhow!(
328 "PostgreSQL bootstrap provider is available in the drasi-bootstrap-postgres crate. \
329 Use PostgresBootstrapProvider::builder().with_host(...).build() to create it."
330 ))
331 }
332 BootstrapProviderConfig::Application(_) => {
333 Err(anyhow::anyhow!(
334 "Application bootstrap provider is available in the drasi-bootstrap-application crate. \
335 Use ApplicationBootstrapProvider::builder().build() to create it."
336 ))
337 }
338 BootstrapProviderConfig::ScriptFile(config) => {
339 Err(anyhow::anyhow!(
340 "ScriptFile bootstrap provider is available in the drasi-bootstrap-scriptfile crate. \
341 Use ScriptFileBootstrapProvider::builder().with_file(...).build() to create it. \
342 File paths: {:?}",
343 config.file_paths
344 ))
345 }
346 BootstrapProviderConfig::Platform(config) => {
347 Err(anyhow::anyhow!(
348 "Platform bootstrap provider is available in the drasi-bootstrap-platform crate. \
349 Use PlatformBootstrapProvider::builder().with_query_api_url(...).build() to create it. \
350 Config: {config:?}"
351 ))
352 }
353 BootstrapProviderConfig::Noop => {
354 Err(anyhow::anyhow!(
355 "No-op bootstrap provider is available in the drasi-bootstrap-noop crate. \
356 Use NoOpBootstrapProvider::builder().build() or NoOpBootstrapProvider::new() to create it."
357 ))
358 }
359 }
360 }
361}
362
363#[cfg(test)]
364mod tests {
365 use super::*;
366
367 #[test]
368 fn test_platform_bootstrap_config_defaults() {
369 let config = PlatformBootstrapConfig {
370 query_api_url: Some("http://test:8080".to_string()), ..Default::default()
372 };
373 assert_eq!(config.timeout_seconds, 300);
374 assert_eq!(config.query_api_url, Some("http://test:8080".to_string())); }
376
377 #[test]
378 fn test_postgres_bootstrap_config_defaults() {
379 let config = PostgresBootstrapConfig::default();
380 assert_eq!(config, PostgresBootstrapConfig {});
382 }
383
384 #[test]
385 fn test_application_bootstrap_config_defaults() {
386 let config = ApplicationBootstrapConfig::default();
387 assert_eq!(config, ApplicationBootstrapConfig {});
389 }
390
391 #[test]
392 fn test_platform_bootstrap_config_serialization() {
393 let config = BootstrapProviderConfig::Platform(PlatformBootstrapConfig {
394 query_api_url: Some("http://test:8080".to_string()), timeout_seconds: 600,
396 });
397
398 let json = serde_json::to_string(&config).unwrap();
399 assert!(json.contains("\"type\":\"platform\""));
400 assert!(json.contains("\"query_api_url\":\"http://test:8080\"")); assert!(json.contains("\"timeout_seconds\":600"));
402
403 let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
404 match deserialized {
405 BootstrapProviderConfig::Platform(cfg) => {
406 assert_eq!(cfg.query_api_url, Some("http://test:8080".to_string())); assert_eq!(cfg.timeout_seconds, 600);
408 }
409 _ => panic!("Expected Platform variant"),
410 }
411 }
412
413 #[test]
414 fn test_scriptfile_bootstrap_config() {
415 let config = BootstrapProviderConfig::ScriptFile(ScriptFileBootstrapConfig {
416 file_paths: vec![
417 "/path/to/file1.jsonl".to_string(),
418 "/path/to/file2.jsonl".to_string(),
419 ],
420 });
421
422 let json = serde_json::to_string(&config).unwrap();
423 assert!(json.contains("\"type\":\"scriptfile\""));
424 assert!(json.contains("\"file_paths\""));
425
426 let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
427 match deserialized {
428 BootstrapProviderConfig::ScriptFile(cfg) => {
429 assert_eq!(cfg.file_paths.len(), 2);
430 assert_eq!(cfg.file_paths[0], "/path/to/file1.jsonl");
431 assert_eq!(cfg.file_paths[1], "/path/to/file2.jsonl");
432 }
433 _ => panic!("Expected ScriptFile variant"),
434 }
435 }
436
437 #[test]
438 fn test_noop_bootstrap_config() {
439 let config = BootstrapProviderConfig::Noop;
440
441 let json = serde_json::to_string(&config).unwrap();
442 assert!(json.contains("\"type\":\"noop\""));
443
444 let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
445 assert!(matches!(deserialized, BootstrapProviderConfig::Noop));
446 }
447
448 #[test]
449 fn test_postgres_bootstrap_config_serialization() {
450 let config = BootstrapProviderConfig::Postgres(PostgresBootstrapConfig::default());
451
452 let json = serde_json::to_string(&config).unwrap();
453 assert!(json.contains("\"type\":\"postgres\""));
454
455 let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
456 assert!(matches!(deserialized, BootstrapProviderConfig::Postgres(_)));
457 }
458
459 #[test]
460 fn test_application_bootstrap_config_serialization() {
461 let config = BootstrapProviderConfig::Application(ApplicationBootstrapConfig::default());
462
463 let json = serde_json::to_string(&config).unwrap();
464 assert!(json.contains("\"type\":\"application\""));
465
466 let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
467 assert!(matches!(
468 deserialized,
469 BootstrapProviderConfig::Application(_)
470 ));
471 }
472
473 #[test]
474 fn test_yaml_deserialization_platform() {
475 let yaml = r#"
476type: platform
477query_api_url: "http://remote:8080" # DevSkim: ignore DS137138
478timeout_seconds: 300
479"#;
480
481 let config: BootstrapProviderConfig = serde_yaml::from_str(yaml).unwrap();
482 match config {
483 BootstrapProviderConfig::Platform(cfg) => {
484 assert_eq!(cfg.query_api_url, Some("http://remote:8080".to_string())); assert_eq!(cfg.timeout_seconds, 300);
486 }
487 _ => panic!("Expected Platform variant"),
488 }
489 }
490
491 #[test]
492 fn test_yaml_deserialization_scriptfile() {
493 let yaml = r#"
494type: scriptfile
495file_paths:
496 - "/data/file1.jsonl"
497 - "/data/file2.jsonl"
498"#;
499
500 let config: BootstrapProviderConfig = serde_yaml::from_str(yaml).unwrap();
501 match config {
502 BootstrapProviderConfig::ScriptFile(cfg) => {
503 assert_eq!(cfg.file_paths.len(), 2);
504 assert_eq!(cfg.file_paths[0], "/data/file1.jsonl");
505 }
506 _ => panic!("Expected ScriptFile variant"),
507 }
508 }
509
510 #[test]
511 fn test_platform_config_with_defaults() {
512 let yaml = r#"
513type: platform
514query_api_url: "http://test:8080" # DevSkim: ignore DS137138
515"#;
516
517 let config: BootstrapProviderConfig = serde_yaml::from_str(yaml).unwrap();
518 match config {
519 BootstrapProviderConfig::Platform(cfg) => {
520 assert_eq!(cfg.timeout_seconds, 300); }
522 _ => panic!("Expected Platform variant"),
523 }
524 }
525
526 #[test]
527 fn test_bootstrap_config_equality() {
528 let config1 = BootstrapProviderConfig::Platform(PlatformBootstrapConfig {
529 query_api_url: Some("http://test:8080".to_string()), timeout_seconds: 300,
531 });
532
533 let config2 = BootstrapProviderConfig::Platform(PlatformBootstrapConfig {
534 query_api_url: Some("http://test:8080".to_string()), timeout_seconds: 300,
536 });
537
538 assert_eq!(config1, config2);
539 }
540
541 #[test]
542 fn test_backward_compatibility_yaml() {
543 let yaml = r#"
545type: postgres
546"#;
547
548 let config: BootstrapProviderConfig = serde_yaml::from_str(yaml).unwrap();
549 assert!(matches!(config, BootstrapProviderConfig::Postgres(_)));
550 }
551}