Skip to main content

drasi_source_http/
lib.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! HTTP Source Plugin for Drasi
16//!
17//! This plugin exposes HTTP endpoints for receiving data change events. It includes
18//! adaptive batching for optimized throughput and supports both single-event and
19//! batch submission modes.
20//!
21//! # Endpoints
22//!
23//! The HTTP source exposes the following endpoints:
24//!
25//! - **`POST /sources/{source_id}/events`** - Submit a single event
26//! - **`POST /sources/{source_id}/events/batch`** - Submit multiple events
27//! - **`GET /health`** - Health check endpoint
28//!
29//! # Data Format
30//!
31//! Events are submitted as JSON using the `HttpSourceChange` format:
32//!
33//! ## Insert Operation
34//!
35//! ```json
36//! {
37//!     "operation": "insert",
38//!     "element": {
39//!         "type": "node",
40//!         "id": "user-123",
41//!         "labels": ["User"],
42//!         "properties": {
43//!             "name": "Alice",
44//!             "email": "alice@example.com"
45//!         }
46//!     },
47//!     "timestamp": 1699900000000000000
48//! }
49//! ```
50//!
51//! ## Update Operation
52//!
53//! ```json
54//! {
55//!     "operation": "update",
56//!     "element": {
57//!         "type": "node",
58//!         "id": "user-123",
59//!         "labels": ["User"],
60//!         "properties": {
61//!             "name": "Alice Updated"
62//!         }
63//!     }
64//! }
65//! ```
66//!
67//! ## Delete Operation
68//!
69//! ```json
70//! {
71//!     "operation": "delete",
72//!     "id": "user-123",
73//!     "labels": ["User"]
74//! }
75//! ```
76//!
77//! ## Relation Element
78//!
79//! ```json
80//! {
81//!     "operation": "insert",
82//!     "element": {
83//!         "type": "relation",
84//!         "id": "follows-1",
85//!         "labels": ["FOLLOWS"],
86//!         "from": "user-123",
87//!         "to": "user-456",
88//!         "properties": {}
89//!     }
90//! }
91//! ```
92//!
93//! # Batch Submission
94//!
95//! ```json
96//! {
97//!     "events": [
98//!         { "operation": "insert", ... },
99//!         { "operation": "update", ... }
100//!     ]
101//! }
102//! ```
103//!
104//! # Adaptive Batching
105//!
106//! The HTTP source includes adaptive batching to optimize throughput. Events are
107//! buffered and dispatched in batches, with batch size and timing adjusted based
108//! on throughput patterns.
109//!
110//! | Parameter | Default | Description |
111//! |-----------|---------|-------------|
112//! | `adaptive_enabled` | `true` | Enable/disable adaptive batching |
113//! | `adaptive_max_batch_size` | `1000` | Maximum events per batch |
114//! | `adaptive_min_batch_size` | `1` | Minimum events per batch |
115//! | `adaptive_max_wait_ms` | `100` | Maximum wait time before dispatching |
116//! | `adaptive_min_wait_ms` | `10` | Minimum wait time between batches |
117//!
118//! # Configuration
119//!
120//! | Field | Type | Default | Description |
121//! |-------|------|---------|-------------|
122//! | `host` | string | *required* | Host address to bind to |
123//! | `port` | u16 | `8080` | Port to listen on |
124//! | `endpoint` | string | None | Optional custom path prefix |
125//! | `timeout_ms` | u64 | `10000` | Request timeout in milliseconds |
126//!
127//! # Example Configuration (YAML)
128//!
129//! ```yaml
130//! source_type: http
131//! properties:
132//!   host: "0.0.0.0"
133//!   port: 8080
134//!   adaptive_enabled: true
135//!   adaptive_max_batch_size: 500
136//! ```
137//!
138//! # Usage Examples
139//!
140//! ## Rust
141//!
142//! ```rust,ignore
143//! use drasi_source_http::{HttpSource, HttpSourceBuilder};
144//!
145//! let config = HttpSourceBuilder::new()
146//!     .with_host("0.0.0.0")
147//!     .with_port(8080)
148//!     .with_adaptive_enabled(true)
149//!     .build();
150//!
151//! let source = Arc::new(HttpSource::new("http-source", config)?);
152//! drasi.add_source(source).await?;
153//! ```
154//!
155//! ## curl (Single Event)
156//!
157//! ```bash
158//! curl -X POST http://localhost:8080/sources/my-source/events \
159//!   -H "Content-Type: application/json" \
160//!   -d '{"operation":"insert","element":{"type":"node","id":"1","labels":["Test"],"properties":{}}}'
161//! ```
162//!
163//! ## curl (Batch)
164//!
165//! ```bash
166//! curl -X POST http://localhost:8080/sources/my-source/events/batch \
167//!   -H "Content-Type: application/json" \
168//!   -d '{"events":[...]}'
169//! ```
170
171pub mod config;
172pub use config::HttpSourceConfig;
173
174mod adaptive_batcher;
175mod models;
176mod time;
177
178// Export HTTP source models and conversion
179pub use models::{convert_http_to_source_change, HttpElement, HttpSourceChange};
180
181use anyhow::Result;
182use async_trait::async_trait;
183use axum::{
184    extract::{Path, State},
185    http::StatusCode,
186    response::IntoResponse,
187    routing::{get, post},
188    Json, Router,
189};
190use log::{debug, error, info, trace};
191use serde::{Deserialize, Serialize};
192use std::collections::HashMap;
193use std::sync::Arc;
194use std::time::Duration;
195use tokio::sync::mpsc;
196use tokio::time::timeout;
197
198use drasi_lib::channels::*;
199use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
200use drasi_lib::Source;
201
202use crate::adaptive_batcher::{AdaptiveBatchConfig, AdaptiveBatcher};
203
204/// Response for event submission
205#[derive(Debug, Serialize, Deserialize)]
206pub struct EventResponse {
207    pub success: bool,
208    pub message: String,
209    #[serde(skip_serializing_if = "Option::is_none")]
210    pub error: Option<String>,
211}
212
213/// HTTP source with configurable adaptive batching.
214///
215/// This source exposes HTTP endpoints for receiving data change events.
216/// It supports both single-event and batch submission modes, with adaptive
217/// batching for optimized throughput.
218///
219/// # Fields
220///
221/// - `base`: Common source functionality (dispatchers, status, lifecycle)
222/// - `config`: HTTP-specific configuration (host, port, timeout)
223/// - `adaptive_config`: Adaptive batching settings for throughput optimization
224pub struct HttpSource {
225    /// Base source implementation providing common functionality
226    base: SourceBase,
227    /// HTTP source configuration
228    config: HttpSourceConfig,
229    /// Adaptive batching configuration for throughput optimization
230    adaptive_config: AdaptiveBatchConfig,
231}
232
233/// Batch event request that can accept multiple events
234#[derive(Debug, Clone, Serialize, Deserialize)]
235pub struct BatchEventRequest {
236    pub events: Vec<HttpSourceChange>,
237}
238
239/// HTTP source app state with batching channel.
240///
241/// Shared state passed to Axum route handlers.
242#[derive(Clone)]
243struct HttpAppState {
244    /// The source ID for validation against incoming requests
245    source_id: String,
246    /// Channel for sending events to the adaptive batcher
247    batch_tx: mpsc::Sender<SourceChangeEvent>,
248}
249
250impl HttpSource {
251    /// Create a new HTTP source.
252    ///
253    /// The event channel is automatically injected when the source is added
254    /// to DrasiLib via `add_source()`.
255    ///
256    /// # Arguments
257    ///
258    /// * `id` - Unique identifier for this source instance
259    /// * `config` - HTTP source configuration
260    ///
261    /// # Returns
262    ///
263    /// A new `HttpSource` instance, or an error if construction fails.
264    ///
265    /// # Errors
266    ///
267    /// Returns an error if the base source cannot be initialized.
268    ///
269    /// # Example
270    ///
271    /// ```rust,ignore
272    /// use drasi_source_http::{HttpSource, HttpSourceBuilder};
273    ///
274    /// let config = HttpSourceBuilder::new()
275    ///     .with_host("0.0.0.0")
276    ///     .with_port(8080)
277    ///     .build();
278    ///
279    /// let source = HttpSource::new("my-http-source", config)?;
280    /// ```
281    pub fn new(id: impl Into<String>, config: HttpSourceConfig) -> Result<Self> {
282        let id = id.into();
283        let params = SourceBaseParams::new(id);
284
285        // Configure adaptive batching
286        let mut adaptive_config = AdaptiveBatchConfig::default();
287
288        // Allow overriding adaptive parameters from config
289        if let Some(max_batch) = config.adaptive_max_batch_size {
290            adaptive_config.max_batch_size = max_batch;
291        }
292        if let Some(min_batch) = config.adaptive_min_batch_size {
293            adaptive_config.min_batch_size = min_batch;
294        }
295        if let Some(max_wait_ms) = config.adaptive_max_wait_ms {
296            adaptive_config.max_wait_time = Duration::from_millis(max_wait_ms);
297        }
298        if let Some(min_wait_ms) = config.adaptive_min_wait_ms {
299            adaptive_config.min_wait_time = Duration::from_millis(min_wait_ms);
300        }
301        if let Some(window_secs) = config.adaptive_window_secs {
302            adaptive_config.throughput_window = Duration::from_secs(window_secs);
303        }
304        if let Some(enabled) = config.adaptive_enabled {
305            adaptive_config.adaptive_enabled = enabled;
306        }
307
308        Ok(Self {
309            base: SourceBase::new(params)?,
310            config,
311            adaptive_config,
312        })
313    }
314
315    /// Create a new HTTP source with custom dispatch settings.
316    ///
317    /// The event channel is automatically injected when the source is added
318    /// to DrasiLib via `add_source()`.
319    ///
320    /// # Arguments
321    ///
322    /// * `id` - Unique identifier for this source instance
323    /// * `config` - HTTP source configuration
324    /// * `dispatch_mode` - Optional dispatch mode (Channel, Direct, etc.)
325    /// * `dispatch_buffer_capacity` - Optional buffer capacity for channel dispatch
326    ///
327    /// # Returns
328    ///
329    /// A new `HttpSource` instance with custom dispatch settings.
330    ///
331    /// # Errors
332    ///
333    /// Returns an error if the base source cannot be initialized.
334    pub fn with_dispatch(
335        id: impl Into<String>,
336        config: HttpSourceConfig,
337        dispatch_mode: Option<DispatchMode>,
338        dispatch_buffer_capacity: Option<usize>,
339    ) -> Result<Self> {
340        let id = id.into();
341        let mut params = SourceBaseParams::new(id);
342        if let Some(mode) = dispatch_mode {
343            params = params.with_dispatch_mode(mode);
344        }
345        if let Some(capacity) = dispatch_buffer_capacity {
346            params = params.with_dispatch_buffer_capacity(capacity);
347        }
348
349        let mut adaptive_config = AdaptiveBatchConfig::default();
350
351        if let Some(max_batch) = config.adaptive_max_batch_size {
352            adaptive_config.max_batch_size = max_batch;
353        }
354        if let Some(min_batch) = config.adaptive_min_batch_size {
355            adaptive_config.min_batch_size = min_batch;
356        }
357        if let Some(max_wait_ms) = config.adaptive_max_wait_ms {
358            adaptive_config.max_wait_time = Duration::from_millis(max_wait_ms);
359        }
360        if let Some(min_wait_ms) = config.adaptive_min_wait_ms {
361            adaptive_config.min_wait_time = Duration::from_millis(min_wait_ms);
362        }
363        if let Some(window_secs) = config.adaptive_window_secs {
364            adaptive_config.throughput_window = Duration::from_secs(window_secs);
365        }
366        if let Some(enabled) = config.adaptive_enabled {
367            adaptive_config.adaptive_enabled = enabled;
368        }
369
370        Ok(Self {
371            base: SourceBase::new(params)?,
372            config,
373            adaptive_config,
374        })
375    }
376
377    /// Handle a single event submission from `POST /sources/{source_id}/events`.
378    ///
379    /// Validates the source ID matches this source and converts the HTTP event
380    /// to a source change before sending to the adaptive batcher.
381    async fn handle_single_event(
382        Path(source_id): Path<String>,
383        State(state): State<HttpAppState>,
384        Json(event): Json<HttpSourceChange>,
385    ) -> Result<impl IntoResponse, (StatusCode, Json<EventResponse>)> {
386        debug!("[{source_id}] HTTP endpoint received single event: {event:?}");
387        Self::process_events(&source_id, &state, vec![event]).await
388    }
389
390    /// Handle a batch event submission from `POST /sources/{source_id}/events/batch`.
391    ///
392    /// Validates the source ID and processes all events in the batch,
393    /// returning partial success if some events fail.
394    async fn handle_batch_events(
395        Path(source_id): Path<String>,
396        State(state): State<HttpAppState>,
397        Json(batch): Json<BatchEventRequest>,
398    ) -> Result<impl IntoResponse, (StatusCode, Json<EventResponse>)> {
399        debug!(
400            "[{}] HTTP endpoint received batch of {} events",
401            source_id,
402            batch.events.len()
403        );
404        Self::process_events(&source_id, &state, batch.events).await
405    }
406
407    /// Process a list of events, converting and sending them to the batcher.
408    ///
409    /// # Arguments
410    ///
411    /// * `source_id` - Source ID from the request path
412    /// * `state` - Shared app state containing the batch channel
413    /// * `events` - List of HTTP source changes to process
414    ///
415    /// # Returns
416    ///
417    /// Success response with count of processed events, or error if all fail.
418    async fn process_events(
419        source_id: &str,
420        state: &HttpAppState,
421        events: Vec<HttpSourceChange>,
422    ) -> Result<impl IntoResponse, (StatusCode, Json<EventResponse>)> {
423        trace!("[{}] Processing {} events", source_id, events.len());
424
425        if source_id != state.source_id {
426            error!(
427                "[{}] Source name mismatch. Expected '{}', got '{}'",
428                state.source_id, state.source_id, source_id
429            );
430            return Err((
431                StatusCode::BAD_REQUEST,
432                Json(EventResponse {
433                    success: false,
434                    message: "Source name mismatch".to_string(),
435                    error: Some(format!(
436                        "Expected source '{}', got '{}'",
437                        state.source_id, source_id
438                    )),
439                }),
440            ));
441        }
442
443        let mut success_count = 0;
444        let mut error_count = 0;
445        let mut last_error = None;
446
447        for (idx, event) in events.iter().enumerate() {
448            match convert_http_to_source_change(event, source_id) {
449                Ok(source_change) => {
450                    let change_event = SourceChangeEvent {
451                        source_id: source_id.to_string(),
452                        change: source_change,
453                        timestamp: chrono::Utc::now(),
454                    };
455
456                    if let Err(e) = state.batch_tx.send(change_event).await {
457                        error!(
458                            "[{}] Failed to send event {} to batch channel: {}",
459                            state.source_id,
460                            idx + 1,
461                            e
462                        );
463                        error_count += 1;
464                        last_error = Some("Internal channel error".to_string());
465                    } else {
466                        success_count += 1;
467                    }
468                }
469                Err(e) => {
470                    error!(
471                        "[{}] Failed to convert event {}: {}",
472                        state.source_id,
473                        idx + 1,
474                        e
475                    );
476                    error_count += 1;
477                    last_error = Some(e.to_string());
478                }
479            }
480        }
481
482        debug!(
483            "[{source_id}] Event processing complete: {success_count} succeeded, {error_count} failed"
484        );
485
486        if error_count > 0 && success_count == 0 {
487            Err((
488                StatusCode::BAD_REQUEST,
489                Json(EventResponse {
490                    success: false,
491                    message: format!("All {error_count} events failed"),
492                    error: last_error,
493                }),
494            ))
495        } else if error_count > 0 {
496            Ok(Json(EventResponse {
497                success: true,
498                message: format!(
499                    "Processed {success_count} events successfully, {error_count} failed"
500                ),
501                error: last_error,
502            }))
503        } else {
504            Ok(Json(EventResponse {
505                success: true,
506                message: format!("All {success_count} events processed successfully"),
507                error: None,
508            }))
509        }
510    }
511
512    async fn health_check() -> impl IntoResponse {
513        Json(serde_json::json!({
514            "status": "healthy",
515            "service": "http-source",
516            "features": ["adaptive-batching", "batch-endpoint"]
517        }))
518    }
519
520    async fn run_adaptive_batcher(
521        batch_rx: mpsc::Receiver<SourceChangeEvent>,
522        dispatchers: Arc<
523            tokio::sync::RwLock<
524                Vec<
525                    Box<
526                        dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync,
527                    >,
528                >,
529            >,
530        >,
531        adaptive_config: AdaptiveBatchConfig,
532        source_id: String,
533    ) {
534        let mut batcher = AdaptiveBatcher::new(batch_rx, adaptive_config.clone());
535        let mut total_events = 0u64;
536        let mut total_batches = 0u64;
537
538        info!("[{source_id}] Adaptive HTTP batcher started with config: {adaptive_config:?}");
539
540        while let Some(batch) = batcher.next_batch().await {
541            if batch.is_empty() {
542                debug!("[{source_id}] Batcher received empty batch, skipping");
543                continue;
544            }
545
546            let batch_size = batch.len();
547            total_events += batch_size as u64;
548            total_batches += 1;
549
550            debug!(
551                "[{source_id}] Batcher forwarding batch #{total_batches} with {batch_size} events to dispatchers"
552            );
553
554            let mut sent_count = 0;
555            let mut failed_count = 0;
556            for (idx, event) in batch.into_iter().enumerate() {
557                debug!(
558                    "[{}] Batch #{}, dispatching event {}/{}",
559                    source_id,
560                    total_batches,
561                    idx + 1,
562                    batch_size
563                );
564
565                let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
566                profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
567
568                let wrapper = SourceEventWrapper::with_profiling(
569                    event.source_id.clone(),
570                    SourceEvent::Change(event.change),
571                    event.timestamp,
572                    profiling,
573                );
574
575                if let Err(e) =
576                    SourceBase::dispatch_from_task(dispatchers.clone(), wrapper.clone(), &source_id)
577                        .await
578                {
579                    error!(
580                        "[{}] Batch #{}, failed to dispatch event {}/{} (no subscribers): {}",
581                        source_id,
582                        total_batches,
583                        idx + 1,
584                        batch_size,
585                        e
586                    );
587                    failed_count += 1;
588                } else {
589                    debug!(
590                        "[{}] Batch #{}, successfully dispatched event {}/{}",
591                        source_id,
592                        total_batches,
593                        idx + 1,
594                        batch_size
595                    );
596                    sent_count += 1;
597                }
598            }
599
600            debug!(
601                "[{source_id}] Batch #{total_batches} complete: {sent_count} dispatched, {failed_count} failed"
602            );
603
604            if total_batches.is_multiple_of(100) {
605                info!(
606                    "[{}] Adaptive HTTP metrics - Batches: {}, Events: {}, Avg batch size: {:.1}",
607                    source_id,
608                    total_batches,
609                    total_events,
610                    total_events as f64 / total_batches as f64
611                );
612            }
613        }
614
615        info!(
616            "[{source_id}] Adaptive HTTP batcher stopped - Total batches: {total_batches}, Total events: {total_events}"
617        );
618    }
619}
620
621#[async_trait]
622impl Source for HttpSource {
623    fn id(&self) -> &str {
624        &self.base.id
625    }
626
627    fn type_name(&self) -> &str {
628        "http"
629    }
630
631    fn properties(&self) -> HashMap<String, serde_json::Value> {
632        let mut props = HashMap::new();
633        props.insert(
634            "host".to_string(),
635            serde_json::Value::String(self.config.host.clone()),
636        );
637        props.insert(
638            "port".to_string(),
639            serde_json::Value::Number(self.config.port.into()),
640        );
641        if let Some(ref endpoint) = self.config.endpoint {
642            props.insert(
643                "endpoint".to_string(),
644                serde_json::Value::String(endpoint.clone()),
645            );
646        }
647        props
648    }
649
650    fn auto_start(&self) -> bool {
651        self.base.get_auto_start()
652    }
653
654    async fn start(&self) -> Result<()> {
655        info!("[{}] Starting adaptive HTTP source", self.base.id);
656
657        self.base.set_status(ComponentStatus::Starting).await;
658        self.base
659            .send_component_event(
660                ComponentStatus::Starting,
661                Some("Starting adaptive HTTP source".to_string()),
662            )
663            .await?;
664
665        let host = self.config.host.clone();
666        let port = self.config.port;
667
668        // Create batch channel with capacity based on batch configuration
669        let batch_channel_capacity = self.adaptive_config.recommended_channel_capacity();
670        let (batch_tx, batch_rx) = mpsc::channel(batch_channel_capacity);
671        info!(
672            "[{}] HttpSource using batch channel capacity: {} (max_batch_size: {} x 5)",
673            self.base.id, batch_channel_capacity, self.adaptive_config.max_batch_size
674        );
675
676        // Start adaptive batcher task
677        let adaptive_config = self.adaptive_config.clone();
678        let source_id = self.base.id.clone();
679
680        info!("[{source_id}] Starting adaptive batcher task");
681        tokio::spawn(Self::run_adaptive_batcher(
682            batch_rx,
683            self.base.dispatchers.clone(),
684            adaptive_config,
685            source_id.clone(),
686        ));
687
688        // Create app state
689        let state = HttpAppState {
690            source_id: self.base.id.clone(),
691            batch_tx,
692        };
693
694        // Build router
695        let app = Router::new()
696            .route("/health", get(Self::health_check))
697            .route(
698                "/sources/:source_id/events",
699                post(Self::handle_single_event),
700            )
701            .route(
702                "/sources/:source_id/events/batch",
703                post(Self::handle_batch_events),
704            )
705            .with_state(state);
706
707        // Create shutdown channel
708        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
709
710        let host_clone = host.clone();
711
712        // Start server
713        let (error_tx, error_rx) = tokio::sync::oneshot::channel();
714        let server_handle = tokio::spawn(async move {
715            let addr = format!("{host}:{port}");
716            info!("[{source_id}] Adaptive HTTP source attempting to bind to {addr}");
717
718            let listener = match tokio::net::TcpListener::bind(&addr).await {
719                Ok(listener) => {
720                    info!("[{source_id}] Adaptive HTTP source successfully listening on {addr}");
721                    listener
722                }
723                Err(e) => {
724                    error!("[{source_id}] Failed to bind HTTP server to {addr}: {e}");
725                    let _ = error_tx.send(format!(
726                        "Failed to bind HTTP server to {addr}: {e}. Common causes: port already in use, insufficient permissions"
727                    ));
728                    return;
729                }
730            };
731
732            if let Err(e) = axum::serve(listener, app)
733                .with_graceful_shutdown(async move {
734                    let _ = shutdown_rx.await;
735                })
736                .await
737            {
738                error!("[{source_id}] HTTP server error: {e}");
739            }
740        });
741
742        *self.base.task_handle.write().await = Some(server_handle);
743        *self.base.shutdown_tx.write().await = Some(shutdown_tx);
744
745        // Check for startup errors with a short timeout
746        match timeout(Duration::from_millis(500), error_rx).await {
747            Ok(Ok(error_msg)) => {
748                self.base.set_status(ComponentStatus::Error).await;
749                return Err(anyhow::anyhow!("{error_msg}"));
750            }
751            _ => {
752                self.base.set_status(ComponentStatus::Running).await;
753            }
754        }
755
756        self.base
757            .send_component_event(
758                ComponentStatus::Running,
759                Some(format!(
760                    "Adaptive HTTP source running on {host_clone}:{port} with batch support"
761                )),
762            )
763            .await?;
764
765        Ok(())
766    }
767
768    async fn stop(&self) -> Result<()> {
769        info!("[{}] Stopping adaptive HTTP source", self.base.id);
770
771        self.base.set_status(ComponentStatus::Stopping).await;
772        self.base
773            .send_component_event(
774                ComponentStatus::Stopping,
775                Some("Stopping adaptive HTTP source".to_string()),
776            )
777            .await?;
778
779        if let Some(tx) = self.base.shutdown_tx.write().await.take() {
780            let _ = tx.send(());
781        }
782
783        if let Some(handle) = self.base.task_handle.write().await.take() {
784            let _ = timeout(Duration::from_secs(5), handle).await;
785        }
786
787        self.base.set_status(ComponentStatus::Stopped).await;
788        self.base
789            .send_component_event(
790                ComponentStatus::Stopped,
791                Some("Adaptive HTTP source stopped".to_string()),
792            )
793            .await?;
794
795        Ok(())
796    }
797
798    async fn status(&self) -> ComponentStatus {
799        self.base.get_status().await
800    }
801
802    async fn subscribe(
803        &self,
804        settings: drasi_lib::config::SourceSubscriptionSettings,
805    ) -> Result<SubscriptionResponse> {
806        self.base.subscribe_with_bootstrap(&settings, "HTTP").await
807    }
808
809    fn as_any(&self) -> &dyn std::any::Any {
810        self
811    }
812
813    async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
814        self.base.initialize(context).await;
815    }
816
817    async fn set_bootstrap_provider(
818        &self,
819        provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
820    ) {
821        self.base.set_bootstrap_provider(provider).await;
822    }
823}
824
825/// Builder for HttpSource instances.
826///
827/// Provides a fluent API for constructing HTTP sources with sensible defaults
828/// and adaptive batching settings. The builder takes the source ID at construction
829/// and returns a fully constructed `HttpSource` from `build()`.
830///
831/// # Example
832///
833/// ```rust,ignore
834/// use drasi_source_http::HttpSource;
835///
836/// let source = HttpSource::builder("my-source")
837///     .with_host("0.0.0.0")
838///     .with_port(8080)
839///     .with_adaptive_enabled(true)
840///     .with_bootstrap_provider(my_provider)
841///     .build()?;
842/// ```
843pub struct HttpSourceBuilder {
844    id: String,
845    host: String,
846    port: u16,
847    endpoint: Option<String>,
848    timeout_ms: u64,
849    adaptive_max_batch_size: Option<usize>,
850    adaptive_min_batch_size: Option<usize>,
851    adaptive_max_wait_ms: Option<u64>,
852    adaptive_min_wait_ms: Option<u64>,
853    adaptive_window_secs: Option<u64>,
854    adaptive_enabled: Option<bool>,
855    dispatch_mode: Option<DispatchMode>,
856    dispatch_buffer_capacity: Option<usize>,
857    bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
858    auto_start: bool,
859}
860
861impl HttpSourceBuilder {
862    /// Create a new HTTP source builder with the given source ID.
863    ///
864    /// # Arguments
865    ///
866    /// * `id` - Unique identifier for the source instance
867    pub fn new(id: impl Into<String>) -> Self {
868        Self {
869            id: id.into(),
870            host: String::new(),
871            port: 8080,
872            endpoint: None,
873            timeout_ms: 10000,
874            adaptive_max_batch_size: None,
875            adaptive_min_batch_size: None,
876            adaptive_max_wait_ms: None,
877            adaptive_min_wait_ms: None,
878            adaptive_window_secs: None,
879            adaptive_enabled: None,
880            dispatch_mode: None,
881            dispatch_buffer_capacity: None,
882            bootstrap_provider: None,
883            auto_start: true,
884        }
885    }
886
887    /// Set the HTTP host
888    pub fn with_host(mut self, host: impl Into<String>) -> Self {
889        self.host = host.into();
890        self
891    }
892
893    /// Set the HTTP port
894    pub fn with_port(mut self, port: u16) -> Self {
895        self.port = port;
896        self
897    }
898
899    /// Set the endpoint path
900    pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
901        self.endpoint = Some(endpoint.into());
902        self
903    }
904
905    /// Set the request timeout in milliseconds
906    pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
907        self.timeout_ms = timeout_ms;
908        self
909    }
910
911    /// Set the adaptive batching maximum batch size
912    pub fn with_adaptive_max_batch_size(mut self, size: usize) -> Self {
913        self.adaptive_max_batch_size = Some(size);
914        self
915    }
916
917    /// Set the adaptive batching minimum batch size
918    pub fn with_adaptive_min_batch_size(mut self, size: usize) -> Self {
919        self.adaptive_min_batch_size = Some(size);
920        self
921    }
922
923    /// Set the adaptive batching maximum wait time in milliseconds
924    pub fn with_adaptive_max_wait_ms(mut self, wait_ms: u64) -> Self {
925        self.adaptive_max_wait_ms = Some(wait_ms);
926        self
927    }
928
929    /// Set the adaptive batching minimum wait time in milliseconds
930    pub fn with_adaptive_min_wait_ms(mut self, wait_ms: u64) -> Self {
931        self.adaptive_min_wait_ms = Some(wait_ms);
932        self
933    }
934
935    /// Set the adaptive batching throughput window in seconds
936    pub fn with_adaptive_window_secs(mut self, secs: u64) -> Self {
937        self.adaptive_window_secs = Some(secs);
938        self
939    }
940
941    /// Enable or disable adaptive batching
942    pub fn with_adaptive_enabled(mut self, enabled: bool) -> Self {
943        self.adaptive_enabled = Some(enabled);
944        self
945    }
946
947    /// Set the dispatch mode for event routing.
948    pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
949        self.dispatch_mode = Some(mode);
950        self
951    }
952
953    /// Set the dispatch buffer capacity.
954    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
955        self.dispatch_buffer_capacity = Some(capacity);
956        self
957    }
958
959    /// Set the bootstrap provider for initial data delivery.
960    pub fn with_bootstrap_provider(
961        mut self,
962        provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
963    ) -> Self {
964        self.bootstrap_provider = Some(Box::new(provider));
965        self
966    }
967
968    /// Set whether this source should auto-start when DrasiLib starts.
969    ///
970    /// Default is `true`. Set to `false` if this source should only be
971    /// started manually via `start_source()`.
972    pub fn with_auto_start(mut self, auto_start: bool) -> Self {
973        self.auto_start = auto_start;
974        self
975    }
976
977    /// Set the full configuration at once
978    pub fn with_config(mut self, config: HttpSourceConfig) -> Self {
979        self.host = config.host;
980        self.port = config.port;
981        self.endpoint = config.endpoint;
982        self.timeout_ms = config.timeout_ms;
983        self.adaptive_max_batch_size = config.adaptive_max_batch_size;
984        self.adaptive_min_batch_size = config.adaptive_min_batch_size;
985        self.adaptive_max_wait_ms = config.adaptive_max_wait_ms;
986        self.adaptive_min_wait_ms = config.adaptive_min_wait_ms;
987        self.adaptive_window_secs = config.adaptive_window_secs;
988        self.adaptive_enabled = config.adaptive_enabled;
989        self
990    }
991
992    /// Build the HttpSource instance.
993    ///
994    /// # Returns
995    ///
996    /// A fully constructed `HttpSource`, or an error if construction fails.
997    pub fn build(self) -> Result<HttpSource> {
998        let config = HttpSourceConfig {
999            host: self.host,
1000            port: self.port,
1001            endpoint: self.endpoint,
1002            timeout_ms: self.timeout_ms,
1003            adaptive_max_batch_size: self.adaptive_max_batch_size,
1004            adaptive_min_batch_size: self.adaptive_min_batch_size,
1005            adaptive_max_wait_ms: self.adaptive_max_wait_ms,
1006            adaptive_min_wait_ms: self.adaptive_min_wait_ms,
1007            adaptive_window_secs: self.adaptive_window_secs,
1008            adaptive_enabled: self.adaptive_enabled,
1009        };
1010
1011        // Build SourceBaseParams with all settings
1012        let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
1013        if let Some(mode) = self.dispatch_mode {
1014            params = params.with_dispatch_mode(mode);
1015        }
1016        if let Some(capacity) = self.dispatch_buffer_capacity {
1017            params = params.with_dispatch_buffer_capacity(capacity);
1018        }
1019        if let Some(provider) = self.bootstrap_provider {
1020            params = params.with_bootstrap_provider(provider);
1021        }
1022
1023        // Configure adaptive batching
1024        let mut adaptive_config = AdaptiveBatchConfig::default();
1025        if let Some(max_batch) = config.adaptive_max_batch_size {
1026            adaptive_config.max_batch_size = max_batch;
1027        }
1028        if let Some(min_batch) = config.adaptive_min_batch_size {
1029            adaptive_config.min_batch_size = min_batch;
1030        }
1031        if let Some(max_wait_ms) = config.adaptive_max_wait_ms {
1032            adaptive_config.max_wait_time = Duration::from_millis(max_wait_ms);
1033        }
1034        if let Some(min_wait_ms) = config.adaptive_min_wait_ms {
1035            adaptive_config.min_wait_time = Duration::from_millis(min_wait_ms);
1036        }
1037        if let Some(window_secs) = config.adaptive_window_secs {
1038            adaptive_config.throughput_window = Duration::from_secs(window_secs);
1039        }
1040        if let Some(enabled) = config.adaptive_enabled {
1041            adaptive_config.adaptive_enabled = enabled;
1042        }
1043
1044        Ok(HttpSource {
1045            base: SourceBase::new(params)?,
1046            config,
1047            adaptive_config,
1048        })
1049    }
1050}
1051
1052impl HttpSource {
1053    /// Create a builder for HttpSource with the given ID.
1054    ///
1055    /// This is the recommended way to construct an HttpSource.
1056    ///
1057    /// # Arguments
1058    ///
1059    /// * `id` - Unique identifier for the source instance
1060    ///
1061    /// # Example
1062    ///
1063    /// ```rust,ignore
1064    /// let source = HttpSource::builder("my-source")
1065    ///     .with_host("0.0.0.0")
1066    ///     .with_port(8080)
1067    ///     .with_bootstrap_provider(my_provider)
1068    ///     .build()?;
1069    /// ```
1070    pub fn builder(id: impl Into<String>) -> HttpSourceBuilder {
1071        HttpSourceBuilder::new(id)
1072    }
1073}
1074
1075#[cfg(test)]
1076mod tests {
1077    use super::*;
1078
1079    mod construction {
1080        use super::*;
1081
1082        #[test]
1083        fn test_builder_with_valid_config() {
1084            let source = HttpSourceBuilder::new("test-source")
1085                .with_host("localhost")
1086                .with_port(8080)
1087                .build();
1088            assert!(source.is_ok());
1089        }
1090
1091        #[test]
1092        fn test_builder_with_custom_config() {
1093            let source = HttpSourceBuilder::new("http-source")
1094                .with_host("0.0.0.0")
1095                .with_port(9000)
1096                .with_endpoint("/events")
1097                .build()
1098                .unwrap();
1099            assert_eq!(source.id(), "http-source");
1100        }
1101
1102        #[test]
1103        fn test_with_dispatch_creates_source() {
1104            let config = HttpSourceConfig {
1105                host: "localhost".to_string(),
1106                port: 8080,
1107                endpoint: None,
1108                timeout_ms: 10000,
1109                adaptive_max_batch_size: None,
1110                adaptive_min_batch_size: None,
1111                adaptive_max_wait_ms: None,
1112                adaptive_min_wait_ms: None,
1113                adaptive_window_secs: None,
1114                adaptive_enabled: None,
1115            };
1116            let source = HttpSource::with_dispatch(
1117                "dispatch-source",
1118                config,
1119                Some(DispatchMode::Channel),
1120                Some(1000),
1121            );
1122            assert!(source.is_ok());
1123            assert_eq!(source.unwrap().id(), "dispatch-source");
1124        }
1125    }
1126
1127    mod properties {
1128        use super::*;
1129
1130        #[test]
1131        fn test_id_returns_correct_value() {
1132            let source = HttpSourceBuilder::new("my-http-source")
1133                .with_host("localhost")
1134                .build()
1135                .unwrap();
1136            assert_eq!(source.id(), "my-http-source");
1137        }
1138
1139        #[test]
1140        fn test_type_name_returns_http() {
1141            let source = HttpSourceBuilder::new("test")
1142                .with_host("localhost")
1143                .build()
1144                .unwrap();
1145            assert_eq!(source.type_name(), "http");
1146        }
1147
1148        #[test]
1149        fn test_properties_contains_host_and_port() {
1150            let source = HttpSourceBuilder::new("test")
1151                .with_host("192.168.1.1")
1152                .with_port(9000)
1153                .build()
1154                .unwrap();
1155            let props = source.properties();
1156
1157            assert_eq!(
1158                props.get("host"),
1159                Some(&serde_json::Value::String("192.168.1.1".to_string()))
1160            );
1161            assert_eq!(
1162                props.get("port"),
1163                Some(&serde_json::Value::Number(9000.into()))
1164            );
1165        }
1166
1167        #[test]
1168        fn test_properties_includes_endpoint_when_set() {
1169            let source = HttpSourceBuilder::new("test")
1170                .with_host("localhost")
1171                .with_endpoint("/api/v1")
1172                .build()
1173                .unwrap();
1174            let props = source.properties();
1175
1176            assert_eq!(
1177                props.get("endpoint"),
1178                Some(&serde_json::Value::String("/api/v1".to_string()))
1179            );
1180        }
1181
1182        #[test]
1183        fn test_properties_excludes_endpoint_when_none() {
1184            let source = HttpSourceBuilder::new("test")
1185                .with_host("localhost")
1186                .build()
1187                .unwrap();
1188            let props = source.properties();
1189
1190            assert!(!props.contains_key("endpoint"));
1191        }
1192    }
1193
1194    mod lifecycle {
1195        use super::*;
1196
1197        #[tokio::test]
1198        async fn test_initial_status_is_stopped() {
1199            let source = HttpSourceBuilder::new("test")
1200                .with_host("localhost")
1201                .build()
1202                .unwrap();
1203            assert_eq!(source.status().await, ComponentStatus::Stopped);
1204        }
1205    }
1206
1207    mod builder {
1208        use super::*;
1209
1210        #[test]
1211        fn test_http_builder_defaults() {
1212            let source = HttpSourceBuilder::new("test").build().unwrap();
1213            assert_eq!(source.config.port, 8080);
1214            assert_eq!(source.config.timeout_ms, 10000);
1215            assert_eq!(source.config.endpoint, None);
1216        }
1217
1218        #[test]
1219        fn test_http_builder_custom_values() {
1220            let source = HttpSourceBuilder::new("test")
1221                .with_host("api.example.com")
1222                .with_port(9000)
1223                .with_endpoint("/webhook")
1224                .with_timeout_ms(5000)
1225                .build()
1226                .unwrap();
1227
1228            assert_eq!(source.config.host, "api.example.com");
1229            assert_eq!(source.config.port, 9000);
1230            assert_eq!(source.config.endpoint, Some("/webhook".to_string()));
1231            assert_eq!(source.config.timeout_ms, 5000);
1232        }
1233
1234        #[test]
1235        fn test_http_builder_adaptive_batching() {
1236            let source = HttpSourceBuilder::new("test")
1237                .with_host("localhost")
1238                .with_adaptive_max_batch_size(1000)
1239                .with_adaptive_min_batch_size(10)
1240                .with_adaptive_max_wait_ms(500)
1241                .with_adaptive_min_wait_ms(50)
1242                .with_adaptive_window_secs(60)
1243                .with_adaptive_enabled(true)
1244                .build()
1245                .unwrap();
1246
1247            assert_eq!(source.config.adaptive_max_batch_size, Some(1000));
1248            assert_eq!(source.config.adaptive_min_batch_size, Some(10));
1249            assert_eq!(source.config.adaptive_max_wait_ms, Some(500));
1250            assert_eq!(source.config.adaptive_min_wait_ms, Some(50));
1251            assert_eq!(source.config.adaptive_window_secs, Some(60));
1252            assert_eq!(source.config.adaptive_enabled, Some(true));
1253        }
1254
1255        #[test]
1256        fn test_builder_id() {
1257            let source = HttpSource::builder("my-http-source")
1258                .with_host("localhost")
1259                .build()
1260                .unwrap();
1261
1262            assert_eq!(source.base.id, "my-http-source");
1263        }
1264    }
1265
1266    mod event_conversion {
1267        use super::*;
1268
1269        #[test]
1270        fn test_convert_node_insert() {
1271            let mut props = serde_json::Map::new();
1272            props.insert(
1273                "name".to_string(),
1274                serde_json::Value::String("Alice".to_string()),
1275            );
1276            props.insert("age".to_string(), serde_json::Value::Number(30.into()));
1277
1278            let http_change = HttpSourceChange::Insert {
1279                element: HttpElement::Node {
1280                    id: "user-1".to_string(),
1281                    labels: vec!["User".to_string()],
1282                    properties: props,
1283                },
1284                timestamp: Some(1234567890000000000),
1285            };
1286
1287            let result = convert_http_to_source_change(&http_change, "test-source");
1288            assert!(result.is_ok());
1289
1290            match result.unwrap() {
1291                drasi_core::models::SourceChange::Insert { element } => match element {
1292                    drasi_core::models::Element::Node {
1293                        metadata,
1294                        properties,
1295                    } => {
1296                        assert_eq!(metadata.reference.element_id.as_ref(), "user-1");
1297                        assert_eq!(metadata.labels.len(), 1);
1298                        assert_eq!(metadata.effective_from, 1234567890000);
1299                        assert!(properties.get("name").is_some());
1300                        assert!(properties.get("age").is_some());
1301                    }
1302                    _ => panic!("Expected Node element"),
1303                },
1304                _ => panic!("Expected Insert operation"),
1305            }
1306        }
1307
1308        #[test]
1309        fn test_convert_relation_insert() {
1310            let http_change = HttpSourceChange::Insert {
1311                element: HttpElement::Relation {
1312                    id: "follows-1".to_string(),
1313                    labels: vec!["FOLLOWS".to_string()],
1314                    from: "user-1".to_string(),
1315                    to: "user-2".to_string(),
1316                    properties: serde_json::Map::new(),
1317                },
1318                timestamp: None,
1319            };
1320
1321            let result = convert_http_to_source_change(&http_change, "test-source");
1322            assert!(result.is_ok());
1323
1324            match result.unwrap() {
1325                drasi_core::models::SourceChange::Insert { element } => match element {
1326                    drasi_core::models::Element::Relation {
1327                        metadata,
1328                        out_node,
1329                        in_node,
1330                        ..
1331                    } => {
1332                        assert_eq!(metadata.reference.element_id.as_ref(), "follows-1");
1333                        assert_eq!(out_node.element_id.as_ref(), "user-1");
1334                        assert_eq!(in_node.element_id.as_ref(), "user-2");
1335                    }
1336                    _ => panic!("Expected Relation element"),
1337                },
1338                _ => panic!("Expected Insert operation"),
1339            }
1340        }
1341
1342        #[test]
1343        fn test_convert_delete() {
1344            let http_change = HttpSourceChange::Delete {
1345                id: "user-1".to_string(),
1346                labels: Some(vec!["User".to_string()]),
1347                timestamp: Some(9999999999),
1348            };
1349
1350            let result = convert_http_to_source_change(&http_change, "test-source");
1351            assert!(result.is_ok());
1352
1353            match result.unwrap() {
1354                drasi_core::models::SourceChange::Delete { metadata } => {
1355                    assert_eq!(metadata.reference.element_id.as_ref(), "user-1");
1356                    assert_eq!(metadata.labels.len(), 1);
1357                }
1358                _ => panic!("Expected Delete operation"),
1359            }
1360        }
1361
1362        #[test]
1363        fn test_convert_update() {
1364            let http_change = HttpSourceChange::Update {
1365                element: HttpElement::Node {
1366                    id: "user-1".to_string(),
1367                    labels: vec!["User".to_string()],
1368                    properties: serde_json::Map::new(),
1369                },
1370                timestamp: None,
1371            };
1372
1373            let result = convert_http_to_source_change(&http_change, "test-source");
1374            assert!(result.is_ok());
1375
1376            match result.unwrap() {
1377                drasi_core::models::SourceChange::Update { .. } => {
1378                    // Success
1379                }
1380                _ => panic!("Expected Update operation"),
1381            }
1382        }
1383    }
1384
1385    mod adaptive_config {
1386        use super::*;
1387
1388        #[test]
1389        fn test_adaptive_config_from_http_config() {
1390            let source = HttpSourceBuilder::new("test")
1391                .with_host("localhost")
1392                .with_adaptive_max_batch_size(500)
1393                .with_adaptive_enabled(true)
1394                .build()
1395                .unwrap();
1396
1397            // The adaptive config should be initialized from the http config
1398            assert_eq!(source.adaptive_config.max_batch_size, 500);
1399            assert!(source.adaptive_config.adaptive_enabled);
1400        }
1401
1402        #[test]
1403        fn test_adaptive_config_uses_defaults_when_not_specified() {
1404            let source = HttpSourceBuilder::new("test")
1405                .with_host("localhost")
1406                .build()
1407                .unwrap();
1408
1409            // Should use AdaptiveBatchConfig defaults
1410            let default_config = AdaptiveBatchConfig::default();
1411            assert_eq!(
1412                source.adaptive_config.max_batch_size,
1413                default_config.max_batch_size
1414            );
1415            assert_eq!(
1416                source.adaptive_config.min_batch_size,
1417                default_config.min_batch_size
1418            );
1419        }
1420    }
1421}