Skip to main content

drasi_source_http/
config.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Configuration for the HTTP source.
16//!
17//! The HTTP source receives data changes via HTTP endpoints.
18//! It supports two mutually exclusive modes:
19//!
20//! - **Standard Mode**: Uses the built-in `HttpSourceChange` format
21//! - **Webhook Mode**: Custom routes with configurable payload mappings
22
23use serde::{Deserialize, Serialize};
24use std::collections::HashMap;
25
26/// HTTP source configuration
27///
28/// This config only contains HTTP-specific settings.
29/// Bootstrap provider configuration (database, user, password, tables, etc.)
30/// should be provided via the source's generic properties map.
31#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
32pub struct HttpSourceConfig {
33    /// HTTP host
34    pub host: String,
35
36    /// HTTP port
37    pub port: u16,
38
39    /// Optional endpoint path
40    #[serde(default, skip_serializing_if = "Option::is_none")]
41    pub endpoint: Option<String>,
42
43    /// Request timeout in milliseconds
44    #[serde(default = "default_timeout_ms")]
45    pub timeout_ms: u64,
46
47    /// Adaptive batching: maximum batch size
48    #[serde(default, skip_serializing_if = "Option::is_none")]
49    pub adaptive_max_batch_size: Option<usize>,
50
51    /// Adaptive batching: minimum batch size
52    #[serde(default, skip_serializing_if = "Option::is_none")]
53    pub adaptive_min_batch_size: Option<usize>,
54
55    /// Adaptive batching: maximum wait time in milliseconds
56    #[serde(default, skip_serializing_if = "Option::is_none")]
57    pub adaptive_max_wait_ms: Option<u64>,
58
59    /// Adaptive batching: minimum wait time in milliseconds
60    #[serde(default, skip_serializing_if = "Option::is_none")]
61    pub adaptive_min_wait_ms: Option<u64>,
62
63    /// Adaptive batching: throughput window in seconds
64    #[serde(default, skip_serializing_if = "Option::is_none")]
65    pub adaptive_window_secs: Option<u64>,
66
67    /// Whether adaptive batching is enabled
68    #[serde(default, skip_serializing_if = "Option::is_none")]
69    pub adaptive_enabled: Option<bool>,
70
71    /// Webhook configuration (enables webhook mode when present)
72    #[serde(default, skip_serializing_if = "Option::is_none")]
73    pub webhooks: Option<WebhookConfig>,
74}
75
76/// Returns true if webhook mode is enabled
77impl HttpSourceConfig {
78    /// Check if webhook mode is enabled
79    pub fn is_webhook_mode(&self) -> bool {
80        self.webhooks.is_some()
81    }
82}
83
84/// Webhook configuration for custom route handling
85#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
86pub struct WebhookConfig {
87    /// Global error behavior for unmatched/failed requests
88    #[serde(default)]
89    pub error_behavior: ErrorBehavior,
90
91    /// CORS (Cross-Origin Resource Sharing) configuration
92    #[serde(default, skip_serializing_if = "Option::is_none")]
93    pub cors: Option<CorsConfig>,
94
95    /// List of webhook route configurations
96    pub routes: Vec<WebhookRoute>,
97}
98
99/// CORS configuration for webhook endpoints
100#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
101pub struct CorsConfig {
102    /// Whether CORS is enabled (default: true when cors section is present)
103    #[serde(default = "default_cors_enabled")]
104    pub enabled: bool,
105
106    /// Allowed origins. Use ["*"] for any origin, or specific origins like ["https://example.com"]
107    #[serde(default = "default_cors_origins")]
108    pub allow_origins: Vec<String>,
109
110    /// Allowed HTTP methods
111    #[serde(default = "default_cors_methods")]
112    pub allow_methods: Vec<String>,
113
114    /// Allowed headers. Use ["*"] for any header.
115    #[serde(default = "default_cors_headers")]
116    pub allow_headers: Vec<String>,
117
118    /// Headers to expose to the browser
119    #[serde(default, skip_serializing_if = "Vec::is_empty")]
120    pub expose_headers: Vec<String>,
121
122    /// Whether to allow credentials (cookies, authorization headers)
123    #[serde(default)]
124    pub allow_credentials: bool,
125
126    /// Max age in seconds for preflight request caching
127    #[serde(default = "default_cors_max_age")]
128    pub max_age: u64,
129}
130
131fn default_cors_enabled() -> bool {
132    true
133}
134
135fn default_cors_origins() -> Vec<String> {
136    vec!["*".to_string()]
137}
138
139fn default_cors_methods() -> Vec<String> {
140    vec![
141        "GET".to_string(),
142        "POST".to_string(),
143        "PUT".to_string(),
144        "PATCH".to_string(),
145        "DELETE".to_string(),
146        "OPTIONS".to_string(),
147    ]
148}
149
150fn default_cors_headers() -> Vec<String> {
151    vec![
152        "Content-Type".to_string(),
153        "Authorization".to_string(),
154        "X-Requested-With".to_string(),
155    ]
156}
157
158fn default_cors_max_age() -> u64 {
159    3600
160}
161
162impl Default for CorsConfig {
163    fn default() -> Self {
164        Self {
165            enabled: true,
166            allow_origins: default_cors_origins(),
167            allow_methods: default_cors_methods(),
168            allow_headers: default_cors_headers(),
169            expose_headers: Vec::new(),
170            allow_credentials: false,
171            max_age: default_cors_max_age(),
172        }
173    }
174}
175
176/// Error handling behavior for webhook requests
177#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
178#[serde(rename_all = "snake_case")]
179pub enum ErrorBehavior {
180    /// Accept the request and log the issue (returns 200)
181    #[default]
182    AcceptAndLog,
183    /// Accept the request but silently discard (returns 200)
184    AcceptAndSkip,
185    /// Reject the request with an appropriate HTTP error
186    Reject,
187}
188
189/// Configuration for a single webhook route
190#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
191pub struct WebhookRoute {
192    /// Route path pattern (supports `:param` for path parameters)
193    /// Example: "/github/events" or "/users/:user_id/webhooks"
194    pub path: String,
195
196    /// Allowed HTTP methods for this route
197    #[serde(default = "default_methods")]
198    pub methods: Vec<HttpMethod>,
199
200    /// Authentication configuration
201    #[serde(default, skip_serializing_if = "Option::is_none")]
202    pub auth: Option<AuthConfig>,
203
204    /// Error behavior override for this route
205    #[serde(default, skip_serializing_if = "Option::is_none")]
206    pub error_behavior: Option<ErrorBehavior>,
207
208    /// Mappings from payload to source change events
209    pub mappings: Vec<WebhookMapping>,
210}
211
212fn default_methods() -> Vec<HttpMethod> {
213    vec![HttpMethod::Post]
214}
215
216/// HTTP methods supported for webhook routes
217#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
218#[serde(rename_all = "UPPERCASE")]
219pub enum HttpMethod {
220    Get,
221    Post,
222    Put,
223    Patch,
224    Delete,
225}
226
227/// Authentication configuration for a webhook route
228#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
229pub struct AuthConfig {
230    /// HMAC signature verification
231    #[serde(default, skip_serializing_if = "Option::is_none")]
232    pub signature: Option<SignatureConfig>,
233
234    /// Bearer token verification
235    #[serde(default, skip_serializing_if = "Option::is_none")]
236    pub bearer: Option<BearerConfig>,
237}
238
239/// HMAC signature verification configuration
240#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
241pub struct SignatureConfig {
242    /// Signature algorithm type
243    #[serde(rename = "type")]
244    pub algorithm: SignatureAlgorithm,
245
246    /// Environment variable containing the secret
247    pub secret_env: String,
248
249    /// Header containing the signature
250    pub header: String,
251
252    /// Prefix to strip from signature (e.g., "sha256=")
253    #[serde(default, skip_serializing_if = "Option::is_none")]
254    pub prefix: Option<String>,
255
256    /// Encoding of the signature (hex or base64)
257    #[serde(default)]
258    pub encoding: SignatureEncoding,
259}
260
261/// Supported HMAC signature algorithms
262#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
263#[serde(rename_all = "kebab-case")]
264pub enum SignatureAlgorithm {
265    HmacSha1,
266    HmacSha256,
267}
268
269/// Signature encoding format
270#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
271#[serde(rename_all = "lowercase")]
272pub enum SignatureEncoding {
273    #[default]
274    Hex,
275    Base64,
276}
277
278/// Bearer token verification configuration
279#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
280pub struct BearerConfig {
281    /// Environment variable containing the expected token
282    pub token_env: String,
283}
284
285/// Mapping configuration from webhook payload to source change event
286#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
287pub struct WebhookMapping {
288    /// Optional condition for when this mapping applies
289    #[serde(default, skip_serializing_if = "Option::is_none")]
290    pub when: Option<MappingCondition>,
291
292    /// Static operation type
293    #[serde(default, skip_serializing_if = "Option::is_none")]
294    pub operation: Option<OperationType>,
295
296    /// Path to extract operation from payload
297    #[serde(default, skip_serializing_if = "Option::is_none")]
298    pub operation_from: Option<String>,
299
300    /// Mapping from payload values to operation types
301    #[serde(default, skip_serializing_if = "Option::is_none")]
302    pub operation_map: Option<HashMap<String, OperationType>>,
303
304    /// Element type to create
305    pub element_type: ElementType,
306
307    /// Timestamp configuration for effective_from
308    #[serde(default, skip_serializing_if = "Option::is_none")]
309    pub effective_from: Option<EffectiveFromConfig>,
310
311    /// Template for element creation
312    pub template: ElementTemplate,
313}
314
315/// Condition for when a mapping applies
316#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
317pub struct MappingCondition {
318    /// Header to check
319    #[serde(default, skip_serializing_if = "Option::is_none")]
320    pub header: Option<String>,
321
322    /// Payload field path to check
323    #[serde(default, skip_serializing_if = "Option::is_none")]
324    pub field: Option<String>,
325
326    /// Value must equal this
327    #[serde(default, skip_serializing_if = "Option::is_none")]
328    pub equals: Option<String>,
329
330    /// Value must contain this
331    #[serde(default, skip_serializing_if = "Option::is_none")]
332    pub contains: Option<String>,
333
334    /// Value must match this regex
335    #[serde(default, skip_serializing_if = "Option::is_none")]
336    pub regex: Option<String>,
337}
338
339/// Operation type for source changes
340#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
341#[serde(rename_all = "lowercase")]
342pub enum OperationType {
343    Insert,
344    Update,
345    Delete,
346}
347
348/// Element type for source changes
349#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
350#[serde(rename_all = "lowercase")]
351pub enum ElementType {
352    Node,
353    Relation,
354}
355
356/// Configuration for effective_from timestamp
357#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
358#[serde(untagged)]
359pub enum EffectiveFromConfig {
360    /// Simple template string (auto-detect format)
361    Simple(String),
362    /// Explicit configuration with format
363    Explicit {
364        /// Template for the timestamp value
365        value: String,
366        /// Format of the timestamp
367        format: TimestampFormat,
368    },
369}
370
371/// Timestamp format for effective_from
372#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
373#[serde(rename_all = "snake_case")]
374pub enum TimestampFormat {
375    /// ISO 8601 datetime string
376    Iso8601,
377    /// Unix timestamp in seconds
378    UnixSeconds,
379    /// Unix timestamp in milliseconds
380    UnixMillis,
381    /// Unix timestamp in nanoseconds
382    UnixNanos,
383}
384
385/// Template for element creation
386#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
387pub struct ElementTemplate {
388    /// Template for element ID
389    pub id: String,
390
391    /// Templates for element labels
392    pub labels: Vec<String>,
393
394    /// Templates for element properties (can be individual templates or a single object template)
395    #[serde(default, skip_serializing_if = "Option::is_none")]
396    pub properties: Option<serde_json::Value>,
397
398    /// Template for relation source node ID (relations only)
399    #[serde(default, skip_serializing_if = "Option::is_none")]
400    pub from: Option<String>,
401
402    /// Template for relation target node ID (relations only)
403    #[serde(default, skip_serializing_if = "Option::is_none")]
404    pub to: Option<String>,
405}
406
407fn default_timeout_ms() -> u64 {
408    10000
409}
410
411impl HttpSourceConfig {
412    /// Validate the configuration and return an error if invalid.
413    ///
414    /// # Errors
415    ///
416    /// Returns an error if:
417    /// - Port is 0 (invalid port)
418    /// - Timeout is 0 (would cause immediate timeouts)
419    /// - Adaptive batching min values exceed max values
420    pub fn validate(&self) -> anyhow::Result<()> {
421        if self.port == 0 {
422            return Err(anyhow::anyhow!(
423                "Validation error: port cannot be 0. \
424                 Please specify a valid port number (1-65535)"
425            ));
426        }
427
428        if self.timeout_ms == 0 {
429            return Err(anyhow::anyhow!(
430                "Validation error: timeout_ms cannot be 0. \
431                 Please specify a positive timeout value in milliseconds"
432            ));
433        }
434
435        // Validate adaptive batching settings
436        if let (Some(min), Some(max)) = (self.adaptive_min_batch_size, self.adaptive_max_batch_size)
437        {
438            if min > max {
439                return Err(anyhow::anyhow!(
440                    "Validation error: adaptive_min_batch_size ({min}) cannot be greater than \
441                     adaptive_max_batch_size ({max})"
442                ));
443            }
444        }
445
446        if let (Some(min), Some(max)) = (self.adaptive_min_wait_ms, self.adaptive_max_wait_ms) {
447            if min > max {
448                return Err(anyhow::anyhow!(
449                    "Validation error: adaptive_min_wait_ms ({min}) cannot be greater than \
450                     adaptive_max_wait_ms ({max})"
451                ));
452            }
453        }
454
455        // Validate webhook configuration if present
456        if let Some(ref webhooks) = self.webhooks {
457            webhooks.validate()?;
458        }
459
460        Ok(())
461    }
462}
463
464impl WebhookConfig {
465    /// Validate webhook configuration
466    pub fn validate(&self) -> anyhow::Result<()> {
467        if self.routes.is_empty() {
468            return Err(anyhow::anyhow!(
469                "Validation error: webhooks.routes cannot be empty"
470            ));
471        }
472
473        for (idx, route) in self.routes.iter().enumerate() {
474            route
475                .validate()
476                .map_err(|e| anyhow::anyhow!("Validation error in route[{idx}]: {e}"))?;
477        }
478
479        Ok(())
480    }
481}
482
483impl WebhookRoute {
484    /// Validate webhook route configuration
485    pub fn validate(&self) -> anyhow::Result<()> {
486        if self.path.is_empty() {
487            return Err(anyhow::anyhow!("path cannot be empty"));
488        }
489
490        if !self.path.starts_with('/') {
491            return Err(anyhow::anyhow!("path must start with '/'"));
492        }
493
494        if self.methods.is_empty() {
495            return Err(anyhow::anyhow!("methods cannot be empty"));
496        }
497
498        if self.mappings.is_empty() {
499            return Err(anyhow::anyhow!("mappings cannot be empty"));
500        }
501
502        for (idx, mapping) in self.mappings.iter().enumerate() {
503            mapping
504                .validate()
505                .map_err(|e| anyhow::anyhow!("mappings[{idx}]: {e}"))?;
506        }
507
508        Ok(())
509    }
510}
511
512impl WebhookMapping {
513    /// Validate webhook mapping configuration
514    pub fn validate(&self) -> anyhow::Result<()> {
515        // Must have either static operation or dynamic operation_from
516        if self.operation.is_none() && self.operation_from.is_none() {
517            return Err(anyhow::anyhow!(
518                "either 'operation' or 'operation_from' must be specified"
519            ));
520        }
521
522        // If using operation_from, should have operation_map
523        if self.operation_from.is_some() && self.operation_map.is_none() {
524            return Err(anyhow::anyhow!(
525                "'operation_map' is required when using 'operation_from'"
526            ));
527        }
528
529        // Validate template
530        self.template.validate(&self.element_type)?;
531
532        Ok(())
533    }
534}
535
536impl ElementTemplate {
537    /// Validate element template configuration
538    pub fn validate(&self, element_type: &ElementType) -> anyhow::Result<()> {
539        if self.id.is_empty() {
540            return Err(anyhow::anyhow!("template.id cannot be empty"));
541        }
542
543        if self.labels.is_empty() {
544            return Err(anyhow::anyhow!("template.labels cannot be empty"));
545        }
546
547        // Relations require from and to
548        if *element_type == ElementType::Relation {
549            if self.from.is_none() {
550                return Err(anyhow::anyhow!(
551                    "template.from is required for relation elements"
552                ));
553            }
554            if self.to.is_none() {
555                return Err(anyhow::anyhow!(
556                    "template.to is required for relation elements"
557                ));
558            }
559        }
560
561        Ok(())
562    }
563}
564
565#[cfg(test)]
566mod tests {
567    use super::*;
568
569    #[test]
570    fn test_config_deserialization_minimal() {
571        let yaml = r#"
572host: "localhost"
573port: 8080
574"#;
575        let config: HttpSourceConfig = serde_yaml::from_str(yaml).unwrap();
576        assert_eq!(config.host, "localhost");
577        assert_eq!(config.port, 8080);
578        assert_eq!(config.endpoint, None);
579        assert_eq!(config.timeout_ms, 10000); // default
580        assert_eq!(config.adaptive_enabled, None);
581    }
582
583    #[test]
584    fn test_config_deserialization_full() {
585        let yaml = r#"
586host: "0.0.0.0"
587port: 9000
588endpoint: "/events"
589timeout_ms: 5000
590adaptive_max_batch_size: 1000
591adaptive_min_batch_size: 10
592adaptive_max_wait_ms: 500
593adaptive_min_wait_ms: 10
594adaptive_window_secs: 60
595adaptive_enabled: true
596"#;
597        let config: HttpSourceConfig = serde_yaml::from_str(yaml).unwrap();
598        assert_eq!(config.host, "0.0.0.0");
599        assert_eq!(config.port, 9000);
600        assert_eq!(config.endpoint, Some("/events".to_string()));
601        assert_eq!(config.timeout_ms, 5000);
602        assert_eq!(config.adaptive_max_batch_size, Some(1000));
603        assert_eq!(config.adaptive_min_batch_size, Some(10));
604        assert_eq!(config.adaptive_max_wait_ms, Some(500));
605        assert_eq!(config.adaptive_min_wait_ms, Some(10));
606        assert_eq!(config.adaptive_window_secs, Some(60));
607        assert_eq!(config.adaptive_enabled, Some(true));
608    }
609
610    #[test]
611    fn test_config_serialization() {
612        let config = HttpSourceConfig {
613            host: "localhost".to_string(),
614            port: 8080,
615            endpoint: Some("/data".to_string()),
616            timeout_ms: 15000,
617            adaptive_max_batch_size: Some(500),
618            adaptive_min_batch_size: Some(5),
619            adaptive_max_wait_ms: Some(1000),
620            adaptive_min_wait_ms: Some(50),
621            adaptive_window_secs: Some(30),
622            adaptive_enabled: Some(false),
623            webhooks: None,
624        };
625
626        let yaml = serde_yaml::to_string(&config).unwrap();
627        let deserialized: HttpSourceConfig = serde_yaml::from_str(&yaml).unwrap();
628        assert_eq!(config, deserialized);
629    }
630
631    #[test]
632    fn test_config_adaptive_batching_disabled() {
633        let yaml = r#"
634host: "localhost"
635port: 8080
636adaptive_enabled: false
637"#;
638        let config: HttpSourceConfig = serde_yaml::from_str(yaml).unwrap();
639        assert_eq!(config.adaptive_enabled, Some(false));
640    }
641
642    #[test]
643    fn test_config_default_values() {
644        let config = HttpSourceConfig {
645            host: "localhost".to_string(),
646            port: 8080,
647            endpoint: None,
648            timeout_ms: default_timeout_ms(),
649            adaptive_max_batch_size: None,
650            adaptive_min_batch_size: None,
651            adaptive_max_wait_ms: None,
652            adaptive_min_wait_ms: None,
653            adaptive_window_secs: None,
654            adaptive_enabled: None,
655            webhooks: None,
656        };
657
658        assert_eq!(config.timeout_ms, 10000);
659    }
660
661    #[test]
662    fn test_config_port_range() {
663        // Test valid port
664        let yaml = r#"
665host: "localhost"
666port: 65535
667"#;
668        let config: HttpSourceConfig = serde_yaml::from_str(yaml).unwrap();
669        assert_eq!(config.port, 65535);
670
671        // Test minimum port
672        let yaml = r#"
673host: "localhost"
674port: 1
675"#;
676        let config: HttpSourceConfig = serde_yaml::from_str(yaml).unwrap();
677        assert_eq!(config.port, 1);
678    }
679
680    #[test]
681    fn test_webhook_config_deserialization() {
682        let yaml = r#"
683host: "0.0.0.0"
684port: 8080
685webhooks:
686  error_behavior: reject
687  routes:
688    - path: "/github/events"
689      methods: ["POST"]
690      auth:
691        signature:
692          type: hmac-sha256
693          secret_env: GITHUB_SECRET
694          header: X-Hub-Signature-256
695          prefix: "sha256="
696        bearer:
697          token_env: GITHUB_TOKEN
698      error_behavior: reject
699      mappings:
700        - when:
701            header: X-GitHub-Event
702            equals: push
703          operation: insert
704          element_type: node
705          effective_from: "{{payload.timestamp}}"
706          template:
707            id: "commit-{{payload.id}}"
708            labels: ["Commit"]
709            properties:
710              message: "{{payload.message}}"
711"#;
712        let config: HttpSourceConfig = serde_yaml::from_str(yaml).unwrap();
713        assert!(config.webhooks.is_some());
714        let webhooks = config.webhooks.unwrap();
715        assert_eq!(webhooks.error_behavior, ErrorBehavior::Reject);
716        assert_eq!(webhooks.routes.len(), 1);
717
718        let route = &webhooks.routes[0];
719        assert_eq!(route.path, "/github/events");
720        assert_eq!(route.methods, vec![HttpMethod::Post]);
721        assert!(route.auth.is_some());
722
723        let auth = route.auth.as_ref().unwrap();
724        assert!(auth.signature.is_some());
725        assert!(auth.bearer.is_some());
726
727        let sig = auth.signature.as_ref().unwrap();
728        assert_eq!(sig.algorithm, SignatureAlgorithm::HmacSha256);
729        assert_eq!(sig.secret_env, "GITHUB_SECRET");
730        assert_eq!(sig.header, "X-Hub-Signature-256");
731        assert_eq!(sig.prefix, Some("sha256=".to_string()));
732
733        let mapping = &route.mappings[0];
734        assert!(mapping.when.is_some());
735        assert_eq!(mapping.operation, Some(OperationType::Insert));
736        assert_eq!(mapping.element_type, ElementType::Node);
737    }
738
739    #[test]
740    fn test_webhook_config_with_operation_map() {
741        let yaml = r#"
742host: "0.0.0.0"
743port: 8080
744webhooks:
745  routes:
746    - path: "/events"
747      mappings:
748        - operation_from: "payload.action"
749          operation_map:
750            created: insert
751            updated: update
752            deleted: delete
753          element_type: node
754          template:
755            id: "{{payload.id}}"
756            labels: ["Event"]
757"#;
758        let config: HttpSourceConfig = serde_yaml::from_str(yaml).unwrap();
759        let webhooks = config.webhooks.unwrap();
760        let mapping = &webhooks.routes[0].mappings[0];
761
762        assert_eq!(mapping.operation_from, Some("payload.action".to_string()));
763        assert!(mapping.operation_map.is_some());
764        let op_map = mapping.operation_map.as_ref().unwrap();
765        assert_eq!(op_map.get("created"), Some(&OperationType::Insert));
766        assert_eq!(op_map.get("updated"), Some(&OperationType::Update));
767        assert_eq!(op_map.get("deleted"), Some(&OperationType::Delete));
768    }
769
770    #[test]
771    fn test_webhook_config_relation() {
772        let yaml = r#"
773host: "0.0.0.0"
774port: 8080
775webhooks:
776  routes:
777    - path: "/links"
778      mappings:
779        - operation: insert
780          element_type: relation
781          template:
782            id: "{{payload.id}}"
783            labels: ["LINKS_TO"]
784            from: "{{payload.source_id}}"
785            to: "{{payload.target_id}}"
786"#;
787        let config: HttpSourceConfig = serde_yaml::from_str(yaml).unwrap();
788        let mapping = &config.webhooks.unwrap().routes[0].mappings[0];
789        assert_eq!(mapping.element_type, ElementType::Relation);
790        assert_eq!(
791            mapping.template.from,
792            Some("{{payload.source_id}}".to_string())
793        );
794        assert_eq!(
795            mapping.template.to,
796            Some("{{payload.target_id}}".to_string())
797        );
798    }
799
800    #[test]
801    fn test_effective_from_simple() {
802        let yaml = r#"
803host: "0.0.0.0"
804port: 8080
805webhooks:
806  routes:
807    - path: "/events"
808      mappings:
809        - operation: insert
810          element_type: node
811          effective_from: "{{payload.timestamp}}"
812          template:
813            id: "{{payload.id}}"
814            labels: ["Event"]
815"#;
816        let config: HttpSourceConfig = serde_yaml::from_str(yaml).unwrap();
817        let mapping = &config.webhooks.unwrap().routes[0].mappings[0];
818        assert_eq!(
819            mapping.effective_from,
820            Some(EffectiveFromConfig::Simple(
821                "{{payload.timestamp}}".to_string()
822            ))
823        );
824    }
825
826    #[test]
827    fn test_effective_from_explicit() {
828        let yaml = r#"
829host: "0.0.0.0"
830port: 8080
831webhooks:
832  routes:
833    - path: "/events"
834      mappings:
835        - operation: insert
836          element_type: node
837          effective_from:
838            value: "{{payload.created_at}}"
839            format: iso8601
840          template:
841            id: "{{payload.id}}"
842            labels: ["Event"]
843"#;
844        let config: HttpSourceConfig = serde_yaml::from_str(yaml).unwrap();
845        let mapping = &config.webhooks.unwrap().routes[0].mappings[0];
846        match &mapping.effective_from {
847            Some(EffectiveFromConfig::Explicit { value, format }) => {
848                assert_eq!(value, "{{payload.created_at}}");
849                assert_eq!(*format, TimestampFormat::Iso8601);
850            }
851            _ => panic!("Expected explicit effective_from config"),
852        }
853    }
854
855    #[test]
856    fn test_is_webhook_mode() {
857        let yaml_standard = r#"
858host: "localhost"
859port: 8080
860"#;
861        let config: HttpSourceConfig = serde_yaml::from_str(yaml_standard).unwrap();
862        assert!(!config.is_webhook_mode());
863
864        let yaml_webhook = r#"
865host: "localhost"
866port: 8080
867webhooks:
868  routes:
869    - path: "/events"
870      mappings:
871        - operation: insert
872          element_type: node
873          template:
874            id: "{{payload.id}}"
875            labels: ["Event"]
876"#;
877        let config: HttpSourceConfig = serde_yaml::from_str(yaml_webhook).unwrap();
878        assert!(config.is_webhook_mode());
879    }
880
881    #[test]
882    fn test_webhook_validation_empty_routes() {
883        let yaml = r#"
884host: "localhost"
885port: 8080
886webhooks:
887  routes: []
888"#;
889        let config: HttpSourceConfig = serde_yaml::from_str(yaml).unwrap();
890        assert!(config.validate().is_err());
891    }
892
893    #[test]
894    fn test_webhook_validation_missing_operation() {
895        let yaml = r#"
896host: "localhost"
897port: 8080
898webhooks:
899  routes:
900    - path: "/events"
901      mappings:
902        - element_type: node
903          template:
904            id: "{{payload.id}}"
905            labels: ["Event"]
906"#;
907        let config: HttpSourceConfig = serde_yaml::from_str(yaml).unwrap();
908        assert!(config.validate().is_err());
909    }
910
911    #[test]
912    fn test_webhook_validation_relation_missing_from_to() {
913        let yaml = r#"
914host: "localhost"
915port: 8080
916webhooks:
917  routes:
918    - path: "/events"
919      mappings:
920        - operation: insert
921          element_type: relation
922          template:
923            id: "{{payload.id}}"
924            labels: ["LINKS"]
925"#;
926        let config: HttpSourceConfig = serde_yaml::from_str(yaml).unwrap();
927        assert!(config.validate().is_err());
928    }
929}