Skip to main content

drasi_source_http/
descriptor.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! HTTP source plugin descriptor and configuration DTOs.
16
17use crate::config::{
18    AuthConfig, BearerConfig, CorsConfig, EffectiveFromConfig, ElementTemplate, ElementType,
19    ErrorBehavior, HttpMethod, MappingCondition, OperationType, SignatureAlgorithm,
20    SignatureConfig, SignatureEncoding, TimestampFormat, WebhookConfig, WebhookMapping,
21    WebhookRoute,
22};
23use crate::{HttpSourceBuilder, HttpSourceConfig};
24use drasi_plugin_sdk::prelude::*;
25use std::collections::HashMap;
26use utoipa::OpenApi;
27
28/// HTTP source configuration DTO.
29#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
30#[schema(as = source::http::HttpSourceConfig)]
31#[serde(rename_all = "camelCase", deny_unknown_fields)]
32pub struct HttpSourceConfigDto {
33    pub host: ConfigValue<String>,
34    pub port: ConfigValue<u16>,
35    #[serde(default, skip_serializing_if = "Option::is_none")]
36    pub endpoint: Option<ConfigValue<String>>,
37    #[serde(default = "default_http_timeout_ms")]
38    pub timeout_ms: ConfigValue<u64>,
39    #[serde(default, skip_serializing_if = "Option::is_none")]
40    pub adaptive_max_batch_size: Option<ConfigValue<usize>>,
41    #[serde(default, skip_serializing_if = "Option::is_none")]
42    pub adaptive_min_batch_size: Option<ConfigValue<usize>>,
43    #[serde(default, skip_serializing_if = "Option::is_none")]
44    pub adaptive_max_wait_ms: Option<ConfigValue<u64>>,
45    #[serde(default, skip_serializing_if = "Option::is_none")]
46    pub adaptive_min_wait_ms: Option<ConfigValue<u64>>,
47    #[serde(default, skip_serializing_if = "Option::is_none")]
48    pub adaptive_window_secs: Option<ConfigValue<u64>>,
49    #[serde(default, skip_serializing_if = "Option::is_none")]
50    pub adaptive_enabled: Option<ConfigValue<bool>>,
51    #[serde(default, skip_serializing_if = "Option::is_none")]
52    #[schema(value_type = Option<source::http::WebhookConfig>)]
53    pub webhooks: Option<WebhookConfigDto>,
54}
55
56fn default_http_timeout_ms() -> ConfigValue<u64> {
57    ConfigValue::Static(10000)
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
61#[schema(as = source::http::WebhookConfig)]
62#[serde(rename_all = "camelCase")]
63pub struct WebhookConfigDto {
64    #[serde(default)]
65    #[schema(value_type = source::http::ErrorBehavior)]
66    pub error_behavior: ErrorBehaviorDto,
67    #[serde(default, skip_serializing_if = "Option::is_none")]
68    #[schema(value_type = Option<source::http::CorsConfig>)]
69    pub cors: Option<CorsConfigDto>,
70    #[schema(value_type = Vec<source::http::WebhookRoute>)]
71    pub routes: Vec<WebhookRouteDto>,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
75#[schema(as = source::http::CorsConfig)]
76#[serde(rename_all = "camelCase")]
77pub struct CorsConfigDto {
78    #[serde(default = "default_cors_enabled")]
79    pub enabled: bool,
80    #[serde(default = "default_cors_origins")]
81    pub allow_origins: Vec<ConfigValue<String>>,
82    #[serde(default = "default_cors_methods")]
83    pub allow_methods: Vec<ConfigValue<String>>,
84    #[serde(default = "default_cors_headers")]
85    pub allow_headers: Vec<ConfigValue<String>>,
86    #[serde(default, skip_serializing_if = "Vec::is_empty")]
87    pub expose_headers: Vec<ConfigValue<String>>,
88    #[serde(default)]
89    pub allow_credentials: bool,
90    #[serde(default = "default_cors_max_age")]
91    pub max_age: u64,
92}
93
94fn default_cors_enabled() -> bool {
95    true
96}
97
98fn default_cors_origins() -> Vec<ConfigValue<String>> {
99    vec![ConfigValue::Static("*".to_string())]
100}
101
102fn default_cors_methods() -> Vec<ConfigValue<String>> {
103    vec![
104        ConfigValue::Static("GET".to_string()),
105        ConfigValue::Static("POST".to_string()),
106        ConfigValue::Static("PUT".to_string()),
107        ConfigValue::Static("PATCH".to_string()),
108        ConfigValue::Static("DELETE".to_string()),
109        ConfigValue::Static("OPTIONS".to_string()),
110    ]
111}
112
113fn default_cors_headers() -> Vec<ConfigValue<String>> {
114    vec![
115        ConfigValue::Static("Content-Type".to_string()),
116        ConfigValue::Static("Authorization".to_string()),
117        ConfigValue::Static("X-Requested-With".to_string()),
118    ]
119}
120
121fn default_cors_max_age() -> u64 {
122    3600
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default, utoipa::ToSchema)]
126#[schema(as = source::http::ErrorBehavior)]
127#[serde(rename_all = "snake_case")]
128pub enum ErrorBehaviorDto {
129    #[default]
130    AcceptAndLog,
131    AcceptAndSkip,
132    Reject,
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
136#[schema(as = source::http::WebhookRoute)]
137#[serde(rename_all = "camelCase")]
138pub struct WebhookRouteDto {
139    pub path: ConfigValue<String>,
140    #[serde(default = "default_methods")]
141    #[schema(value_type = Vec<source::http::HttpMethod>)]
142    pub methods: Vec<HttpMethodDto>,
143    #[serde(default, skip_serializing_if = "Option::is_none")]
144    #[schema(value_type = Option<source::http::AuthConfig>)]
145    pub auth: Option<AuthConfigDto>,
146    #[serde(default, skip_serializing_if = "Option::is_none")]
147    #[schema(value_type = Option<source::http::ErrorBehavior>)]
148    pub error_behavior: Option<ErrorBehaviorDto>,
149    #[schema(value_type = Vec<source::http::WebhookMapping>)]
150    pub mappings: Vec<WebhookMappingDto>,
151}
152
153fn default_methods() -> Vec<HttpMethodDto> {
154    vec![HttpMethodDto::Post]
155}
156
157#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, utoipa::ToSchema)]
158#[schema(as = source::http::HttpMethod)]
159#[serde(rename_all = "UPPERCASE")]
160pub enum HttpMethodDto {
161    Get,
162    Post,
163    Put,
164    Patch,
165    Delete,
166}
167
168#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
169#[schema(as = source::http::AuthConfig)]
170#[serde(rename_all = "camelCase")]
171pub struct AuthConfigDto {
172    #[serde(default, skip_serializing_if = "Option::is_none")]
173    #[schema(value_type = Option<source::http::SignatureConfig>)]
174    pub signature: Option<SignatureConfigDto>,
175    #[serde(default, skip_serializing_if = "Option::is_none")]
176    #[schema(value_type = Option<source::http::BearerConfig>)]
177    pub bearer: Option<BearerConfigDto>,
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
181#[schema(as = source::http::SignatureConfig)]
182#[serde(rename_all = "camelCase")]
183pub struct SignatureConfigDto {
184    #[serde(rename = "type")]
185    #[schema(value_type = source::http::SignatureAlgorithm)]
186    pub algorithm: SignatureAlgorithmDto,
187    pub secret_env: ConfigValue<String>,
188    pub header: ConfigValue<String>,
189    #[serde(default, skip_serializing_if = "Option::is_none")]
190    pub prefix: Option<ConfigValue<String>>,
191    #[serde(default)]
192    #[schema(value_type = source::http::SignatureEncoding)]
193    pub encoding: SignatureEncodingDto,
194}
195
196#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
197#[schema(as = source::http::SignatureAlgorithm)]
198#[serde(rename_all = "kebab-case")]
199pub enum SignatureAlgorithmDto {
200    HmacSha1,
201    HmacSha256,
202}
203
204#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default, utoipa::ToSchema)]
205#[schema(as = source::http::SignatureEncoding)]
206#[serde(rename_all = "lowercase")]
207pub enum SignatureEncodingDto {
208    #[default]
209    Hex,
210    Base64,
211}
212
213#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
214#[schema(as = source::http::BearerConfig)]
215#[serde(rename_all = "camelCase")]
216pub struct BearerConfigDto {
217    pub token_env: ConfigValue<String>,
218}
219
220#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
221#[schema(as = source::http::WebhookMapping)]
222#[serde(rename_all = "camelCase")]
223pub struct WebhookMappingDto {
224    #[serde(default, skip_serializing_if = "Option::is_none")]
225    #[schema(value_type = Option<source::http::MappingCondition>)]
226    pub when: Option<MappingConditionDto>,
227    #[serde(default, skip_serializing_if = "Option::is_none")]
228    #[schema(value_type = Option<source::http::OperationType>)]
229    pub operation: Option<OperationTypeDto>,
230    #[serde(default, skip_serializing_if = "Option::is_none")]
231    pub operation_from: Option<ConfigValue<String>>,
232    #[serde(default, skip_serializing_if = "Option::is_none")]
233    #[schema(value_type = Option<HashMap<String, source::http::OperationType>>)]
234    pub operation_map: Option<HashMap<String, OperationTypeDto>>,
235    #[schema(value_type = source::http::ElementType)]
236    pub element_type: ElementTypeDto,
237    #[serde(default, skip_serializing_if = "Option::is_none")]
238    #[schema(value_type = Option<source::http::EffectiveFromConfig>)]
239    pub effective_from: Option<EffectiveFromConfigDto>,
240    #[schema(value_type = source::http::ElementTemplate)]
241    pub template: ElementTemplateDto,
242}
243
244#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
245#[schema(as = source::http::MappingCondition)]
246#[serde(rename_all = "camelCase")]
247pub struct MappingConditionDto {
248    #[serde(default, skip_serializing_if = "Option::is_none")]
249    pub header: Option<ConfigValue<String>>,
250    #[serde(default, skip_serializing_if = "Option::is_none")]
251    pub field: Option<ConfigValue<String>>,
252    #[serde(default, skip_serializing_if = "Option::is_none")]
253    pub equals: Option<ConfigValue<String>>,
254    #[serde(default, skip_serializing_if = "Option::is_none")]
255    pub contains: Option<ConfigValue<String>>,
256    #[serde(default, skip_serializing_if = "Option::is_none")]
257    pub regex: Option<ConfigValue<String>>,
258}
259
260#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
261#[schema(as = source::http::OperationType)]
262#[serde(rename_all = "lowercase")]
263pub enum OperationTypeDto {
264    Insert,
265    Update,
266    Delete,
267}
268
269#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
270#[schema(as = source::http::ElementType)]
271#[serde(rename_all = "lowercase")]
272pub enum ElementTypeDto {
273    Node,
274    Relation,
275}
276
277#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
278#[schema(as = source::http::EffectiveFromConfig)]
279#[serde(untagged)]
280pub enum EffectiveFromConfigDto {
281    Simple(ConfigValue<String>),
282    Explicit {
283        value: ConfigValue<String>,
284        format: TimestampFormatDto,
285    },
286}
287
288#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
289#[schema(as = source::http::TimestampFormat)]
290#[serde(rename_all = "snake_case")]
291pub enum TimestampFormatDto {
292    Iso8601,
293    UnixSeconds,
294    UnixMillis,
295    UnixNanos,
296}
297
298#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
299#[schema(as = source::http::ElementTemplate)]
300#[serde(rename_all = "camelCase")]
301pub struct ElementTemplateDto {
302    pub id: ConfigValue<String>,
303    pub labels: Vec<ConfigValue<String>>,
304    #[serde(default, skip_serializing_if = "Option::is_none")]
305    pub properties: Option<serde_json::Value>,
306    #[serde(default, skip_serializing_if = "Option::is_none")]
307    pub from: Option<ConfigValue<String>>,
308    #[serde(default, skip_serializing_if = "Option::is_none")]
309    pub to: Option<ConfigValue<String>>,
310}
311
312// --- Mapping functions ---
313
314fn map_error_behavior(dto: &ErrorBehaviorDto) -> ErrorBehavior {
315    match dto {
316        ErrorBehaviorDto::AcceptAndLog => ErrorBehavior::AcceptAndLog,
317        ErrorBehaviorDto::AcceptAndSkip => ErrorBehavior::AcceptAndSkip,
318        ErrorBehaviorDto::Reject => ErrorBehavior::Reject,
319    }
320}
321
322fn map_http_method(dto: &HttpMethodDto) -> HttpMethod {
323    match dto {
324        HttpMethodDto::Get => HttpMethod::Get,
325        HttpMethodDto::Post => HttpMethod::Post,
326        HttpMethodDto::Put => HttpMethod::Put,
327        HttpMethodDto::Patch => HttpMethod::Patch,
328        HttpMethodDto::Delete => HttpMethod::Delete,
329    }
330}
331
332fn map_signature_algorithm(dto: &SignatureAlgorithmDto) -> SignatureAlgorithm {
333    match dto {
334        SignatureAlgorithmDto::HmacSha1 => SignatureAlgorithm::HmacSha1,
335        SignatureAlgorithmDto::HmacSha256 => SignatureAlgorithm::HmacSha256,
336    }
337}
338
339fn map_signature_encoding(dto: &SignatureEncodingDto) -> SignatureEncoding {
340    match dto {
341        SignatureEncodingDto::Hex => SignatureEncoding::Hex,
342        SignatureEncodingDto::Base64 => SignatureEncoding::Base64,
343    }
344}
345
346fn map_operation_type(dto: &OperationTypeDto) -> OperationType {
347    match dto {
348        OperationTypeDto::Insert => OperationType::Insert,
349        OperationTypeDto::Update => OperationType::Update,
350        OperationTypeDto::Delete => OperationType::Delete,
351    }
352}
353
354fn map_element_type(dto: &ElementTypeDto) -> ElementType {
355    match dto {
356        ElementTypeDto::Node => ElementType::Node,
357        ElementTypeDto::Relation => ElementType::Relation,
358    }
359}
360
361fn map_timestamp_format(dto: &TimestampFormatDto) -> TimestampFormat {
362    match dto {
363        TimestampFormatDto::Iso8601 => TimestampFormat::Iso8601,
364        TimestampFormatDto::UnixSeconds => TimestampFormat::UnixSeconds,
365        TimestampFormatDto::UnixMillis => TimestampFormat::UnixMillis,
366        TimestampFormatDto::UnixNanos => TimestampFormat::UnixNanos,
367    }
368}
369
370fn map_webhook_config(
371    dto: &WebhookConfigDto,
372    resolver: &DtoMapper,
373) -> Result<WebhookConfig, MappingError> {
374    Ok(WebhookConfig {
375        error_behavior: map_error_behavior(&dto.error_behavior),
376        cors: dto
377            .cors
378            .as_ref()
379            .map(|c| map_cors_config(c, resolver))
380            .transpose()?,
381        routes: dto
382            .routes
383            .iter()
384            .map(|r| map_webhook_route(r, resolver))
385            .collect::<Result<Vec<_>, _>>()?,
386    })
387}
388
389fn map_cors_config(dto: &CorsConfigDto, resolver: &DtoMapper) -> Result<CorsConfig, MappingError> {
390    Ok(CorsConfig {
391        enabled: dto.enabled,
392        allow_origins: resolver.resolve_string_vec(&dto.allow_origins)?,
393        allow_methods: resolver.resolve_string_vec(&dto.allow_methods)?,
394        allow_headers: resolver.resolve_string_vec(&dto.allow_headers)?,
395        expose_headers: resolver.resolve_string_vec(&dto.expose_headers)?,
396        allow_credentials: dto.allow_credentials,
397        max_age: dto.max_age,
398    })
399}
400
401fn map_webhook_route(
402    dto: &WebhookRouteDto,
403    resolver: &DtoMapper,
404) -> Result<WebhookRoute, MappingError> {
405    Ok(WebhookRoute {
406        path: resolver.resolve_string(&dto.path)?,
407        methods: dto.methods.iter().map(map_http_method).collect(),
408        auth: dto
409            .auth
410            .as_ref()
411            .map(|a| map_auth_config(a, resolver))
412            .transpose()?,
413        error_behavior: dto.error_behavior.as_ref().map(map_error_behavior),
414        mappings: dto
415            .mappings
416            .iter()
417            .map(|m| map_webhook_mapping(m, resolver))
418            .collect::<Result<Vec<_>, _>>()?,
419    })
420}
421
422fn map_auth_config(dto: &AuthConfigDto, resolver: &DtoMapper) -> Result<AuthConfig, MappingError> {
423    Ok(AuthConfig {
424        signature: dto
425            .signature
426            .as_ref()
427            .map(|s| map_signature_config(s, resolver))
428            .transpose()?,
429        bearer: dto
430            .bearer
431            .as_ref()
432            .map(|b| map_bearer_config(b, resolver))
433            .transpose()?,
434    })
435}
436
437fn map_signature_config(
438    dto: &SignatureConfigDto,
439    resolver: &DtoMapper,
440) -> Result<SignatureConfig, MappingError> {
441    Ok(SignatureConfig {
442        algorithm: map_signature_algorithm(&dto.algorithm),
443        secret_env: resolver.resolve_string(&dto.secret_env)?,
444        header: resolver.resolve_string(&dto.header)?,
445        prefix: resolver.resolve_optional_string(&dto.prefix)?,
446        encoding: map_signature_encoding(&dto.encoding),
447    })
448}
449
450fn map_bearer_config(
451    dto: &BearerConfigDto,
452    resolver: &DtoMapper,
453) -> Result<BearerConfig, MappingError> {
454    Ok(BearerConfig {
455        token_env: resolver.resolve_string(&dto.token_env)?,
456    })
457}
458
459fn map_webhook_mapping(
460    dto: &WebhookMappingDto,
461    resolver: &DtoMapper,
462) -> Result<WebhookMapping, MappingError> {
463    Ok(WebhookMapping {
464        when: dto
465            .when
466            .as_ref()
467            .map(|c| map_mapping_condition(c, resolver))
468            .transpose()?,
469        operation: dto.operation.as_ref().map(map_operation_type),
470        operation_from: resolver.resolve_optional_string(&dto.operation_from)?,
471        operation_map: dto.operation_map.as_ref().map(|m| {
472            m.iter()
473                .map(|(k, v)| (k.clone(), map_operation_type(v)))
474                .collect()
475        }),
476        element_type: map_element_type(&dto.element_type),
477        effective_from: dto
478            .effective_from
479            .as_ref()
480            .map(|e| map_effective_from(e, resolver))
481            .transpose()?,
482        template: map_element_template(&dto.template, resolver)?,
483    })
484}
485
486fn map_mapping_condition(
487    dto: &MappingConditionDto,
488    resolver: &DtoMapper,
489) -> Result<MappingCondition, MappingError> {
490    Ok(MappingCondition {
491        header: resolver.resolve_optional_string(&dto.header)?,
492        field: resolver.resolve_optional_string(&dto.field)?,
493        equals: resolver.resolve_optional_string(&dto.equals)?,
494        contains: resolver.resolve_optional_string(&dto.contains)?,
495        regex: resolver.resolve_optional_string(&dto.regex)?,
496    })
497}
498
499fn map_effective_from(
500    dto: &EffectiveFromConfigDto,
501    resolver: &DtoMapper,
502) -> Result<EffectiveFromConfig, MappingError> {
503    match dto {
504        EffectiveFromConfigDto::Simple(v) => {
505            Ok(EffectiveFromConfig::Simple(resolver.resolve_string(v)?))
506        }
507        EffectiveFromConfigDto::Explicit { value, format } => Ok(EffectiveFromConfig::Explicit {
508            value: resolver.resolve_string(value)?,
509            format: map_timestamp_format(format),
510        }),
511    }
512}
513
514fn map_element_template(
515    dto: &ElementTemplateDto,
516    resolver: &DtoMapper,
517) -> Result<ElementTemplate, MappingError> {
518    Ok(ElementTemplate {
519        id: resolver.resolve_string(&dto.id)?,
520        labels: resolver.resolve_string_vec(&dto.labels)?,
521        properties: dto.properties.clone(),
522        from: resolver.resolve_optional_string(&dto.from)?,
523        to: resolver.resolve_optional_string(&dto.to)?,
524    })
525}
526
527#[derive(OpenApi)]
528#[openapi(components(schemas(
529    HttpSourceConfigDto,
530    WebhookConfigDto,
531    CorsConfigDto,
532    ErrorBehaviorDto,
533    WebhookRouteDto,
534    HttpMethodDto,
535    AuthConfigDto,
536    SignatureConfigDto,
537    SignatureAlgorithmDto,
538    SignatureEncodingDto,
539    BearerConfigDto,
540    WebhookMappingDto,
541    MappingConditionDto,
542    OperationTypeDto,
543    ElementTypeDto,
544    EffectiveFromConfigDto,
545    TimestampFormatDto,
546    ElementTemplateDto,
547)))]
548struct HttpSourceSchemas;
549
550/// Descriptor for the HTTP source plugin.
551pub struct HttpSourceDescriptor;
552
553#[async_trait]
554impl SourcePluginDescriptor for HttpSourceDescriptor {
555    fn kind(&self) -> &str {
556        "http"
557    }
558
559    fn config_version(&self) -> &str {
560        "1.0.0"
561    }
562
563    fn config_schema_name(&self) -> &str {
564        "source.http.HttpSourceConfig"
565    }
566
567    fn config_schema_json(&self) -> String {
568        let api = HttpSourceSchemas::openapi();
569        serde_json::to_string(
570            &api.components
571                .as_ref()
572                .expect("OpenAPI components missing")
573                .schemas,
574        )
575        .expect("Failed to serialize config schema")
576    }
577
578    async fn create_source(
579        &self,
580        id: &str,
581        config_json: &serde_json::Value,
582        auto_start: bool,
583    ) -> anyhow::Result<Box<dyn drasi_lib::sources::Source>> {
584        let dto: HttpSourceConfigDto = serde_json::from_value(config_json.clone())?;
585        let mapper = DtoMapper::new();
586
587        let config = HttpSourceConfig {
588            host: mapper.resolve_string(&dto.host)?,
589            port: mapper.resolve_typed(&dto.port)?,
590            endpoint: mapper.resolve_optional(&dto.endpoint)?,
591            timeout_ms: mapper.resolve_typed(&dto.timeout_ms)?,
592            adaptive_max_batch_size: mapper.resolve_optional(&dto.adaptive_max_batch_size)?,
593            adaptive_min_batch_size: mapper.resolve_optional(&dto.adaptive_min_batch_size)?,
594            adaptive_max_wait_ms: mapper.resolve_optional(&dto.adaptive_max_wait_ms)?,
595            adaptive_min_wait_ms: mapper.resolve_optional(&dto.adaptive_min_wait_ms)?,
596            adaptive_window_secs: mapper.resolve_optional(&dto.adaptive_window_secs)?,
597            adaptive_enabled: mapper.resolve_optional(&dto.adaptive_enabled)?,
598            webhooks: dto
599                .webhooks
600                .as_ref()
601                .map(|w| map_webhook_config(w, &mapper))
602                .transpose()?,
603        };
604
605        let source = HttpSourceBuilder::new(id)
606            .with_config(config)
607            .with_auto_start(auto_start)
608            .build()?;
609
610        Ok(Box::new(source))
611    }
612}