Skip to main content

drasi_source_http/
lib.rs

1#![allow(unexpected_cfgs)]
2// Copyright 2025 The Drasi Authors.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! HTTP Source Plugin for Drasi
17//!
18//! This plugin exposes HTTP endpoints for receiving data change events. It supports
19//! two mutually exclusive modes:
20//!
21//! - **Standard Mode**: Fixed-format `HttpSourceChange` endpoints with adaptive batching
22//! - **Webhook Mode**: Configurable routes with template-based payload transformation
23//!
24//! # Standard Mode
25//!
26//! When no `webhooks` configuration is present, the source operates in standard mode
27//! with the following endpoints:
28//!
29//! - **`POST /sources/{source_id}/events`** - Submit a single event
30//! - **`POST /sources/{source_id}/events/batch`** - Submit multiple events
31//! - **`GET /health`** - Health check endpoint
32//!
33//! ## Data Format
34//!
35//! Events are submitted as JSON using the `HttpSourceChange` format:
36//!
37//! ### Insert Operation
38//!
39//! ```json
40//! {
41//!     "operation": "insert",
42//!     "element": {
43//!         "type": "node",
44//!         "id": "user-123",
45//!         "labels": ["User"],
46//!         "properties": {
47//!             "name": "Alice",
48//!             "email": "alice@example.com"
49//!         }
50//!     },
51//!     "timestamp": 1699900000000000000
52//! }
53//! ```
54//!
55//! ### Update Operation
56//!
57//! ```json
58//! {
59//!     "operation": "update",
60//!     "element": {
61//!         "type": "node",
62//!         "id": "user-123",
63//!         "labels": ["User"],
64//!         "properties": {
65//!             "name": "Alice Updated"
66//!         }
67//!     }
68//! }
69//! ```
70//!
71//! ### Delete Operation
72//!
73//! ```json
74//! {
75//!     "operation": "delete",
76//!     "id": "user-123",
77//!     "labels": ["User"]
78//! }
79//! ```
80//!
81//! # Webhook Mode
82//!
83//! When a `webhooks` section is present in the configuration, the source operates
84//! in webhook mode. This enables:
85//!
86//! - Custom routes with path parameters (e.g., `/github/events`, `/users/:id/hooks`)
87//! - Multiple HTTP methods per route (POST, PUT, PATCH, DELETE, GET)
88//! - Handlebars template-based payload transformation
89//! - HMAC signature verification (GitHub, Shopify style)
90//! - Bearer token authentication
91//! - Support for JSON, XML, YAML, and plain text payloads
92//!
93//! ## Webhook Configuration Example
94//!
95//! ```yaml
96//! webhooks:
97//!   error_behavior: accept_and_log
98//!   routes:
99//!     - path: "/github/events"
100//!       methods: ["POST"]
101//!       auth:
102//!         signature:
103//!           type: hmac-sha256
104//!           secret_env: GITHUB_WEBHOOK_SECRET
105//!           header: X-Hub-Signature-256
106//!           prefix: "sha256="
107//!       error_behavior: reject
108//!       mappings:
109//!         - when:
110//!             header: X-GitHub-Event
111//!             equals: push
112//!           operation: insert
113//!           element_type: node
114//!           effective_from: "{{payload.head_commit.timestamp}}"
115//!           template:
116//!             id: "commit-{{payload.head_commit.id}}"
117//!             labels: ["Commit"]
118//!             properties:
119//!               message: "{{payload.head_commit.message}}"
120//!               author: "{{payload.head_commit.author.name}}"
121//! ```
122//!
123//! ## Relation Element
124//!
125//! ```json
126//! {
127//!     "operation": "insert",
128//!     "element": {
129//!         "type": "relation",
130//!         "id": "follows-1",
131//!         "labels": ["FOLLOWS"],
132//!         "from": "user-123",
133//!         "to": "user-456",
134//!         "properties": {}
135//!     }
136//! }
137//! ```
138//!
139//! # Batch Submission
140//!
141//! ```json
142//! {
143//!     "events": [
144//!         { "operation": "insert", ... },
145//!         { "operation": "update", ... }
146//!     ]
147//! }
148//! ```
149//!
150//! # Adaptive Batching
151//!
152//! The HTTP source includes adaptive batching to optimize throughput. Events are
153//! buffered and dispatched in batches, with batch size and timing adjusted based
154//! on throughput patterns.
155//!
156//! | Parameter | Default | Description |
157//! |-----------|---------|-------------|
158//! | `adaptive_enabled` | `true` | Enable/disable adaptive batching |
159//! | `adaptive_max_batch_size` | `1000` | Maximum events per batch |
160//! | `adaptive_min_batch_size` | `1` | Minimum events per batch |
161//! | `adaptive_max_wait_ms` | `100` | Maximum wait time before dispatching |
162//! | `adaptive_min_wait_ms` | `10` | Minimum wait time between batches |
163//!
164//! # Configuration
165//!
166//! | Field | Type | Default | Description |
167//! |-------|------|---------|-------------|
168//! | `host` | string | *required* | Host address to bind to |
169//! | `port` | u16 | `8080` | Port to listen on |
170//! | `endpoint` | string | None | Optional custom path prefix |
171//! | `timeout_ms` | u64 | `10000` | Request timeout in milliseconds |
172//!
173//! # Example Configuration (YAML)
174//!
175//! ```yaml
176//! source_type: http
177//! properties:
178//!   host: "0.0.0.0"
179//!   port: 8080
180//!   adaptive_enabled: true
181//!   adaptive_max_batch_size: 500
182//! ```
183//!
184//! # Usage Examples
185//!
186//! ## Rust
187//!
188//! ```rust,ignore
189//! use drasi_source_http::{HttpSource, HttpSourceBuilder};
190//!
191//! let config = HttpSourceBuilder::new()
192//!     .with_host("0.0.0.0")
193//!     .with_port(8080)
194//!     .with_adaptive_enabled(true)
195//!     .build();
196//!
197//! let source = Arc::new(HttpSource::new("http-source", config)?);
198//! drasi.add_source(source).await?;
199//! ```
200//!
201//! ## curl (Single Event)
202//!
203//! ```bash
204//! curl -X POST http://localhost:8080/sources/my-source/events \
205//!   -H "Content-Type: application/json" \
206//!   -d '{"operation":"insert","element":{"type":"node","id":"1","labels":["Test"],"properties":{}}}'
207//! ```
208//!
209//! ## curl (Batch)
210//!
211//! ```bash
212//! curl -X POST http://localhost:8080/sources/my-source/events/batch \
213//!   -H "Content-Type: application/json" \
214//!   -d '{"events":[...]}'
215//! ```
216
217pub mod config;
218pub mod descriptor;
219pub use config::HttpSourceConfig;
220
221mod adaptive_batcher;
222mod models;
223mod time;
224
225// Webhook support modules
226pub mod auth;
227pub mod content_parser;
228pub mod route_matcher;
229pub mod template_engine;
230
231// Export HTTP source models and conversion
232pub use models::{convert_http_to_source_change, HttpElement, HttpSourceChange};
233
234use anyhow::Result;
235use async_trait::async_trait;
236use axum::{
237    body::Bytes,
238    extract::{Path, State},
239    http::{header, Method, StatusCode},
240    response::IntoResponse,
241    routing::{delete, get, post, put},
242    Json, Router,
243};
244use log::{debug, error, info, trace, warn};
245use serde::{Deserialize, Serialize};
246use std::collections::HashMap;
247use std::sync::Arc;
248use std::time::Duration;
249use tokio::sync::mpsc;
250use tokio::time::timeout;
251use tower_http::cors::{Any, CorsLayer};
252
253use drasi_lib::channels::{ComponentType, *};
254use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
255use drasi_lib::Source;
256use tracing::Instrument;
257
258use crate::adaptive_batcher::{AdaptiveBatchConfig, AdaptiveBatcher};
259use crate::auth::{verify_auth, AuthResult};
260use crate::config::{CorsConfig, ErrorBehavior, WebhookConfig};
261use crate::content_parser::{parse_content, ContentType};
262use crate::route_matcher::{convert_method, find_matching_mappings, headers_to_map, RouteMatcher};
263use crate::template_engine::{TemplateContext, TemplateEngine};
264
265/// Response for event submission
266#[derive(Debug, Serialize, Deserialize)]
267pub struct EventResponse {
268    pub success: bool,
269    pub message: String,
270    #[serde(skip_serializing_if = "Option::is_none")]
271    pub error: Option<String>,
272}
273
274/// HTTP source with configurable adaptive batching.
275///
276/// This source exposes HTTP endpoints for receiving data change events.
277/// It supports both single-event and batch submission modes, with adaptive
278/// batching for optimized throughput.
279///
280/// # Fields
281///
282/// - `base`: Common source functionality (dispatchers, status, lifecycle)
283/// - `config`: HTTP-specific configuration (host, port, timeout)
284/// - `adaptive_config`: Adaptive batching settings for throughput optimization
285pub struct HttpSource {
286    /// Base source implementation providing common functionality
287    base: SourceBase,
288    /// HTTP source configuration
289    config: HttpSourceConfig,
290    /// Adaptive batching configuration for throughput optimization
291    adaptive_config: AdaptiveBatchConfig,
292}
293
294/// Batch event request that can accept multiple events
295#[derive(Debug, Clone, Serialize, Deserialize)]
296pub struct BatchEventRequest {
297    pub events: Vec<HttpSourceChange>,
298}
299
300/// HTTP source app state with batching channel.
301///
302/// Shared state passed to Axum route handlers.
303#[derive(Clone)]
304struct HttpAppState {
305    /// The source ID for validation against incoming requests
306    source_id: String,
307    /// Channel for sending events to the adaptive batcher
308    batch_tx: mpsc::Sender<SourceChangeEvent>,
309    /// Webhook configuration (if in webhook mode)
310    webhook_config: Option<Arc<WebhookState>>,
311}
312
313/// State for webhook mode processing
314struct WebhookState {
315    /// Webhook configuration
316    config: WebhookConfig,
317    /// Route matcher for incoming requests
318    route_matcher: RouteMatcher,
319    /// Template engine for payload transformation
320    template_engine: TemplateEngine,
321}
322
323impl HttpSource {
324    /// Create a new HTTP source.
325    ///
326    /// The event channel is automatically injected when the source is added
327    /// to DrasiLib via `add_source()`.
328    ///
329    /// # Arguments
330    ///
331    /// * `id` - Unique identifier for this source instance
332    /// * `config` - HTTP source configuration
333    ///
334    /// # Returns
335    ///
336    /// A new `HttpSource` instance, or an error if construction fails.
337    ///
338    /// # Errors
339    ///
340    /// Returns an error if the base source cannot be initialized.
341    ///
342    /// # Example
343    ///
344    /// ```rust,ignore
345    /// use drasi_source_http::{HttpSource, HttpSourceBuilder};
346    ///
347    /// let config = HttpSourceBuilder::new()
348    ///     .with_host("0.0.0.0")
349    ///     .with_port(8080)
350    ///     .build();
351    ///
352    /// let source = HttpSource::new("my-http-source", config)?;
353    /// ```
354    pub fn new(id: impl Into<String>, config: HttpSourceConfig) -> Result<Self> {
355        let id = id.into();
356        let params = SourceBaseParams::new(id);
357
358        // Configure adaptive batching
359        let mut adaptive_config = AdaptiveBatchConfig::default();
360
361        // Allow overriding adaptive parameters from config
362        if let Some(max_batch) = config.adaptive_max_batch_size {
363            adaptive_config.max_batch_size = max_batch;
364        }
365        if let Some(min_batch) = config.adaptive_min_batch_size {
366            adaptive_config.min_batch_size = min_batch;
367        }
368        if let Some(max_wait_ms) = config.adaptive_max_wait_ms {
369            adaptive_config.max_wait_time = Duration::from_millis(max_wait_ms);
370        }
371        if let Some(min_wait_ms) = config.adaptive_min_wait_ms {
372            adaptive_config.min_wait_time = Duration::from_millis(min_wait_ms);
373        }
374        if let Some(window_secs) = config.adaptive_window_secs {
375            adaptive_config.throughput_window = Duration::from_secs(window_secs);
376        }
377        if let Some(enabled) = config.adaptive_enabled {
378            adaptive_config.adaptive_enabled = enabled;
379        }
380
381        Ok(Self {
382            base: SourceBase::new(params)?,
383            config,
384            adaptive_config,
385        })
386    }
387
388    /// Create a new HTTP source with custom dispatch settings.
389    ///
390    /// The event channel is automatically injected when the source is added
391    /// to DrasiLib via `add_source()`.
392    ///
393    /// # Arguments
394    ///
395    /// * `id` - Unique identifier for this source instance
396    /// * `config` - HTTP source configuration
397    /// * `dispatch_mode` - Optional dispatch mode (Channel, Direct, etc.)
398    /// * `dispatch_buffer_capacity` - Optional buffer capacity for channel dispatch
399    ///
400    /// # Returns
401    ///
402    /// A new `HttpSource` instance with custom dispatch settings.
403    ///
404    /// # Errors
405    ///
406    /// Returns an error if the base source cannot be initialized.
407    pub fn with_dispatch(
408        id: impl Into<String>,
409        config: HttpSourceConfig,
410        dispatch_mode: Option<DispatchMode>,
411        dispatch_buffer_capacity: Option<usize>,
412    ) -> Result<Self> {
413        let id = id.into();
414        let mut params = SourceBaseParams::new(id);
415        if let Some(mode) = dispatch_mode {
416            params = params.with_dispatch_mode(mode);
417        }
418        if let Some(capacity) = dispatch_buffer_capacity {
419            params = params.with_dispatch_buffer_capacity(capacity);
420        }
421
422        let mut adaptive_config = AdaptiveBatchConfig::default();
423
424        if let Some(max_batch) = config.adaptive_max_batch_size {
425            adaptive_config.max_batch_size = max_batch;
426        }
427        if let Some(min_batch) = config.adaptive_min_batch_size {
428            adaptive_config.min_batch_size = min_batch;
429        }
430        if let Some(max_wait_ms) = config.adaptive_max_wait_ms {
431            adaptive_config.max_wait_time = Duration::from_millis(max_wait_ms);
432        }
433        if let Some(min_wait_ms) = config.adaptive_min_wait_ms {
434            adaptive_config.min_wait_time = Duration::from_millis(min_wait_ms);
435        }
436        if let Some(window_secs) = config.adaptive_window_secs {
437            adaptive_config.throughput_window = Duration::from_secs(window_secs);
438        }
439        if let Some(enabled) = config.adaptive_enabled {
440            adaptive_config.adaptive_enabled = enabled;
441        }
442
443        Ok(Self {
444            base: SourceBase::new(params)?,
445            config,
446            adaptive_config,
447        })
448    }
449
450    /// Handle a single event submission from `POST /sources/{source_id}/events`.
451    ///
452    /// Validates the source ID matches this source and converts the HTTP event
453    /// to a source change before sending to the adaptive batcher.
454    async fn handle_single_event(
455        Path(source_id): Path<String>,
456        State(state): State<HttpAppState>,
457        Json(event): Json<HttpSourceChange>,
458    ) -> Result<impl IntoResponse, (StatusCode, Json<EventResponse>)> {
459        debug!("[{source_id}] HTTP endpoint received single event: {event:?}");
460        Self::process_events(&source_id, &state, vec![event]).await
461    }
462
463    /// Handle a batch event submission from `POST /sources/{source_id}/events/batch`.
464    ///
465    /// Validates the source ID and processes all events in the batch,
466    /// returning partial success if some events fail.
467    async fn handle_batch_events(
468        Path(source_id): Path<String>,
469        State(state): State<HttpAppState>,
470        Json(batch): Json<BatchEventRequest>,
471    ) -> Result<impl IntoResponse, (StatusCode, Json<EventResponse>)> {
472        debug!(
473            "[{}] HTTP endpoint received batch of {} events",
474            source_id,
475            batch.events.len()
476        );
477        Self::process_events(&source_id, &state, batch.events).await
478    }
479
480    /// Process a list of events, converting and sending them to the batcher.
481    ///
482    /// # Arguments
483    ///
484    /// * `source_id` - Source ID from the request path
485    /// * `state` - Shared app state containing the batch channel
486    /// * `events` - List of HTTP source changes to process
487    ///
488    /// # Returns
489    ///
490    /// Success response with count of processed events, or error if all fail.
491    async fn process_events(
492        source_id: &str,
493        state: &HttpAppState,
494        events: Vec<HttpSourceChange>,
495    ) -> Result<impl IntoResponse, (StatusCode, Json<EventResponse>)> {
496        trace!("[{}] Processing {} events", source_id, events.len());
497
498        if source_id != state.source_id {
499            error!(
500                "[{}] Source name mismatch. Expected '{}', got '{}'",
501                state.source_id, state.source_id, source_id
502            );
503            return Err((
504                StatusCode::BAD_REQUEST,
505                Json(EventResponse {
506                    success: false,
507                    message: "Source name mismatch".to_string(),
508                    error: Some(format!(
509                        "Expected source '{}', got '{}'",
510                        state.source_id, source_id
511                    )),
512                }),
513            ));
514        }
515
516        let mut success_count = 0;
517        let mut error_count = 0;
518        let mut last_error = None;
519
520        for (idx, event) in events.iter().enumerate() {
521            match convert_http_to_source_change(event, source_id) {
522                Ok(source_change) => {
523                    let change_event = SourceChangeEvent {
524                        source_id: source_id.to_string(),
525                        change: source_change,
526                        timestamp: chrono::Utc::now(),
527                    };
528
529                    if let Err(e) = state.batch_tx.send(change_event).await {
530                        error!(
531                            "[{}] Failed to send event {} to batch channel: {}",
532                            state.source_id,
533                            idx + 1,
534                            e
535                        );
536                        error_count += 1;
537                        last_error = Some("Internal channel error".to_string());
538                    } else {
539                        success_count += 1;
540                    }
541                }
542                Err(e) => {
543                    error!(
544                        "[{}] Failed to convert event {}: {}",
545                        state.source_id,
546                        idx + 1,
547                        e
548                    );
549                    error_count += 1;
550                    last_error = Some(e.to_string());
551                }
552            }
553        }
554
555        debug!(
556            "[{source_id}] Event processing complete: {success_count} succeeded, {error_count} failed"
557        );
558
559        if error_count > 0 && success_count == 0 {
560            Err((
561                StatusCode::BAD_REQUEST,
562                Json(EventResponse {
563                    success: false,
564                    message: format!("All {error_count} events failed"),
565                    error: last_error,
566                }),
567            ))
568        } else if error_count > 0 {
569            Ok(Json(EventResponse {
570                success: true,
571                message: format!(
572                    "Processed {success_count} events successfully, {error_count} failed"
573                ),
574                error: last_error,
575            }))
576        } else {
577            Ok(Json(EventResponse {
578                success: true,
579                message: format!("All {success_count} events processed successfully"),
580                error: None,
581            }))
582        }
583    }
584
585    async fn health_check() -> impl IntoResponse {
586        Json(serde_json::json!({
587            "status": "healthy",
588            "service": "http-source",
589            "features": ["adaptive-batching", "batch-endpoint", "webhooks"]
590        }))
591    }
592
593    /// Handle webhook requests
594    ///
595    /// This handler processes requests in webhook mode, matching against
596    /// configured routes and transforming payloads using templates.
597    async fn handle_webhook(
598        method: axum::http::Method,
599        uri: axum::http::Uri,
600        headers: axum::http::HeaderMap,
601        State(state): State<HttpAppState>,
602        body: Bytes,
603    ) -> impl IntoResponse {
604        let path = uri.path();
605        let source_id = &state.source_id;
606
607        debug!("[{source_id}] Webhook received: {method} {path}");
608
609        // Get webhook state - should always be present in webhook mode
610        let webhook_state = match &state.webhook_config {
611            Some(ws) => ws,
612            None => {
613                error!("[{source_id}] Webhook handler called but no webhook config present");
614                return (
615                    StatusCode::INTERNAL_SERVER_ERROR,
616                    Json(EventResponse {
617                        success: false,
618                        message: "Internal configuration error".to_string(),
619                        error: Some("Webhook mode not properly configured".to_string()),
620                    }),
621                );
622            }
623        };
624
625        // Convert method
626        let http_method = match convert_method(&method) {
627            Some(m) => m,
628            None => {
629                return handle_error(
630                    &webhook_state.config.error_behavior,
631                    source_id,
632                    StatusCode::METHOD_NOT_ALLOWED,
633                    "Method not supported",
634                    None,
635                );
636            }
637        };
638
639        // Match route
640        let route_match = match webhook_state.route_matcher.match_route(
641            path,
642            &http_method,
643            &webhook_state.config.routes,
644        ) {
645            Some(rm) => rm,
646            None => {
647                debug!("[{source_id}] No matching route for {method} {path}");
648                return handle_error(
649                    &webhook_state.config.error_behavior,
650                    source_id,
651                    StatusCode::NOT_FOUND,
652                    "No matching route",
653                    None,
654                );
655            }
656        };
657
658        let route = route_match.route;
659        let error_behavior = route
660            .error_behavior
661            .as_ref()
662            .unwrap_or(&webhook_state.config.error_behavior);
663
664        // Verify authentication
665        let auth_result = verify_auth(route.auth.as_ref(), &headers, &body);
666        if let AuthResult::Failed(reason) = auth_result {
667            warn!("[{source_id}] Authentication failed for {path}: {reason}");
668            return handle_error(
669                error_behavior,
670                source_id,
671                StatusCode::UNAUTHORIZED,
672                "Authentication failed",
673                Some(&reason),
674            );
675        }
676
677        // Parse content
678        let content_type = ContentType::from_header(
679            headers
680                .get(axum::http::header::CONTENT_TYPE)
681                .and_then(|v| v.to_str().ok()),
682        );
683
684        let payload = match parse_content(&body, content_type) {
685            Ok(p) => p,
686            Err(e) => {
687                warn!("[{source_id}] Failed to parse payload: {e}");
688                return handle_error(
689                    error_behavior,
690                    source_id,
691                    StatusCode::BAD_REQUEST,
692                    "Failed to parse payload",
693                    Some(&e.to_string()),
694                );
695            }
696        };
697
698        // Build template context
699        let headers_map = headers_to_map(&headers);
700        let query_map = parse_query_string(uri.query());
701
702        let context = TemplateContext {
703            payload: payload.clone(),
704            route: route_match.path_params,
705            query: query_map,
706            headers: headers_map.clone(),
707            method: method.to_string(),
708            path: path.to_string(),
709            source_id: source_id.clone(),
710        };
711
712        // Find matching mappings
713        let matching_mappings = find_matching_mappings(&route.mappings, &headers_map, &payload);
714
715        if matching_mappings.is_empty() {
716            debug!("[{source_id}] No matching mappings for request");
717            return handle_error(
718                error_behavior,
719                source_id,
720                StatusCode::BAD_REQUEST,
721                "No matching mapping for request",
722                None,
723            );
724        }
725
726        // Process each matching mapping
727        let mut success_count = 0;
728        let mut error_count = 0;
729        let mut last_error = None;
730
731        for mapping in matching_mappings {
732            match webhook_state
733                .template_engine
734                .process_mapping(mapping, &context, source_id)
735            {
736                Ok(source_change) => {
737                    let event = SourceChangeEvent {
738                        source_id: source_id.clone(),
739                        change: source_change,
740                        timestamp: chrono::Utc::now(),
741                    };
742
743                    if let Err(e) = state.batch_tx.send(event).await {
744                        error!("[{source_id}] Failed to send event to batcher: {e}");
745                        error_count += 1;
746                        last_error = Some(format!("Failed to queue event: {e}"));
747                    } else {
748                        success_count += 1;
749                    }
750                }
751                Err(e) => {
752                    warn!("[{source_id}] Failed to process mapping: {e}");
753                    error_count += 1;
754                    last_error = Some(e.to_string());
755                }
756            }
757        }
758
759        debug!("[{source_id}] Webhook processing complete: {success_count} succeeded, {error_count} failed");
760
761        if error_count > 0 && success_count == 0 {
762            handle_error(
763                error_behavior,
764                source_id,
765                StatusCode::BAD_REQUEST,
766                &format!("All {error_count} mappings failed"),
767                last_error.as_deref(),
768            )
769        } else if error_count > 0 {
770            (
771                StatusCode::OK,
772                Json(EventResponse {
773                    success: true,
774                    message: format!("Processed {success_count} events, {error_count} failed"),
775                    error: last_error,
776                }),
777            )
778        } else {
779            (
780                StatusCode::OK,
781                Json(EventResponse {
782                    success: true,
783                    message: format!("Processed {success_count} events successfully"),
784                    error: None,
785                }),
786            )
787        }
788    }
789
790    async fn run_adaptive_batcher(
791        batch_rx: mpsc::Receiver<SourceChangeEvent>,
792        dispatchers: Arc<
793            tokio::sync::RwLock<
794                Vec<
795                    Box<
796                        dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync,
797                    >,
798                >,
799            >,
800        >,
801        adaptive_config: AdaptiveBatchConfig,
802        source_id: String,
803    ) {
804        let mut batcher = AdaptiveBatcher::new(batch_rx, adaptive_config.clone());
805        let mut total_events = 0u64;
806        let mut total_batches = 0u64;
807
808        info!("[{source_id}] Adaptive HTTP batcher started with config: {adaptive_config:?}");
809
810        while let Some(batch) = batcher.next_batch().await {
811            if batch.is_empty() {
812                debug!("[{source_id}] Batcher received empty batch, skipping");
813                continue;
814            }
815
816            let batch_size = batch.len();
817            total_events += batch_size as u64;
818            total_batches += 1;
819
820            debug!(
821                "[{source_id}] Batcher forwarding batch #{total_batches} with {batch_size} events to dispatchers"
822            );
823
824            let mut sent_count = 0;
825            let mut failed_count = 0;
826            for (idx, event) in batch.into_iter().enumerate() {
827                debug!(
828                    "[{}] Batch #{}, dispatching event {}/{}",
829                    source_id,
830                    total_batches,
831                    idx + 1,
832                    batch_size
833                );
834
835                let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
836                profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
837
838                let wrapper = SourceEventWrapper::with_profiling(
839                    event.source_id.clone(),
840                    SourceEvent::Change(event.change),
841                    event.timestamp,
842                    profiling,
843                );
844
845                if let Err(e) =
846                    SourceBase::dispatch_from_task(dispatchers.clone(), wrapper.clone(), &source_id)
847                        .await
848                {
849                    error!(
850                        "[{}] Batch #{}, failed to dispatch event {}/{} (no subscribers): {}",
851                        source_id,
852                        total_batches,
853                        idx + 1,
854                        batch_size,
855                        e
856                    );
857                    failed_count += 1;
858                } else {
859                    debug!(
860                        "[{}] Batch #{}, successfully dispatched event {}/{}",
861                        source_id,
862                        total_batches,
863                        idx + 1,
864                        batch_size
865                    );
866                    sent_count += 1;
867                }
868            }
869
870            debug!(
871                "[{source_id}] Batch #{total_batches} complete: {sent_count} dispatched, {failed_count} failed"
872            );
873
874            if total_batches.is_multiple_of(100) {
875                info!(
876                    "[{}] Adaptive HTTP metrics - Batches: {}, Events: {}, Avg batch size: {:.1}",
877                    source_id,
878                    total_batches,
879                    total_events,
880                    total_events as f64 / total_batches as f64
881                );
882            }
883        }
884
885        info!(
886            "[{source_id}] Adaptive HTTP batcher stopped - Total batches: {total_batches}, Total events: {total_events}"
887        );
888    }
889}
890
891#[async_trait]
892impl Source for HttpSource {
893    fn id(&self) -> &str {
894        &self.base.id
895    }
896
897    fn type_name(&self) -> &str {
898        "http"
899    }
900
901    fn properties(&self) -> HashMap<String, serde_json::Value> {
902        match serde_json::to_value(&self.config) {
903            Ok(serde_json::Value::Object(map)) => map.into_iter().collect(),
904            _ => HashMap::new(),
905        }
906    }
907
908    fn auto_start(&self) -> bool {
909        self.base.get_auto_start()
910    }
911
912    async fn start(&self) -> Result<()> {
913        info!("[{}] Starting adaptive HTTP source", self.base.id);
914
915        self.base
916            .set_status(
917                ComponentStatus::Starting,
918                Some("Starting adaptive HTTP source".to_string()),
919            )
920            .await;
921
922        let host = self.config.host.clone();
923        let port = self.config.port;
924
925        // Create batch channel with capacity based on batch configuration
926        let batch_channel_capacity = self.adaptive_config.recommended_channel_capacity();
927        let (batch_tx, batch_rx) = mpsc::channel(batch_channel_capacity);
928        info!(
929            "[{}] HttpSource using batch channel capacity: {} (max_batch_size: {} x 5)",
930            self.base.id, batch_channel_capacity, self.adaptive_config.max_batch_size
931        );
932
933        // Start adaptive batcher task
934        let adaptive_config = self.adaptive_config.clone();
935        let source_id = self.base.id.clone();
936        let dispatchers = self.base.dispatchers.clone();
937
938        // Get instance_id from context for log routing isolation
939        let instance_id = self
940            .base
941            .context()
942            .await
943            .map(|c| c.instance_id)
944            .unwrap_or_default();
945
946        info!("[{source_id}] Starting adaptive batcher task");
947        let source_id_for_span = source_id.clone();
948        let span = tracing::info_span!(
949            "http_adaptive_batcher",
950            instance_id = %instance_id,
951            component_id = %source_id_for_span,
952            component_type = "source"
953        );
954        tokio::spawn(
955            async move {
956                Self::run_adaptive_batcher(
957                    batch_rx,
958                    dispatchers,
959                    adaptive_config,
960                    source_id.clone(),
961                )
962                .await
963            }
964            .instrument(span),
965        );
966
967        // Create app state
968        let webhook_state = if let Some(ref webhook_config) = self.config.webhooks {
969            info!(
970                "[{}] Webhook mode enabled with {} routes",
971                self.base.id,
972                webhook_config.routes.len()
973            );
974            Some(Arc::new(WebhookState {
975                config: webhook_config.clone(),
976                route_matcher: RouteMatcher::new(&webhook_config.routes),
977                template_engine: TemplateEngine::new(),
978            }))
979        } else {
980            info!("[{}] Standard mode enabled", self.base.id);
981            None
982        };
983
984        let state = HttpAppState {
985            source_id: self.base.id.clone(),
986            batch_tx,
987            webhook_config: webhook_state,
988        };
989
990        // Build router based on mode
991        let app = if self.config.is_webhook_mode() {
992            // Webhook mode: only health check + catch-all webhook handler
993            let router = Router::new()
994                .route("/health", get(Self::health_check))
995                .fallback(Self::handle_webhook)
996                .with_state(state);
997
998            // Apply CORS if configured
999            if let Some(ref webhooks) = self.config.webhooks {
1000                if let Some(ref cors_config) = webhooks.cors {
1001                    if cors_config.enabled {
1002                        info!("[{}] CORS enabled for webhook endpoints", self.base.id);
1003                        router.layer(build_cors_layer(cors_config))
1004                    } else {
1005                        router
1006                    }
1007                } else {
1008                    router
1009                }
1010            } else {
1011                router
1012            }
1013        } else {
1014            // Standard mode: original endpoints
1015            Router::new()
1016                .route("/health", get(Self::health_check))
1017                .route(
1018                    "/sources/:source_id/events",
1019                    post(Self::handle_single_event),
1020                )
1021                .route(
1022                    "/sources/:source_id/events/batch",
1023                    post(Self::handle_batch_events),
1024                )
1025                .with_state(state)
1026        };
1027
1028        // Create shutdown channel
1029        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
1030
1031        let host_clone = host.clone();
1032
1033        // Start server
1034        let (error_tx, error_rx) = tokio::sync::oneshot::channel();
1035        let source_id = self.base.id.clone();
1036        let source_id_for_span = source_id.clone();
1037        let span = tracing::info_span!(
1038            "http_source_server",
1039            instance_id = %instance_id,
1040            component_id = %source_id_for_span,
1041            component_type = "source"
1042        );
1043        let server_handle = tokio::spawn(
1044            async move {
1045                let addr = format!("{host}:{port}");
1046                info!("[{source_id}] Adaptive HTTP source attempting to bind to {addr}");
1047
1048                let listener = match tokio::net::TcpListener::bind(&addr).await {
1049                    Ok(listener) => {
1050                        info!("[{source_id}] Adaptive HTTP source successfully listening on {addr}");
1051                        listener
1052                    }
1053                    Err(e) => {
1054                        error!("[{source_id}] Failed to bind HTTP server to {addr}: {e}");
1055                        let _ = error_tx.send(format!(
1056                        "Failed to bind HTTP server to {addr}: {e}. Common causes: port already in use, insufficient permissions"
1057                    ));
1058                    return;
1059                }
1060            };
1061
1062                if let Err(e) = axum::serve(listener, app)
1063                    .with_graceful_shutdown(async move {
1064                        let _ = shutdown_rx.await;
1065                    })
1066                    .await
1067                {
1068                    error!("[{source_id}] HTTP server error: {e}");
1069                }
1070            }
1071            .instrument(span),
1072        );
1073
1074        *self.base.task_handle.write().await = Some(server_handle);
1075        *self.base.shutdown_tx.write().await = Some(shutdown_tx);
1076
1077        // Check for startup errors with a short timeout
1078        match timeout(Duration::from_millis(500), error_rx).await {
1079            Ok(Ok(error_msg)) => {
1080                self.base.set_status(ComponentStatus::Error, None).await;
1081                return Err(anyhow::anyhow!("{error_msg}"));
1082            }
1083            _ => {
1084                self.base
1085                    .set_status(
1086                        ComponentStatus::Running,
1087                        Some(format!(
1088                    "Adaptive HTTP source running on {host_clone}:{port} with batch support"
1089                )),
1090                    )
1091                    .await;
1092            }
1093        }
1094
1095        Ok(())
1096    }
1097
1098    async fn stop(&self) -> Result<()> {
1099        info!("[{}] Stopping adaptive HTTP source", self.base.id);
1100
1101        self.base
1102            .set_status(
1103                ComponentStatus::Stopping,
1104                Some("Stopping adaptive HTTP source".to_string()),
1105            )
1106            .await;
1107
1108        if let Some(tx) = self.base.shutdown_tx.write().await.take() {
1109            let _ = tx.send(());
1110        }
1111
1112        if let Some(handle) = self.base.task_handle.write().await.take() {
1113            let _ = timeout(Duration::from_secs(5), handle).await;
1114        }
1115
1116        self.base
1117            .set_status(
1118                ComponentStatus::Stopped,
1119                Some("Adaptive HTTP source stopped".to_string()),
1120            )
1121            .await;
1122
1123        Ok(())
1124    }
1125
1126    async fn status(&self) -> ComponentStatus {
1127        self.base.get_status().await
1128    }
1129
1130    async fn subscribe(
1131        &self,
1132        settings: drasi_lib::config::SourceSubscriptionSettings,
1133    ) -> Result<SubscriptionResponse> {
1134        self.base.subscribe_with_bootstrap(&settings, "HTTP").await
1135    }
1136
1137    fn as_any(&self) -> &dyn std::any::Any {
1138        self
1139    }
1140
1141    async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
1142        self.base.initialize(context).await;
1143    }
1144
1145    async fn set_bootstrap_provider(
1146        &self,
1147        provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
1148    ) {
1149        self.base.set_bootstrap_provider(provider).await;
1150    }
1151}
1152
1153/// Builder for HttpSource instances.
1154///
1155/// Provides a fluent API for constructing HTTP sources with sensible defaults
1156/// and adaptive batching settings. The builder takes the source ID at construction
1157/// and returns a fully constructed `HttpSource` from `build()`.
1158///
1159/// # Example
1160///
1161/// ```rust,ignore
1162/// use drasi_source_http::HttpSource;
1163///
1164/// let source = HttpSource::builder("my-source")
1165///     .with_host("0.0.0.0")
1166///     .with_port(8080)
1167///     .with_adaptive_enabled(true)
1168///     .with_bootstrap_provider(my_provider)
1169///     .build()?;
1170/// ```
1171pub struct HttpSourceBuilder {
1172    id: String,
1173    host: String,
1174    port: u16,
1175    endpoint: Option<String>,
1176    timeout_ms: u64,
1177    adaptive_max_batch_size: Option<usize>,
1178    adaptive_min_batch_size: Option<usize>,
1179    adaptive_max_wait_ms: Option<u64>,
1180    adaptive_min_wait_ms: Option<u64>,
1181    adaptive_window_secs: Option<u64>,
1182    adaptive_enabled: Option<bool>,
1183    webhooks: Option<WebhookConfig>,
1184    dispatch_mode: Option<DispatchMode>,
1185    dispatch_buffer_capacity: Option<usize>,
1186    bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
1187    auto_start: bool,
1188}
1189
1190impl HttpSourceBuilder {
1191    /// Create a new HTTP source builder with the given source ID.
1192    ///
1193    /// # Arguments
1194    ///
1195    /// * `id` - Unique identifier for the source instance
1196    pub fn new(id: impl Into<String>) -> Self {
1197        Self {
1198            id: id.into(),
1199            host: String::new(),
1200            port: 8080,
1201            endpoint: None,
1202            timeout_ms: 10000,
1203            adaptive_max_batch_size: None,
1204            adaptive_min_batch_size: None,
1205            adaptive_max_wait_ms: None,
1206            adaptive_min_wait_ms: None,
1207            adaptive_window_secs: None,
1208            adaptive_enabled: None,
1209            webhooks: None,
1210            dispatch_mode: None,
1211            dispatch_buffer_capacity: None,
1212            bootstrap_provider: None,
1213            auto_start: true,
1214        }
1215    }
1216
1217    /// Set the HTTP host
1218    pub fn with_host(mut self, host: impl Into<String>) -> Self {
1219        self.host = host.into();
1220        self
1221    }
1222
1223    /// Set the HTTP port
1224    pub fn with_port(mut self, port: u16) -> Self {
1225        self.port = port;
1226        self
1227    }
1228
1229    /// Set the endpoint path
1230    pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
1231        self.endpoint = Some(endpoint.into());
1232        self
1233    }
1234
1235    /// Set the request timeout in milliseconds
1236    pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
1237        self.timeout_ms = timeout_ms;
1238        self
1239    }
1240
1241    /// Set the adaptive batching maximum batch size
1242    pub fn with_adaptive_max_batch_size(mut self, size: usize) -> Self {
1243        self.adaptive_max_batch_size = Some(size);
1244        self
1245    }
1246
1247    /// Set the adaptive batching minimum batch size
1248    pub fn with_adaptive_min_batch_size(mut self, size: usize) -> Self {
1249        self.adaptive_min_batch_size = Some(size);
1250        self
1251    }
1252
1253    /// Set the adaptive batching maximum wait time in milliseconds
1254    pub fn with_adaptive_max_wait_ms(mut self, wait_ms: u64) -> Self {
1255        self.adaptive_max_wait_ms = Some(wait_ms);
1256        self
1257    }
1258
1259    /// Set the adaptive batching minimum wait time in milliseconds
1260    pub fn with_adaptive_min_wait_ms(mut self, wait_ms: u64) -> Self {
1261        self.adaptive_min_wait_ms = Some(wait_ms);
1262        self
1263    }
1264
1265    /// Set the adaptive batching throughput window in seconds
1266    pub fn with_adaptive_window_secs(mut self, secs: u64) -> Self {
1267        self.adaptive_window_secs = Some(secs);
1268        self
1269    }
1270
1271    /// Enable or disable adaptive batching
1272    pub fn with_adaptive_enabled(mut self, enabled: bool) -> Self {
1273        self.adaptive_enabled = Some(enabled);
1274        self
1275    }
1276
1277    /// Set the dispatch mode for event routing.
1278    pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
1279        self.dispatch_mode = Some(mode);
1280        self
1281    }
1282
1283    /// Set the dispatch buffer capacity.
1284    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
1285        self.dispatch_buffer_capacity = Some(capacity);
1286        self
1287    }
1288
1289    /// Set the bootstrap provider for initial data delivery.
1290    pub fn with_bootstrap_provider(
1291        mut self,
1292        provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
1293    ) -> Self {
1294        self.bootstrap_provider = Some(Box::new(provider));
1295        self
1296    }
1297
1298    /// Set whether this source should auto-start when DrasiLib starts.
1299    ///
1300    /// Default is `true`. Set to `false` if this source should only be
1301    /// started manually via `start_source()`.
1302    pub fn with_auto_start(mut self, auto_start: bool) -> Self {
1303        self.auto_start = auto_start;
1304        self
1305    }
1306
1307    /// Set the webhook configuration to enable webhook mode.
1308    ///
1309    /// When webhook mode is enabled, the standard `HttpSourceChange` endpoints
1310    /// are disabled and custom webhook routes are used instead.
1311    pub fn with_webhooks(mut self, webhooks: WebhookConfig) -> Self {
1312        self.webhooks = Some(webhooks);
1313        self
1314    }
1315
1316    /// Set the full configuration at once
1317    pub fn with_config(mut self, config: HttpSourceConfig) -> Self {
1318        self.host = config.host;
1319        self.port = config.port;
1320        self.endpoint = config.endpoint;
1321        self.timeout_ms = config.timeout_ms;
1322        self.adaptive_max_batch_size = config.adaptive_max_batch_size;
1323        self.adaptive_min_batch_size = config.adaptive_min_batch_size;
1324        self.adaptive_max_wait_ms = config.adaptive_max_wait_ms;
1325        self.adaptive_min_wait_ms = config.adaptive_min_wait_ms;
1326        self.adaptive_window_secs = config.adaptive_window_secs;
1327        self.adaptive_enabled = config.adaptive_enabled;
1328        self.webhooks = config.webhooks;
1329        self
1330    }
1331
1332    /// Build the HttpSource instance.
1333    ///
1334    /// # Returns
1335    ///
1336    /// A fully constructed `HttpSource`, or an error if construction fails.
1337    pub fn build(self) -> Result<HttpSource> {
1338        let config = HttpSourceConfig {
1339            host: self.host,
1340            port: self.port,
1341            endpoint: self.endpoint,
1342            timeout_ms: self.timeout_ms,
1343            adaptive_max_batch_size: self.adaptive_max_batch_size,
1344            adaptive_min_batch_size: self.adaptive_min_batch_size,
1345            adaptive_max_wait_ms: self.adaptive_max_wait_ms,
1346            adaptive_min_wait_ms: self.adaptive_min_wait_ms,
1347            adaptive_window_secs: self.adaptive_window_secs,
1348            adaptive_enabled: self.adaptive_enabled,
1349            webhooks: self.webhooks,
1350        };
1351
1352        // Build SourceBaseParams with all settings
1353        let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
1354        if let Some(mode) = self.dispatch_mode {
1355            params = params.with_dispatch_mode(mode);
1356        }
1357        if let Some(capacity) = self.dispatch_buffer_capacity {
1358            params = params.with_dispatch_buffer_capacity(capacity);
1359        }
1360        if let Some(provider) = self.bootstrap_provider {
1361            params = params.with_bootstrap_provider(provider);
1362        }
1363
1364        // Configure adaptive batching
1365        let mut adaptive_config = AdaptiveBatchConfig::default();
1366        if let Some(max_batch) = config.adaptive_max_batch_size {
1367            adaptive_config.max_batch_size = max_batch;
1368        }
1369        if let Some(min_batch) = config.adaptive_min_batch_size {
1370            adaptive_config.min_batch_size = min_batch;
1371        }
1372        if let Some(max_wait_ms) = config.adaptive_max_wait_ms {
1373            adaptive_config.max_wait_time = Duration::from_millis(max_wait_ms);
1374        }
1375        if let Some(min_wait_ms) = config.adaptive_min_wait_ms {
1376            adaptive_config.min_wait_time = Duration::from_millis(min_wait_ms);
1377        }
1378        if let Some(window_secs) = config.adaptive_window_secs {
1379            adaptive_config.throughput_window = Duration::from_secs(window_secs);
1380        }
1381        if let Some(enabled) = config.adaptive_enabled {
1382            adaptive_config.adaptive_enabled = enabled;
1383        }
1384
1385        Ok(HttpSource {
1386            base: SourceBase::new(params)?,
1387            config,
1388            adaptive_config,
1389        })
1390    }
1391}
1392
1393impl HttpSource {
1394    /// Create a builder for HttpSource with the given ID.
1395    ///
1396    /// This is the recommended way to construct an HttpSource.
1397    ///
1398    /// # Arguments
1399    ///
1400    /// * `id` - Unique identifier for the source instance
1401    ///
1402    /// # Example
1403    ///
1404    /// ```rust,ignore
1405    /// let source = HttpSource::builder("my-source")
1406    ///     .with_host("0.0.0.0")
1407    ///     .with_port(8080)
1408    ///     .with_bootstrap_provider(my_provider)
1409    ///     .build()?;
1410    /// ```
1411    pub fn builder(id: impl Into<String>) -> HttpSourceBuilder {
1412        HttpSourceBuilder::new(id)
1413    }
1414}
1415
1416/// Handle errors according to configured error behavior
1417fn handle_error(
1418    behavior: &ErrorBehavior,
1419    source_id: &str,
1420    status: StatusCode,
1421    message: &str,
1422    detail: Option<&str>,
1423) -> (StatusCode, Json<EventResponse>) {
1424    match behavior {
1425        ErrorBehavior::Reject => {
1426            debug!("[{source_id}] Rejecting request: {message}");
1427            (
1428                status,
1429                Json(EventResponse {
1430                    success: false,
1431                    message: message.to_string(),
1432                    error: detail.map(String::from),
1433                }),
1434            )
1435        }
1436        ErrorBehavior::AcceptAndLog => {
1437            warn!("[{source_id}] Accepting with error (logged): {message}");
1438            (
1439                StatusCode::OK,
1440                Json(EventResponse {
1441                    success: true,
1442                    message: format!("Accepted with warning: {message}"),
1443                    error: detail.map(String::from),
1444                }),
1445            )
1446        }
1447        ErrorBehavior::AcceptAndSkip => {
1448            trace!("[{source_id}] Accepting silently: {message}");
1449            (
1450                StatusCode::OK,
1451                Json(EventResponse {
1452                    success: true,
1453                    message: "Accepted".to_string(),
1454                    error: None,
1455                }),
1456            )
1457        }
1458    }
1459}
1460
1461/// Parse query string into a HashMap
1462fn parse_query_string(query: Option<&str>) -> HashMap<String, String> {
1463    query
1464        .map(|q| {
1465            q.split('&')
1466                .filter_map(|pair| {
1467                    let mut parts = pair.splitn(2, '=');
1468                    let key = parts.next()?;
1469                    let value = parts.next().unwrap_or("");
1470                    Some((urlencoding_decode(key), urlencoding_decode(value)))
1471                })
1472                .collect()
1473        })
1474        .unwrap_or_default()
1475}
1476
1477/// Simple URL decoding (handles %XX sequences) with proper UTF-8 handling
1478fn urlencoding_decode(s: &str) -> String {
1479    // Collect decoded bytes first, then convert to String to properly handle UTF-8
1480    let mut decoded: Vec<u8> = Vec::with_capacity(s.len());
1481    let mut chars = s.chars();
1482
1483    while let Some(c) = chars.next() {
1484        if c == '%' {
1485            let mut hex = String::new();
1486            if let Some(c1) = chars.next() {
1487                hex.push(c1);
1488            }
1489            if let Some(c2) = chars.next() {
1490                hex.push(c2);
1491            }
1492
1493            if hex.len() == 2 {
1494                if let Ok(byte) = u8::from_str_radix(&hex, 16) {
1495                    decoded.push(byte);
1496                    continue;
1497                }
1498            }
1499
1500            // If we couldn't decode a valid %XX sequence, keep the original text
1501            decoded.extend_from_slice(b"%");
1502            decoded.extend_from_slice(hex.as_bytes());
1503        } else if c == '+' {
1504            decoded.push(b' ');
1505        } else {
1506            // Encode the character as UTF-8 and append its bytes
1507            let mut buf = [0u8; 4];
1508            let encoded = c.encode_utf8(&mut buf);
1509            decoded.extend_from_slice(encoded.as_bytes());
1510        }
1511    }
1512
1513    // Convert bytes to string, replacing invalid UTF-8 sequences
1514    String::from_utf8_lossy(&decoded).into_owned()
1515}
1516
1517/// Build a CORS layer from configuration
1518fn build_cors_layer(cors_config: &CorsConfig) -> CorsLayer {
1519    let mut cors = CorsLayer::new();
1520
1521    // Configure allowed origins
1522    if cors_config.allow_origins.len() == 1 && cors_config.allow_origins[0] == "*" {
1523        cors = cors.allow_origin(Any);
1524    } else {
1525        let origins: Vec<_> = cors_config
1526            .allow_origins
1527            .iter()
1528            .filter_map(|o| o.parse().ok())
1529            .collect();
1530        cors = cors.allow_origin(origins);
1531    }
1532
1533    // Configure allowed methods
1534    let methods: Vec<Method> = cors_config
1535        .allow_methods
1536        .iter()
1537        .filter_map(|m| m.parse().ok())
1538        .collect();
1539    cors = cors.allow_methods(methods);
1540
1541    // Configure allowed headers
1542    if cors_config.allow_headers.len() == 1 && cors_config.allow_headers[0] == "*" {
1543        cors = cors.allow_headers(Any);
1544    } else {
1545        let headers: Vec<header::HeaderName> = cors_config
1546            .allow_headers
1547            .iter()
1548            .filter_map(|h| h.parse().ok())
1549            .collect();
1550        cors = cors.allow_headers(headers);
1551    }
1552
1553    // Configure exposed headers
1554    if !cors_config.expose_headers.is_empty() {
1555        let exposed: Vec<header::HeaderName> = cors_config
1556            .expose_headers
1557            .iter()
1558            .filter_map(|h| h.parse().ok())
1559            .collect();
1560        cors = cors.expose_headers(exposed);
1561    }
1562
1563    // Configure credentials
1564    if cors_config.allow_credentials {
1565        cors = cors.allow_credentials(true);
1566    }
1567
1568    // Configure max age
1569    cors = cors.max_age(Duration::from_secs(cors_config.max_age));
1570
1571    cors
1572}
1573
1574#[cfg(test)]
1575mod tests {
1576    use super::*;
1577
1578    mod construction {
1579        use super::*;
1580
1581        #[test]
1582        fn test_builder_with_valid_config() {
1583            let source = HttpSourceBuilder::new("test-source")
1584                .with_host("localhost")
1585                .with_port(8080)
1586                .build();
1587            assert!(source.is_ok());
1588        }
1589
1590        #[test]
1591        fn test_builder_with_custom_config() {
1592            let source = HttpSourceBuilder::new("http-source")
1593                .with_host("0.0.0.0")
1594                .with_port(9000)
1595                .with_endpoint("/events")
1596                .build()
1597                .unwrap();
1598            assert_eq!(source.id(), "http-source");
1599        }
1600
1601        #[test]
1602        fn test_with_dispatch_creates_source() {
1603            let config = HttpSourceConfig {
1604                host: "localhost".to_string(),
1605                port: 8080,
1606                endpoint: None,
1607                timeout_ms: 10000,
1608                adaptive_max_batch_size: None,
1609                adaptive_min_batch_size: None,
1610                adaptive_max_wait_ms: None,
1611                adaptive_min_wait_ms: None,
1612                adaptive_window_secs: None,
1613                adaptive_enabled: None,
1614                webhooks: None,
1615            };
1616            let source = HttpSource::with_dispatch(
1617                "dispatch-source",
1618                config,
1619                Some(DispatchMode::Channel),
1620                Some(1000),
1621            );
1622            assert!(source.is_ok());
1623            assert_eq!(source.unwrap().id(), "dispatch-source");
1624        }
1625    }
1626
1627    mod properties {
1628        use super::*;
1629
1630        #[test]
1631        fn test_id_returns_correct_value() {
1632            let source = HttpSourceBuilder::new("my-http-source")
1633                .with_host("localhost")
1634                .build()
1635                .unwrap();
1636            assert_eq!(source.id(), "my-http-source");
1637        }
1638
1639        #[test]
1640        fn test_type_name_returns_http() {
1641            let source = HttpSourceBuilder::new("test")
1642                .with_host("localhost")
1643                .build()
1644                .unwrap();
1645            assert_eq!(source.type_name(), "http");
1646        }
1647
1648        #[test]
1649        fn test_properties_contains_host_and_port() {
1650            let source = HttpSourceBuilder::new("test")
1651                .with_host("192.168.1.1")
1652                .with_port(9000)
1653                .build()
1654                .unwrap();
1655            let props = source.properties();
1656
1657            assert_eq!(
1658                props.get("host"),
1659                Some(&serde_json::Value::String("192.168.1.1".to_string()))
1660            );
1661            assert_eq!(
1662                props.get("port"),
1663                Some(&serde_json::Value::Number(9000.into()))
1664            );
1665        }
1666
1667        #[test]
1668        fn test_properties_includes_endpoint_when_set() {
1669            let source = HttpSourceBuilder::new("test")
1670                .with_host("localhost")
1671                .with_endpoint("/api/v1")
1672                .build()
1673                .unwrap();
1674            let props = source.properties();
1675
1676            assert_eq!(
1677                props.get("endpoint"),
1678                Some(&serde_json::Value::String("/api/v1".to_string()))
1679            );
1680        }
1681
1682        #[test]
1683        fn test_properties_excludes_endpoint_when_none() {
1684            let source = HttpSourceBuilder::new("test")
1685                .with_host("localhost")
1686                .build()
1687                .unwrap();
1688            let props = source.properties();
1689
1690            assert!(!props.contains_key("endpoint"));
1691        }
1692    }
1693
1694    mod lifecycle {
1695        use super::*;
1696
1697        #[tokio::test]
1698        async fn test_initial_status_is_stopped() {
1699            let source = HttpSourceBuilder::new("test")
1700                .with_host("localhost")
1701                .build()
1702                .unwrap();
1703            assert_eq!(source.status().await, ComponentStatus::Stopped);
1704        }
1705    }
1706
1707    mod builder {
1708        use super::*;
1709
1710        #[test]
1711        fn test_http_builder_defaults() {
1712            let source = HttpSourceBuilder::new("test").build().unwrap();
1713            assert_eq!(source.config.port, 8080);
1714            assert_eq!(source.config.timeout_ms, 10000);
1715            assert_eq!(source.config.endpoint, None);
1716        }
1717
1718        #[test]
1719        fn test_http_builder_custom_values() {
1720            let source = HttpSourceBuilder::new("test")
1721                .with_host("api.example.com")
1722                .with_port(9000)
1723                .with_endpoint("/webhook")
1724                .with_timeout_ms(5000)
1725                .build()
1726                .unwrap();
1727
1728            assert_eq!(source.config.host, "api.example.com");
1729            assert_eq!(source.config.port, 9000);
1730            assert_eq!(source.config.endpoint, Some("/webhook".to_string()));
1731            assert_eq!(source.config.timeout_ms, 5000);
1732        }
1733
1734        #[test]
1735        fn test_http_builder_adaptive_batching() {
1736            let source = HttpSourceBuilder::new("test")
1737                .with_host("localhost")
1738                .with_adaptive_max_batch_size(1000)
1739                .with_adaptive_min_batch_size(10)
1740                .with_adaptive_max_wait_ms(500)
1741                .with_adaptive_min_wait_ms(50)
1742                .with_adaptive_window_secs(60)
1743                .with_adaptive_enabled(true)
1744                .build()
1745                .unwrap();
1746
1747            assert_eq!(source.config.adaptive_max_batch_size, Some(1000));
1748            assert_eq!(source.config.adaptive_min_batch_size, Some(10));
1749            assert_eq!(source.config.adaptive_max_wait_ms, Some(500));
1750            assert_eq!(source.config.adaptive_min_wait_ms, Some(50));
1751            assert_eq!(source.config.adaptive_window_secs, Some(60));
1752            assert_eq!(source.config.adaptive_enabled, Some(true));
1753        }
1754
1755        #[test]
1756        fn test_builder_id() {
1757            let source = HttpSource::builder("my-http-source")
1758                .with_host("localhost")
1759                .build()
1760                .unwrap();
1761
1762            assert_eq!(source.base.id, "my-http-source");
1763        }
1764    }
1765
1766    mod event_conversion {
1767        use super::*;
1768
1769        #[test]
1770        fn test_convert_node_insert() {
1771            let mut props = serde_json::Map::new();
1772            props.insert(
1773                "name".to_string(),
1774                serde_json::Value::String("Alice".to_string()),
1775            );
1776            props.insert("age".to_string(), serde_json::Value::Number(30.into()));
1777
1778            let http_change = HttpSourceChange::Insert {
1779                element: HttpElement::Node {
1780                    id: "user-1".to_string(),
1781                    labels: vec!["User".to_string()],
1782                    properties: props,
1783                },
1784                timestamp: Some(1234567890000000000),
1785            };
1786
1787            let result = convert_http_to_source_change(&http_change, "test-source");
1788            assert!(result.is_ok());
1789
1790            match result.unwrap() {
1791                drasi_core::models::SourceChange::Insert { element } => match element {
1792                    drasi_core::models::Element::Node {
1793                        metadata,
1794                        properties,
1795                    } => {
1796                        assert_eq!(metadata.reference.element_id.as_ref(), "user-1");
1797                        assert_eq!(metadata.labels.len(), 1);
1798                        assert_eq!(metadata.effective_from, 1234567890000);
1799                        assert!(properties.get("name").is_some());
1800                        assert!(properties.get("age").is_some());
1801                    }
1802                    _ => panic!("Expected Node element"),
1803                },
1804                _ => panic!("Expected Insert operation"),
1805            }
1806        }
1807
1808        #[test]
1809        fn test_convert_relation_insert() {
1810            let http_change = HttpSourceChange::Insert {
1811                element: HttpElement::Relation {
1812                    id: "follows-1".to_string(),
1813                    labels: vec!["FOLLOWS".to_string()],
1814                    from: "user-1".to_string(),
1815                    to: "user-2".to_string(),
1816                    properties: serde_json::Map::new(),
1817                },
1818                timestamp: None,
1819            };
1820
1821            let result = convert_http_to_source_change(&http_change, "test-source");
1822            assert!(result.is_ok());
1823
1824            match result.unwrap() {
1825                drasi_core::models::SourceChange::Insert { element } => match element {
1826                    drasi_core::models::Element::Relation {
1827                        metadata,
1828                        out_node,
1829                        in_node,
1830                        ..
1831                    } => {
1832                        assert_eq!(metadata.reference.element_id.as_ref(), "follows-1");
1833                        assert_eq!(in_node.element_id.as_ref(), "user-1");
1834                        assert_eq!(out_node.element_id.as_ref(), "user-2");
1835                    }
1836                    _ => panic!("Expected Relation element"),
1837                },
1838                _ => panic!("Expected Insert operation"),
1839            }
1840        }
1841
1842        #[test]
1843        fn test_convert_delete() {
1844            let http_change = HttpSourceChange::Delete {
1845                id: "user-1".to_string(),
1846                labels: Some(vec!["User".to_string()]),
1847                timestamp: Some(9999999999),
1848            };
1849
1850            let result = convert_http_to_source_change(&http_change, "test-source");
1851            assert!(result.is_ok());
1852
1853            match result.unwrap() {
1854                drasi_core::models::SourceChange::Delete { metadata } => {
1855                    assert_eq!(metadata.reference.element_id.as_ref(), "user-1");
1856                    assert_eq!(metadata.labels.len(), 1);
1857                }
1858                _ => panic!("Expected Delete operation"),
1859            }
1860        }
1861
1862        #[test]
1863        fn test_convert_update() {
1864            let http_change = HttpSourceChange::Update {
1865                element: HttpElement::Node {
1866                    id: "user-1".to_string(),
1867                    labels: vec!["User".to_string()],
1868                    properties: serde_json::Map::new(),
1869                },
1870                timestamp: None,
1871            };
1872
1873            let result = convert_http_to_source_change(&http_change, "test-source");
1874            assert!(result.is_ok());
1875
1876            match result.unwrap() {
1877                drasi_core::models::SourceChange::Update { .. } => {
1878                    // Success
1879                }
1880                _ => panic!("Expected Update operation"),
1881            }
1882        }
1883    }
1884
1885    mod adaptive_config {
1886        use super::*;
1887
1888        #[test]
1889        fn test_adaptive_config_from_http_config() {
1890            let source = HttpSourceBuilder::new("test")
1891                .with_host("localhost")
1892                .with_adaptive_max_batch_size(500)
1893                .with_adaptive_enabled(true)
1894                .build()
1895                .unwrap();
1896
1897            // The adaptive config should be initialized from the http config
1898            assert_eq!(source.adaptive_config.max_batch_size, 500);
1899            assert!(source.adaptive_config.adaptive_enabled);
1900        }
1901
1902        #[test]
1903        fn test_adaptive_config_uses_defaults_when_not_specified() {
1904            let source = HttpSourceBuilder::new("test")
1905                .with_host("localhost")
1906                .build()
1907                .unwrap();
1908
1909            // Should use AdaptiveBatchConfig defaults
1910            let default_config = AdaptiveBatchConfig::default();
1911            assert_eq!(
1912                source.adaptive_config.max_batch_size,
1913                default_config.max_batch_size
1914            );
1915            assert_eq!(
1916                source.adaptive_config.min_batch_size,
1917                default_config.min_batch_size
1918            );
1919        }
1920    }
1921}
1922
1923/// Dynamic plugin entry point.
1924///
1925/// Dynamic plugin entry point.
1926#[cfg(feature = "dynamic-plugin")]
1927drasi_plugin_sdk::export_plugin!(
1928    plugin_id = "http-source",
1929    core_version = env!("CARGO_PKG_VERSION"),
1930    lib_version = env!("CARGO_PKG_VERSION"),
1931    plugin_version = env!("CARGO_PKG_VERSION"),
1932    source_descriptors = [descriptor::HttpSourceDescriptor],
1933    reaction_descriptors = [],
1934    bootstrap_descriptors = [],
1935);