Skip to main content

drasi_source_http/
lib.rs

1#![allow(unexpected_cfgs)]
2// Copyright 2025 The Drasi Authors.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! HTTP Source Plugin for Drasi
17//!
18//! This plugin exposes HTTP endpoints for receiving data change events. It supports
19//! two mutually exclusive modes:
20//!
21//! - **Standard Mode**: Fixed-format `HttpSourceChange` endpoints with adaptive batching
22//! - **Webhook Mode**: Configurable routes with template-based payload transformation
23//!
24//! # Standard Mode
25//!
26//! When no `webhooks` configuration is present, the source operates in standard mode
27//! with the following endpoints:
28//!
29//! - **`POST /sources/{source_id}/events`** - Submit a single event
30//! - **`POST /sources/{source_id}/events/batch`** - Submit multiple events
31//! - **`GET /health`** - Health check endpoint
32//!
33//! ## Data Format
34//!
35//! Events are submitted as JSON using the `HttpSourceChange` format:
36//!
37//! ### Insert Operation
38//!
39//! ```json
40//! {
41//!     "operation": "insert",
42//!     "element": {
43//!         "type": "node",
44//!         "id": "user-123",
45//!         "labels": ["User"],
46//!         "properties": {
47//!             "name": "Alice",
48//!             "email": "alice@example.com"
49//!         }
50//!     },
51//!     "timestamp": 1699900000000000000
52//! }
53//! ```
54//!
55//! ### Update Operation
56//!
57//! ```json
58//! {
59//!     "operation": "update",
60//!     "element": {
61//!         "type": "node",
62//!         "id": "user-123",
63//!         "labels": ["User"],
64//!         "properties": {
65//!             "name": "Alice Updated"
66//!         }
67//!     }
68//! }
69//! ```
70//!
71//! ### Delete Operation
72//!
73//! ```json
74//! {
75//!     "operation": "delete",
76//!     "id": "user-123",
77//!     "labels": ["User"]
78//! }
79//! ```
80//!
81//! # Webhook Mode
82//!
83//! When a `webhooks` section is present in the configuration, the source operates
84//! in webhook mode. This enables:
85//!
86//! - Custom routes with path parameters (e.g., `/github/events`, `/users/:id/hooks`)
87//! - Multiple HTTP methods per route (POST, PUT, PATCH, DELETE, GET)
88//! - Handlebars template-based payload transformation
89//! - HMAC signature verification (GitHub, Shopify style)
90//! - Bearer token authentication
91//! - Support for JSON, XML, YAML, and plain text payloads
92//!
93//! ## Webhook Configuration Example
94//!
95//! ```yaml
96//! webhooks:
97//!   error_behavior: accept_and_log
98//!   routes:
99//!     - path: "/github/events"
100//!       methods: ["POST"]
101//!       auth:
102//!         signature:
103//!           type: hmac-sha256
104//!           secret_env: GITHUB_WEBHOOK_SECRET
105//!           header: X-Hub-Signature-256
106//!           prefix: "sha256="
107//!       error_behavior: reject
108//!       mappings:
109//!         - when:
110//!             header: X-GitHub-Event
111//!             equals: push
112//!           operation: insert
113//!           element_type: node
114//!           effective_from: "{{payload.head_commit.timestamp}}"
115//!           template:
116//!             id: "commit-{{payload.head_commit.id}}"
117//!             labels: ["Commit"]
118//!             properties:
119//!               message: "{{payload.head_commit.message}}"
120//!               author: "{{payload.head_commit.author.name}}"
121//! ```
122//!
123//! ## Relation Element
124//!
125//! ```json
126//! {
127//!     "operation": "insert",
128//!     "element": {
129//!         "type": "relation",
130//!         "id": "follows-1",
131//!         "labels": ["FOLLOWS"],
132//!         "from": "user-123",
133//!         "to": "user-456",
134//!         "properties": {}
135//!     }
136//! }
137//! ```
138//!
139//! # Batch Submission
140//!
141//! ```json
142//! {
143//!     "events": [
144//!         { "operation": "insert", ... },
145//!         { "operation": "update", ... }
146//!     ]
147//! }
148//! ```
149//!
150//! # Adaptive Batching
151//!
152//! The HTTP source includes adaptive batching to optimize throughput. Events are
153//! buffered and dispatched in batches, with batch size and timing adjusted based
154//! on throughput patterns.
155//!
156//! | Parameter | Default | Description |
157//! |-----------|---------|-------------|
158//! | `adaptive_enabled` | `true` | Enable/disable adaptive batching |
159//! | `adaptive_max_batch_size` | `1000` | Maximum events per batch |
160//! | `adaptive_min_batch_size` | `1` | Minimum events per batch |
161//! | `adaptive_max_wait_ms` | `100` | Maximum wait time before dispatching |
162//! | `adaptive_min_wait_ms` | `10` | Minimum wait time between batches |
163//!
164//! # Configuration
165//!
166//! | Field | Type | Default | Description |
167//! |-------|------|---------|-------------|
168//! | `host` | string | *required* | Host address to bind to |
169//! | `port` | u16 | `8080` | Port to listen on |
170//! | `endpoint` | string | None | Optional custom path prefix |
171//! | `timeout_ms` | u64 | `10000` | Request timeout in milliseconds |
172//!
173//! # Example Configuration (YAML)
174//!
175//! ```yaml
176//! source_type: http
177//! properties:
178//!   host: "0.0.0.0"
179//!   port: 8080
180//!   adaptive_enabled: true
181//!   adaptive_max_batch_size: 500
182//! ```
183//!
184//! # Usage Examples
185//!
186//! ## Rust
187//!
188//! ```rust,ignore
189//! use drasi_source_http::{HttpSource, HttpSourceBuilder};
190//!
191//! let config = HttpSourceBuilder::new()
192//!     .with_host("0.0.0.0")
193//!     .with_port(8080)
194//!     .with_adaptive_enabled(true)
195//!     .build();
196//!
197//! let source = Arc::new(HttpSource::new("http-source", config)?);
198//! drasi.add_source(source).await?;
199//! ```
200//!
201//! ## curl (Single Event)
202//!
203//! ```bash
204//! curl -X POST http://localhost:8080/sources/my-source/events \
205//!   -H "Content-Type: application/json" \
206//!   -d '{"operation":"insert","element":{"type":"node","id":"1","labels":["Test"],"properties":{}}}'
207//! ```
208//!
209//! ## curl (Batch)
210//!
211//! ```bash
212//! curl -X POST http://localhost:8080/sources/my-source/events/batch \
213//!   -H "Content-Type: application/json" \
214//!   -d '{"events":[...]}'
215//! ```
216
217pub mod config;
218pub mod descriptor;
219pub use config::HttpSourceConfig;
220
221mod adaptive_batcher;
222mod models;
223mod time;
224
225// Webhook support modules
226pub mod auth;
227pub mod content_parser;
228pub mod route_matcher;
229pub mod template_engine;
230
231// Export HTTP source models and conversion
232pub use models::{convert_http_to_source_change, HttpElement, HttpSourceChange};
233
234use anyhow::Result;
235use async_trait::async_trait;
236use axum::{
237    body::Bytes,
238    extract::{Path, State},
239    http::{header, Method, StatusCode},
240    response::IntoResponse,
241    routing::{delete, get, post, put},
242    Json, Router,
243};
244use log::{debug, error, info, trace, warn};
245use serde::{Deserialize, Serialize};
246use std::collections::HashMap;
247use std::sync::Arc;
248use std::time::Duration;
249use tokio::sync::mpsc;
250use tokio::time::timeout;
251use tower_http::cors::{Any, CorsLayer};
252
253use drasi_lib::channels::{ComponentType, *};
254use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
255use drasi_lib::Source;
256use tracing::Instrument;
257
258use crate::adaptive_batcher::{AdaptiveBatchConfig, AdaptiveBatcher};
259use crate::auth::{verify_auth, AuthResult};
260use crate::config::{CorsConfig, ErrorBehavior, WebhookConfig};
261use crate::content_parser::{parse_content, ContentType};
262use crate::route_matcher::{convert_method, find_matching_mappings, headers_to_map, RouteMatcher};
263use crate::template_engine::{TemplateContext, TemplateEngine};
264
265/// Response for event submission
266#[derive(Debug, Serialize, Deserialize)]
267pub struct EventResponse {
268    pub success: bool,
269    pub message: String,
270    #[serde(skip_serializing_if = "Option::is_none")]
271    pub error: Option<String>,
272}
273
274/// HTTP source with configurable adaptive batching.
275///
276/// This source exposes HTTP endpoints for receiving data change events.
277/// It supports both single-event and batch submission modes, with adaptive
278/// batching for optimized throughput.
279///
280/// # Fields
281///
282/// - `base`: Common source functionality (dispatchers, status, lifecycle)
283/// - `config`: HTTP-specific configuration (host, port, timeout)
284/// - `adaptive_config`: Adaptive batching settings for throughput optimization
285pub struct HttpSource {
286    /// Base source implementation providing common functionality
287    base: SourceBase,
288    /// HTTP source configuration
289    config: HttpSourceConfig,
290    /// Adaptive batching configuration for throughput optimization
291    adaptive_config: AdaptiveBatchConfig,
292}
293
294/// Batch event request that can accept multiple events
295#[derive(Debug, Clone, Serialize, Deserialize)]
296pub struct BatchEventRequest {
297    pub events: Vec<HttpSourceChange>,
298}
299
300/// HTTP source app state with batching channel.
301///
302/// Shared state passed to Axum route handlers.
303#[derive(Clone)]
304struct HttpAppState {
305    /// The source ID for validation against incoming requests
306    source_id: String,
307    /// Channel for sending events to the adaptive batcher
308    batch_tx: mpsc::Sender<SourceChangeEvent>,
309    /// Webhook configuration (if in webhook mode)
310    webhook_config: Option<Arc<WebhookState>>,
311}
312
313/// State for webhook mode processing
314struct WebhookState {
315    /// Webhook configuration
316    config: WebhookConfig,
317    /// Route matcher for incoming requests
318    route_matcher: RouteMatcher,
319    /// Template engine for payload transformation
320    template_engine: TemplateEngine,
321}
322
323impl HttpSource {
324    /// Create a new HTTP source.
325    ///
326    /// The event channel is automatically injected when the source is added
327    /// to DrasiLib via `add_source()`.
328    ///
329    /// # Arguments
330    ///
331    /// * `id` - Unique identifier for this source instance
332    /// * `config` - HTTP source configuration
333    ///
334    /// # Returns
335    ///
336    /// A new `HttpSource` instance, or an error if construction fails.
337    ///
338    /// # Errors
339    ///
340    /// Returns an error if the base source cannot be initialized.
341    ///
342    /// # Example
343    ///
344    /// ```rust,ignore
345    /// use drasi_source_http::{HttpSource, HttpSourceBuilder};
346    ///
347    /// let config = HttpSourceBuilder::new()
348    ///     .with_host("0.0.0.0")
349    ///     .with_port(8080)
350    ///     .build();
351    ///
352    /// let source = HttpSource::new("my-http-source", config)?;
353    /// ```
354    pub fn new(id: impl Into<String>, config: HttpSourceConfig) -> Result<Self> {
355        let id = id.into();
356        let params = SourceBaseParams::new(id);
357
358        // Configure adaptive batching
359        let mut adaptive_config = AdaptiveBatchConfig::default();
360
361        // Allow overriding adaptive parameters from config
362        if let Some(max_batch) = config.adaptive_max_batch_size {
363            adaptive_config.max_batch_size = max_batch;
364        }
365        if let Some(min_batch) = config.adaptive_min_batch_size {
366            adaptive_config.min_batch_size = min_batch;
367        }
368        if let Some(max_wait_ms) = config.adaptive_max_wait_ms {
369            adaptive_config.max_wait_time = Duration::from_millis(max_wait_ms);
370        }
371        if let Some(min_wait_ms) = config.adaptive_min_wait_ms {
372            adaptive_config.min_wait_time = Duration::from_millis(min_wait_ms);
373        }
374        if let Some(window_secs) = config.adaptive_window_secs {
375            adaptive_config.throughput_window = Duration::from_secs(window_secs);
376        }
377        if let Some(enabled) = config.adaptive_enabled {
378            adaptive_config.adaptive_enabled = enabled;
379        }
380
381        Ok(Self {
382            base: SourceBase::new(params)?,
383            config,
384            adaptive_config,
385        })
386    }
387
388    /// Create a new HTTP source with custom dispatch settings.
389    ///
390    /// The event channel is automatically injected when the source is added
391    /// to DrasiLib via `add_source()`.
392    ///
393    /// # Arguments
394    ///
395    /// * `id` - Unique identifier for this source instance
396    /// * `config` - HTTP source configuration
397    /// * `dispatch_mode` - Optional dispatch mode (Channel, Direct, etc.)
398    /// * `dispatch_buffer_capacity` - Optional buffer capacity for channel dispatch
399    ///
400    /// # Returns
401    ///
402    /// A new `HttpSource` instance with custom dispatch settings.
403    ///
404    /// # Errors
405    ///
406    /// Returns an error if the base source cannot be initialized.
407    pub fn with_dispatch(
408        id: impl Into<String>,
409        config: HttpSourceConfig,
410        dispatch_mode: Option<DispatchMode>,
411        dispatch_buffer_capacity: Option<usize>,
412    ) -> Result<Self> {
413        let id = id.into();
414        let mut params = SourceBaseParams::new(id);
415        if let Some(mode) = dispatch_mode {
416            params = params.with_dispatch_mode(mode);
417        }
418        if let Some(capacity) = dispatch_buffer_capacity {
419            params = params.with_dispatch_buffer_capacity(capacity);
420        }
421
422        let mut adaptive_config = AdaptiveBatchConfig::default();
423
424        if let Some(max_batch) = config.adaptive_max_batch_size {
425            adaptive_config.max_batch_size = max_batch;
426        }
427        if let Some(min_batch) = config.adaptive_min_batch_size {
428            adaptive_config.min_batch_size = min_batch;
429        }
430        if let Some(max_wait_ms) = config.adaptive_max_wait_ms {
431            adaptive_config.max_wait_time = Duration::from_millis(max_wait_ms);
432        }
433        if let Some(min_wait_ms) = config.adaptive_min_wait_ms {
434            adaptive_config.min_wait_time = Duration::from_millis(min_wait_ms);
435        }
436        if let Some(window_secs) = config.adaptive_window_secs {
437            adaptive_config.throughput_window = Duration::from_secs(window_secs);
438        }
439        if let Some(enabled) = config.adaptive_enabled {
440            adaptive_config.adaptive_enabled = enabled;
441        }
442
443        Ok(Self {
444            base: SourceBase::new(params)?,
445            config,
446            adaptive_config,
447        })
448    }
449
450    /// Handle a single event submission from `POST /sources/{source_id}/events`.
451    ///
452    /// Validates the source ID matches this source and converts the HTTP event
453    /// to a source change before sending to the adaptive batcher.
454    async fn handle_single_event(
455        Path(source_id): Path<String>,
456        State(state): State<HttpAppState>,
457        Json(event): Json<HttpSourceChange>,
458    ) -> Result<impl IntoResponse, (StatusCode, Json<EventResponse>)> {
459        debug!("[{source_id}] HTTP endpoint received single event: {event:?}");
460        Self::process_events(&source_id, &state, vec![event]).await
461    }
462
463    /// Handle a batch event submission from `POST /sources/{source_id}/events/batch`.
464    ///
465    /// Validates the source ID and processes all events in the batch,
466    /// returning partial success if some events fail.
467    async fn handle_batch_events(
468        Path(source_id): Path<String>,
469        State(state): State<HttpAppState>,
470        Json(batch): Json<BatchEventRequest>,
471    ) -> Result<impl IntoResponse, (StatusCode, Json<EventResponse>)> {
472        debug!(
473            "[{}] HTTP endpoint received batch of {} events",
474            source_id,
475            batch.events.len()
476        );
477        Self::process_events(&source_id, &state, batch.events).await
478    }
479
480    /// Process a list of events, converting and sending them to the batcher.
481    ///
482    /// # Arguments
483    ///
484    /// * `source_id` - Source ID from the request path
485    /// * `state` - Shared app state containing the batch channel
486    /// * `events` - List of HTTP source changes to process
487    ///
488    /// # Returns
489    ///
490    /// Success response with count of processed events, or error if all fail.
491    async fn process_events(
492        source_id: &str,
493        state: &HttpAppState,
494        events: Vec<HttpSourceChange>,
495    ) -> Result<impl IntoResponse, (StatusCode, Json<EventResponse>)> {
496        trace!("[{}] Processing {} events", source_id, events.len());
497
498        if source_id != state.source_id {
499            error!(
500                "[{}] Source name mismatch. Expected '{}', got '{}'",
501                state.source_id, state.source_id, source_id
502            );
503            return Err((
504                StatusCode::BAD_REQUEST,
505                Json(EventResponse {
506                    success: false,
507                    message: "Source name mismatch".to_string(),
508                    error: Some(format!(
509                        "Expected source '{}', got '{}'",
510                        state.source_id, source_id
511                    )),
512                }),
513            ));
514        }
515
516        let mut success_count = 0;
517        let mut error_count = 0;
518        let mut last_error = None;
519
520        for (idx, event) in events.iter().enumerate() {
521            match convert_http_to_source_change(event, source_id) {
522                Ok(source_change) => {
523                    let change_event = SourceChangeEvent {
524                        source_id: source_id.to_string(),
525                        change: source_change,
526                        timestamp: chrono::Utc::now(),
527                    };
528
529                    if let Err(e) = state.batch_tx.send(change_event).await {
530                        error!(
531                            "[{}] Failed to send event {} to batch channel: {}",
532                            state.source_id,
533                            idx + 1,
534                            e
535                        );
536                        error_count += 1;
537                        last_error = Some("Internal channel error".to_string());
538                    } else {
539                        success_count += 1;
540                    }
541                }
542                Err(e) => {
543                    error!(
544                        "[{}] Failed to convert event {}: {}",
545                        state.source_id,
546                        idx + 1,
547                        e
548                    );
549                    error_count += 1;
550                    last_error = Some(e.to_string());
551                }
552            }
553        }
554
555        debug!(
556            "[{source_id}] Event processing complete: {success_count} succeeded, {error_count} failed"
557        );
558
559        if error_count > 0 && success_count == 0 {
560            Err((
561                StatusCode::BAD_REQUEST,
562                Json(EventResponse {
563                    success: false,
564                    message: format!("All {error_count} events failed"),
565                    error: last_error,
566                }),
567            ))
568        } else if error_count > 0 {
569            Ok(Json(EventResponse {
570                success: true,
571                message: format!(
572                    "Processed {success_count} events successfully, {error_count} failed"
573                ),
574                error: last_error,
575            }))
576        } else {
577            Ok(Json(EventResponse {
578                success: true,
579                message: format!("All {success_count} events processed successfully"),
580                error: None,
581            }))
582        }
583    }
584
585    async fn health_check() -> impl IntoResponse {
586        Json(serde_json::json!({
587            "status": "healthy",
588            "service": "http-source",
589            "features": ["adaptive-batching", "batch-endpoint", "webhooks"]
590        }))
591    }
592
593    /// Handle webhook requests
594    ///
595    /// This handler processes requests in webhook mode, matching against
596    /// configured routes and transforming payloads using templates.
597    async fn handle_webhook(
598        method: axum::http::Method,
599        uri: axum::http::Uri,
600        headers: axum::http::HeaderMap,
601        State(state): State<HttpAppState>,
602        body: Bytes,
603    ) -> impl IntoResponse {
604        let path = uri.path();
605        let source_id = &state.source_id;
606
607        debug!("[{source_id}] Webhook received: {method} {path}");
608
609        // Get webhook state - should always be present in webhook mode
610        let webhook_state = match &state.webhook_config {
611            Some(ws) => ws,
612            None => {
613                error!("[{source_id}] Webhook handler called but no webhook config present");
614                return (
615                    StatusCode::INTERNAL_SERVER_ERROR,
616                    Json(EventResponse {
617                        success: false,
618                        message: "Internal configuration error".to_string(),
619                        error: Some("Webhook mode not properly configured".to_string()),
620                    }),
621                );
622            }
623        };
624
625        // Convert method
626        let http_method = match convert_method(&method) {
627            Some(m) => m,
628            None => {
629                return handle_error(
630                    &webhook_state.config.error_behavior,
631                    source_id,
632                    StatusCode::METHOD_NOT_ALLOWED,
633                    "Method not supported",
634                    None,
635                );
636            }
637        };
638
639        // Match route
640        let route_match = match webhook_state.route_matcher.match_route(
641            path,
642            &http_method,
643            &webhook_state.config.routes,
644        ) {
645            Some(rm) => rm,
646            None => {
647                debug!("[{source_id}] No matching route for {method} {path}");
648                return handle_error(
649                    &webhook_state.config.error_behavior,
650                    source_id,
651                    StatusCode::NOT_FOUND,
652                    "No matching route",
653                    None,
654                );
655            }
656        };
657
658        let route = route_match.route;
659        let error_behavior = route
660            .error_behavior
661            .as_ref()
662            .unwrap_or(&webhook_state.config.error_behavior);
663
664        // Verify authentication
665        let auth_result = verify_auth(route.auth.as_ref(), &headers, &body);
666        if let AuthResult::Failed(reason) = auth_result {
667            warn!("[{source_id}] Authentication failed for {path}: {reason}");
668            return handle_error(
669                error_behavior,
670                source_id,
671                StatusCode::UNAUTHORIZED,
672                "Authentication failed",
673                Some(&reason),
674            );
675        }
676
677        // Parse content
678        let content_type = ContentType::from_header(
679            headers
680                .get(axum::http::header::CONTENT_TYPE)
681                .and_then(|v| v.to_str().ok()),
682        );
683
684        let payload = match parse_content(&body, content_type) {
685            Ok(p) => p,
686            Err(e) => {
687                warn!("[{source_id}] Failed to parse payload: {e}");
688                return handle_error(
689                    error_behavior,
690                    source_id,
691                    StatusCode::BAD_REQUEST,
692                    "Failed to parse payload",
693                    Some(&e.to_string()),
694                );
695            }
696        };
697
698        // Build template context
699        let headers_map = headers_to_map(&headers);
700        let query_map = parse_query_string(uri.query());
701
702        let context = TemplateContext {
703            payload: payload.clone(),
704            route: route_match.path_params,
705            query: query_map,
706            headers: headers_map.clone(),
707            method: method.to_string(),
708            path: path.to_string(),
709            source_id: source_id.clone(),
710        };
711
712        // Find matching mappings
713        let matching_mappings = find_matching_mappings(&route.mappings, &headers_map, &payload);
714
715        if matching_mappings.is_empty() {
716            debug!("[{source_id}] No matching mappings for request");
717            return handle_error(
718                error_behavior,
719                source_id,
720                StatusCode::BAD_REQUEST,
721                "No matching mapping for request",
722                None,
723            );
724        }
725
726        // Process each matching mapping
727        let mut success_count = 0;
728        let mut error_count = 0;
729        let mut last_error = None;
730
731        for mapping in matching_mappings {
732            match webhook_state
733                .template_engine
734                .process_mapping(mapping, &context, source_id)
735            {
736                Ok(source_change) => {
737                    let event = SourceChangeEvent {
738                        source_id: source_id.clone(),
739                        change: source_change,
740                        timestamp: chrono::Utc::now(),
741                    };
742
743                    if let Err(e) = state.batch_tx.send(event).await {
744                        error!("[{source_id}] Failed to send event to batcher: {e}");
745                        error_count += 1;
746                        last_error = Some(format!("Failed to queue event: {e}"));
747                    } else {
748                        success_count += 1;
749                    }
750                }
751                Err(e) => {
752                    warn!("[{source_id}] Failed to process mapping: {e}");
753                    error_count += 1;
754                    last_error = Some(e.to_string());
755                }
756            }
757        }
758
759        debug!("[{source_id}] Webhook processing complete: {success_count} succeeded, {error_count} failed");
760
761        if error_count > 0 && success_count == 0 {
762            handle_error(
763                error_behavior,
764                source_id,
765                StatusCode::BAD_REQUEST,
766                &format!("All {error_count} mappings failed"),
767                last_error.as_deref(),
768            )
769        } else if error_count > 0 {
770            (
771                StatusCode::OK,
772                Json(EventResponse {
773                    success: true,
774                    message: format!("Processed {success_count} events, {error_count} failed"),
775                    error: last_error,
776                }),
777            )
778        } else {
779            (
780                StatusCode::OK,
781                Json(EventResponse {
782                    success: true,
783                    message: format!("Processed {success_count} events successfully"),
784                    error: None,
785                }),
786            )
787        }
788    }
789
790    async fn run_adaptive_batcher(
791        batch_rx: mpsc::Receiver<SourceChangeEvent>,
792        dispatchers: Arc<
793            tokio::sync::RwLock<
794                Vec<
795                    Box<
796                        dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync,
797                    >,
798                >,
799            >,
800        >,
801        adaptive_config: AdaptiveBatchConfig,
802        source_id: String,
803    ) {
804        let mut batcher = AdaptiveBatcher::new(batch_rx, adaptive_config.clone());
805        let mut total_events = 0u64;
806        let mut total_batches = 0u64;
807
808        info!("[{source_id}] Adaptive HTTP batcher started with config: {adaptive_config:?}");
809
810        while let Some(batch) = batcher.next_batch().await {
811            if batch.is_empty() {
812                debug!("[{source_id}] Batcher received empty batch, skipping");
813                continue;
814            }
815
816            let batch_size = batch.len();
817            total_events += batch_size as u64;
818            total_batches += 1;
819
820            debug!(
821                "[{source_id}] Batcher forwarding batch #{total_batches} with {batch_size} events to dispatchers"
822            );
823
824            let mut sent_count = 0;
825            let mut failed_count = 0;
826            for (idx, event) in batch.into_iter().enumerate() {
827                debug!(
828                    "[{}] Batch #{}, dispatching event {}/{}",
829                    source_id,
830                    total_batches,
831                    idx + 1,
832                    batch_size
833                );
834
835                let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
836                profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
837
838                let wrapper = SourceEventWrapper::with_profiling(
839                    event.source_id.clone(),
840                    SourceEvent::Change(event.change),
841                    event.timestamp,
842                    profiling,
843                );
844
845                if let Err(e) =
846                    SourceBase::dispatch_from_task(dispatchers.clone(), wrapper.clone(), &source_id)
847                        .await
848                {
849                    error!(
850                        "[{}] Batch #{}, failed to dispatch event {}/{} (no subscribers): {}",
851                        source_id,
852                        total_batches,
853                        idx + 1,
854                        batch_size,
855                        e
856                    );
857                    failed_count += 1;
858                } else {
859                    debug!(
860                        "[{}] Batch #{}, successfully dispatched event {}/{}",
861                        source_id,
862                        total_batches,
863                        idx + 1,
864                        batch_size
865                    );
866                    sent_count += 1;
867                }
868            }
869
870            debug!(
871                "[{source_id}] Batch #{total_batches} complete: {sent_count} dispatched, {failed_count} failed"
872            );
873
874            if total_batches.is_multiple_of(100) {
875                info!(
876                    "[{}] Adaptive HTTP metrics - Batches: {}, Events: {}, Avg batch size: {:.1}",
877                    source_id,
878                    total_batches,
879                    total_events,
880                    total_events as f64 / total_batches as f64
881                );
882            }
883        }
884
885        info!(
886            "[{source_id}] Adaptive HTTP batcher stopped - Total batches: {total_batches}, Total events: {total_events}"
887        );
888    }
889}
890
891#[async_trait]
892impl Source for HttpSource {
893    fn id(&self) -> &str {
894        &self.base.id
895    }
896
897    fn type_name(&self) -> &str {
898        "http"
899    }
900
901    fn properties(&self) -> HashMap<String, serde_json::Value> {
902        let mut props = HashMap::new();
903        props.insert(
904            "host".to_string(),
905            serde_json::Value::String(self.config.host.clone()),
906        );
907        props.insert(
908            "port".to_string(),
909            serde_json::Value::Number(self.config.port.into()),
910        );
911        if let Some(ref endpoint) = self.config.endpoint {
912            props.insert(
913                "endpoint".to_string(),
914                serde_json::Value::String(endpoint.clone()),
915            );
916        }
917        props
918    }
919
920    fn auto_start(&self) -> bool {
921        self.base.get_auto_start()
922    }
923
924    async fn start(&self) -> Result<()> {
925        info!("[{}] Starting adaptive HTTP source", self.base.id);
926
927        self.base.set_status(ComponentStatus::Starting).await;
928        self.base
929            .send_component_event(
930                ComponentStatus::Starting,
931                Some("Starting adaptive HTTP source".to_string()),
932            )
933            .await?;
934
935        let host = self.config.host.clone();
936        let port = self.config.port;
937
938        // Create batch channel with capacity based on batch configuration
939        let batch_channel_capacity = self.adaptive_config.recommended_channel_capacity();
940        let (batch_tx, batch_rx) = mpsc::channel(batch_channel_capacity);
941        info!(
942            "[{}] HttpSource using batch channel capacity: {} (max_batch_size: {} x 5)",
943            self.base.id, batch_channel_capacity, self.adaptive_config.max_batch_size
944        );
945
946        // Start adaptive batcher task
947        let adaptive_config = self.adaptive_config.clone();
948        let source_id = self.base.id.clone();
949        let dispatchers = self.base.dispatchers.clone();
950
951        // Get instance_id from context for log routing isolation
952        let instance_id = self
953            .base
954            .context()
955            .await
956            .map(|c| c.instance_id)
957            .unwrap_or_default();
958
959        info!("[{source_id}] Starting adaptive batcher task");
960        let source_id_for_span = source_id.clone();
961        let span = tracing::info_span!(
962            "http_adaptive_batcher",
963            instance_id = %instance_id,
964            component_id = %source_id_for_span,
965            component_type = "source"
966        );
967        tokio::spawn(
968            async move {
969                Self::run_adaptive_batcher(
970                    batch_rx,
971                    dispatchers,
972                    adaptive_config,
973                    source_id.clone(),
974                )
975                .await
976            }
977            .instrument(span),
978        );
979
980        // Create app state
981        let webhook_state = if let Some(ref webhook_config) = self.config.webhooks {
982            info!(
983                "[{}] Webhook mode enabled with {} routes",
984                self.base.id,
985                webhook_config.routes.len()
986            );
987            Some(Arc::new(WebhookState {
988                config: webhook_config.clone(),
989                route_matcher: RouteMatcher::new(&webhook_config.routes),
990                template_engine: TemplateEngine::new(),
991            }))
992        } else {
993            info!("[{}] Standard mode enabled", self.base.id);
994            None
995        };
996
997        let state = HttpAppState {
998            source_id: self.base.id.clone(),
999            batch_tx,
1000            webhook_config: webhook_state,
1001        };
1002
1003        // Build router based on mode
1004        let app = if self.config.is_webhook_mode() {
1005            // Webhook mode: only health check + catch-all webhook handler
1006            let router = Router::new()
1007                .route("/health", get(Self::health_check))
1008                .fallback(Self::handle_webhook)
1009                .with_state(state);
1010
1011            // Apply CORS if configured
1012            if let Some(ref webhooks) = self.config.webhooks {
1013                if let Some(ref cors_config) = webhooks.cors {
1014                    if cors_config.enabled {
1015                        info!("[{}] CORS enabled for webhook endpoints", self.base.id);
1016                        router.layer(build_cors_layer(cors_config))
1017                    } else {
1018                        router
1019                    }
1020                } else {
1021                    router
1022                }
1023            } else {
1024                router
1025            }
1026        } else {
1027            // Standard mode: original endpoints
1028            Router::new()
1029                .route("/health", get(Self::health_check))
1030                .route(
1031                    "/sources/:source_id/events",
1032                    post(Self::handle_single_event),
1033                )
1034                .route(
1035                    "/sources/:source_id/events/batch",
1036                    post(Self::handle_batch_events),
1037                )
1038                .with_state(state)
1039        };
1040
1041        // Create shutdown channel
1042        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
1043
1044        let host_clone = host.clone();
1045
1046        // Start server
1047        let (error_tx, error_rx) = tokio::sync::oneshot::channel();
1048        let source_id = self.base.id.clone();
1049        let source_id_for_span = source_id.clone();
1050        let span = tracing::info_span!(
1051            "http_source_server",
1052            instance_id = %instance_id,
1053            component_id = %source_id_for_span,
1054            component_type = "source"
1055        );
1056        let server_handle = tokio::spawn(
1057            async move {
1058                let addr = format!("{host}:{port}");
1059                info!("[{source_id}] Adaptive HTTP source attempting to bind to {addr}");
1060
1061                let listener = match tokio::net::TcpListener::bind(&addr).await {
1062                    Ok(listener) => {
1063                        info!("[{source_id}] Adaptive HTTP source successfully listening on {addr}");
1064                        listener
1065                    }
1066                    Err(e) => {
1067                        error!("[{source_id}] Failed to bind HTTP server to {addr}: {e}");
1068                        let _ = error_tx.send(format!(
1069                        "Failed to bind HTTP server to {addr}: {e}. Common causes: port already in use, insufficient permissions"
1070                    ));
1071                    return;
1072                }
1073            };
1074
1075                if let Err(e) = axum::serve(listener, app)
1076                    .with_graceful_shutdown(async move {
1077                        let _ = shutdown_rx.await;
1078                    })
1079                    .await
1080                {
1081                    error!("[{source_id}] HTTP server error: {e}");
1082                }
1083            }
1084            .instrument(span),
1085        );
1086
1087        *self.base.task_handle.write().await = Some(server_handle);
1088        *self.base.shutdown_tx.write().await = Some(shutdown_tx);
1089
1090        // Check for startup errors with a short timeout
1091        match timeout(Duration::from_millis(500), error_rx).await {
1092            Ok(Ok(error_msg)) => {
1093                self.base.set_status(ComponentStatus::Error).await;
1094                return Err(anyhow::anyhow!("{error_msg}"));
1095            }
1096            _ => {
1097                self.base.set_status(ComponentStatus::Running).await;
1098            }
1099        }
1100
1101        self.base
1102            .send_component_event(
1103                ComponentStatus::Running,
1104                Some(format!(
1105                    "Adaptive HTTP source running on {host_clone}:{port} with batch support"
1106                )),
1107            )
1108            .await?;
1109
1110        Ok(())
1111    }
1112
1113    async fn stop(&self) -> Result<()> {
1114        info!("[{}] Stopping adaptive HTTP source", self.base.id);
1115
1116        self.base.set_status(ComponentStatus::Stopping).await;
1117        self.base
1118            .send_component_event(
1119                ComponentStatus::Stopping,
1120                Some("Stopping adaptive HTTP source".to_string()),
1121            )
1122            .await?;
1123
1124        if let Some(tx) = self.base.shutdown_tx.write().await.take() {
1125            let _ = tx.send(());
1126        }
1127
1128        if let Some(handle) = self.base.task_handle.write().await.take() {
1129            let _ = timeout(Duration::from_secs(5), handle).await;
1130        }
1131
1132        self.base.set_status(ComponentStatus::Stopped).await;
1133        self.base
1134            .send_component_event(
1135                ComponentStatus::Stopped,
1136                Some("Adaptive HTTP source stopped".to_string()),
1137            )
1138            .await?;
1139
1140        Ok(())
1141    }
1142
1143    async fn status(&self) -> ComponentStatus {
1144        self.base.get_status().await
1145    }
1146
1147    async fn subscribe(
1148        &self,
1149        settings: drasi_lib::config::SourceSubscriptionSettings,
1150    ) -> Result<SubscriptionResponse> {
1151        self.base.subscribe_with_bootstrap(&settings, "HTTP").await
1152    }
1153
1154    fn as_any(&self) -> &dyn std::any::Any {
1155        self
1156    }
1157
1158    async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
1159        self.base.initialize(context).await;
1160    }
1161
1162    async fn set_bootstrap_provider(
1163        &self,
1164        provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
1165    ) {
1166        self.base.set_bootstrap_provider(provider).await;
1167    }
1168}
1169
1170/// Builder for HttpSource instances.
1171///
1172/// Provides a fluent API for constructing HTTP sources with sensible defaults
1173/// and adaptive batching settings. The builder takes the source ID at construction
1174/// and returns a fully constructed `HttpSource` from `build()`.
1175///
1176/// # Example
1177///
1178/// ```rust,ignore
1179/// use drasi_source_http::HttpSource;
1180///
1181/// let source = HttpSource::builder("my-source")
1182///     .with_host("0.0.0.0")
1183///     .with_port(8080)
1184///     .with_adaptive_enabled(true)
1185///     .with_bootstrap_provider(my_provider)
1186///     .build()?;
1187/// ```
1188pub struct HttpSourceBuilder {
1189    id: String,
1190    host: String,
1191    port: u16,
1192    endpoint: Option<String>,
1193    timeout_ms: u64,
1194    adaptive_max_batch_size: Option<usize>,
1195    adaptive_min_batch_size: Option<usize>,
1196    adaptive_max_wait_ms: Option<u64>,
1197    adaptive_min_wait_ms: Option<u64>,
1198    adaptive_window_secs: Option<u64>,
1199    adaptive_enabled: Option<bool>,
1200    webhooks: Option<WebhookConfig>,
1201    dispatch_mode: Option<DispatchMode>,
1202    dispatch_buffer_capacity: Option<usize>,
1203    bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
1204    auto_start: bool,
1205}
1206
1207impl HttpSourceBuilder {
1208    /// Create a new HTTP source builder with the given source ID.
1209    ///
1210    /// # Arguments
1211    ///
1212    /// * `id` - Unique identifier for the source instance
1213    pub fn new(id: impl Into<String>) -> Self {
1214        Self {
1215            id: id.into(),
1216            host: String::new(),
1217            port: 8080,
1218            endpoint: None,
1219            timeout_ms: 10000,
1220            adaptive_max_batch_size: None,
1221            adaptive_min_batch_size: None,
1222            adaptive_max_wait_ms: None,
1223            adaptive_min_wait_ms: None,
1224            adaptive_window_secs: None,
1225            adaptive_enabled: None,
1226            webhooks: None,
1227            dispatch_mode: None,
1228            dispatch_buffer_capacity: None,
1229            bootstrap_provider: None,
1230            auto_start: true,
1231        }
1232    }
1233
1234    /// Set the HTTP host
1235    pub fn with_host(mut self, host: impl Into<String>) -> Self {
1236        self.host = host.into();
1237        self
1238    }
1239
1240    /// Set the HTTP port
1241    pub fn with_port(mut self, port: u16) -> Self {
1242        self.port = port;
1243        self
1244    }
1245
1246    /// Set the endpoint path
1247    pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
1248        self.endpoint = Some(endpoint.into());
1249        self
1250    }
1251
1252    /// Set the request timeout in milliseconds
1253    pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
1254        self.timeout_ms = timeout_ms;
1255        self
1256    }
1257
1258    /// Set the adaptive batching maximum batch size
1259    pub fn with_adaptive_max_batch_size(mut self, size: usize) -> Self {
1260        self.adaptive_max_batch_size = Some(size);
1261        self
1262    }
1263
1264    /// Set the adaptive batching minimum batch size
1265    pub fn with_adaptive_min_batch_size(mut self, size: usize) -> Self {
1266        self.adaptive_min_batch_size = Some(size);
1267        self
1268    }
1269
1270    /// Set the adaptive batching maximum wait time in milliseconds
1271    pub fn with_adaptive_max_wait_ms(mut self, wait_ms: u64) -> Self {
1272        self.adaptive_max_wait_ms = Some(wait_ms);
1273        self
1274    }
1275
1276    /// Set the adaptive batching minimum wait time in milliseconds
1277    pub fn with_adaptive_min_wait_ms(mut self, wait_ms: u64) -> Self {
1278        self.adaptive_min_wait_ms = Some(wait_ms);
1279        self
1280    }
1281
1282    /// Set the adaptive batching throughput window in seconds
1283    pub fn with_adaptive_window_secs(mut self, secs: u64) -> Self {
1284        self.adaptive_window_secs = Some(secs);
1285        self
1286    }
1287
1288    /// Enable or disable adaptive batching
1289    pub fn with_adaptive_enabled(mut self, enabled: bool) -> Self {
1290        self.adaptive_enabled = Some(enabled);
1291        self
1292    }
1293
1294    /// Set the dispatch mode for event routing.
1295    pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
1296        self.dispatch_mode = Some(mode);
1297        self
1298    }
1299
1300    /// Set the dispatch buffer capacity.
1301    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
1302        self.dispatch_buffer_capacity = Some(capacity);
1303        self
1304    }
1305
1306    /// Set the bootstrap provider for initial data delivery.
1307    pub fn with_bootstrap_provider(
1308        mut self,
1309        provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
1310    ) -> Self {
1311        self.bootstrap_provider = Some(Box::new(provider));
1312        self
1313    }
1314
1315    /// Set whether this source should auto-start when DrasiLib starts.
1316    ///
1317    /// Default is `true`. Set to `false` if this source should only be
1318    /// started manually via `start_source()`.
1319    pub fn with_auto_start(mut self, auto_start: bool) -> Self {
1320        self.auto_start = auto_start;
1321        self
1322    }
1323
1324    /// Set the webhook configuration to enable webhook mode.
1325    ///
1326    /// When webhook mode is enabled, the standard `HttpSourceChange` endpoints
1327    /// are disabled and custom webhook routes are used instead.
1328    pub fn with_webhooks(mut self, webhooks: WebhookConfig) -> Self {
1329        self.webhooks = Some(webhooks);
1330        self
1331    }
1332
1333    /// Set the full configuration at once
1334    pub fn with_config(mut self, config: HttpSourceConfig) -> Self {
1335        self.host = config.host;
1336        self.port = config.port;
1337        self.endpoint = config.endpoint;
1338        self.timeout_ms = config.timeout_ms;
1339        self.adaptive_max_batch_size = config.adaptive_max_batch_size;
1340        self.adaptive_min_batch_size = config.adaptive_min_batch_size;
1341        self.adaptive_max_wait_ms = config.adaptive_max_wait_ms;
1342        self.adaptive_min_wait_ms = config.adaptive_min_wait_ms;
1343        self.adaptive_window_secs = config.adaptive_window_secs;
1344        self.adaptive_enabled = config.adaptive_enabled;
1345        self.webhooks = config.webhooks;
1346        self
1347    }
1348
1349    /// Build the HttpSource instance.
1350    ///
1351    /// # Returns
1352    ///
1353    /// A fully constructed `HttpSource`, or an error if construction fails.
1354    pub fn build(self) -> Result<HttpSource> {
1355        let config = HttpSourceConfig {
1356            host: self.host,
1357            port: self.port,
1358            endpoint: self.endpoint,
1359            timeout_ms: self.timeout_ms,
1360            adaptive_max_batch_size: self.adaptive_max_batch_size,
1361            adaptive_min_batch_size: self.adaptive_min_batch_size,
1362            adaptive_max_wait_ms: self.adaptive_max_wait_ms,
1363            adaptive_min_wait_ms: self.adaptive_min_wait_ms,
1364            adaptive_window_secs: self.adaptive_window_secs,
1365            adaptive_enabled: self.adaptive_enabled,
1366            webhooks: self.webhooks,
1367        };
1368
1369        // Build SourceBaseParams with all settings
1370        let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
1371        if let Some(mode) = self.dispatch_mode {
1372            params = params.with_dispatch_mode(mode);
1373        }
1374        if let Some(capacity) = self.dispatch_buffer_capacity {
1375            params = params.with_dispatch_buffer_capacity(capacity);
1376        }
1377        if let Some(provider) = self.bootstrap_provider {
1378            params = params.with_bootstrap_provider(provider);
1379        }
1380
1381        // Configure adaptive batching
1382        let mut adaptive_config = AdaptiveBatchConfig::default();
1383        if let Some(max_batch) = config.adaptive_max_batch_size {
1384            adaptive_config.max_batch_size = max_batch;
1385        }
1386        if let Some(min_batch) = config.adaptive_min_batch_size {
1387            adaptive_config.min_batch_size = min_batch;
1388        }
1389        if let Some(max_wait_ms) = config.adaptive_max_wait_ms {
1390            adaptive_config.max_wait_time = Duration::from_millis(max_wait_ms);
1391        }
1392        if let Some(min_wait_ms) = config.adaptive_min_wait_ms {
1393            adaptive_config.min_wait_time = Duration::from_millis(min_wait_ms);
1394        }
1395        if let Some(window_secs) = config.adaptive_window_secs {
1396            adaptive_config.throughput_window = Duration::from_secs(window_secs);
1397        }
1398        if let Some(enabled) = config.adaptive_enabled {
1399            adaptive_config.adaptive_enabled = enabled;
1400        }
1401
1402        Ok(HttpSource {
1403            base: SourceBase::new(params)?,
1404            config,
1405            adaptive_config,
1406        })
1407    }
1408}
1409
1410impl HttpSource {
1411    /// Create a builder for HttpSource with the given ID.
1412    ///
1413    /// This is the recommended way to construct an HttpSource.
1414    ///
1415    /// # Arguments
1416    ///
1417    /// * `id` - Unique identifier for the source instance
1418    ///
1419    /// # Example
1420    ///
1421    /// ```rust,ignore
1422    /// let source = HttpSource::builder("my-source")
1423    ///     .with_host("0.0.0.0")
1424    ///     .with_port(8080)
1425    ///     .with_bootstrap_provider(my_provider)
1426    ///     .build()?;
1427    /// ```
1428    pub fn builder(id: impl Into<String>) -> HttpSourceBuilder {
1429        HttpSourceBuilder::new(id)
1430    }
1431}
1432
1433/// Handle errors according to configured error behavior
1434fn handle_error(
1435    behavior: &ErrorBehavior,
1436    source_id: &str,
1437    status: StatusCode,
1438    message: &str,
1439    detail: Option<&str>,
1440) -> (StatusCode, Json<EventResponse>) {
1441    match behavior {
1442        ErrorBehavior::Reject => {
1443            debug!("[{source_id}] Rejecting request: {message}");
1444            (
1445                status,
1446                Json(EventResponse {
1447                    success: false,
1448                    message: message.to_string(),
1449                    error: detail.map(String::from),
1450                }),
1451            )
1452        }
1453        ErrorBehavior::AcceptAndLog => {
1454            warn!("[{source_id}] Accepting with error (logged): {message}");
1455            (
1456                StatusCode::OK,
1457                Json(EventResponse {
1458                    success: true,
1459                    message: format!("Accepted with warning: {message}"),
1460                    error: detail.map(String::from),
1461                }),
1462            )
1463        }
1464        ErrorBehavior::AcceptAndSkip => {
1465            trace!("[{source_id}] Accepting silently: {message}");
1466            (
1467                StatusCode::OK,
1468                Json(EventResponse {
1469                    success: true,
1470                    message: "Accepted".to_string(),
1471                    error: None,
1472                }),
1473            )
1474        }
1475    }
1476}
1477
1478/// Parse query string into a HashMap
1479fn parse_query_string(query: Option<&str>) -> HashMap<String, String> {
1480    query
1481        .map(|q| {
1482            q.split('&')
1483                .filter_map(|pair| {
1484                    let mut parts = pair.splitn(2, '=');
1485                    let key = parts.next()?;
1486                    let value = parts.next().unwrap_or("");
1487                    Some((urlencoding_decode(key), urlencoding_decode(value)))
1488                })
1489                .collect()
1490        })
1491        .unwrap_or_default()
1492}
1493
1494/// Simple URL decoding (handles %XX sequences) with proper UTF-8 handling
1495fn urlencoding_decode(s: &str) -> String {
1496    // Collect decoded bytes first, then convert to String to properly handle UTF-8
1497    let mut decoded: Vec<u8> = Vec::with_capacity(s.len());
1498    let mut chars = s.chars();
1499
1500    while let Some(c) = chars.next() {
1501        if c == '%' {
1502            let mut hex = String::new();
1503            if let Some(c1) = chars.next() {
1504                hex.push(c1);
1505            }
1506            if let Some(c2) = chars.next() {
1507                hex.push(c2);
1508            }
1509
1510            if hex.len() == 2 {
1511                if let Ok(byte) = u8::from_str_radix(&hex, 16) {
1512                    decoded.push(byte);
1513                    continue;
1514                }
1515            }
1516
1517            // If we couldn't decode a valid %XX sequence, keep the original text
1518            decoded.extend_from_slice(b"%");
1519            decoded.extend_from_slice(hex.as_bytes());
1520        } else if c == '+' {
1521            decoded.push(b' ');
1522        } else {
1523            // Encode the character as UTF-8 and append its bytes
1524            let mut buf = [0u8; 4];
1525            let encoded = c.encode_utf8(&mut buf);
1526            decoded.extend_from_slice(encoded.as_bytes());
1527        }
1528    }
1529
1530    // Convert bytes to string, replacing invalid UTF-8 sequences
1531    String::from_utf8_lossy(&decoded).into_owned()
1532}
1533
1534/// Build a CORS layer from configuration
1535fn build_cors_layer(cors_config: &CorsConfig) -> CorsLayer {
1536    let mut cors = CorsLayer::new();
1537
1538    // Configure allowed origins
1539    if cors_config.allow_origins.len() == 1 && cors_config.allow_origins[0] == "*" {
1540        cors = cors.allow_origin(Any);
1541    } else {
1542        let origins: Vec<_> = cors_config
1543            .allow_origins
1544            .iter()
1545            .filter_map(|o| o.parse().ok())
1546            .collect();
1547        cors = cors.allow_origin(origins);
1548    }
1549
1550    // Configure allowed methods
1551    let methods: Vec<Method> = cors_config
1552        .allow_methods
1553        .iter()
1554        .filter_map(|m| m.parse().ok())
1555        .collect();
1556    cors = cors.allow_methods(methods);
1557
1558    // Configure allowed headers
1559    if cors_config.allow_headers.len() == 1 && cors_config.allow_headers[0] == "*" {
1560        cors = cors.allow_headers(Any);
1561    } else {
1562        let headers: Vec<header::HeaderName> = cors_config
1563            .allow_headers
1564            .iter()
1565            .filter_map(|h| h.parse().ok())
1566            .collect();
1567        cors = cors.allow_headers(headers);
1568    }
1569
1570    // Configure exposed headers
1571    if !cors_config.expose_headers.is_empty() {
1572        let exposed: Vec<header::HeaderName> = cors_config
1573            .expose_headers
1574            .iter()
1575            .filter_map(|h| h.parse().ok())
1576            .collect();
1577        cors = cors.expose_headers(exposed);
1578    }
1579
1580    // Configure credentials
1581    if cors_config.allow_credentials {
1582        cors = cors.allow_credentials(true);
1583    }
1584
1585    // Configure max age
1586    cors = cors.max_age(Duration::from_secs(cors_config.max_age));
1587
1588    cors
1589}
1590
1591#[cfg(test)]
1592mod tests {
1593    use super::*;
1594
1595    mod construction {
1596        use super::*;
1597
1598        #[test]
1599        fn test_builder_with_valid_config() {
1600            let source = HttpSourceBuilder::new("test-source")
1601                .with_host("localhost")
1602                .with_port(8080)
1603                .build();
1604            assert!(source.is_ok());
1605        }
1606
1607        #[test]
1608        fn test_builder_with_custom_config() {
1609            let source = HttpSourceBuilder::new("http-source")
1610                .with_host("0.0.0.0")
1611                .with_port(9000)
1612                .with_endpoint("/events")
1613                .build()
1614                .unwrap();
1615            assert_eq!(source.id(), "http-source");
1616        }
1617
1618        #[test]
1619        fn test_with_dispatch_creates_source() {
1620            let config = HttpSourceConfig {
1621                host: "localhost".to_string(),
1622                port: 8080,
1623                endpoint: None,
1624                timeout_ms: 10000,
1625                adaptive_max_batch_size: None,
1626                adaptive_min_batch_size: None,
1627                adaptive_max_wait_ms: None,
1628                adaptive_min_wait_ms: None,
1629                adaptive_window_secs: None,
1630                adaptive_enabled: None,
1631                webhooks: None,
1632            };
1633            let source = HttpSource::with_dispatch(
1634                "dispatch-source",
1635                config,
1636                Some(DispatchMode::Channel),
1637                Some(1000),
1638            );
1639            assert!(source.is_ok());
1640            assert_eq!(source.unwrap().id(), "dispatch-source");
1641        }
1642    }
1643
1644    mod properties {
1645        use super::*;
1646
1647        #[test]
1648        fn test_id_returns_correct_value() {
1649            let source = HttpSourceBuilder::new("my-http-source")
1650                .with_host("localhost")
1651                .build()
1652                .unwrap();
1653            assert_eq!(source.id(), "my-http-source");
1654        }
1655
1656        #[test]
1657        fn test_type_name_returns_http() {
1658            let source = HttpSourceBuilder::new("test")
1659                .with_host("localhost")
1660                .build()
1661                .unwrap();
1662            assert_eq!(source.type_name(), "http");
1663        }
1664
1665        #[test]
1666        fn test_properties_contains_host_and_port() {
1667            let source = HttpSourceBuilder::new("test")
1668                .with_host("192.168.1.1")
1669                .with_port(9000)
1670                .build()
1671                .unwrap();
1672            let props = source.properties();
1673
1674            assert_eq!(
1675                props.get("host"),
1676                Some(&serde_json::Value::String("192.168.1.1".to_string()))
1677            );
1678            assert_eq!(
1679                props.get("port"),
1680                Some(&serde_json::Value::Number(9000.into()))
1681            );
1682        }
1683
1684        #[test]
1685        fn test_properties_includes_endpoint_when_set() {
1686            let source = HttpSourceBuilder::new("test")
1687                .with_host("localhost")
1688                .with_endpoint("/api/v1")
1689                .build()
1690                .unwrap();
1691            let props = source.properties();
1692
1693            assert_eq!(
1694                props.get("endpoint"),
1695                Some(&serde_json::Value::String("/api/v1".to_string()))
1696            );
1697        }
1698
1699        #[test]
1700        fn test_properties_excludes_endpoint_when_none() {
1701            let source = HttpSourceBuilder::new("test")
1702                .with_host("localhost")
1703                .build()
1704                .unwrap();
1705            let props = source.properties();
1706
1707            assert!(!props.contains_key("endpoint"));
1708        }
1709    }
1710
1711    mod lifecycle {
1712        use super::*;
1713
1714        #[tokio::test]
1715        async fn test_initial_status_is_stopped() {
1716            let source = HttpSourceBuilder::new("test")
1717                .with_host("localhost")
1718                .build()
1719                .unwrap();
1720            assert_eq!(source.status().await, ComponentStatus::Stopped);
1721        }
1722    }
1723
1724    mod builder {
1725        use super::*;
1726
1727        #[test]
1728        fn test_http_builder_defaults() {
1729            let source = HttpSourceBuilder::new("test").build().unwrap();
1730            assert_eq!(source.config.port, 8080);
1731            assert_eq!(source.config.timeout_ms, 10000);
1732            assert_eq!(source.config.endpoint, None);
1733        }
1734
1735        #[test]
1736        fn test_http_builder_custom_values() {
1737            let source = HttpSourceBuilder::new("test")
1738                .with_host("api.example.com")
1739                .with_port(9000)
1740                .with_endpoint("/webhook")
1741                .with_timeout_ms(5000)
1742                .build()
1743                .unwrap();
1744
1745            assert_eq!(source.config.host, "api.example.com");
1746            assert_eq!(source.config.port, 9000);
1747            assert_eq!(source.config.endpoint, Some("/webhook".to_string()));
1748            assert_eq!(source.config.timeout_ms, 5000);
1749        }
1750
1751        #[test]
1752        fn test_http_builder_adaptive_batching() {
1753            let source = HttpSourceBuilder::new("test")
1754                .with_host("localhost")
1755                .with_adaptive_max_batch_size(1000)
1756                .with_adaptive_min_batch_size(10)
1757                .with_adaptive_max_wait_ms(500)
1758                .with_adaptive_min_wait_ms(50)
1759                .with_adaptive_window_secs(60)
1760                .with_adaptive_enabled(true)
1761                .build()
1762                .unwrap();
1763
1764            assert_eq!(source.config.adaptive_max_batch_size, Some(1000));
1765            assert_eq!(source.config.adaptive_min_batch_size, Some(10));
1766            assert_eq!(source.config.adaptive_max_wait_ms, Some(500));
1767            assert_eq!(source.config.adaptive_min_wait_ms, Some(50));
1768            assert_eq!(source.config.adaptive_window_secs, Some(60));
1769            assert_eq!(source.config.adaptive_enabled, Some(true));
1770        }
1771
1772        #[test]
1773        fn test_builder_id() {
1774            let source = HttpSource::builder("my-http-source")
1775                .with_host("localhost")
1776                .build()
1777                .unwrap();
1778
1779            assert_eq!(source.base.id, "my-http-source");
1780        }
1781    }
1782
1783    mod event_conversion {
1784        use super::*;
1785
1786        #[test]
1787        fn test_convert_node_insert() {
1788            let mut props = serde_json::Map::new();
1789            props.insert(
1790                "name".to_string(),
1791                serde_json::Value::String("Alice".to_string()),
1792            );
1793            props.insert("age".to_string(), serde_json::Value::Number(30.into()));
1794
1795            let http_change = HttpSourceChange::Insert {
1796                element: HttpElement::Node {
1797                    id: "user-1".to_string(),
1798                    labels: vec!["User".to_string()],
1799                    properties: props,
1800                },
1801                timestamp: Some(1234567890000000000),
1802            };
1803
1804            let result = convert_http_to_source_change(&http_change, "test-source");
1805            assert!(result.is_ok());
1806
1807            match result.unwrap() {
1808                drasi_core::models::SourceChange::Insert { element } => match element {
1809                    drasi_core::models::Element::Node {
1810                        metadata,
1811                        properties,
1812                    } => {
1813                        assert_eq!(metadata.reference.element_id.as_ref(), "user-1");
1814                        assert_eq!(metadata.labels.len(), 1);
1815                        assert_eq!(metadata.effective_from, 1234567890000);
1816                        assert!(properties.get("name").is_some());
1817                        assert!(properties.get("age").is_some());
1818                    }
1819                    _ => panic!("Expected Node element"),
1820                },
1821                _ => panic!("Expected Insert operation"),
1822            }
1823        }
1824
1825        #[test]
1826        fn test_convert_relation_insert() {
1827            let http_change = HttpSourceChange::Insert {
1828                element: HttpElement::Relation {
1829                    id: "follows-1".to_string(),
1830                    labels: vec!["FOLLOWS".to_string()],
1831                    from: "user-1".to_string(),
1832                    to: "user-2".to_string(),
1833                    properties: serde_json::Map::new(),
1834                },
1835                timestamp: None,
1836            };
1837
1838            let result = convert_http_to_source_change(&http_change, "test-source");
1839            assert!(result.is_ok());
1840
1841            match result.unwrap() {
1842                drasi_core::models::SourceChange::Insert { element } => match element {
1843                    drasi_core::models::Element::Relation {
1844                        metadata,
1845                        out_node,
1846                        in_node,
1847                        ..
1848                    } => {
1849                        assert_eq!(metadata.reference.element_id.as_ref(), "follows-1");
1850                        assert_eq!(in_node.element_id.as_ref(), "user-1");
1851                        assert_eq!(out_node.element_id.as_ref(), "user-2");
1852                    }
1853                    _ => panic!("Expected Relation element"),
1854                },
1855                _ => panic!("Expected Insert operation"),
1856            }
1857        }
1858
1859        #[test]
1860        fn test_convert_delete() {
1861            let http_change = HttpSourceChange::Delete {
1862                id: "user-1".to_string(),
1863                labels: Some(vec!["User".to_string()]),
1864                timestamp: Some(9999999999),
1865            };
1866
1867            let result = convert_http_to_source_change(&http_change, "test-source");
1868            assert!(result.is_ok());
1869
1870            match result.unwrap() {
1871                drasi_core::models::SourceChange::Delete { metadata } => {
1872                    assert_eq!(metadata.reference.element_id.as_ref(), "user-1");
1873                    assert_eq!(metadata.labels.len(), 1);
1874                }
1875                _ => panic!("Expected Delete operation"),
1876            }
1877        }
1878
1879        #[test]
1880        fn test_convert_update() {
1881            let http_change = HttpSourceChange::Update {
1882                element: HttpElement::Node {
1883                    id: "user-1".to_string(),
1884                    labels: vec!["User".to_string()],
1885                    properties: serde_json::Map::new(),
1886                },
1887                timestamp: None,
1888            };
1889
1890            let result = convert_http_to_source_change(&http_change, "test-source");
1891            assert!(result.is_ok());
1892
1893            match result.unwrap() {
1894                drasi_core::models::SourceChange::Update { .. } => {
1895                    // Success
1896                }
1897                _ => panic!("Expected Update operation"),
1898            }
1899        }
1900    }
1901
1902    mod adaptive_config {
1903        use super::*;
1904
1905        #[test]
1906        fn test_adaptive_config_from_http_config() {
1907            let source = HttpSourceBuilder::new("test")
1908                .with_host("localhost")
1909                .with_adaptive_max_batch_size(500)
1910                .with_adaptive_enabled(true)
1911                .build()
1912                .unwrap();
1913
1914            // The adaptive config should be initialized from the http config
1915            assert_eq!(source.adaptive_config.max_batch_size, 500);
1916            assert!(source.adaptive_config.adaptive_enabled);
1917        }
1918
1919        #[test]
1920        fn test_adaptive_config_uses_defaults_when_not_specified() {
1921            let source = HttpSourceBuilder::new("test")
1922                .with_host("localhost")
1923                .build()
1924                .unwrap();
1925
1926            // Should use AdaptiveBatchConfig defaults
1927            let default_config = AdaptiveBatchConfig::default();
1928            assert_eq!(
1929                source.adaptive_config.max_batch_size,
1930                default_config.max_batch_size
1931            );
1932            assert_eq!(
1933                source.adaptive_config.min_batch_size,
1934                default_config.min_batch_size
1935            );
1936        }
1937    }
1938}
1939
1940/// Dynamic plugin entry point.
1941///
1942/// Dynamic plugin entry point.
1943#[cfg(feature = "dynamic-plugin")]
1944drasi_plugin_sdk::export_plugin!(
1945    plugin_id = "http-source",
1946    core_version = env!("CARGO_PKG_VERSION"),
1947    lib_version = env!("CARGO_PKG_VERSION"),
1948    plugin_version = env!("CARGO_PKG_VERSION"),
1949    source_descriptors = [descriptor::HttpSourceDescriptor],
1950    reaction_descriptors = [],
1951    bootstrap_descriptors = [],
1952);