Skip to main content

drasi_bootstrap_http/
config.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Configuration types for the HTTP bootstrap provider.
16
17use serde::{Deserialize, Serialize};
18use std::collections::HashMap;
19
20/// Top-level configuration for the HTTP bootstrap provider.
21#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
22#[serde(rename_all = "camelCase")]
23pub struct HttpBootstrapConfig {
24    /// List of endpoint configurations to fetch data from.
25    pub endpoints: Vec<EndpointConfig>,
26
27    /// HTTP request timeout in seconds (default: 30).
28    #[serde(default = "default_timeout_seconds")]
29    pub timeout_seconds: u64,
30
31    /// Maximum number of retries on failure (default: 3).
32    #[serde(default = "default_max_retries")]
33    pub max_retries: u32,
34
35    /// Delay between retries in milliseconds (default: 1000).
36    #[serde(default = "default_retry_delay_ms")]
37    pub retry_delay_ms: u64,
38
39    /// Maximum number of pages to fetch per endpoint (default: 10,000).
40    /// Set a higher value for very large initial loads.
41    #[serde(default, skip_serializing_if = "Option::is_none")]
42    pub max_pages: Option<u64>,
43}
44
45fn default_timeout_seconds() -> u64 {
46    30
47}
48
49fn default_max_retries() -> u32 {
50    3
51}
52
53fn default_retry_delay_ms() -> u64 {
54    1000
55}
56
57/// Configuration for a single HTTP endpoint to bootstrap from.
58#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
59#[serde(rename_all = "camelCase")]
60pub struct EndpointConfig {
61    /// The URL to fetch data from.
62    pub url: String,
63
64    /// HTTP method (default: GET).
65    #[serde(default = "default_method")]
66    pub method: HttpMethod,
67
68    /// Additional HTTP headers to include in requests.
69    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
70    pub headers: HashMap<String, String>,
71
72    /// Optional request body (for POST/PUT methods).
73    #[serde(default, skip_serializing_if = "Option::is_none")]
74    pub body: Option<serde_json::Value>,
75
76    /// Authentication configuration.
77    #[serde(default, skip_serializing_if = "Option::is_none")]
78    pub auth: Option<AuthConfig>,
79
80    /// Pagination configuration.
81    #[serde(default, skip_serializing_if = "Option::is_none")]
82    pub pagination: Option<PaginationConfig>,
83
84    /// Response parsing and element mapping configuration.
85    pub response: ResponseConfig,
86}
87
88fn default_method() -> HttpMethod {
89    HttpMethod::Get
90}
91
92/// HTTP methods supported for bootstrap requests.
93#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
94#[serde(rename_all = "UPPERCASE")]
95pub enum HttpMethod {
96    Get,
97    Post,
98    Put,
99}
100
101/// Authentication configuration.
102#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
103#[serde(tag = "type", rename_all = "kebab-case")]
104pub enum AuthConfig {
105    /// Bearer token authentication.
106    Bearer {
107        /// Environment variable containing the token.
108        token_env: String,
109    },
110    /// API key authentication (in header or query parameter).
111    ApiKey {
112        /// Where to send the API key.
113        location: ApiKeyLocation,
114        /// Header name or query parameter name.
115        name: String,
116        /// Environment variable containing the API key value.
117        value_env: String,
118    },
119    /// HTTP Basic authentication.
120    Basic {
121        /// Environment variable containing the username.
122        username_env: String,
123        /// Environment variable containing the password (optional, can be empty).
124        #[serde(default, skip_serializing_if = "Option::is_none")]
125        password_env: Option<String>,
126    },
127    /// OAuth2 Client Credentials flow.
128    #[serde(rename = "oauth2-client-credentials")]
129    OAuth2ClientCredentials {
130        /// Token endpoint URL.
131        token_url: String,
132        /// Environment variable containing the client ID.
133        client_id_env: String,
134        /// Environment variable containing the client secret.
135        client_secret_env: String,
136        /// Optional scopes to request.
137        #[serde(default, skip_serializing_if = "Vec::is_empty")]
138        scopes: Vec<String>,
139    },
140}
141
142/// Where to place an API key.
143#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
144#[serde(rename_all = "kebab-case")]
145pub enum ApiKeyLocation {
146    Header,
147    Query,
148}
149
150/// Pagination configuration.
151#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
152#[serde(tag = "type", rename_all = "kebab-case")]
153pub enum PaginationConfig {
154    /// Offset/limit pagination.
155    OffsetLimit {
156        /// Query parameter name for offset (default: "offset").
157        #[serde(default = "default_offset_param")]
158        offset_param: String,
159        /// Query parameter name for limit/page size (default: "limit").
160        #[serde(default = "default_limit_param")]
161        limit_param: String,
162        /// Number of items per page.
163        page_size: u64,
164        /// JSONPath to extract total count from response (optional).
165        #[serde(default, skip_serializing_if = "Option::is_none")]
166        total_path: Option<String>,
167    },
168    /// Page number pagination.
169    PageNumber {
170        /// Query parameter name for page number (default: "page").
171        #[serde(default = "default_page_param")]
172        page_param: String,
173        /// Query parameter name for page size (default: "per_page").
174        #[serde(default = "default_per_page_param")]
175        page_size_param: String,
176        /// Number of items per page.
177        page_size: u64,
178        /// JSONPath to extract total pages from response (optional).
179        #[serde(default, skip_serializing_if = "Option::is_none")]
180        total_pages_path: Option<String>,
181    },
182    /// Cursor-based pagination (e.g., Stripe's `starting_after`).
183    Cursor {
184        /// Query parameter name to send the cursor value.
185        cursor_param: String,
186        /// JSONPath to extract the next cursor value from the response.
187        cursor_path: String,
188        /// JSONPath to a boolean `has_more` field (optional).
189        #[serde(default, skip_serializing_if = "Option::is_none")]
190        has_more_path: Option<String>,
191        /// Query parameter name for page size (optional).
192        #[serde(default, skip_serializing_if = "Option::is_none")]
193        page_size_param: Option<String>,
194        /// Number of items per page (optional).
195        #[serde(default, skip_serializing_if = "Option::is_none")]
196        page_size: Option<u64>,
197    },
198    /// Link header pagination (RFC 5988).
199    LinkHeader {
200        /// Query parameter name for page size (optional).
201        #[serde(default, skip_serializing_if = "Option::is_none")]
202        page_size_param: Option<String>,
203        /// Number of items per page (optional).
204        #[serde(default, skip_serializing_if = "Option::is_none")]
205        page_size: Option<u64>,
206    },
207    /// Next URL from response body (e.g., Salesforce `nextRecordsUrl`).
208    NextUrl {
209        /// JSONPath to extract the next URL from the response body.
210        next_url_path: String,
211        /// Base URL to prepend if the extracted URL is relative.
212        #[serde(default, skip_serializing_if = "Option::is_none")]
213        base_url: Option<String>,
214    },
215}
216
217impl HttpBootstrapConfig {
218    /// Validate the configuration and return an error if invalid.
219    pub fn validate(&self) -> anyhow::Result<()> {
220        if self.endpoints.is_empty() {
221            return Err(anyhow::anyhow!(
222                "Validation error: at least one endpoint must be configured"
223            ));
224        }
225        if self.timeout_seconds == 0 {
226            return Err(anyhow::anyhow!(
227                "Validation error: timeoutSeconds must be greater than 0"
228            ));
229        }
230        for (i, endpoint) in self.endpoints.iter().enumerate() {
231            endpoint.validate(i)?;
232        }
233        Ok(())
234    }
235}
236
237impl EndpointConfig {
238    fn validate(&self, index: usize) -> anyhow::Result<()> {
239        if self.url.is_empty() {
240            return Err(anyhow::anyhow!(
241                "Validation error: endpoint[{index}].url cannot be empty"
242            ));
243        }
244        if !self.url.starts_with("http://") && !self.url.starts_with("https://") {
245            return Err(anyhow::anyhow!(
246                "Validation error: endpoint[{index}].url must start with http:// or https://"
247            ));
248        }
249        if self.response.mappings.is_empty() {
250            return Err(anyhow::anyhow!(
251                "Validation error: endpoint[{index}].response.mappings must have at least one mapping"
252            ));
253        }
254        if let Some(ref pagination) = self.pagination {
255            pagination.validate(index)?;
256        }
257        for (j, mapping) in self.response.mappings.iter().enumerate() {
258            mapping.validate(index, j)?;
259        }
260        Ok(())
261    }
262}
263
264impl PaginationConfig {
265    fn validate(&self, endpoint_index: usize) -> anyhow::Result<()> {
266        match self {
267            PaginationConfig::OffsetLimit { page_size, .. } => {
268                if *page_size == 0 {
269                    return Err(anyhow::anyhow!(
270                        "Validation error: endpoint[{endpoint_index}].pagination.page_size must be greater than 0"
271                    ));
272                }
273            }
274            PaginationConfig::PageNumber { page_size, .. } => {
275                if *page_size == 0 {
276                    return Err(anyhow::anyhow!(
277                        "Validation error: endpoint[{endpoint_index}].pagination.page_size must be greater than 0"
278                    ));
279                }
280            }
281            PaginationConfig::Cursor {
282                cursor_param,
283                cursor_path,
284                ..
285            } => {
286                if cursor_param.is_empty() {
287                    return Err(anyhow::anyhow!(
288                        "Validation error: endpoint[{endpoint_index}].pagination.cursor_param cannot be empty"
289                    ));
290                }
291                if cursor_path.is_empty() {
292                    return Err(anyhow::anyhow!(
293                        "Validation error: endpoint[{endpoint_index}].pagination.cursor_path cannot be empty"
294                    ));
295                }
296            }
297            PaginationConfig::NextUrl { next_url_path, .. } => {
298                if next_url_path.is_empty() {
299                    return Err(anyhow::anyhow!(
300                        "Validation error: endpoint[{endpoint_index}].pagination.next_url_path cannot be empty"
301                    ));
302                }
303            }
304            PaginationConfig::LinkHeader { .. } => {}
305        }
306        Ok(())
307    }
308}
309
310impl ElementMappingConfig {
311    fn validate(&self, endpoint_index: usize, mapping_index: usize) -> anyhow::Result<()> {
312        if self.template.id.is_empty() {
313            return Err(anyhow::anyhow!(
314                "Validation error: endpoint[{endpoint_index}].mappings[{mapping_index}].template.id cannot be empty"
315            ));
316        }
317        if self.template.labels.is_empty() {
318            return Err(anyhow::anyhow!(
319                "Validation error: endpoint[{endpoint_index}].mappings[{mapping_index}].template.labels must have at least one label"
320            ));
321        }
322        if self.element_type == ElementType::Relation && self.operation != OperationType::Delete {
323            if self.template.from.is_none() {
324                return Err(anyhow::anyhow!(
325                    "Validation error: endpoint[{endpoint_index}].mappings[{mapping_index}].template.from is required for relation mappings"
326                ));
327            }
328            if self.template.to.is_none() {
329                return Err(anyhow::anyhow!(
330                    "Validation error: endpoint[{endpoint_index}].mappings[{mapping_index}].template.to is required for relation mappings"
331                ));
332            }
333        }
334        Ok(())
335    }
336}
337
338fn default_offset_param() -> String {
339    "offset".to_string()
340}
341
342fn default_limit_param() -> String {
343    "limit".to_string()
344}
345
346fn default_page_param() -> String {
347    "page".to_string()
348}
349
350fn default_per_page_param() -> String {
351    "per_page".to_string()
352}
353
354/// Response parsing configuration.
355#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
356#[serde(rename_all = "camelCase")]
357pub struct ResponseConfig {
358    /// JSONPath expression to locate the array of items in the response.
359    /// Use "$" if the response is a top-level array.
360    #[serde(default = "default_items_path")]
361    pub items_path: String,
362
363    /// Content type override (auto-detected from Content-Type header if not set).
364    #[serde(default, skip_serializing_if = "Option::is_none")]
365    pub content_type: Option<ContentTypeOverride>,
366
367    /// Element mapping configurations.
368    pub mappings: Vec<ElementMappingConfig>,
369}
370
371fn default_items_path() -> String {
372    "$".to_string()
373}
374
375/// Override for response content type.
376#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
377#[serde(rename_all = "lowercase")]
378pub enum ContentTypeOverride {
379    Json,
380    Xml,
381    Yaml,
382}
383
384/// Mapping configuration from response items to Drasi graph elements.
385#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
386#[serde(rename_all = "camelCase")]
387pub struct ElementMappingConfig {
388    /// Type of element to create.
389    pub element_type: ElementType,
390
391    /// Operation type for the source change (default: update for idempotent bootstrap).
392    #[serde(default)]
393    pub operation: OperationType,
394
395    /// Template for element creation.
396    pub template: ElementTemplate,
397}
398
399/// Operation type for source changes.
400/// Matches the HTTP webhook source naming convention.
401#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
402#[serde(rename_all = "lowercase")]
403pub enum OperationType {
404    Insert,
405    #[default]
406    Update,
407    Delete,
408}
409
410/// Element type.
411#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
412#[serde(rename_all = "lowercase")]
413pub enum ElementType {
414    Node,
415    Relation,
416}
417
418/// Template for element creation using Handlebars expressions.
419#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
420#[serde(rename_all = "camelCase")]
421pub struct ElementTemplate {
422    /// Handlebars template for element ID.
423    pub id: String,
424
425    /// Handlebars templates for element labels.
426    pub labels: Vec<String>,
427
428    /// Properties mapping (each value is a Handlebars template or literal).
429    #[serde(default, skip_serializing_if = "Option::is_none")]
430    pub properties: Option<serde_json::Value>,
431
432    /// Template for relation source node ID (relations only).
433    #[serde(default, skip_serializing_if = "Option::is_none")]
434    pub from: Option<String>,
435
436    /// Template for relation target node ID (relations only).
437    #[serde(default, skip_serializing_if = "Option::is_none")]
438    pub to: Option<String>,
439}
440
441#[cfg(test)]
442mod tests {
443    use super::*;
444
445    #[test]
446    fn test_deserialize_full_config() {
447        let json = r#"{
448            "endpoints": [{
449                "url": "https://api.example.com/users",
450                "method": "GET",
451                "auth": {
452                    "type": "bearer",
453                    "token_env": "API_TOKEN"
454                },
455                "pagination": {
456                    "type": "offset-limit",
457                    "offset_param": "offset",
458                    "limit_param": "limit",
459                    "page_size": 100
460                },
461                "response": {
462                    "itemsPath": "$.data",
463                    "mappings": [{
464                        "elementType": "node",
465                        "template": {
466                            "id": "{{item.id}}",
467                            "labels": ["User"],
468                            "properties": {
469                                "name": "{{item.name}}"
470                            }
471                        }
472                    }]
473                }
474            }],
475            "timeoutSeconds": 30,
476            "maxRetries": 3,
477            "retryDelayMs": 1000
478        }"#;
479
480        let config: HttpBootstrapConfig = serde_json::from_str(json).unwrap();
481        assert_eq!(config.endpoints.len(), 1);
482        assert_eq!(config.timeout_seconds, 30);
483        assert_eq!(config.endpoints[0].url, "https://api.example.com/users");
484
485        // Backward compat: missing "operation" field should default to Update
486        assert_eq!(
487            config.endpoints[0].response.mappings[0].operation,
488            OperationType::Update
489        );
490    }
491
492    #[test]
493    fn test_deserialize_cursor_pagination() {
494        let json = r#"{
495            "type": "cursor",
496            "cursor_param": "starting_after",
497            "cursor_path": "$.data[-1].id",
498            "has_more_path": "$.has_more",
499            "page_size_param": "limit",
500            "page_size": 100
501        }"#;
502
503        let config: PaginationConfig = serde_json::from_str(json).unwrap();
504        match config {
505            PaginationConfig::Cursor {
506                cursor_param,
507                cursor_path,
508                has_more_path,
509                ..
510            } => {
511                assert_eq!(cursor_param, "starting_after");
512                assert_eq!(cursor_path, "$.data[-1].id");
513                assert_eq!(has_more_path, Some("$.has_more".to_string()));
514            }
515            _ => panic!("Expected Cursor pagination"),
516        }
517    }
518
519    #[test]
520    fn test_deserialize_oauth2_auth() {
521        let json = r#"{
522            "type": "oauth2-client-credentials",
523            "token_url": "https://auth.example.com/token",
524            "client_id_env": "CLIENT_ID",
525            "client_secret_env": "CLIENT_SECRET",
526            "scopes": ["read", "write"]
527        }"#;
528
529        let config: AuthConfig = serde_json::from_str(json).unwrap();
530        match config {
531            AuthConfig::OAuth2ClientCredentials {
532                token_url, scopes, ..
533            } => {
534                assert_eq!(token_url, "https://auth.example.com/token");
535                assert_eq!(scopes, vec!["read", "write"]);
536            }
537            _ => panic!("Expected OAuth2ClientCredentials"),
538        }
539    }
540
541    #[test]
542    fn test_deserialize_next_url_pagination() {
543        let json = r#"{
544            "type": "next-url",
545            "next_url_path": "$.nextRecordsUrl",
546            "base_url": "https://instance.salesforce.com"
547        }"#;
548
549        let config: PaginationConfig = serde_json::from_str(json).unwrap();
550        match config {
551            PaginationConfig::NextUrl {
552                next_url_path,
553                base_url,
554            } => {
555                assert_eq!(next_url_path, "$.nextRecordsUrl");
556                assert_eq!(
557                    base_url,
558                    Some("https://instance.salesforce.com".to_string())
559                );
560            }
561            _ => panic!("Expected NextUrl pagination"),
562        }
563    }
564
565    fn make_valid_config() -> HttpBootstrapConfig {
566        HttpBootstrapConfig {
567            endpoints: vec![EndpointConfig {
568                url: "https://api.example.com/users".to_string(),
569                method: HttpMethod::Get,
570                headers: HashMap::new(),
571                body: None,
572                auth: None,
573                pagination: None,
574                response: ResponseConfig {
575                    items_path: "$".to_string(),
576                    content_type: None,
577                    mappings: vec![ElementMappingConfig {
578                        element_type: ElementType::Node,
579                        operation: Default::default(),
580                        template: ElementTemplate {
581                            id: "{{item.id}}".to_string(),
582                            labels: vec!["User".to_string()],
583                            properties: None,
584                            from: None,
585                            to: None,
586                        },
587                    }],
588                },
589            }],
590            timeout_seconds: 30,
591            max_retries: 3,
592            retry_delay_ms: 1000,
593            max_pages: None,
594        }
595    }
596
597    #[test]
598    fn test_validate_valid_config() {
599        let config = make_valid_config();
600        assert!(config.validate().is_ok());
601    }
602
603    #[test]
604    fn test_validate_no_endpoints() {
605        let mut config = make_valid_config();
606        config.endpoints.clear();
607        let err = config.validate().unwrap_err();
608        assert!(err.to_string().contains("at least one endpoint"));
609    }
610
611    #[test]
612    fn test_validate_empty_url() {
613        let mut config = make_valid_config();
614        config.endpoints[0].url = String::new();
615        let err = config.validate().unwrap_err();
616        assert!(err.to_string().contains("url cannot be empty"));
617    }
618
619    #[test]
620    fn test_validate_no_mappings() {
621        let mut config = make_valid_config();
622        config.endpoints[0].response.mappings.clear();
623        let err = config.validate().unwrap_err();
624        assert!(err.to_string().contains("at least one mapping"));
625    }
626
627    #[test]
628    fn test_validate_zero_page_size() {
629        let mut config = make_valid_config();
630        config.endpoints[0].pagination = Some(PaginationConfig::OffsetLimit {
631            offset_param: "offset".to_string(),
632            limit_param: "limit".to_string(),
633            page_size: 0,
634            total_path: None,
635        });
636        let err = config.validate().unwrap_err();
637        assert!(err.to_string().contains("page_size must be greater than 0"));
638    }
639
640    #[test]
641    fn test_validate_zero_timeout() {
642        let mut config = make_valid_config();
643        config.timeout_seconds = 0;
644        let err = config.validate().unwrap_err();
645        assert!(err
646            .to_string()
647            .contains("timeoutSeconds must be greater than 0"));
648    }
649
650    #[test]
651    fn test_validate_relation_missing_from() {
652        let mut config = make_valid_config();
653        config.endpoints[0].response.mappings[0].element_type = ElementType::Relation;
654        // from is None
655        let err = config.validate().unwrap_err();
656        assert!(err.to_string().contains("from is required"));
657    }
658
659    #[test]
660    fn test_validate_empty_labels() {
661        let mut config = make_valid_config();
662        config.endpoints[0].response.mappings[0]
663            .template
664            .labels
665            .clear();
666        let err = config.validate().unwrap_err();
667        assert!(err.to_string().contains("at least one label"));
668    }
669
670    #[test]
671    fn test_validate_relation_delete_skips_from_to() {
672        let mut config = make_valid_config();
673        config.endpoints[0].response.mappings[0].element_type = ElementType::Relation;
674        config.endpoints[0].response.mappings[0].operation = OperationType::Delete;
675        config.endpoints[0].response.mappings[0].template.from = None;
676        config.endpoints[0].response.mappings[0].template.to = None;
677        // Delete relations don't need from/to
678        assert!(config.validate().is_ok());
679    }
680
681    #[test]
682    fn test_validate_relation_update_requires_from_to() {
683        let mut config = make_valid_config();
684        config.endpoints[0].response.mappings[0].element_type = ElementType::Relation;
685        config.endpoints[0].response.mappings[0].operation = OperationType::Update;
686        config.endpoints[0].response.mappings[0].template.from = None;
687        config.endpoints[0].response.mappings[0].template.to = None;
688        let err = config.validate().unwrap_err();
689        assert!(err.to_string().contains("from is required"));
690    }
691
692    #[test]
693    fn test_validate_relation_insert_requires_from_to() {
694        let mut config = make_valid_config();
695        config.endpoints[0].response.mappings[0].element_type = ElementType::Relation;
696        config.endpoints[0].response.mappings[0].operation = OperationType::Insert;
697        config.endpoints[0].response.mappings[0].template.from = None;
698        config.endpoints[0].response.mappings[0].template.to = Some("{{item.to}}".to_string());
699        let err = config.validate().unwrap_err();
700        assert!(err.to_string().contains("from is required"));
701    }
702}