1pub mod component_graph;
28pub use component_graph::ComponentGraphBootstrapProvider;
29
30use anyhow::Result;
31use async_trait::async_trait;
32use serde::{Deserialize, Serialize};
33use std::collections::HashMap;
34use std::sync::atomic::{AtomicU64, Ordering};
35use std::sync::Arc;
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct BootstrapRequest {
40 pub query_id: String,
41 pub node_labels: Vec<String>,
42 pub relation_labels: Vec<String>,
43 pub request_id: String,
44}
45
46#[derive(Clone)]
58pub struct BootstrapContext {
59 pub server_id: String,
61 pub source_id: String,
63 pub sequence_counter: Arc<AtomicU64>,
65 properties: Arc<HashMap<String, serde_json::Value>>,
67}
68
69impl BootstrapContext {
70 pub fn new_minimal(server_id: String, source_id: String) -> Self {
75 Self {
76 server_id,
77 source_id,
78 sequence_counter: Arc::new(AtomicU64::new(0)),
79 properties: Arc::new(HashMap::new()),
80 }
81 }
82
83 pub fn with_properties(
89 server_id: String,
90 source_id: String,
91 properties: HashMap<String, serde_json::Value>,
92 ) -> Self {
93 Self {
94 server_id,
95 source_id,
96 sequence_counter: Arc::new(AtomicU64::new(0)),
97 properties: Arc::new(properties),
98 }
99 }
100
101 pub fn next_sequence(&self) -> u64 {
103 self.sequence_counter.fetch_add(1, Ordering::SeqCst)
104 }
105
106 pub fn get_property(&self, key: &str) -> Option<serde_json::Value> {
108 self.properties.get(key).cloned()
109 }
110
111 pub fn get_typed_property<T>(&self, key: &str) -> Result<Option<T>>
113 where
114 T: for<'de> Deserialize<'de>,
115 {
116 match self.get_property(key) {
117 Some(value) => Ok(Some(serde_json::from_value(value.clone())?)),
118 None => Ok(None),
119 }
120 }
121}
122
123use crate::channels::BootstrapEventSender;
124
125#[derive(Debug, Clone, Default)]
140pub struct BootstrapResult {
141 pub event_count: usize,
142 pub last_sequence: Option<u64>,
143 pub sequences_aligned: bool,
144}
145
146#[async_trait]
149pub trait BootstrapProvider: Send + Sync {
150 async fn bootstrap(
160 &self,
161 request: BootstrapRequest,
162 context: &BootstrapContext,
163 event_tx: BootstrapEventSender,
164 settings: Option<&crate::config::SourceSubscriptionSettings>,
165 ) -> Result<BootstrapResult>;
166}
167
168#[async_trait]
171impl BootstrapProvider for Box<dyn BootstrapProvider> {
172 async fn bootstrap(
173 &self,
174 request: BootstrapRequest,
175 context: &BootstrapContext,
176 event_tx: BootstrapEventSender,
177 settings: Option<&crate::config::SourceSubscriptionSettings>,
178 ) -> Result<BootstrapResult> {
179 (**self)
180 .bootstrap(request, context, event_tx, settings)
181 .await
182 }
183}
184
185#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
200pub struct PostgresBootstrapConfig {
201 }
204
205#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
218pub struct ApplicationBootstrapConfig {
219 }
222
223#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
239pub struct ScriptFileBootstrapConfig {
240 pub file_paths: Vec<String>,
242}
243
244#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
258pub struct PlatformBootstrapConfig {
259 #[serde(skip_serializing_if = "Option::is_none")]
262 pub query_api_url: Option<String>,
263
264 #[serde(default = "default_platform_timeout")]
266 pub timeout_seconds: u64,
267}
268
269fn default_platform_timeout() -> u64 {
270 300
271}
272
273impl Default for PlatformBootstrapConfig {
274 fn default() -> Self {
275 Self {
276 query_api_url: None,
277 timeout_seconds: default_platform_timeout(),
278 }
279 }
280}
281
282#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
284#[serde(tag = "type", rename_all = "lowercase")]
285pub enum BootstrapProviderConfig {
286 Postgres(PostgresBootstrapConfig),
288 Application(ApplicationBootstrapConfig),
290 ScriptFile(ScriptFileBootstrapConfig),
292 Platform(PlatformBootstrapConfig),
295 Noop,
297}
298
299pub struct BootstrapProviderFactory;
305
306impl BootstrapProviderFactory {
307 pub fn create_provider(config: &BootstrapProviderConfig) -> Result<Box<dyn BootstrapProvider>> {
317 match config {
318 BootstrapProviderConfig::Postgres(_) => {
319 Err(anyhow::anyhow!(
320 "PostgreSQL bootstrap provider is available in the drasi-bootstrap-postgres crate. \
321 Use PostgresBootstrapProvider::builder().with_host(...).build() to create it."
322 ))
323 }
324 BootstrapProviderConfig::Application(_) => {
325 Err(anyhow::anyhow!(
326 "Application bootstrap provider is available in the drasi-bootstrap-application crate. \
327 Use ApplicationBootstrapProvider::builder().build() to create it."
328 ))
329 }
330 BootstrapProviderConfig::ScriptFile(config) => {
331 Err(anyhow::anyhow!(
332 "ScriptFile bootstrap provider is available in the drasi-bootstrap-scriptfile crate. \
333 Use ScriptFileBootstrapProvider::builder().with_file(...).build() to create it. \
334 File paths: {:?}",
335 config.file_paths
336 ))
337 }
338 BootstrapProviderConfig::Platform(config) => {
339 Err(anyhow::anyhow!(
340 "Platform bootstrap provider is available in the drasi-bootstrap-platform crate. \
341 Use PlatformBootstrapProvider::builder().with_query_api_url(...).build() to create it. \
342 Config: {config:?}"
343 ))
344 }
345 BootstrapProviderConfig::Noop => {
346 Err(anyhow::anyhow!(
347 "No-op bootstrap provider is available in the drasi-bootstrap-noop crate. \
348 Use NoOpBootstrapProvider::builder().build() or NoOpBootstrapProvider::new() to create it."
349 ))
350 }
351 }
352 }
353}
354
355#[cfg(test)]
356mod tests {
357 use super::*;
358
359 #[test]
360 fn test_platform_bootstrap_config_defaults() {
361 let config = PlatformBootstrapConfig {
362 query_api_url: Some("http://test:8080".to_string()), ..Default::default()
364 };
365 assert_eq!(config.timeout_seconds, 300);
366 assert_eq!(config.query_api_url, Some("http://test:8080".to_string())); }
368
369 #[test]
370 fn test_postgres_bootstrap_config_defaults() {
371 let config = PostgresBootstrapConfig::default();
372 assert_eq!(config, PostgresBootstrapConfig {});
374 }
375
376 #[test]
377 fn test_application_bootstrap_config_defaults() {
378 let config = ApplicationBootstrapConfig::default();
379 assert_eq!(config, ApplicationBootstrapConfig {});
381 }
382
383 #[test]
384 fn test_platform_bootstrap_config_serialization() {
385 let config = BootstrapProviderConfig::Platform(PlatformBootstrapConfig {
386 query_api_url: Some("http://test:8080".to_string()), timeout_seconds: 600,
388 });
389
390 let json = serde_json::to_string(&config).unwrap();
391 assert!(json.contains("\"type\":\"platform\""));
392 assert!(json.contains("\"query_api_url\":\"http://test:8080\"")); assert!(json.contains("\"timeout_seconds\":600"));
394
395 let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
396 match deserialized {
397 BootstrapProviderConfig::Platform(cfg) => {
398 assert_eq!(cfg.query_api_url, Some("http://test:8080".to_string())); assert_eq!(cfg.timeout_seconds, 600);
400 }
401 _ => panic!("Expected Platform variant"),
402 }
403 }
404
405 #[test]
406 fn test_scriptfile_bootstrap_config() {
407 let config = BootstrapProviderConfig::ScriptFile(ScriptFileBootstrapConfig {
408 file_paths: vec![
409 "/path/to/file1.jsonl".to_string(),
410 "/path/to/file2.jsonl".to_string(),
411 ],
412 });
413
414 let json = serde_json::to_string(&config).unwrap();
415 assert!(json.contains("\"type\":\"scriptfile\""));
416 assert!(json.contains("\"file_paths\""));
417
418 let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
419 match deserialized {
420 BootstrapProviderConfig::ScriptFile(cfg) => {
421 assert_eq!(cfg.file_paths.len(), 2);
422 assert_eq!(cfg.file_paths[0], "/path/to/file1.jsonl");
423 assert_eq!(cfg.file_paths[1], "/path/to/file2.jsonl");
424 }
425 _ => panic!("Expected ScriptFile variant"),
426 }
427 }
428
429 #[test]
430 fn test_noop_bootstrap_config() {
431 let config = BootstrapProviderConfig::Noop;
432
433 let json = serde_json::to_string(&config).unwrap();
434 assert!(json.contains("\"type\":\"noop\""));
435
436 let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
437 assert!(matches!(deserialized, BootstrapProviderConfig::Noop));
438 }
439
440 #[test]
441 fn test_postgres_bootstrap_config_serialization() {
442 let config = BootstrapProviderConfig::Postgres(PostgresBootstrapConfig::default());
443
444 let json = serde_json::to_string(&config).unwrap();
445 assert!(json.contains("\"type\":\"postgres\""));
446
447 let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
448 assert!(matches!(deserialized, BootstrapProviderConfig::Postgres(_)));
449 }
450
451 #[test]
452 fn test_application_bootstrap_config_serialization() {
453 let config = BootstrapProviderConfig::Application(ApplicationBootstrapConfig::default());
454
455 let json = serde_json::to_string(&config).unwrap();
456 assert!(json.contains("\"type\":\"application\""));
457
458 let deserialized: BootstrapProviderConfig = serde_json::from_str(&json).unwrap();
459 assert!(matches!(
460 deserialized,
461 BootstrapProviderConfig::Application(_)
462 ));
463 }
464
465 #[test]
466 fn test_yaml_deserialization_platform() {
467 let yaml = r#"
468type: platform
469query_api_url: "http://remote:8080" # DevSkim: ignore DS137138
470timeout_seconds: 300
471"#;
472
473 let config: BootstrapProviderConfig = serde_yaml::from_str(yaml).unwrap();
474 match config {
475 BootstrapProviderConfig::Platform(cfg) => {
476 assert_eq!(cfg.query_api_url, Some("http://remote:8080".to_string())); assert_eq!(cfg.timeout_seconds, 300);
478 }
479 _ => panic!("Expected Platform variant"),
480 }
481 }
482
483 #[test]
484 fn test_yaml_deserialization_scriptfile() {
485 let yaml = r#"
486type: scriptfile
487file_paths:
488 - "/data/file1.jsonl"
489 - "/data/file2.jsonl"
490"#;
491
492 let config: BootstrapProviderConfig = serde_yaml::from_str(yaml).unwrap();
493 match config {
494 BootstrapProviderConfig::ScriptFile(cfg) => {
495 assert_eq!(cfg.file_paths.len(), 2);
496 assert_eq!(cfg.file_paths[0], "/data/file1.jsonl");
497 }
498 _ => panic!("Expected ScriptFile variant"),
499 }
500 }
501
502 #[test]
503 fn test_platform_config_with_defaults() {
504 let yaml = r#"
505type: platform
506query_api_url: "http://test:8080" # DevSkim: ignore DS137138
507"#;
508
509 let config: BootstrapProviderConfig = serde_yaml::from_str(yaml).unwrap();
510 match config {
511 BootstrapProviderConfig::Platform(cfg) => {
512 assert_eq!(cfg.timeout_seconds, 300); }
514 _ => panic!("Expected Platform variant"),
515 }
516 }
517
518 #[test]
519 fn test_bootstrap_config_equality() {
520 let config1 = BootstrapProviderConfig::Platform(PlatformBootstrapConfig {
521 query_api_url: Some("http://test:8080".to_string()), timeout_seconds: 300,
523 });
524
525 let config2 = BootstrapProviderConfig::Platform(PlatformBootstrapConfig {
526 query_api_url: Some("http://test:8080".to_string()), timeout_seconds: 300,
528 });
529
530 assert_eq!(config1, config2);
531 }
532
533 #[test]
534 fn test_backward_compatibility_yaml() {
535 let yaml = r#"
537type: postgres
538"#;
539
540 let config: BootstrapProviderConfig = serde_yaml::from_str(yaml).unwrap();
541 assert!(matches!(config, BootstrapProviderConfig::Postgres(_)));
542 }
543}