Skip to main content

drasi_bootstrap_http/
descriptor.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Plugin descriptor for the HTTP bootstrap provider.
16
17use std::collections::HashMap;
18
19use drasi_lib::bootstrap::BootstrapProvider;
20use drasi_plugin_sdk::prelude::*;
21use utoipa::OpenApi;
22
23use crate::config::{
24    ApiKeyLocation, AuthConfig, ContentTypeOverride, ElementMappingConfig, ElementTemplate,
25    ElementType, EndpointConfig, HttpBootstrapConfig, HttpMethod, OperationType, PaginationConfig,
26    ResponseConfig,
27};
28use crate::provider::HttpBootstrapProvider;
29
30// ── DTO types ────────────────────────────────────────────────────────────────
31
32/// Top-level configuration DTO for the HTTP bootstrap provider.
33#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
34#[schema(as = bootstrap::http::HttpBootstrapConfig)]
35#[serde(rename_all = "camelCase", deny_unknown_fields)]
36pub struct HttpBootstrapConfigDto {
37    /// Endpoint configurations.
38    #[schema(value_type = Vec<bootstrap::http::EndpointConfig>)]
39    pub endpoints: Vec<EndpointConfigDto>,
40
41    /// Timeout in seconds.
42    #[serde(default = "default_timeout")]
43    pub timeout_seconds: ConfigValue<u64>,
44
45    /// Maximum retries.
46    #[serde(default = "default_retries")]
47    pub max_retries: ConfigValue<u32>,
48
49    /// Retry delay in milliseconds.
50    #[serde(default = "default_retry_delay")]
51    pub retry_delay_ms: ConfigValue<u64>,
52
53    /// Maximum number of pages to fetch per endpoint (default: 10,000).
54    #[serde(default, skip_serializing_if = "Option::is_none")]
55    pub max_pages: Option<ConfigValue<u64>>,
56}
57
58fn default_timeout() -> ConfigValue<u64> {
59    ConfigValue::Static(30)
60}
61
62fn default_retries() -> ConfigValue<u32> {
63    ConfigValue::Static(3)
64}
65
66fn default_retry_delay() -> ConfigValue<u64> {
67    ConfigValue::Static(1000)
68}
69
70/// Configuration DTO for a single HTTP endpoint.
71#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
72#[schema(as = bootstrap::http::EndpointConfig)]
73#[serde(rename_all = "camelCase")]
74pub struct EndpointConfigDto {
75    /// The URL to fetch data from.
76    pub url: ConfigValue<String>,
77
78    /// HTTP method (default: GET).
79    #[serde(default = "default_method")]
80    #[schema(value_type = bootstrap::http::HttpMethod)]
81    pub method: HttpMethodDto,
82
83    /// Additional HTTP headers to include in requests.
84    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
85    pub headers: HashMap<String, ConfigValue<String>>,
86
87    /// Optional request body (for POST/PUT methods).
88    #[serde(default, skip_serializing_if = "Option::is_none")]
89    pub body: Option<serde_json::Value>,
90
91    /// Authentication configuration.
92    #[serde(default, skip_serializing_if = "Option::is_none")]
93    #[schema(value_type = Option<bootstrap::http::AuthConfig>)]
94    pub auth: Option<AuthConfigDto>,
95
96    /// Pagination configuration.
97    #[serde(default, skip_serializing_if = "Option::is_none")]
98    #[schema(value_type = Option<bootstrap::http::PaginationConfig>)]
99    pub pagination: Option<PaginationConfigDto>,
100
101    /// Response parsing and element mapping configuration.
102    #[schema(value_type = bootstrap::http::ResponseConfig)]
103    pub response: ResponseConfigDto,
104}
105
106fn default_method() -> HttpMethodDto {
107    HttpMethodDto::Get
108}
109
110/// HTTP methods supported for bootstrap requests.
111#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
112#[schema(as = bootstrap::http::HttpMethod)]
113#[serde(rename_all = "UPPERCASE")]
114pub enum HttpMethodDto {
115    Get,
116    Post,
117    Put,
118}
119
120/// Authentication configuration DTO.
121#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
122#[schema(as = bootstrap::http::AuthConfig)]
123#[serde(tag = "type", rename_all = "kebab-case")]
124pub enum AuthConfigDto {
125    /// Bearer token authentication.
126    Bearer {
127        /// Environment variable containing the token.
128        token_env: ConfigValue<String>,
129    },
130    /// API key authentication (in header or query parameter).
131    ApiKey {
132        /// Where to send the API key.
133        #[schema(value_type = bootstrap::http::ApiKeyLocation)]
134        location: ApiKeyLocationDto,
135        /// Header name or query parameter name.
136        name: ConfigValue<String>,
137        /// Environment variable containing the API key value.
138        value_env: ConfigValue<String>,
139    },
140    /// HTTP Basic authentication.
141    Basic {
142        /// Environment variable containing the username.
143        username_env: ConfigValue<String>,
144        /// Environment variable containing the password.
145        #[serde(default, skip_serializing_if = "Option::is_none")]
146        password_env: Option<ConfigValue<String>>,
147    },
148    /// OAuth2 Client Credentials flow.
149    #[serde(rename = "oauth2-client-credentials")]
150    OAuth2ClientCredentials {
151        /// Token endpoint URL.
152        token_url: ConfigValue<String>,
153        /// Environment variable containing the client ID.
154        client_id_env: ConfigValue<String>,
155        /// Environment variable containing the client secret.
156        client_secret_env: ConfigValue<String>,
157        /// Optional scopes to request.
158        #[serde(default, skip_serializing_if = "Vec::is_empty")]
159        scopes: Vec<ConfigValue<String>>,
160    },
161}
162
163/// Where to place an API key.
164#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
165#[schema(as = bootstrap::http::ApiKeyLocation)]
166#[serde(rename_all = "kebab-case")]
167pub enum ApiKeyLocationDto {
168    Header,
169    Query,
170}
171
172/// Pagination configuration DTO.
173#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
174#[schema(as = bootstrap::http::PaginationConfig)]
175#[serde(tag = "type", rename_all = "kebab-case")]
176pub enum PaginationConfigDto {
177    /// Offset/limit pagination.
178    OffsetLimit {
179        #[serde(default = "default_offset_param")]
180        offset_param: ConfigValue<String>,
181        #[serde(default = "default_limit_param")]
182        limit_param: ConfigValue<String>,
183        page_size: ConfigValue<u64>,
184        #[serde(default, skip_serializing_if = "Option::is_none")]
185        total_path: Option<ConfigValue<String>>,
186    },
187    /// Page number pagination.
188    PageNumber {
189        #[serde(default = "default_page_param")]
190        page_param: ConfigValue<String>,
191        #[serde(default = "default_per_page_param")]
192        page_size_param: ConfigValue<String>,
193        page_size: ConfigValue<u64>,
194        #[serde(default, skip_serializing_if = "Option::is_none")]
195        total_pages_path: Option<ConfigValue<String>>,
196    },
197    /// Cursor-based pagination.
198    Cursor {
199        cursor_param: ConfigValue<String>,
200        cursor_path: ConfigValue<String>,
201        #[serde(default, skip_serializing_if = "Option::is_none")]
202        has_more_path: Option<ConfigValue<String>>,
203        #[serde(default, skip_serializing_if = "Option::is_none")]
204        page_size_param: Option<ConfigValue<String>>,
205        #[serde(default, skip_serializing_if = "Option::is_none")]
206        page_size: Option<ConfigValue<u64>>,
207    },
208    /// Link header pagination (RFC 5988).
209    LinkHeader {
210        #[serde(default, skip_serializing_if = "Option::is_none")]
211        page_size_param: Option<ConfigValue<String>>,
212        #[serde(default, skip_serializing_if = "Option::is_none")]
213        page_size: Option<ConfigValue<u64>>,
214    },
215    /// Next URL from response body.
216    NextUrl {
217        next_url_path: ConfigValue<String>,
218        #[serde(default, skip_serializing_if = "Option::is_none")]
219        base_url: Option<ConfigValue<String>>,
220    },
221}
222
223fn default_offset_param() -> ConfigValue<String> {
224    ConfigValue::Static("offset".to_string())
225}
226
227fn default_limit_param() -> ConfigValue<String> {
228    ConfigValue::Static("limit".to_string())
229}
230
231fn default_page_param() -> ConfigValue<String> {
232    ConfigValue::Static("page".to_string())
233}
234
235fn default_per_page_param() -> ConfigValue<String> {
236    ConfigValue::Static("per_page".to_string())
237}
238
239/// Response parsing configuration DTO.
240#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
241#[schema(as = bootstrap::http::ResponseConfig)]
242#[serde(rename_all = "camelCase")]
243pub struct ResponseConfigDto {
244    /// JSONPath expression to locate the array of items in the response.
245    #[serde(default = "default_items_path")]
246    pub items_path: ConfigValue<String>,
247
248    /// Content type override.
249    #[serde(default, skip_serializing_if = "Option::is_none")]
250    #[schema(value_type = Option<bootstrap::http::ContentTypeOverride>)]
251    pub content_type: Option<ContentTypeOverrideDto>,
252
253    /// Element mapping configurations.
254    #[schema(value_type = Vec<bootstrap::http::ElementMappingConfig>)]
255    pub mappings: Vec<ElementMappingConfigDto>,
256}
257
258fn default_items_path() -> ConfigValue<String> {
259    ConfigValue::Static("$".to_string())
260}
261
262/// Content type override DTO.
263#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
264#[schema(as = bootstrap::http::ContentTypeOverride)]
265#[serde(rename_all = "lowercase")]
266pub enum ContentTypeOverrideDto {
267    Json,
268    Xml,
269    Yaml,
270}
271
272/// Element mapping configuration DTO.
273#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
274#[schema(as = bootstrap::http::ElementMappingConfig)]
275#[serde(rename_all = "camelCase")]
276pub struct ElementMappingConfigDto {
277    /// Type of element to create.
278    #[schema(value_type = bootstrap::http::ElementType)]
279    pub element_type: ElementTypeDto,
280
281    /// Operation type for the source change (default: update for idempotent bootstrap).
282    #[serde(default)]
283    #[schema(value_type = bootstrap::http::OperationType)]
284    pub operation: OperationTypeDto,
285
286    /// Template for element creation.
287    #[schema(value_type = bootstrap::http::ElementTemplate)]
288    pub template: ElementTemplateDto,
289}
290
291/// Operation type DTO.
292#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default, utoipa::ToSchema)]
293#[schema(as = bootstrap::http::OperationType)]
294#[serde(rename_all = "lowercase")]
295pub enum OperationTypeDto {
296    Insert,
297    #[default]
298    Update,
299    Delete,
300}
301
302/// Element type DTO.
303#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
304#[schema(as = bootstrap::http::ElementType)]
305#[serde(rename_all = "lowercase")]
306pub enum ElementTypeDto {
307    Node,
308    Relation,
309}
310
311/// Element template DTO using Handlebars expressions.
312#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
313#[schema(as = bootstrap::http::ElementTemplate)]
314#[serde(rename_all = "camelCase")]
315pub struct ElementTemplateDto {
316    /// Handlebars template for element ID.
317    pub id: ConfigValue<String>,
318
319    /// Handlebars templates for element labels.
320    pub labels: Vec<ConfigValue<String>>,
321
322    /// Properties mapping (each value is a Handlebars template or literal).
323    #[serde(default, skip_serializing_if = "Option::is_none")]
324    pub properties: Option<serde_json::Value>,
325
326    /// Template for relation source node ID (relations only).
327    #[serde(default, skip_serializing_if = "Option::is_none")]
328    pub from: Option<ConfigValue<String>>,
329
330    /// Template for relation target node ID (relations only).
331    #[serde(default, skip_serializing_if = "Option::is_none")]
332    pub to: Option<ConfigValue<String>>,
333}
334
335// ── Mapping functions ────────────────────────────────────────────────────────
336
337fn map_http_method(dto: &HttpMethodDto) -> HttpMethod {
338    match dto {
339        HttpMethodDto::Get => HttpMethod::Get,
340        HttpMethodDto::Post => HttpMethod::Post,
341        HttpMethodDto::Put => HttpMethod::Put,
342    }
343}
344
345fn map_api_key_location(dto: &ApiKeyLocationDto) -> ApiKeyLocation {
346    match dto {
347        ApiKeyLocationDto::Header => ApiKeyLocation::Header,
348        ApiKeyLocationDto::Query => ApiKeyLocation::Query,
349    }
350}
351
352fn map_element_type(dto: &ElementTypeDto) -> ElementType {
353    match dto {
354        ElementTypeDto::Node => ElementType::Node,
355        ElementTypeDto::Relation => ElementType::Relation,
356    }
357}
358
359fn map_operation_type(dto: &OperationTypeDto) -> OperationType {
360    match dto {
361        OperationTypeDto::Insert => OperationType::Insert,
362        OperationTypeDto::Update => OperationType::Update,
363        OperationTypeDto::Delete => OperationType::Delete,
364    }
365}
366
367fn map_content_type_override(dto: &ContentTypeOverrideDto) -> ContentTypeOverride {
368    match dto {
369        ContentTypeOverrideDto::Json => ContentTypeOverride::Json,
370        ContentTypeOverrideDto::Xml => ContentTypeOverride::Xml,
371        ContentTypeOverrideDto::Yaml => ContentTypeOverride::Yaml,
372    }
373}
374
375async fn map_auth_config(
376    dto: &AuthConfigDto,
377    resolver: &DtoMapper,
378) -> Result<AuthConfig, MappingError> {
379    match dto {
380        AuthConfigDto::Bearer { token_env } => Ok(AuthConfig::Bearer {
381            token_env: resolver.resolve_string(token_env).await?,
382        }),
383        AuthConfigDto::ApiKey {
384            location,
385            name,
386            value_env,
387        } => Ok(AuthConfig::ApiKey {
388            location: map_api_key_location(location),
389            name: resolver.resolve_string(name).await?,
390            value_env: resolver.resolve_string(value_env).await?,
391        }),
392        AuthConfigDto::Basic {
393            username_env,
394            password_env,
395        } => Ok(AuthConfig::Basic {
396            username_env: resolver.resolve_string(username_env).await?,
397            password_env: resolver.resolve_optional_string(password_env).await?,
398        }),
399        AuthConfigDto::OAuth2ClientCredentials {
400            token_url,
401            client_id_env,
402            client_secret_env,
403            scopes,
404        } => Ok(AuthConfig::OAuth2ClientCredentials {
405            token_url: resolver.resolve_string(token_url).await?,
406            client_id_env: resolver.resolve_string(client_id_env).await?,
407            client_secret_env: resolver.resolve_string(client_secret_env).await?,
408            scopes: resolver.resolve_string_vec(scopes).await?,
409        }),
410    }
411}
412
413async fn map_pagination_config(
414    dto: &PaginationConfigDto,
415    resolver: &DtoMapper,
416) -> Result<PaginationConfig, MappingError> {
417    match dto {
418        PaginationConfigDto::OffsetLimit {
419            offset_param,
420            limit_param,
421            page_size,
422            total_path,
423        } => Ok(PaginationConfig::OffsetLimit {
424            offset_param: resolver.resolve_string(offset_param).await?,
425            limit_param: resolver.resolve_string(limit_param).await?,
426            page_size: resolver.resolve_typed(page_size).await?,
427            total_path: resolver.resolve_optional_string(total_path).await?,
428        }),
429        PaginationConfigDto::PageNumber {
430            page_param,
431            page_size_param,
432            page_size,
433            total_pages_path,
434        } => Ok(PaginationConfig::PageNumber {
435            page_param: resolver.resolve_string(page_param).await?,
436            page_size_param: resolver.resolve_string(page_size_param).await?,
437            page_size: resolver.resolve_typed(page_size).await?,
438            total_pages_path: resolver.resolve_optional_string(total_pages_path).await?,
439        }),
440        PaginationConfigDto::Cursor {
441            cursor_param,
442            cursor_path,
443            has_more_path,
444            page_size_param,
445            page_size,
446        } => Ok(PaginationConfig::Cursor {
447            cursor_param: resolver.resolve_string(cursor_param).await?,
448            cursor_path: resolver.resolve_string(cursor_path).await?,
449            has_more_path: resolver.resolve_optional_string(has_more_path).await?,
450            page_size_param: resolver.resolve_optional_string(page_size_param).await?,
451            page_size: resolver.resolve_optional(page_size).await?,
452        }),
453        PaginationConfigDto::LinkHeader {
454            page_size_param,
455            page_size,
456        } => Ok(PaginationConfig::LinkHeader {
457            page_size_param: resolver.resolve_optional_string(page_size_param).await?,
458            page_size: resolver.resolve_optional(page_size).await?,
459        }),
460        PaginationConfigDto::NextUrl {
461            next_url_path,
462            base_url,
463        } => Ok(PaginationConfig::NextUrl {
464            next_url_path: resolver.resolve_string(next_url_path).await?,
465            base_url: resolver.resolve_optional_string(base_url).await?,
466        }),
467    }
468}
469
470async fn map_element_template(
471    dto: &ElementTemplateDto,
472    resolver: &DtoMapper,
473) -> Result<ElementTemplate, MappingError> {
474    Ok(ElementTemplate {
475        id: resolver.resolve_string(&dto.id).await?,
476        labels: resolver.resolve_string_vec(&dto.labels).await?,
477        properties: dto.properties.clone(),
478        from: resolver.resolve_optional_string(&dto.from).await?,
479        to: resolver.resolve_optional_string(&dto.to).await?,
480    })
481}
482
483async fn map_element_mapping(
484    dto: &ElementMappingConfigDto,
485    resolver: &DtoMapper,
486) -> Result<ElementMappingConfig, MappingError> {
487    Ok(ElementMappingConfig {
488        element_type: map_element_type(&dto.element_type),
489        operation: map_operation_type(&dto.operation),
490        template: map_element_template(&dto.template, resolver).await?,
491    })
492}
493
494async fn map_response_config(
495    dto: &ResponseConfigDto,
496    resolver: &DtoMapper,
497) -> Result<ResponseConfig, MappingError> {
498    let mut mappings = Vec::with_capacity(dto.mappings.len());
499    for mapping in &dto.mappings {
500        mappings.push(map_element_mapping(mapping, resolver).await?);
501    }
502
503    Ok(ResponseConfig {
504        items_path: resolver.resolve_string(&dto.items_path).await?,
505        content_type: dto.content_type.as_ref().map(map_content_type_override),
506        mappings,
507    })
508}
509
510async fn map_endpoint_config(
511    dto: &EndpointConfigDto,
512    resolver: &DtoMapper,
513) -> Result<EndpointConfig, MappingError> {
514    let mut headers = HashMap::with_capacity(dto.headers.len());
515    for (key, value) in &dto.headers {
516        headers.insert(key.clone(), resolver.resolve_string(value).await?);
517    }
518
519    let auth = match &dto.auth {
520        Some(auth) => Some(map_auth_config(auth, resolver).await?),
521        None => None,
522    };
523
524    let pagination = match &dto.pagination {
525        Some(pagination) => Some(map_pagination_config(pagination, resolver).await?),
526        None => None,
527    };
528
529    Ok(EndpointConfig {
530        url: resolver.resolve_string(&dto.url).await?,
531        method: map_http_method(&dto.method),
532        headers,
533        body: dto.body.clone(),
534        auth,
535        pagination,
536        response: map_response_config(&dto.response, resolver).await?,
537    })
538}
539
540async fn map_config(
541    dto: &HttpBootstrapConfigDto,
542    resolver: &DtoMapper,
543) -> Result<HttpBootstrapConfig, MappingError> {
544    let mut endpoints = Vec::with_capacity(dto.endpoints.len());
545    for endpoint in &dto.endpoints {
546        endpoints.push(map_endpoint_config(endpoint, resolver).await?);
547    }
548
549    let max_pages = match &dto.max_pages {
550        Some(value) => Some(resolver.resolve_typed(value).await?),
551        None => None,
552    };
553
554    Ok(HttpBootstrapConfig {
555        endpoints,
556        timeout_seconds: resolver.resolve_typed(&dto.timeout_seconds).await?,
557        max_retries: resolver.resolve_typed(&dto.max_retries).await?,
558        retry_delay_ms: resolver.resolve_typed(&dto.retry_delay_ms).await?,
559        max_pages,
560    })
561}
562
563// ── OpenAPI schema registration ─────────────────────────────────────────────
564
565#[derive(OpenApi)]
566#[openapi(components(schemas(
567    HttpBootstrapConfigDto,
568    EndpointConfigDto,
569    HttpMethodDto,
570    AuthConfigDto,
571    ApiKeyLocationDto,
572    PaginationConfigDto,
573    ResponseConfigDto,
574    ContentTypeOverrideDto,
575    ElementMappingConfigDto,
576    ElementTypeDto,
577    ElementTemplateDto,
578)))]
579struct HttpBootstrapSchemas;
580
581// ── Descriptor ──────────────────────────────────────────────────────────────
582
583/// Plugin descriptor for the HTTP bootstrap provider.
584pub struct HttpBootstrapDescriptor;
585
586#[async_trait]
587impl BootstrapPluginDescriptor for HttpBootstrapDescriptor {
588    fn kind(&self) -> &str {
589        "http"
590    }
591
592    fn config_version(&self) -> &str {
593        "1.0.0"
594    }
595
596    fn config_schema_name(&self) -> &str {
597        "bootstrap.http.HttpBootstrapConfig"
598    }
599
600    fn config_schema_json(&self) -> String {
601        let api = HttpBootstrapSchemas::openapi();
602        serde_json::to_string(
603            &api.components
604                .as_ref()
605                .expect("OpenAPI components missing")
606                .schemas,
607        )
608        .expect("Failed to serialize config schema")
609    }
610
611    async fn create_bootstrap_provider(
612        &self,
613        config_json: &serde_json::Value,
614        _source_config_json: &serde_json::Value,
615    ) -> anyhow::Result<Box<dyn BootstrapProvider>> {
616        let dto: HttpBootstrapConfigDto = serde_json::from_value(config_json.clone())
617            .map_err(|e| anyhow::anyhow!("Failed to parse HTTP bootstrap config: {e}"))?;
618
619        let mapper = DtoMapper::new();
620        let config = map_config(&dto, &mapper)
621            .await
622            .map_err(|e| anyhow::anyhow!("Failed to resolve HTTP bootstrap config: {e}"))?;
623
624        config.validate()?;
625
626        let provider = HttpBootstrapProvider::new(config)?;
627        Ok(Box::new(provider))
628    }
629}