Skip to main content

drasi_source_http/
lib.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! HTTP Source Plugin for Drasi
16//!
17//! This plugin exposes HTTP endpoints for receiving data change events. It supports
18//! two mutually exclusive modes:
19//!
20//! - **Standard Mode**: Fixed-format `HttpSourceChange` endpoints with adaptive batching
21//! - **Webhook Mode**: Configurable routes with template-based payload transformation
22//!
23//! # Standard Mode
24//!
25//! When no `webhooks` configuration is present, the source operates in standard mode
26//! with the following endpoints:
27//!
28//! - **`POST /sources/{source_id}/events`** - Submit a single event
29//! - **`POST /sources/{source_id}/events/batch`** - Submit multiple events
30//! - **`GET /health`** - Health check endpoint
31//!
32//! ## Data Format
33//!
34//! Events are submitted as JSON using the `HttpSourceChange` format:
35//!
36//! ### Insert Operation
37//!
38//! ```json
39//! {
40//!     "operation": "insert",
41//!     "element": {
42//!         "type": "node",
43//!         "id": "user-123",
44//!         "labels": ["User"],
45//!         "properties": {
46//!             "name": "Alice",
47//!             "email": "alice@example.com"
48//!         }
49//!     },
50//!     "timestamp": 1699900000000000000
51//! }
52//! ```
53//!
54//! ### Update Operation
55//!
56//! ```json
57//! {
58//!     "operation": "update",
59//!     "element": {
60//!         "type": "node",
61//!         "id": "user-123",
62//!         "labels": ["User"],
63//!         "properties": {
64//!             "name": "Alice Updated"
65//!         }
66//!     }
67//! }
68//! ```
69//!
70//! ### Delete Operation
71//!
72//! ```json
73//! {
74//!     "operation": "delete",
75//!     "id": "user-123",
76//!     "labels": ["User"]
77//! }
78//! ```
79//!
80//! # Webhook Mode
81//!
82//! When a `webhooks` section is present in the configuration, the source operates
83//! in webhook mode. This enables:
84//!
85//! - Custom routes with path parameters (e.g., `/github/events`, `/users/:id/hooks`)
86//! - Multiple HTTP methods per route (POST, PUT, PATCH, DELETE, GET)
87//! - Handlebars template-based payload transformation
88//! - HMAC signature verification (GitHub, Shopify style)
89//! - Bearer token authentication
90//! - Support for JSON, XML, YAML, and plain text payloads
91//!
92//! ## Webhook Configuration Example
93//!
94//! ```yaml
95//! webhooks:
96//!   error_behavior: accept_and_log
97//!   routes:
98//!     - path: "/github/events"
99//!       methods: ["POST"]
100//!       auth:
101//!         signature:
102//!           type: hmac-sha256
103//!           secret_env: GITHUB_WEBHOOK_SECRET
104//!           header: X-Hub-Signature-256
105//!           prefix: "sha256="
106//!       error_behavior: reject
107//!       mappings:
108//!         - when:
109//!             header: X-GitHub-Event
110//!             equals: push
111//!           operation: insert
112//!           element_type: node
113//!           effective_from: "{{payload.head_commit.timestamp}}"
114//!           template:
115//!             id: "commit-{{payload.head_commit.id}}"
116//!             labels: ["Commit"]
117//!             properties:
118//!               message: "{{payload.head_commit.message}}"
119//!               author: "{{payload.head_commit.author.name}}"
120//! ```
121//!
122//! ## Relation Element
123//!
124//! ```json
125//! {
126//!     "operation": "insert",
127//!     "element": {
128//!         "type": "relation",
129//!         "id": "follows-1",
130//!         "labels": ["FOLLOWS"],
131//!         "from": "user-123",
132//!         "to": "user-456",
133//!         "properties": {}
134//!     }
135//! }
136//! ```
137//!
138//! # Batch Submission
139//!
140//! ```json
141//! {
142//!     "events": [
143//!         { "operation": "insert", ... },
144//!         { "operation": "update", ... }
145//!     ]
146//! }
147//! ```
148//!
149//! # Adaptive Batching
150//!
151//! The HTTP source includes adaptive batching to optimize throughput. Events are
152//! buffered and dispatched in batches, with batch size and timing adjusted based
153//! on throughput patterns.
154//!
155//! | Parameter | Default | Description |
156//! |-----------|---------|-------------|
157//! | `adaptive_enabled` | `true` | Enable/disable adaptive batching |
158//! | `adaptive_max_batch_size` | `1000` | Maximum events per batch |
159//! | `adaptive_min_batch_size` | `1` | Minimum events per batch |
160//! | `adaptive_max_wait_ms` | `100` | Maximum wait time before dispatching |
161//! | `adaptive_min_wait_ms` | `10` | Minimum wait time between batches |
162//!
163//! # Configuration
164//!
165//! | Field | Type | Default | Description |
166//! |-------|------|---------|-------------|
167//! | `host` | string | *required* | Host address to bind to |
168//! | `port` | u16 | `8080` | Port to listen on |
169//! | `endpoint` | string | None | Optional custom path prefix |
170//! | `timeout_ms` | u64 | `10000` | Request timeout in milliseconds |
171//!
172//! # Example Configuration (YAML)
173//!
174//! ```yaml
175//! source_type: http
176//! properties:
177//!   host: "0.0.0.0"
178//!   port: 8080
179//!   adaptive_enabled: true
180//!   adaptive_max_batch_size: 500
181//! ```
182//!
183//! # Usage Examples
184//!
185//! ## Rust
186//!
187//! ```rust,ignore
188//! use drasi_source_http::{HttpSource, HttpSourceBuilder};
189//!
190//! let config = HttpSourceBuilder::new()
191//!     .with_host("0.0.0.0")
192//!     .with_port(8080)
193//!     .with_adaptive_enabled(true)
194//!     .build();
195//!
196//! let source = Arc::new(HttpSource::new("http-source", config)?);
197//! drasi.add_source(source).await?;
198//! ```
199//!
200//! ## curl (Single Event)
201//!
202//! ```bash
203//! curl -X POST http://localhost:8080/sources/my-source/events \
204//!   -H "Content-Type: application/json" \
205//!   -d '{"operation":"insert","element":{"type":"node","id":"1","labels":["Test"],"properties":{}}}'
206//! ```
207//!
208//! ## curl (Batch)
209//!
210//! ```bash
211//! curl -X POST http://localhost:8080/sources/my-source/events/batch \
212//!   -H "Content-Type: application/json" \
213//!   -d '{"events":[...]}'
214//! ```
215
216pub mod config;
217pub use config::HttpSourceConfig;
218
219mod adaptive_batcher;
220mod models;
221mod time;
222
223// Webhook support modules
224pub mod auth;
225pub mod content_parser;
226pub mod route_matcher;
227pub mod template_engine;
228
229// Export HTTP source models and conversion
230pub use models::{convert_http_to_source_change, HttpElement, HttpSourceChange};
231
232use anyhow::Result;
233use async_trait::async_trait;
234use axum::{
235    body::Bytes,
236    extract::{Path, State},
237    http::{header, Method, StatusCode},
238    response::IntoResponse,
239    routing::{delete, get, post, put},
240    Json, Router,
241};
242use log::{debug, error, info, trace, warn};
243use serde::{Deserialize, Serialize};
244use std::collections::HashMap;
245use std::sync::Arc;
246use std::time::Duration;
247use tokio::sync::mpsc;
248use tokio::time::timeout;
249use tower_http::cors::{Any, CorsLayer};
250
251use drasi_lib::channels::{ComponentType, *};
252use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
253use drasi_lib::Source;
254use tracing::Instrument;
255
256use crate::adaptive_batcher::{AdaptiveBatchConfig, AdaptiveBatcher};
257use crate::auth::{verify_auth, AuthResult};
258use crate::config::{CorsConfig, ErrorBehavior, WebhookConfig};
259use crate::content_parser::{parse_content, ContentType};
260use crate::route_matcher::{convert_method, find_matching_mappings, headers_to_map, RouteMatcher};
261use crate::template_engine::{TemplateContext, TemplateEngine};
262
263/// Response for event submission
264#[derive(Debug, Serialize, Deserialize)]
265pub struct EventResponse {
266    pub success: bool,
267    pub message: String,
268    #[serde(skip_serializing_if = "Option::is_none")]
269    pub error: Option<String>,
270}
271
272/// HTTP source with configurable adaptive batching.
273///
274/// This source exposes HTTP endpoints for receiving data change events.
275/// It supports both single-event and batch submission modes, with adaptive
276/// batching for optimized throughput.
277///
278/// # Fields
279///
280/// - `base`: Common source functionality (dispatchers, status, lifecycle)
281/// - `config`: HTTP-specific configuration (host, port, timeout)
282/// - `adaptive_config`: Adaptive batching settings for throughput optimization
283pub struct HttpSource {
284    /// Base source implementation providing common functionality
285    base: SourceBase,
286    /// HTTP source configuration
287    config: HttpSourceConfig,
288    /// Adaptive batching configuration for throughput optimization
289    adaptive_config: AdaptiveBatchConfig,
290}
291
292/// Batch event request that can accept multiple events
293#[derive(Debug, Clone, Serialize, Deserialize)]
294pub struct BatchEventRequest {
295    pub events: Vec<HttpSourceChange>,
296}
297
298/// HTTP source app state with batching channel.
299///
300/// Shared state passed to Axum route handlers.
301#[derive(Clone)]
302struct HttpAppState {
303    /// The source ID for validation against incoming requests
304    source_id: String,
305    /// Channel for sending events to the adaptive batcher
306    batch_tx: mpsc::Sender<SourceChangeEvent>,
307    /// Webhook configuration (if in webhook mode)
308    webhook_config: Option<Arc<WebhookState>>,
309}
310
311/// State for webhook mode processing
312struct WebhookState {
313    /// Webhook configuration
314    config: WebhookConfig,
315    /// Route matcher for incoming requests
316    route_matcher: RouteMatcher,
317    /// Template engine for payload transformation
318    template_engine: TemplateEngine,
319}
320
321impl HttpSource {
322    /// Create a new HTTP source.
323    ///
324    /// The event channel is automatically injected when the source is added
325    /// to DrasiLib via `add_source()`.
326    ///
327    /// # Arguments
328    ///
329    /// * `id` - Unique identifier for this source instance
330    /// * `config` - HTTP source configuration
331    ///
332    /// # Returns
333    ///
334    /// A new `HttpSource` instance, or an error if construction fails.
335    ///
336    /// # Errors
337    ///
338    /// Returns an error if the base source cannot be initialized.
339    ///
340    /// # Example
341    ///
342    /// ```rust,ignore
343    /// use drasi_source_http::{HttpSource, HttpSourceBuilder};
344    ///
345    /// let config = HttpSourceBuilder::new()
346    ///     .with_host("0.0.0.0")
347    ///     .with_port(8080)
348    ///     .build();
349    ///
350    /// let source = HttpSource::new("my-http-source", config)?;
351    /// ```
352    pub fn new(id: impl Into<String>, config: HttpSourceConfig) -> Result<Self> {
353        let id = id.into();
354        let params = SourceBaseParams::new(id);
355
356        // Configure adaptive batching
357        let mut adaptive_config = AdaptiveBatchConfig::default();
358
359        // Allow overriding adaptive parameters from config
360        if let Some(max_batch) = config.adaptive_max_batch_size {
361            adaptive_config.max_batch_size = max_batch;
362        }
363        if let Some(min_batch) = config.adaptive_min_batch_size {
364            adaptive_config.min_batch_size = min_batch;
365        }
366        if let Some(max_wait_ms) = config.adaptive_max_wait_ms {
367            adaptive_config.max_wait_time = Duration::from_millis(max_wait_ms);
368        }
369        if let Some(min_wait_ms) = config.adaptive_min_wait_ms {
370            adaptive_config.min_wait_time = Duration::from_millis(min_wait_ms);
371        }
372        if let Some(window_secs) = config.adaptive_window_secs {
373            adaptive_config.throughput_window = Duration::from_secs(window_secs);
374        }
375        if let Some(enabled) = config.adaptive_enabled {
376            adaptive_config.adaptive_enabled = enabled;
377        }
378
379        Ok(Self {
380            base: SourceBase::new(params)?,
381            config,
382            adaptive_config,
383        })
384    }
385
386    /// Create a new HTTP source with custom dispatch settings.
387    ///
388    /// The event channel is automatically injected when the source is added
389    /// to DrasiLib via `add_source()`.
390    ///
391    /// # Arguments
392    ///
393    /// * `id` - Unique identifier for this source instance
394    /// * `config` - HTTP source configuration
395    /// * `dispatch_mode` - Optional dispatch mode (Channel, Direct, etc.)
396    /// * `dispatch_buffer_capacity` - Optional buffer capacity for channel dispatch
397    ///
398    /// # Returns
399    ///
400    /// A new `HttpSource` instance with custom dispatch settings.
401    ///
402    /// # Errors
403    ///
404    /// Returns an error if the base source cannot be initialized.
405    pub fn with_dispatch(
406        id: impl Into<String>,
407        config: HttpSourceConfig,
408        dispatch_mode: Option<DispatchMode>,
409        dispatch_buffer_capacity: Option<usize>,
410    ) -> Result<Self> {
411        let id = id.into();
412        let mut params = SourceBaseParams::new(id);
413        if let Some(mode) = dispatch_mode {
414            params = params.with_dispatch_mode(mode);
415        }
416        if let Some(capacity) = dispatch_buffer_capacity {
417            params = params.with_dispatch_buffer_capacity(capacity);
418        }
419
420        let mut adaptive_config = AdaptiveBatchConfig::default();
421
422        if let Some(max_batch) = config.adaptive_max_batch_size {
423            adaptive_config.max_batch_size = max_batch;
424        }
425        if let Some(min_batch) = config.adaptive_min_batch_size {
426            adaptive_config.min_batch_size = min_batch;
427        }
428        if let Some(max_wait_ms) = config.adaptive_max_wait_ms {
429            adaptive_config.max_wait_time = Duration::from_millis(max_wait_ms);
430        }
431        if let Some(min_wait_ms) = config.adaptive_min_wait_ms {
432            adaptive_config.min_wait_time = Duration::from_millis(min_wait_ms);
433        }
434        if let Some(window_secs) = config.adaptive_window_secs {
435            adaptive_config.throughput_window = Duration::from_secs(window_secs);
436        }
437        if let Some(enabled) = config.adaptive_enabled {
438            adaptive_config.adaptive_enabled = enabled;
439        }
440
441        Ok(Self {
442            base: SourceBase::new(params)?,
443            config,
444            adaptive_config,
445        })
446    }
447
448    /// Handle a single event submission from `POST /sources/{source_id}/events`.
449    ///
450    /// Validates the source ID matches this source and converts the HTTP event
451    /// to a source change before sending to the adaptive batcher.
452    async fn handle_single_event(
453        Path(source_id): Path<String>,
454        State(state): State<HttpAppState>,
455        Json(event): Json<HttpSourceChange>,
456    ) -> Result<impl IntoResponse, (StatusCode, Json<EventResponse>)> {
457        debug!("[{source_id}] HTTP endpoint received single event: {event:?}");
458        Self::process_events(&source_id, &state, vec![event]).await
459    }
460
461    /// Handle a batch event submission from `POST /sources/{source_id}/events/batch`.
462    ///
463    /// Validates the source ID and processes all events in the batch,
464    /// returning partial success if some events fail.
465    async fn handle_batch_events(
466        Path(source_id): Path<String>,
467        State(state): State<HttpAppState>,
468        Json(batch): Json<BatchEventRequest>,
469    ) -> Result<impl IntoResponse, (StatusCode, Json<EventResponse>)> {
470        debug!(
471            "[{}] HTTP endpoint received batch of {} events",
472            source_id,
473            batch.events.len()
474        );
475        Self::process_events(&source_id, &state, batch.events).await
476    }
477
478    /// Process a list of events, converting and sending them to the batcher.
479    ///
480    /// # Arguments
481    ///
482    /// * `source_id` - Source ID from the request path
483    /// * `state` - Shared app state containing the batch channel
484    /// * `events` - List of HTTP source changes to process
485    ///
486    /// # Returns
487    ///
488    /// Success response with count of processed events, or error if all fail.
489    async fn process_events(
490        source_id: &str,
491        state: &HttpAppState,
492        events: Vec<HttpSourceChange>,
493    ) -> Result<impl IntoResponse, (StatusCode, Json<EventResponse>)> {
494        trace!("[{}] Processing {} events", source_id, events.len());
495
496        if source_id != state.source_id {
497            error!(
498                "[{}] Source name mismatch. Expected '{}', got '{}'",
499                state.source_id, state.source_id, source_id
500            );
501            return Err((
502                StatusCode::BAD_REQUEST,
503                Json(EventResponse {
504                    success: false,
505                    message: "Source name mismatch".to_string(),
506                    error: Some(format!(
507                        "Expected source '{}', got '{}'",
508                        state.source_id, source_id
509                    )),
510                }),
511            ));
512        }
513
514        let mut success_count = 0;
515        let mut error_count = 0;
516        let mut last_error = None;
517
518        for (idx, event) in events.iter().enumerate() {
519            match convert_http_to_source_change(event, source_id) {
520                Ok(source_change) => {
521                    let change_event = SourceChangeEvent {
522                        source_id: source_id.to_string(),
523                        change: source_change,
524                        timestamp: chrono::Utc::now(),
525                    };
526
527                    if let Err(e) = state.batch_tx.send(change_event).await {
528                        error!(
529                            "[{}] Failed to send event {} to batch channel: {}",
530                            state.source_id,
531                            idx + 1,
532                            e
533                        );
534                        error_count += 1;
535                        last_error = Some("Internal channel error".to_string());
536                    } else {
537                        success_count += 1;
538                    }
539                }
540                Err(e) => {
541                    error!(
542                        "[{}] Failed to convert event {}: {}",
543                        state.source_id,
544                        idx + 1,
545                        e
546                    );
547                    error_count += 1;
548                    last_error = Some(e.to_string());
549                }
550            }
551        }
552
553        debug!(
554            "[{source_id}] Event processing complete: {success_count} succeeded, {error_count} failed"
555        );
556
557        if error_count > 0 && success_count == 0 {
558            Err((
559                StatusCode::BAD_REQUEST,
560                Json(EventResponse {
561                    success: false,
562                    message: format!("All {error_count} events failed"),
563                    error: last_error,
564                }),
565            ))
566        } else if error_count > 0 {
567            Ok(Json(EventResponse {
568                success: true,
569                message: format!(
570                    "Processed {success_count} events successfully, {error_count} failed"
571                ),
572                error: last_error,
573            }))
574        } else {
575            Ok(Json(EventResponse {
576                success: true,
577                message: format!("All {success_count} events processed successfully"),
578                error: None,
579            }))
580        }
581    }
582
583    async fn health_check() -> impl IntoResponse {
584        Json(serde_json::json!({
585            "status": "healthy",
586            "service": "http-source",
587            "features": ["adaptive-batching", "batch-endpoint", "webhooks"]
588        }))
589    }
590
591    /// Handle webhook requests
592    ///
593    /// This handler processes requests in webhook mode, matching against
594    /// configured routes and transforming payloads using templates.
595    async fn handle_webhook(
596        method: axum::http::Method,
597        uri: axum::http::Uri,
598        headers: axum::http::HeaderMap,
599        State(state): State<HttpAppState>,
600        body: Bytes,
601    ) -> impl IntoResponse {
602        let path = uri.path();
603        let source_id = &state.source_id;
604
605        debug!("[{source_id}] Webhook received: {method} {path}");
606
607        // Get webhook state - should always be present in webhook mode
608        let webhook_state = match &state.webhook_config {
609            Some(ws) => ws,
610            None => {
611                error!("[{source_id}] Webhook handler called but no webhook config present");
612                return (
613                    StatusCode::INTERNAL_SERVER_ERROR,
614                    Json(EventResponse {
615                        success: false,
616                        message: "Internal configuration error".to_string(),
617                        error: Some("Webhook mode not properly configured".to_string()),
618                    }),
619                );
620            }
621        };
622
623        // Convert method
624        let http_method = match convert_method(&method) {
625            Some(m) => m,
626            None => {
627                return handle_error(
628                    &webhook_state.config.error_behavior,
629                    source_id,
630                    StatusCode::METHOD_NOT_ALLOWED,
631                    "Method not supported",
632                    None,
633                );
634            }
635        };
636
637        // Match route
638        let route_match = match webhook_state.route_matcher.match_route(
639            path,
640            &http_method,
641            &webhook_state.config.routes,
642        ) {
643            Some(rm) => rm,
644            None => {
645                debug!("[{source_id}] No matching route for {method} {path}");
646                return handle_error(
647                    &webhook_state.config.error_behavior,
648                    source_id,
649                    StatusCode::NOT_FOUND,
650                    "No matching route",
651                    None,
652                );
653            }
654        };
655
656        let route = route_match.route;
657        let error_behavior = route
658            .error_behavior
659            .as_ref()
660            .unwrap_or(&webhook_state.config.error_behavior);
661
662        // Verify authentication
663        let auth_result = verify_auth(route.auth.as_ref(), &headers, &body);
664        if let AuthResult::Failed(reason) = auth_result {
665            warn!("[{source_id}] Authentication failed for {path}: {reason}");
666            return handle_error(
667                error_behavior,
668                source_id,
669                StatusCode::UNAUTHORIZED,
670                "Authentication failed",
671                Some(&reason),
672            );
673        }
674
675        // Parse content
676        let content_type = ContentType::from_header(
677            headers
678                .get(axum::http::header::CONTENT_TYPE)
679                .and_then(|v| v.to_str().ok()),
680        );
681
682        let payload = match parse_content(&body, content_type) {
683            Ok(p) => p,
684            Err(e) => {
685                warn!("[{source_id}] Failed to parse payload: {e}");
686                return handle_error(
687                    error_behavior,
688                    source_id,
689                    StatusCode::BAD_REQUEST,
690                    "Failed to parse payload",
691                    Some(&e.to_string()),
692                );
693            }
694        };
695
696        // Build template context
697        let headers_map = headers_to_map(&headers);
698        let query_map = parse_query_string(uri.query());
699
700        let context = TemplateContext {
701            payload: payload.clone(),
702            route: route_match.path_params,
703            query: query_map,
704            headers: headers_map.clone(),
705            method: method.to_string(),
706            path: path.to_string(),
707            source_id: source_id.clone(),
708        };
709
710        // Find matching mappings
711        let matching_mappings = find_matching_mappings(&route.mappings, &headers_map, &payload);
712
713        if matching_mappings.is_empty() {
714            debug!("[{source_id}] No matching mappings for request");
715            return handle_error(
716                error_behavior,
717                source_id,
718                StatusCode::BAD_REQUEST,
719                "No matching mapping for request",
720                None,
721            );
722        }
723
724        // Process each matching mapping
725        let mut success_count = 0;
726        let mut error_count = 0;
727        let mut last_error = None;
728
729        for mapping in matching_mappings {
730            match webhook_state
731                .template_engine
732                .process_mapping(mapping, &context, source_id)
733            {
734                Ok(source_change) => {
735                    let event = SourceChangeEvent {
736                        source_id: source_id.clone(),
737                        change: source_change,
738                        timestamp: chrono::Utc::now(),
739                    };
740
741                    if let Err(e) = state.batch_tx.send(event).await {
742                        error!("[{source_id}] Failed to send event to batcher: {e}");
743                        error_count += 1;
744                        last_error = Some(format!("Failed to queue event: {e}"));
745                    } else {
746                        success_count += 1;
747                    }
748                }
749                Err(e) => {
750                    warn!("[{source_id}] Failed to process mapping: {e}");
751                    error_count += 1;
752                    last_error = Some(e.to_string());
753                }
754            }
755        }
756
757        debug!("[{source_id}] Webhook processing complete: {success_count} succeeded, {error_count} failed");
758
759        if error_count > 0 && success_count == 0 {
760            handle_error(
761                error_behavior,
762                source_id,
763                StatusCode::BAD_REQUEST,
764                &format!("All {error_count} mappings failed"),
765                last_error.as_deref(),
766            )
767        } else if error_count > 0 {
768            (
769                StatusCode::OK,
770                Json(EventResponse {
771                    success: true,
772                    message: format!("Processed {success_count} events, {error_count} failed"),
773                    error: last_error,
774                }),
775            )
776        } else {
777            (
778                StatusCode::OK,
779                Json(EventResponse {
780                    success: true,
781                    message: format!("Processed {success_count} events successfully"),
782                    error: None,
783                }),
784            )
785        }
786    }
787
788    async fn run_adaptive_batcher(
789        batch_rx: mpsc::Receiver<SourceChangeEvent>,
790        dispatchers: Arc<
791            tokio::sync::RwLock<
792                Vec<
793                    Box<
794                        dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync,
795                    >,
796                >,
797            >,
798        >,
799        adaptive_config: AdaptiveBatchConfig,
800        source_id: String,
801    ) {
802        let mut batcher = AdaptiveBatcher::new(batch_rx, adaptive_config.clone());
803        let mut total_events = 0u64;
804        let mut total_batches = 0u64;
805
806        info!("[{source_id}] Adaptive HTTP batcher started with config: {adaptive_config:?}");
807
808        while let Some(batch) = batcher.next_batch().await {
809            if batch.is_empty() {
810                debug!("[{source_id}] Batcher received empty batch, skipping");
811                continue;
812            }
813
814            let batch_size = batch.len();
815            total_events += batch_size as u64;
816            total_batches += 1;
817
818            debug!(
819                "[{source_id}] Batcher forwarding batch #{total_batches} with {batch_size} events to dispatchers"
820            );
821
822            let mut sent_count = 0;
823            let mut failed_count = 0;
824            for (idx, event) in batch.into_iter().enumerate() {
825                debug!(
826                    "[{}] Batch #{}, dispatching event {}/{}",
827                    source_id,
828                    total_batches,
829                    idx + 1,
830                    batch_size
831                );
832
833                let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
834                profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
835
836                let wrapper = SourceEventWrapper::with_profiling(
837                    event.source_id.clone(),
838                    SourceEvent::Change(event.change),
839                    event.timestamp,
840                    profiling,
841                );
842
843                if let Err(e) =
844                    SourceBase::dispatch_from_task(dispatchers.clone(), wrapper.clone(), &source_id)
845                        .await
846                {
847                    error!(
848                        "[{}] Batch #{}, failed to dispatch event {}/{} (no subscribers): {}",
849                        source_id,
850                        total_batches,
851                        idx + 1,
852                        batch_size,
853                        e
854                    );
855                    failed_count += 1;
856                } else {
857                    debug!(
858                        "[{}] Batch #{}, successfully dispatched event {}/{}",
859                        source_id,
860                        total_batches,
861                        idx + 1,
862                        batch_size
863                    );
864                    sent_count += 1;
865                }
866            }
867
868            debug!(
869                "[{source_id}] Batch #{total_batches} complete: {sent_count} dispatched, {failed_count} failed"
870            );
871
872            if total_batches.is_multiple_of(100) {
873                info!(
874                    "[{}] Adaptive HTTP metrics - Batches: {}, Events: {}, Avg batch size: {:.1}",
875                    source_id,
876                    total_batches,
877                    total_events,
878                    total_events as f64 / total_batches as f64
879                );
880            }
881        }
882
883        info!(
884            "[{source_id}] Adaptive HTTP batcher stopped - Total batches: {total_batches}, Total events: {total_events}"
885        );
886    }
887}
888
889#[async_trait]
890impl Source for HttpSource {
891    fn id(&self) -> &str {
892        &self.base.id
893    }
894
895    fn type_name(&self) -> &str {
896        "http"
897    }
898
899    fn properties(&self) -> HashMap<String, serde_json::Value> {
900        let mut props = HashMap::new();
901        props.insert(
902            "host".to_string(),
903            serde_json::Value::String(self.config.host.clone()),
904        );
905        props.insert(
906            "port".to_string(),
907            serde_json::Value::Number(self.config.port.into()),
908        );
909        if let Some(ref endpoint) = self.config.endpoint {
910            props.insert(
911                "endpoint".to_string(),
912                serde_json::Value::String(endpoint.clone()),
913            );
914        }
915        props
916    }
917
918    fn auto_start(&self) -> bool {
919        self.base.get_auto_start()
920    }
921
922    async fn start(&self) -> Result<()> {
923        info!("[{}] Starting adaptive HTTP source", self.base.id);
924
925        self.base.set_status(ComponentStatus::Starting).await;
926        self.base
927            .send_component_event(
928                ComponentStatus::Starting,
929                Some("Starting adaptive HTTP source".to_string()),
930            )
931            .await?;
932
933        let host = self.config.host.clone();
934        let port = self.config.port;
935
936        // Create batch channel with capacity based on batch configuration
937        let batch_channel_capacity = self.adaptive_config.recommended_channel_capacity();
938        let (batch_tx, batch_rx) = mpsc::channel(batch_channel_capacity);
939        info!(
940            "[{}] HttpSource using batch channel capacity: {} (max_batch_size: {} x 5)",
941            self.base.id, batch_channel_capacity, self.adaptive_config.max_batch_size
942        );
943
944        // Start adaptive batcher task
945        let adaptive_config = self.adaptive_config.clone();
946        let source_id = self.base.id.clone();
947        let dispatchers = self.base.dispatchers.clone();
948
949        // Get instance_id from context for log routing isolation
950        let instance_id = self
951            .base
952            .context()
953            .await
954            .map(|c| c.instance_id)
955            .unwrap_or_default();
956
957        info!("[{source_id}] Starting adaptive batcher task");
958        let source_id_for_span = source_id.clone();
959        let span = tracing::info_span!(
960            "http_adaptive_batcher",
961            instance_id = %instance_id,
962            component_id = %source_id_for_span,
963            component_type = "source"
964        );
965        tokio::spawn(
966            async move {
967                Self::run_adaptive_batcher(
968                    batch_rx,
969                    dispatchers,
970                    adaptive_config,
971                    source_id.clone(),
972                )
973                .await
974            }
975            .instrument(span),
976        );
977
978        // Create app state
979        let webhook_state = if let Some(ref webhook_config) = self.config.webhooks {
980            info!(
981                "[{}] Webhook mode enabled with {} routes",
982                self.base.id,
983                webhook_config.routes.len()
984            );
985            Some(Arc::new(WebhookState {
986                config: webhook_config.clone(),
987                route_matcher: RouteMatcher::new(&webhook_config.routes),
988                template_engine: TemplateEngine::new(),
989            }))
990        } else {
991            info!("[{}] Standard mode enabled", self.base.id);
992            None
993        };
994
995        let state = HttpAppState {
996            source_id: self.base.id.clone(),
997            batch_tx,
998            webhook_config: webhook_state,
999        };
1000
1001        // Build router based on mode
1002        let app = if self.config.is_webhook_mode() {
1003            // Webhook mode: only health check + catch-all webhook handler
1004            let router = Router::new()
1005                .route("/health", get(Self::health_check))
1006                .fallback(Self::handle_webhook)
1007                .with_state(state);
1008
1009            // Apply CORS if configured
1010            if let Some(ref webhooks) = self.config.webhooks {
1011                if let Some(ref cors_config) = webhooks.cors {
1012                    if cors_config.enabled {
1013                        info!("[{}] CORS enabled for webhook endpoints", self.base.id);
1014                        router.layer(build_cors_layer(cors_config))
1015                    } else {
1016                        router
1017                    }
1018                } else {
1019                    router
1020                }
1021            } else {
1022                router
1023            }
1024        } else {
1025            // Standard mode: original endpoints
1026            Router::new()
1027                .route("/health", get(Self::health_check))
1028                .route(
1029                    "/sources/:source_id/events",
1030                    post(Self::handle_single_event),
1031                )
1032                .route(
1033                    "/sources/:source_id/events/batch",
1034                    post(Self::handle_batch_events),
1035                )
1036                .with_state(state)
1037        };
1038
1039        // Create shutdown channel
1040        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
1041
1042        let host_clone = host.clone();
1043
1044        // Start server
1045        let (error_tx, error_rx) = tokio::sync::oneshot::channel();
1046        let source_id = self.base.id.clone();
1047        let source_id_for_span = source_id.clone();
1048        let span = tracing::info_span!(
1049            "http_source_server",
1050            instance_id = %instance_id,
1051            component_id = %source_id_for_span,
1052            component_type = "source"
1053        );
1054        let server_handle = tokio::spawn(
1055            async move {
1056                let addr = format!("{host}:{port}");
1057                info!("[{source_id}] Adaptive HTTP source attempting to bind to {addr}");
1058
1059                let listener = match tokio::net::TcpListener::bind(&addr).await {
1060                    Ok(listener) => {
1061                        info!("[{source_id}] Adaptive HTTP source successfully listening on {addr}");
1062                        listener
1063                    }
1064                    Err(e) => {
1065                        error!("[{source_id}] Failed to bind HTTP server to {addr}: {e}");
1066                        let _ = error_tx.send(format!(
1067                        "Failed to bind HTTP server to {addr}: {e}. Common causes: port already in use, insufficient permissions"
1068                    ));
1069                    return;
1070                }
1071            };
1072
1073                if let Err(e) = axum::serve(listener, app)
1074                    .with_graceful_shutdown(async move {
1075                        let _ = shutdown_rx.await;
1076                    })
1077                    .await
1078                {
1079                    error!("[{source_id}] HTTP server error: {e}");
1080                }
1081            }
1082            .instrument(span),
1083        );
1084
1085        *self.base.task_handle.write().await = Some(server_handle);
1086        *self.base.shutdown_tx.write().await = Some(shutdown_tx);
1087
1088        // Check for startup errors with a short timeout
1089        match timeout(Duration::from_millis(500), error_rx).await {
1090            Ok(Ok(error_msg)) => {
1091                self.base.set_status(ComponentStatus::Error).await;
1092                return Err(anyhow::anyhow!("{error_msg}"));
1093            }
1094            _ => {
1095                self.base.set_status(ComponentStatus::Running).await;
1096            }
1097        }
1098
1099        self.base
1100            .send_component_event(
1101                ComponentStatus::Running,
1102                Some(format!(
1103                    "Adaptive HTTP source running on {host_clone}:{port} with batch support"
1104                )),
1105            )
1106            .await?;
1107
1108        Ok(())
1109    }
1110
1111    async fn stop(&self) -> Result<()> {
1112        info!("[{}] Stopping adaptive HTTP source", self.base.id);
1113
1114        self.base.set_status(ComponentStatus::Stopping).await;
1115        self.base
1116            .send_component_event(
1117                ComponentStatus::Stopping,
1118                Some("Stopping adaptive HTTP source".to_string()),
1119            )
1120            .await?;
1121
1122        if let Some(tx) = self.base.shutdown_tx.write().await.take() {
1123            let _ = tx.send(());
1124        }
1125
1126        if let Some(handle) = self.base.task_handle.write().await.take() {
1127            let _ = timeout(Duration::from_secs(5), handle).await;
1128        }
1129
1130        self.base.set_status(ComponentStatus::Stopped).await;
1131        self.base
1132            .send_component_event(
1133                ComponentStatus::Stopped,
1134                Some("Adaptive HTTP source stopped".to_string()),
1135            )
1136            .await?;
1137
1138        Ok(())
1139    }
1140
1141    async fn status(&self) -> ComponentStatus {
1142        self.base.get_status().await
1143    }
1144
1145    async fn subscribe(
1146        &self,
1147        settings: drasi_lib::config::SourceSubscriptionSettings,
1148    ) -> Result<SubscriptionResponse> {
1149        self.base.subscribe_with_bootstrap(&settings, "HTTP").await
1150    }
1151
1152    fn as_any(&self) -> &dyn std::any::Any {
1153        self
1154    }
1155
1156    async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
1157        self.base.initialize(context).await;
1158    }
1159
1160    async fn set_bootstrap_provider(
1161        &self,
1162        provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
1163    ) {
1164        self.base.set_bootstrap_provider(provider).await;
1165    }
1166}
1167
1168/// Builder for HttpSource instances.
1169///
1170/// Provides a fluent API for constructing HTTP sources with sensible defaults
1171/// and adaptive batching settings. The builder takes the source ID at construction
1172/// and returns a fully constructed `HttpSource` from `build()`.
1173///
1174/// # Example
1175///
1176/// ```rust,ignore
1177/// use drasi_source_http::HttpSource;
1178///
1179/// let source = HttpSource::builder("my-source")
1180///     .with_host("0.0.0.0")
1181///     .with_port(8080)
1182///     .with_adaptive_enabled(true)
1183///     .with_bootstrap_provider(my_provider)
1184///     .build()?;
1185/// ```
1186pub struct HttpSourceBuilder {
1187    id: String,
1188    host: String,
1189    port: u16,
1190    endpoint: Option<String>,
1191    timeout_ms: u64,
1192    adaptive_max_batch_size: Option<usize>,
1193    adaptive_min_batch_size: Option<usize>,
1194    adaptive_max_wait_ms: Option<u64>,
1195    adaptive_min_wait_ms: Option<u64>,
1196    adaptive_window_secs: Option<u64>,
1197    adaptive_enabled: Option<bool>,
1198    webhooks: Option<WebhookConfig>,
1199    dispatch_mode: Option<DispatchMode>,
1200    dispatch_buffer_capacity: Option<usize>,
1201    bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
1202    auto_start: bool,
1203}
1204
1205impl HttpSourceBuilder {
1206    /// Create a new HTTP source builder with the given source ID.
1207    ///
1208    /// # Arguments
1209    ///
1210    /// * `id` - Unique identifier for the source instance
1211    pub fn new(id: impl Into<String>) -> Self {
1212        Self {
1213            id: id.into(),
1214            host: String::new(),
1215            port: 8080,
1216            endpoint: None,
1217            timeout_ms: 10000,
1218            adaptive_max_batch_size: None,
1219            adaptive_min_batch_size: None,
1220            adaptive_max_wait_ms: None,
1221            adaptive_min_wait_ms: None,
1222            adaptive_window_secs: None,
1223            adaptive_enabled: None,
1224            webhooks: None,
1225            dispatch_mode: None,
1226            dispatch_buffer_capacity: None,
1227            bootstrap_provider: None,
1228            auto_start: true,
1229        }
1230    }
1231
1232    /// Set the HTTP host
1233    pub fn with_host(mut self, host: impl Into<String>) -> Self {
1234        self.host = host.into();
1235        self
1236    }
1237
1238    /// Set the HTTP port
1239    pub fn with_port(mut self, port: u16) -> Self {
1240        self.port = port;
1241        self
1242    }
1243
1244    /// Set the endpoint path
1245    pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
1246        self.endpoint = Some(endpoint.into());
1247        self
1248    }
1249
1250    /// Set the request timeout in milliseconds
1251    pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
1252        self.timeout_ms = timeout_ms;
1253        self
1254    }
1255
1256    /// Set the adaptive batching maximum batch size
1257    pub fn with_adaptive_max_batch_size(mut self, size: usize) -> Self {
1258        self.adaptive_max_batch_size = Some(size);
1259        self
1260    }
1261
1262    /// Set the adaptive batching minimum batch size
1263    pub fn with_adaptive_min_batch_size(mut self, size: usize) -> Self {
1264        self.adaptive_min_batch_size = Some(size);
1265        self
1266    }
1267
1268    /// Set the adaptive batching maximum wait time in milliseconds
1269    pub fn with_adaptive_max_wait_ms(mut self, wait_ms: u64) -> Self {
1270        self.adaptive_max_wait_ms = Some(wait_ms);
1271        self
1272    }
1273
1274    /// Set the adaptive batching minimum wait time in milliseconds
1275    pub fn with_adaptive_min_wait_ms(mut self, wait_ms: u64) -> Self {
1276        self.adaptive_min_wait_ms = Some(wait_ms);
1277        self
1278    }
1279
1280    /// Set the adaptive batching throughput window in seconds
1281    pub fn with_adaptive_window_secs(mut self, secs: u64) -> Self {
1282        self.adaptive_window_secs = Some(secs);
1283        self
1284    }
1285
1286    /// Enable or disable adaptive batching
1287    pub fn with_adaptive_enabled(mut self, enabled: bool) -> Self {
1288        self.adaptive_enabled = Some(enabled);
1289        self
1290    }
1291
1292    /// Set the dispatch mode for event routing.
1293    pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
1294        self.dispatch_mode = Some(mode);
1295        self
1296    }
1297
1298    /// Set the dispatch buffer capacity.
1299    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
1300        self.dispatch_buffer_capacity = Some(capacity);
1301        self
1302    }
1303
1304    /// Set the bootstrap provider for initial data delivery.
1305    pub fn with_bootstrap_provider(
1306        mut self,
1307        provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
1308    ) -> Self {
1309        self.bootstrap_provider = Some(Box::new(provider));
1310        self
1311    }
1312
1313    /// Set whether this source should auto-start when DrasiLib starts.
1314    ///
1315    /// Default is `true`. Set to `false` if this source should only be
1316    /// started manually via `start_source()`.
1317    pub fn with_auto_start(mut self, auto_start: bool) -> Self {
1318        self.auto_start = auto_start;
1319        self
1320    }
1321
1322    /// Set the webhook configuration to enable webhook mode.
1323    ///
1324    /// When webhook mode is enabled, the standard `HttpSourceChange` endpoints
1325    /// are disabled and custom webhook routes are used instead.
1326    pub fn with_webhooks(mut self, webhooks: WebhookConfig) -> Self {
1327        self.webhooks = Some(webhooks);
1328        self
1329    }
1330
1331    /// Set the full configuration at once
1332    pub fn with_config(mut self, config: HttpSourceConfig) -> Self {
1333        self.host = config.host;
1334        self.port = config.port;
1335        self.endpoint = config.endpoint;
1336        self.timeout_ms = config.timeout_ms;
1337        self.adaptive_max_batch_size = config.adaptive_max_batch_size;
1338        self.adaptive_min_batch_size = config.adaptive_min_batch_size;
1339        self.adaptive_max_wait_ms = config.adaptive_max_wait_ms;
1340        self.adaptive_min_wait_ms = config.adaptive_min_wait_ms;
1341        self.adaptive_window_secs = config.adaptive_window_secs;
1342        self.adaptive_enabled = config.adaptive_enabled;
1343        self.webhooks = config.webhooks;
1344        self
1345    }
1346
1347    /// Build the HttpSource instance.
1348    ///
1349    /// # Returns
1350    ///
1351    /// A fully constructed `HttpSource`, or an error if construction fails.
1352    pub fn build(self) -> Result<HttpSource> {
1353        let config = HttpSourceConfig {
1354            host: self.host,
1355            port: self.port,
1356            endpoint: self.endpoint,
1357            timeout_ms: self.timeout_ms,
1358            adaptive_max_batch_size: self.adaptive_max_batch_size,
1359            adaptive_min_batch_size: self.adaptive_min_batch_size,
1360            adaptive_max_wait_ms: self.adaptive_max_wait_ms,
1361            adaptive_min_wait_ms: self.adaptive_min_wait_ms,
1362            adaptive_window_secs: self.adaptive_window_secs,
1363            adaptive_enabled: self.adaptive_enabled,
1364            webhooks: self.webhooks,
1365        };
1366
1367        // Build SourceBaseParams with all settings
1368        let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
1369        if let Some(mode) = self.dispatch_mode {
1370            params = params.with_dispatch_mode(mode);
1371        }
1372        if let Some(capacity) = self.dispatch_buffer_capacity {
1373            params = params.with_dispatch_buffer_capacity(capacity);
1374        }
1375        if let Some(provider) = self.bootstrap_provider {
1376            params = params.with_bootstrap_provider(provider);
1377        }
1378
1379        // Configure adaptive batching
1380        let mut adaptive_config = AdaptiveBatchConfig::default();
1381        if let Some(max_batch) = config.adaptive_max_batch_size {
1382            adaptive_config.max_batch_size = max_batch;
1383        }
1384        if let Some(min_batch) = config.adaptive_min_batch_size {
1385            adaptive_config.min_batch_size = min_batch;
1386        }
1387        if let Some(max_wait_ms) = config.adaptive_max_wait_ms {
1388            adaptive_config.max_wait_time = Duration::from_millis(max_wait_ms);
1389        }
1390        if let Some(min_wait_ms) = config.adaptive_min_wait_ms {
1391            adaptive_config.min_wait_time = Duration::from_millis(min_wait_ms);
1392        }
1393        if let Some(window_secs) = config.adaptive_window_secs {
1394            adaptive_config.throughput_window = Duration::from_secs(window_secs);
1395        }
1396        if let Some(enabled) = config.adaptive_enabled {
1397            adaptive_config.adaptive_enabled = enabled;
1398        }
1399
1400        Ok(HttpSource {
1401            base: SourceBase::new(params)?,
1402            config,
1403            adaptive_config,
1404        })
1405    }
1406}
1407
1408impl HttpSource {
1409    /// Create a builder for HttpSource with the given ID.
1410    ///
1411    /// This is the recommended way to construct an HttpSource.
1412    ///
1413    /// # Arguments
1414    ///
1415    /// * `id` - Unique identifier for the source instance
1416    ///
1417    /// # Example
1418    ///
1419    /// ```rust,ignore
1420    /// let source = HttpSource::builder("my-source")
1421    ///     .with_host("0.0.0.0")
1422    ///     .with_port(8080)
1423    ///     .with_bootstrap_provider(my_provider)
1424    ///     .build()?;
1425    /// ```
1426    pub fn builder(id: impl Into<String>) -> HttpSourceBuilder {
1427        HttpSourceBuilder::new(id)
1428    }
1429}
1430
1431/// Handle errors according to configured error behavior
1432fn handle_error(
1433    behavior: &ErrorBehavior,
1434    source_id: &str,
1435    status: StatusCode,
1436    message: &str,
1437    detail: Option<&str>,
1438) -> (StatusCode, Json<EventResponse>) {
1439    match behavior {
1440        ErrorBehavior::Reject => {
1441            debug!("[{source_id}] Rejecting request: {message}");
1442            (
1443                status,
1444                Json(EventResponse {
1445                    success: false,
1446                    message: message.to_string(),
1447                    error: detail.map(String::from),
1448                }),
1449            )
1450        }
1451        ErrorBehavior::AcceptAndLog => {
1452            warn!("[{source_id}] Accepting with error (logged): {message}");
1453            (
1454                StatusCode::OK,
1455                Json(EventResponse {
1456                    success: true,
1457                    message: format!("Accepted with warning: {message}"),
1458                    error: detail.map(String::from),
1459                }),
1460            )
1461        }
1462        ErrorBehavior::AcceptAndSkip => {
1463            trace!("[{source_id}] Accepting silently: {message}");
1464            (
1465                StatusCode::OK,
1466                Json(EventResponse {
1467                    success: true,
1468                    message: "Accepted".to_string(),
1469                    error: None,
1470                }),
1471            )
1472        }
1473    }
1474}
1475
1476/// Parse query string into a HashMap
1477fn parse_query_string(query: Option<&str>) -> HashMap<String, String> {
1478    query
1479        .map(|q| {
1480            q.split('&')
1481                .filter_map(|pair| {
1482                    let mut parts = pair.splitn(2, '=');
1483                    let key = parts.next()?;
1484                    let value = parts.next().unwrap_or("");
1485                    Some((urlencoding_decode(key), urlencoding_decode(value)))
1486                })
1487                .collect()
1488        })
1489        .unwrap_or_default()
1490}
1491
1492/// Simple URL decoding (handles %XX sequences) with proper UTF-8 handling
1493fn urlencoding_decode(s: &str) -> String {
1494    // Collect decoded bytes first, then convert to String to properly handle UTF-8
1495    let mut decoded: Vec<u8> = Vec::with_capacity(s.len());
1496    let mut chars = s.chars();
1497
1498    while let Some(c) = chars.next() {
1499        if c == '%' {
1500            let mut hex = String::new();
1501            if let Some(c1) = chars.next() {
1502                hex.push(c1);
1503            }
1504            if let Some(c2) = chars.next() {
1505                hex.push(c2);
1506            }
1507
1508            if hex.len() == 2 {
1509                if let Ok(byte) = u8::from_str_radix(&hex, 16) {
1510                    decoded.push(byte);
1511                    continue;
1512                }
1513            }
1514
1515            // If we couldn't decode a valid %XX sequence, keep the original text
1516            decoded.extend_from_slice(b"%");
1517            decoded.extend_from_slice(hex.as_bytes());
1518        } else if c == '+' {
1519            decoded.push(b' ');
1520        } else {
1521            // Encode the character as UTF-8 and append its bytes
1522            let mut buf = [0u8; 4];
1523            let encoded = c.encode_utf8(&mut buf);
1524            decoded.extend_from_slice(encoded.as_bytes());
1525        }
1526    }
1527
1528    // Convert bytes to string, replacing invalid UTF-8 sequences
1529    String::from_utf8_lossy(&decoded).into_owned()
1530}
1531
1532/// Build a CORS layer from configuration
1533fn build_cors_layer(cors_config: &CorsConfig) -> CorsLayer {
1534    let mut cors = CorsLayer::new();
1535
1536    // Configure allowed origins
1537    if cors_config.allow_origins.len() == 1 && cors_config.allow_origins[0] == "*" {
1538        cors = cors.allow_origin(Any);
1539    } else {
1540        let origins: Vec<_> = cors_config
1541            .allow_origins
1542            .iter()
1543            .filter_map(|o| o.parse().ok())
1544            .collect();
1545        cors = cors.allow_origin(origins);
1546    }
1547
1548    // Configure allowed methods
1549    let methods: Vec<Method> = cors_config
1550        .allow_methods
1551        .iter()
1552        .filter_map(|m| m.parse().ok())
1553        .collect();
1554    cors = cors.allow_methods(methods);
1555
1556    // Configure allowed headers
1557    if cors_config.allow_headers.len() == 1 && cors_config.allow_headers[0] == "*" {
1558        cors = cors.allow_headers(Any);
1559    } else {
1560        let headers: Vec<header::HeaderName> = cors_config
1561            .allow_headers
1562            .iter()
1563            .filter_map(|h| h.parse().ok())
1564            .collect();
1565        cors = cors.allow_headers(headers);
1566    }
1567
1568    // Configure exposed headers
1569    if !cors_config.expose_headers.is_empty() {
1570        let exposed: Vec<header::HeaderName> = cors_config
1571            .expose_headers
1572            .iter()
1573            .filter_map(|h| h.parse().ok())
1574            .collect();
1575        cors = cors.expose_headers(exposed);
1576    }
1577
1578    // Configure credentials
1579    if cors_config.allow_credentials {
1580        cors = cors.allow_credentials(true);
1581    }
1582
1583    // Configure max age
1584    cors = cors.max_age(Duration::from_secs(cors_config.max_age));
1585
1586    cors
1587}
1588
1589#[cfg(test)]
1590mod tests {
1591    use super::*;
1592
1593    mod construction {
1594        use super::*;
1595
1596        #[test]
1597        fn test_builder_with_valid_config() {
1598            let source = HttpSourceBuilder::new("test-source")
1599                .with_host("localhost")
1600                .with_port(8080)
1601                .build();
1602            assert!(source.is_ok());
1603        }
1604
1605        #[test]
1606        fn test_builder_with_custom_config() {
1607            let source = HttpSourceBuilder::new("http-source")
1608                .with_host("0.0.0.0")
1609                .with_port(9000)
1610                .with_endpoint("/events")
1611                .build()
1612                .unwrap();
1613            assert_eq!(source.id(), "http-source");
1614        }
1615
1616        #[test]
1617        fn test_with_dispatch_creates_source() {
1618            let config = HttpSourceConfig {
1619                host: "localhost".to_string(),
1620                port: 8080,
1621                endpoint: None,
1622                timeout_ms: 10000,
1623                adaptive_max_batch_size: None,
1624                adaptive_min_batch_size: None,
1625                adaptive_max_wait_ms: None,
1626                adaptive_min_wait_ms: None,
1627                adaptive_window_secs: None,
1628                adaptive_enabled: None,
1629                webhooks: None,
1630            };
1631            let source = HttpSource::with_dispatch(
1632                "dispatch-source",
1633                config,
1634                Some(DispatchMode::Channel),
1635                Some(1000),
1636            );
1637            assert!(source.is_ok());
1638            assert_eq!(source.unwrap().id(), "dispatch-source");
1639        }
1640    }
1641
1642    mod properties {
1643        use super::*;
1644
1645        #[test]
1646        fn test_id_returns_correct_value() {
1647            let source = HttpSourceBuilder::new("my-http-source")
1648                .with_host("localhost")
1649                .build()
1650                .unwrap();
1651            assert_eq!(source.id(), "my-http-source");
1652        }
1653
1654        #[test]
1655        fn test_type_name_returns_http() {
1656            let source = HttpSourceBuilder::new("test")
1657                .with_host("localhost")
1658                .build()
1659                .unwrap();
1660            assert_eq!(source.type_name(), "http");
1661        }
1662
1663        #[test]
1664        fn test_properties_contains_host_and_port() {
1665            let source = HttpSourceBuilder::new("test")
1666                .with_host("192.168.1.1")
1667                .with_port(9000)
1668                .build()
1669                .unwrap();
1670            let props = source.properties();
1671
1672            assert_eq!(
1673                props.get("host"),
1674                Some(&serde_json::Value::String("192.168.1.1".to_string()))
1675            );
1676            assert_eq!(
1677                props.get("port"),
1678                Some(&serde_json::Value::Number(9000.into()))
1679            );
1680        }
1681
1682        #[test]
1683        fn test_properties_includes_endpoint_when_set() {
1684            let source = HttpSourceBuilder::new("test")
1685                .with_host("localhost")
1686                .with_endpoint("/api/v1")
1687                .build()
1688                .unwrap();
1689            let props = source.properties();
1690
1691            assert_eq!(
1692                props.get("endpoint"),
1693                Some(&serde_json::Value::String("/api/v1".to_string()))
1694            );
1695        }
1696
1697        #[test]
1698        fn test_properties_excludes_endpoint_when_none() {
1699            let source = HttpSourceBuilder::new("test")
1700                .with_host("localhost")
1701                .build()
1702                .unwrap();
1703            let props = source.properties();
1704
1705            assert!(!props.contains_key("endpoint"));
1706        }
1707    }
1708
1709    mod lifecycle {
1710        use super::*;
1711
1712        #[tokio::test]
1713        async fn test_initial_status_is_stopped() {
1714            let source = HttpSourceBuilder::new("test")
1715                .with_host("localhost")
1716                .build()
1717                .unwrap();
1718            assert_eq!(source.status().await, ComponentStatus::Stopped);
1719        }
1720    }
1721
1722    mod builder {
1723        use super::*;
1724
1725        #[test]
1726        fn test_http_builder_defaults() {
1727            let source = HttpSourceBuilder::new("test").build().unwrap();
1728            assert_eq!(source.config.port, 8080);
1729            assert_eq!(source.config.timeout_ms, 10000);
1730            assert_eq!(source.config.endpoint, None);
1731        }
1732
1733        #[test]
1734        fn test_http_builder_custom_values() {
1735            let source = HttpSourceBuilder::new("test")
1736                .with_host("api.example.com")
1737                .with_port(9000)
1738                .with_endpoint("/webhook")
1739                .with_timeout_ms(5000)
1740                .build()
1741                .unwrap();
1742
1743            assert_eq!(source.config.host, "api.example.com");
1744            assert_eq!(source.config.port, 9000);
1745            assert_eq!(source.config.endpoint, Some("/webhook".to_string()));
1746            assert_eq!(source.config.timeout_ms, 5000);
1747        }
1748
1749        #[test]
1750        fn test_http_builder_adaptive_batching() {
1751            let source = HttpSourceBuilder::new("test")
1752                .with_host("localhost")
1753                .with_adaptive_max_batch_size(1000)
1754                .with_adaptive_min_batch_size(10)
1755                .with_adaptive_max_wait_ms(500)
1756                .with_adaptive_min_wait_ms(50)
1757                .with_adaptive_window_secs(60)
1758                .with_adaptive_enabled(true)
1759                .build()
1760                .unwrap();
1761
1762            assert_eq!(source.config.adaptive_max_batch_size, Some(1000));
1763            assert_eq!(source.config.adaptive_min_batch_size, Some(10));
1764            assert_eq!(source.config.adaptive_max_wait_ms, Some(500));
1765            assert_eq!(source.config.adaptive_min_wait_ms, Some(50));
1766            assert_eq!(source.config.adaptive_window_secs, Some(60));
1767            assert_eq!(source.config.adaptive_enabled, Some(true));
1768        }
1769
1770        #[test]
1771        fn test_builder_id() {
1772            let source = HttpSource::builder("my-http-source")
1773                .with_host("localhost")
1774                .build()
1775                .unwrap();
1776
1777            assert_eq!(source.base.id, "my-http-source");
1778        }
1779    }
1780
1781    mod event_conversion {
1782        use super::*;
1783
1784        #[test]
1785        fn test_convert_node_insert() {
1786            let mut props = serde_json::Map::new();
1787            props.insert(
1788                "name".to_string(),
1789                serde_json::Value::String("Alice".to_string()),
1790            );
1791            props.insert("age".to_string(), serde_json::Value::Number(30.into()));
1792
1793            let http_change = HttpSourceChange::Insert {
1794                element: HttpElement::Node {
1795                    id: "user-1".to_string(),
1796                    labels: vec!["User".to_string()],
1797                    properties: props,
1798                },
1799                timestamp: Some(1234567890000000000),
1800            };
1801
1802            let result = convert_http_to_source_change(&http_change, "test-source");
1803            assert!(result.is_ok());
1804
1805            match result.unwrap() {
1806                drasi_core::models::SourceChange::Insert { element } => match element {
1807                    drasi_core::models::Element::Node {
1808                        metadata,
1809                        properties,
1810                    } => {
1811                        assert_eq!(metadata.reference.element_id.as_ref(), "user-1");
1812                        assert_eq!(metadata.labels.len(), 1);
1813                        assert_eq!(metadata.effective_from, 1234567890000);
1814                        assert!(properties.get("name").is_some());
1815                        assert!(properties.get("age").is_some());
1816                    }
1817                    _ => panic!("Expected Node element"),
1818                },
1819                _ => panic!("Expected Insert operation"),
1820            }
1821        }
1822
1823        #[test]
1824        fn test_convert_relation_insert() {
1825            let http_change = HttpSourceChange::Insert {
1826                element: HttpElement::Relation {
1827                    id: "follows-1".to_string(),
1828                    labels: vec!["FOLLOWS".to_string()],
1829                    from: "user-1".to_string(),
1830                    to: "user-2".to_string(),
1831                    properties: serde_json::Map::new(),
1832                },
1833                timestamp: None,
1834            };
1835
1836            let result = convert_http_to_source_change(&http_change, "test-source");
1837            assert!(result.is_ok());
1838
1839            match result.unwrap() {
1840                drasi_core::models::SourceChange::Insert { element } => match element {
1841                    drasi_core::models::Element::Relation {
1842                        metadata,
1843                        out_node,
1844                        in_node,
1845                        ..
1846                    } => {
1847                        assert_eq!(metadata.reference.element_id.as_ref(), "follows-1");
1848                        assert_eq!(out_node.element_id.as_ref(), "user-1");
1849                        assert_eq!(in_node.element_id.as_ref(), "user-2");
1850                    }
1851                    _ => panic!("Expected Relation element"),
1852                },
1853                _ => panic!("Expected Insert operation"),
1854            }
1855        }
1856
1857        #[test]
1858        fn test_convert_delete() {
1859            let http_change = HttpSourceChange::Delete {
1860                id: "user-1".to_string(),
1861                labels: Some(vec!["User".to_string()]),
1862                timestamp: Some(9999999999),
1863            };
1864
1865            let result = convert_http_to_source_change(&http_change, "test-source");
1866            assert!(result.is_ok());
1867
1868            match result.unwrap() {
1869                drasi_core::models::SourceChange::Delete { metadata } => {
1870                    assert_eq!(metadata.reference.element_id.as_ref(), "user-1");
1871                    assert_eq!(metadata.labels.len(), 1);
1872                }
1873                _ => panic!("Expected Delete operation"),
1874            }
1875        }
1876
1877        #[test]
1878        fn test_convert_update() {
1879            let http_change = HttpSourceChange::Update {
1880                element: HttpElement::Node {
1881                    id: "user-1".to_string(),
1882                    labels: vec!["User".to_string()],
1883                    properties: serde_json::Map::new(),
1884                },
1885                timestamp: None,
1886            };
1887
1888            let result = convert_http_to_source_change(&http_change, "test-source");
1889            assert!(result.is_ok());
1890
1891            match result.unwrap() {
1892                drasi_core::models::SourceChange::Update { .. } => {
1893                    // Success
1894                }
1895                _ => panic!("Expected Update operation"),
1896            }
1897        }
1898    }
1899
1900    mod adaptive_config {
1901        use super::*;
1902
1903        #[test]
1904        fn test_adaptive_config_from_http_config() {
1905            let source = HttpSourceBuilder::new("test")
1906                .with_host("localhost")
1907                .with_adaptive_max_batch_size(500)
1908                .with_adaptive_enabled(true)
1909                .build()
1910                .unwrap();
1911
1912            // The adaptive config should be initialized from the http config
1913            assert_eq!(source.adaptive_config.max_batch_size, 500);
1914            assert!(source.adaptive_config.adaptive_enabled);
1915        }
1916
1917        #[test]
1918        fn test_adaptive_config_uses_defaults_when_not_specified() {
1919            let source = HttpSourceBuilder::new("test")
1920                .with_host("localhost")
1921                .build()
1922                .unwrap();
1923
1924            // Should use AdaptiveBatchConfig defaults
1925            let default_config = AdaptiveBatchConfig::default();
1926            assert_eq!(
1927                source.adaptive_config.max_batch_size,
1928                default_config.max_batch_size
1929            );
1930            assert_eq!(
1931                source.adaptive_config.min_batch_size,
1932                default_config.min_batch_size
1933            );
1934        }
1935    }
1936}