1use crate::config::{
18 AuthConfig, BearerConfig, CorsConfig, EffectiveFromConfig, ElementTemplate, ElementType,
19 ErrorBehavior, HttpMethod, MappingCondition, OperationType, SignatureAlgorithm,
20 SignatureConfig, SignatureEncoding, TimestampFormat, WebhookConfig, WebhookMapping,
21 WebhookRoute,
22};
23use crate::{HttpSourceBuilder, HttpSourceConfig};
24use drasi_plugin_sdk::prelude::*;
25use std::collections::HashMap;
26use utoipa::OpenApi;
27
28#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
30#[schema(as = source::http::HttpSourceConfig)]
31#[serde(rename_all = "camelCase", deny_unknown_fields)]
32pub struct HttpSourceConfigDto {
33 pub host: ConfigValue<String>,
34 pub port: ConfigValue<u16>,
35 #[serde(default, skip_serializing_if = "Option::is_none")]
36 pub endpoint: Option<ConfigValue<String>>,
37 #[serde(default = "default_http_timeout_ms")]
38 pub timeout_ms: ConfigValue<u64>,
39 #[serde(default, skip_serializing_if = "Option::is_none")]
40 pub adaptive_max_batch_size: Option<ConfigValue<usize>>,
41 #[serde(default, skip_serializing_if = "Option::is_none")]
42 pub adaptive_min_batch_size: Option<ConfigValue<usize>>,
43 #[serde(default, skip_serializing_if = "Option::is_none")]
44 pub adaptive_max_wait_ms: Option<ConfigValue<u64>>,
45 #[serde(default, skip_serializing_if = "Option::is_none")]
46 pub adaptive_min_wait_ms: Option<ConfigValue<u64>>,
47 #[serde(default, skip_serializing_if = "Option::is_none")]
48 pub adaptive_window_secs: Option<ConfigValue<u64>>,
49 #[serde(default, skip_serializing_if = "Option::is_none")]
50 pub adaptive_enabled: Option<ConfigValue<bool>>,
51 #[serde(default, skip_serializing_if = "Option::is_none")]
52 #[schema(value_type = Option<source::http::WebhookConfig>)]
53 pub webhooks: Option<WebhookConfigDto>,
54}
55
56fn default_http_timeout_ms() -> ConfigValue<u64> {
57 ConfigValue::Static(10000)
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
61#[schema(as = source::http::WebhookConfig)]
62#[serde(rename_all = "camelCase")]
63pub struct WebhookConfigDto {
64 #[serde(default)]
65 #[schema(value_type = source::http::ErrorBehavior)]
66 pub error_behavior: ErrorBehaviorDto,
67 #[serde(default, skip_serializing_if = "Option::is_none")]
68 #[schema(value_type = Option<source::http::CorsConfig>)]
69 pub cors: Option<CorsConfigDto>,
70 #[schema(value_type = Vec<source::http::WebhookRoute>)]
71 pub routes: Vec<WebhookRouteDto>,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
75#[schema(as = source::http::CorsConfig)]
76#[serde(rename_all = "camelCase")]
77pub struct CorsConfigDto {
78 #[serde(default = "default_cors_enabled")]
79 pub enabled: bool,
80 #[serde(default = "default_cors_origins")]
81 pub allow_origins: Vec<ConfigValue<String>>,
82 #[serde(default = "default_cors_methods")]
83 pub allow_methods: Vec<ConfigValue<String>>,
84 #[serde(default = "default_cors_headers")]
85 pub allow_headers: Vec<ConfigValue<String>>,
86 #[serde(default, skip_serializing_if = "Vec::is_empty")]
87 pub expose_headers: Vec<ConfigValue<String>>,
88 #[serde(default)]
89 pub allow_credentials: bool,
90 #[serde(default = "default_cors_max_age")]
91 pub max_age: u64,
92}
93
94fn default_cors_enabled() -> bool {
95 true
96}
97
98fn default_cors_origins() -> Vec<ConfigValue<String>> {
99 vec![ConfigValue::Static("*".to_string())]
100}
101
102fn default_cors_methods() -> Vec<ConfigValue<String>> {
103 vec![
104 ConfigValue::Static("GET".to_string()),
105 ConfigValue::Static("POST".to_string()),
106 ConfigValue::Static("PUT".to_string()),
107 ConfigValue::Static("PATCH".to_string()),
108 ConfigValue::Static("DELETE".to_string()),
109 ConfigValue::Static("OPTIONS".to_string()),
110 ]
111}
112
113fn default_cors_headers() -> Vec<ConfigValue<String>> {
114 vec![
115 ConfigValue::Static("Content-Type".to_string()),
116 ConfigValue::Static("Authorization".to_string()),
117 ConfigValue::Static("X-Requested-With".to_string()),
118 ]
119}
120
121fn default_cors_max_age() -> u64 {
122 3600
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default, utoipa::ToSchema)]
126#[schema(as = source::http::ErrorBehavior)]
127#[serde(rename_all = "snake_case")]
128pub enum ErrorBehaviorDto {
129 #[default]
130 AcceptAndLog,
131 AcceptAndSkip,
132 Reject,
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
136#[schema(as = source::http::WebhookRoute)]
137#[serde(rename_all = "camelCase")]
138pub struct WebhookRouteDto {
139 pub path: ConfigValue<String>,
140 #[serde(default = "default_methods")]
141 #[schema(value_type = Vec<source::http::HttpMethod>)]
142 pub methods: Vec<HttpMethodDto>,
143 #[serde(default, skip_serializing_if = "Option::is_none")]
144 #[schema(value_type = Option<source::http::AuthConfig>)]
145 pub auth: Option<AuthConfigDto>,
146 #[serde(default, skip_serializing_if = "Option::is_none")]
147 #[schema(value_type = Option<source::http::ErrorBehavior>)]
148 pub error_behavior: Option<ErrorBehaviorDto>,
149 #[schema(value_type = Vec<source::http::WebhookMapping>)]
150 pub mappings: Vec<WebhookMappingDto>,
151}
152
153fn default_methods() -> Vec<HttpMethodDto> {
154 vec![HttpMethodDto::Post]
155}
156
157#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, utoipa::ToSchema)]
158#[schema(as = source::http::HttpMethod)]
159#[serde(rename_all = "UPPERCASE")]
160pub enum HttpMethodDto {
161 Get,
162 Post,
163 Put,
164 Patch,
165 Delete,
166}
167
168#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
169#[schema(as = source::http::AuthConfig)]
170#[serde(rename_all = "camelCase")]
171pub struct AuthConfigDto {
172 #[serde(default, skip_serializing_if = "Option::is_none")]
173 #[schema(value_type = Option<source::http::SignatureConfig>)]
174 pub signature: Option<SignatureConfigDto>,
175 #[serde(default, skip_serializing_if = "Option::is_none")]
176 #[schema(value_type = Option<source::http::BearerConfig>)]
177 pub bearer: Option<BearerConfigDto>,
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
181#[schema(as = source::http::SignatureConfig)]
182#[serde(rename_all = "camelCase")]
183pub struct SignatureConfigDto {
184 #[serde(rename = "type")]
185 #[schema(value_type = source::http::SignatureAlgorithm)]
186 pub algorithm: SignatureAlgorithmDto,
187 pub secret_env: ConfigValue<String>,
188 pub header: ConfigValue<String>,
189 #[serde(default, skip_serializing_if = "Option::is_none")]
190 pub prefix: Option<ConfigValue<String>>,
191 #[serde(default)]
192 #[schema(value_type = source::http::SignatureEncoding)]
193 pub encoding: SignatureEncodingDto,
194}
195
196#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
197#[schema(as = source::http::SignatureAlgorithm)]
198#[serde(rename_all = "kebab-case")]
199pub enum SignatureAlgorithmDto {
200 HmacSha1,
201 HmacSha256,
202}
203
204#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default, utoipa::ToSchema)]
205#[schema(as = source::http::SignatureEncoding)]
206#[serde(rename_all = "lowercase")]
207pub enum SignatureEncodingDto {
208 #[default]
209 Hex,
210 Base64,
211}
212
213#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
214#[schema(as = source::http::BearerConfig)]
215#[serde(rename_all = "camelCase")]
216pub struct BearerConfigDto {
217 pub token_env: ConfigValue<String>,
218}
219
220#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
221#[schema(as = source::http::WebhookMapping)]
222#[serde(rename_all = "camelCase")]
223pub struct WebhookMappingDto {
224 #[serde(default, skip_serializing_if = "Option::is_none")]
225 #[schema(value_type = Option<source::http::MappingCondition>)]
226 pub when: Option<MappingConditionDto>,
227 #[serde(default, skip_serializing_if = "Option::is_none")]
228 #[schema(value_type = Option<source::http::OperationType>)]
229 pub operation: Option<OperationTypeDto>,
230 #[serde(default, skip_serializing_if = "Option::is_none")]
231 pub operation_from: Option<ConfigValue<String>>,
232 #[serde(default, skip_serializing_if = "Option::is_none")]
233 #[schema(value_type = Option<HashMap<String, source::http::OperationType>>)]
234 pub operation_map: Option<HashMap<String, OperationTypeDto>>,
235 #[schema(value_type = source::http::ElementType)]
236 pub element_type: ElementTypeDto,
237 #[serde(default, skip_serializing_if = "Option::is_none")]
238 #[schema(value_type = Option<source::http::EffectiveFromConfig>)]
239 pub effective_from: Option<EffectiveFromConfigDto>,
240 #[schema(value_type = source::http::ElementTemplate)]
241 pub template: ElementTemplateDto,
242}
243
244#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
245#[schema(as = source::http::MappingCondition)]
246#[serde(rename_all = "camelCase")]
247pub struct MappingConditionDto {
248 #[serde(default, skip_serializing_if = "Option::is_none")]
249 pub header: Option<ConfigValue<String>>,
250 #[serde(default, skip_serializing_if = "Option::is_none")]
251 pub field: Option<ConfigValue<String>>,
252 #[serde(default, skip_serializing_if = "Option::is_none")]
253 pub equals: Option<ConfigValue<String>>,
254 #[serde(default, skip_serializing_if = "Option::is_none")]
255 pub contains: Option<ConfigValue<String>>,
256 #[serde(default, skip_serializing_if = "Option::is_none")]
257 pub regex: Option<ConfigValue<String>>,
258}
259
260#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
261#[schema(as = source::http::OperationType)]
262#[serde(rename_all = "lowercase")]
263pub enum OperationTypeDto {
264 Insert,
265 Update,
266 Delete,
267}
268
269#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
270#[schema(as = source::http::ElementType)]
271#[serde(rename_all = "lowercase")]
272pub enum ElementTypeDto {
273 Node,
274 Relation,
275}
276
277#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
278#[schema(as = source::http::EffectiveFromConfig)]
279#[serde(untagged)]
280pub enum EffectiveFromConfigDto {
281 Simple(ConfigValue<String>),
282 Explicit {
283 value: ConfigValue<String>,
284 format: TimestampFormatDto,
285 },
286}
287
288#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
289#[schema(as = source::http::TimestampFormat)]
290#[serde(rename_all = "snake_case")]
291pub enum TimestampFormatDto {
292 Iso8601,
293 UnixSeconds,
294 UnixMillis,
295 UnixNanos,
296}
297
298#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
299#[schema(as = source::http::ElementTemplate)]
300#[serde(rename_all = "camelCase")]
301pub struct ElementTemplateDto {
302 pub id: ConfigValue<String>,
303 pub labels: Vec<ConfigValue<String>>,
304 #[serde(default, skip_serializing_if = "Option::is_none")]
305 pub properties: Option<serde_json::Value>,
306 #[serde(default, skip_serializing_if = "Option::is_none")]
307 pub from: Option<ConfigValue<String>>,
308 #[serde(default, skip_serializing_if = "Option::is_none")]
309 pub to: Option<ConfigValue<String>>,
310}
311
312fn map_error_behavior(dto: &ErrorBehaviorDto) -> ErrorBehavior {
315 match dto {
316 ErrorBehaviorDto::AcceptAndLog => ErrorBehavior::AcceptAndLog,
317 ErrorBehaviorDto::AcceptAndSkip => ErrorBehavior::AcceptAndSkip,
318 ErrorBehaviorDto::Reject => ErrorBehavior::Reject,
319 }
320}
321
322fn map_http_method(dto: &HttpMethodDto) -> HttpMethod {
323 match dto {
324 HttpMethodDto::Get => HttpMethod::Get,
325 HttpMethodDto::Post => HttpMethod::Post,
326 HttpMethodDto::Put => HttpMethod::Put,
327 HttpMethodDto::Patch => HttpMethod::Patch,
328 HttpMethodDto::Delete => HttpMethod::Delete,
329 }
330}
331
332fn map_signature_algorithm(dto: &SignatureAlgorithmDto) -> SignatureAlgorithm {
333 match dto {
334 SignatureAlgorithmDto::HmacSha1 => SignatureAlgorithm::HmacSha1,
335 SignatureAlgorithmDto::HmacSha256 => SignatureAlgorithm::HmacSha256,
336 }
337}
338
339fn map_signature_encoding(dto: &SignatureEncodingDto) -> SignatureEncoding {
340 match dto {
341 SignatureEncodingDto::Hex => SignatureEncoding::Hex,
342 SignatureEncodingDto::Base64 => SignatureEncoding::Base64,
343 }
344}
345
346fn map_operation_type(dto: &OperationTypeDto) -> OperationType {
347 match dto {
348 OperationTypeDto::Insert => OperationType::Insert,
349 OperationTypeDto::Update => OperationType::Update,
350 OperationTypeDto::Delete => OperationType::Delete,
351 }
352}
353
354fn map_element_type(dto: &ElementTypeDto) -> ElementType {
355 match dto {
356 ElementTypeDto::Node => ElementType::Node,
357 ElementTypeDto::Relation => ElementType::Relation,
358 }
359}
360
361fn map_timestamp_format(dto: &TimestampFormatDto) -> TimestampFormat {
362 match dto {
363 TimestampFormatDto::Iso8601 => TimestampFormat::Iso8601,
364 TimestampFormatDto::UnixSeconds => TimestampFormat::UnixSeconds,
365 TimestampFormatDto::UnixMillis => TimestampFormat::UnixMillis,
366 TimestampFormatDto::UnixNanos => TimestampFormat::UnixNanos,
367 }
368}
369
370fn map_webhook_config(
371 dto: &WebhookConfigDto,
372 resolver: &DtoMapper,
373) -> Result<WebhookConfig, MappingError> {
374 Ok(WebhookConfig {
375 error_behavior: map_error_behavior(&dto.error_behavior),
376 cors: dto
377 .cors
378 .as_ref()
379 .map(|c| map_cors_config(c, resolver))
380 .transpose()?,
381 routes: dto
382 .routes
383 .iter()
384 .map(|r| map_webhook_route(r, resolver))
385 .collect::<Result<Vec<_>, _>>()?,
386 })
387}
388
389fn map_cors_config(dto: &CorsConfigDto, resolver: &DtoMapper) -> Result<CorsConfig, MappingError> {
390 Ok(CorsConfig {
391 enabled: dto.enabled,
392 allow_origins: resolver.resolve_string_vec(&dto.allow_origins)?,
393 allow_methods: resolver.resolve_string_vec(&dto.allow_methods)?,
394 allow_headers: resolver.resolve_string_vec(&dto.allow_headers)?,
395 expose_headers: resolver.resolve_string_vec(&dto.expose_headers)?,
396 allow_credentials: dto.allow_credentials,
397 max_age: dto.max_age,
398 })
399}
400
401fn map_webhook_route(
402 dto: &WebhookRouteDto,
403 resolver: &DtoMapper,
404) -> Result<WebhookRoute, MappingError> {
405 Ok(WebhookRoute {
406 path: resolver.resolve_string(&dto.path)?,
407 methods: dto.methods.iter().map(map_http_method).collect(),
408 auth: dto
409 .auth
410 .as_ref()
411 .map(|a| map_auth_config(a, resolver))
412 .transpose()?,
413 error_behavior: dto.error_behavior.as_ref().map(map_error_behavior),
414 mappings: dto
415 .mappings
416 .iter()
417 .map(|m| map_webhook_mapping(m, resolver))
418 .collect::<Result<Vec<_>, _>>()?,
419 })
420}
421
422fn map_auth_config(dto: &AuthConfigDto, resolver: &DtoMapper) -> Result<AuthConfig, MappingError> {
423 Ok(AuthConfig {
424 signature: dto
425 .signature
426 .as_ref()
427 .map(|s| map_signature_config(s, resolver))
428 .transpose()?,
429 bearer: dto
430 .bearer
431 .as_ref()
432 .map(|b| map_bearer_config(b, resolver))
433 .transpose()?,
434 })
435}
436
437fn map_signature_config(
438 dto: &SignatureConfigDto,
439 resolver: &DtoMapper,
440) -> Result<SignatureConfig, MappingError> {
441 Ok(SignatureConfig {
442 algorithm: map_signature_algorithm(&dto.algorithm),
443 secret_env: resolver.resolve_string(&dto.secret_env)?,
444 header: resolver.resolve_string(&dto.header)?,
445 prefix: resolver.resolve_optional_string(&dto.prefix)?,
446 encoding: map_signature_encoding(&dto.encoding),
447 })
448}
449
450fn map_bearer_config(
451 dto: &BearerConfigDto,
452 resolver: &DtoMapper,
453) -> Result<BearerConfig, MappingError> {
454 Ok(BearerConfig {
455 token_env: resolver.resolve_string(&dto.token_env)?,
456 })
457}
458
459fn map_webhook_mapping(
460 dto: &WebhookMappingDto,
461 resolver: &DtoMapper,
462) -> Result<WebhookMapping, MappingError> {
463 Ok(WebhookMapping {
464 when: dto
465 .when
466 .as_ref()
467 .map(|c| map_mapping_condition(c, resolver))
468 .transpose()?,
469 operation: dto.operation.as_ref().map(map_operation_type),
470 operation_from: resolver.resolve_optional_string(&dto.operation_from)?,
471 operation_map: dto.operation_map.as_ref().map(|m| {
472 m.iter()
473 .map(|(k, v)| (k.clone(), map_operation_type(v)))
474 .collect()
475 }),
476 element_type: map_element_type(&dto.element_type),
477 effective_from: dto
478 .effective_from
479 .as_ref()
480 .map(|e| map_effective_from(e, resolver))
481 .transpose()?,
482 template: map_element_template(&dto.template, resolver)?,
483 })
484}
485
486fn map_mapping_condition(
487 dto: &MappingConditionDto,
488 resolver: &DtoMapper,
489) -> Result<MappingCondition, MappingError> {
490 Ok(MappingCondition {
491 header: resolver.resolve_optional_string(&dto.header)?,
492 field: resolver.resolve_optional_string(&dto.field)?,
493 equals: resolver.resolve_optional_string(&dto.equals)?,
494 contains: resolver.resolve_optional_string(&dto.contains)?,
495 regex: resolver.resolve_optional_string(&dto.regex)?,
496 })
497}
498
499fn map_effective_from(
500 dto: &EffectiveFromConfigDto,
501 resolver: &DtoMapper,
502) -> Result<EffectiveFromConfig, MappingError> {
503 match dto {
504 EffectiveFromConfigDto::Simple(v) => {
505 Ok(EffectiveFromConfig::Simple(resolver.resolve_string(v)?))
506 }
507 EffectiveFromConfigDto::Explicit { value, format } => Ok(EffectiveFromConfig::Explicit {
508 value: resolver.resolve_string(value)?,
509 format: map_timestamp_format(format),
510 }),
511 }
512}
513
514fn map_element_template(
515 dto: &ElementTemplateDto,
516 resolver: &DtoMapper,
517) -> Result<ElementTemplate, MappingError> {
518 Ok(ElementTemplate {
519 id: resolver.resolve_string(&dto.id)?,
520 labels: resolver.resolve_string_vec(&dto.labels)?,
521 properties: dto.properties.clone(),
522 from: resolver.resolve_optional_string(&dto.from)?,
523 to: resolver.resolve_optional_string(&dto.to)?,
524 })
525}
526
527#[derive(OpenApi)]
528#[openapi(components(schemas(
529 HttpSourceConfigDto,
530 WebhookConfigDto,
531 CorsConfigDto,
532 ErrorBehaviorDto,
533 WebhookRouteDto,
534 HttpMethodDto,
535 AuthConfigDto,
536 SignatureConfigDto,
537 SignatureAlgorithmDto,
538 SignatureEncodingDto,
539 BearerConfigDto,
540 WebhookMappingDto,
541 MappingConditionDto,
542 OperationTypeDto,
543 ElementTypeDto,
544 EffectiveFromConfigDto,
545 TimestampFormatDto,
546 ElementTemplateDto,
547)))]
548struct HttpSourceSchemas;
549
550pub struct HttpSourceDescriptor;
552
553#[async_trait]
554impl SourcePluginDescriptor for HttpSourceDescriptor {
555 fn kind(&self) -> &str {
556 "http"
557 }
558
559 fn config_version(&self) -> &str {
560 "1.0.0"
561 }
562
563 fn config_schema_name(&self) -> &str {
564 "source.http.HttpSourceConfig"
565 }
566
567 fn config_schema_json(&self) -> String {
568 let api = HttpSourceSchemas::openapi();
569 serde_json::to_string(
570 &api.components
571 .as_ref()
572 .expect("OpenAPI components missing")
573 .schemas,
574 )
575 .expect("Failed to serialize config schema")
576 }
577
578 async fn create_source(
579 &self,
580 id: &str,
581 config_json: &serde_json::Value,
582 auto_start: bool,
583 ) -> anyhow::Result<Box<dyn drasi_lib::sources::Source>> {
584 let dto: HttpSourceConfigDto = serde_json::from_value(config_json.clone())?;
585 let mapper = DtoMapper::new();
586
587 let config = HttpSourceConfig {
588 host: mapper.resolve_string(&dto.host)?,
589 port: mapper.resolve_typed(&dto.port)?,
590 endpoint: mapper.resolve_optional(&dto.endpoint)?,
591 timeout_ms: mapper.resolve_typed(&dto.timeout_ms)?,
592 adaptive_max_batch_size: mapper.resolve_optional(&dto.adaptive_max_batch_size)?,
593 adaptive_min_batch_size: mapper.resolve_optional(&dto.adaptive_min_batch_size)?,
594 adaptive_max_wait_ms: mapper.resolve_optional(&dto.adaptive_max_wait_ms)?,
595 adaptive_min_wait_ms: mapper.resolve_optional(&dto.adaptive_min_wait_ms)?,
596 adaptive_window_secs: mapper.resolve_optional(&dto.adaptive_window_secs)?,
597 adaptive_enabled: mapper.resolve_optional(&dto.adaptive_enabled)?,
598 webhooks: dto
599 .webhooks
600 .as_ref()
601 .map(|w| map_webhook_config(w, &mapper))
602 .transpose()?,
603 };
604
605 let source = HttpSourceBuilder::new(id)
606 .with_config(config)
607 .with_auto_start(auto_start)
608 .build()?;
609
610 Ok(Box::new(source))
611 }
612}