1#![allow(unexpected_cfgs)]
2pub mod config;
218pub mod descriptor;
219pub use config::HttpSourceConfig;
220
221mod adaptive_batcher;
222mod models;
223mod time;
224
225pub mod auth;
227pub mod content_parser;
228pub mod route_matcher;
229pub mod template_engine;
230
231pub use models::{convert_http_to_source_change, HttpElement, HttpSourceChange};
233
234use anyhow::Result;
235use async_trait::async_trait;
236use axum::{
237 body::Bytes,
238 extract::{Path, State},
239 http::{header, Method, StatusCode},
240 response::IntoResponse,
241 routing::{delete, get, post, put},
242 Json, Router,
243};
244use log::{debug, error, info, trace, warn};
245use serde::{Deserialize, Serialize};
246use std::collections::HashMap;
247use std::sync::Arc;
248use std::time::Duration;
249use tokio::sync::mpsc;
250use tokio::time::timeout;
251use tower_http::cors::{Any, CorsLayer};
252
253use drasi_lib::channels::{ComponentType, *};
254use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
255use drasi_lib::Source;
256use tracing::Instrument;
257
258use crate::adaptive_batcher::{AdaptiveBatchConfig, AdaptiveBatcher};
259use crate::auth::{verify_auth, AuthResult};
260use crate::config::{CorsConfig, ErrorBehavior, WebhookConfig};
261use crate::content_parser::{parse_content, ContentType};
262use crate::route_matcher::{convert_method, find_matching_mappings, headers_to_map, RouteMatcher};
263use crate::template_engine::{TemplateContext, TemplateEngine};
264
265#[derive(Debug, Serialize, Deserialize)]
267pub struct EventResponse {
268 pub success: bool,
269 pub message: String,
270 #[serde(skip_serializing_if = "Option::is_none")]
271 pub error: Option<String>,
272}
273
274pub struct HttpSource {
286 base: SourceBase,
288 config: HttpSourceConfig,
290 adaptive_config: AdaptiveBatchConfig,
292}
293
294#[derive(Debug, Clone, Serialize, Deserialize)]
296pub struct BatchEventRequest {
297 pub events: Vec<HttpSourceChange>,
298}
299
300#[derive(Clone)]
304struct HttpAppState {
305 source_id: String,
307 batch_tx: mpsc::Sender<SourceChangeEvent>,
309 webhook_config: Option<Arc<WebhookState>>,
311}
312
313struct WebhookState {
315 config: WebhookConfig,
317 route_matcher: RouteMatcher,
319 template_engine: TemplateEngine,
321}
322
323impl HttpSource {
324 pub fn new(id: impl Into<String>, config: HttpSourceConfig) -> Result<Self> {
355 let id = id.into();
356 let params = SourceBaseParams::new(id);
357
358 let mut adaptive_config = AdaptiveBatchConfig::default();
360
361 if let Some(max_batch) = config.adaptive_max_batch_size {
363 adaptive_config.max_batch_size = max_batch;
364 }
365 if let Some(min_batch) = config.adaptive_min_batch_size {
366 adaptive_config.min_batch_size = min_batch;
367 }
368 if let Some(max_wait_ms) = config.adaptive_max_wait_ms {
369 adaptive_config.max_wait_time = Duration::from_millis(max_wait_ms);
370 }
371 if let Some(min_wait_ms) = config.adaptive_min_wait_ms {
372 adaptive_config.min_wait_time = Duration::from_millis(min_wait_ms);
373 }
374 if let Some(window_secs) = config.adaptive_window_secs {
375 adaptive_config.throughput_window = Duration::from_secs(window_secs);
376 }
377 if let Some(enabled) = config.adaptive_enabled {
378 adaptive_config.adaptive_enabled = enabled;
379 }
380
381 Ok(Self {
382 base: SourceBase::new(params)?,
383 config,
384 adaptive_config,
385 })
386 }
387
388 pub fn with_dispatch(
408 id: impl Into<String>,
409 config: HttpSourceConfig,
410 dispatch_mode: Option<DispatchMode>,
411 dispatch_buffer_capacity: Option<usize>,
412 ) -> Result<Self> {
413 let id = id.into();
414 let mut params = SourceBaseParams::new(id);
415 if let Some(mode) = dispatch_mode {
416 params = params.with_dispatch_mode(mode);
417 }
418 if let Some(capacity) = dispatch_buffer_capacity {
419 params = params.with_dispatch_buffer_capacity(capacity);
420 }
421
422 let mut adaptive_config = AdaptiveBatchConfig::default();
423
424 if let Some(max_batch) = config.adaptive_max_batch_size {
425 adaptive_config.max_batch_size = max_batch;
426 }
427 if let Some(min_batch) = config.adaptive_min_batch_size {
428 adaptive_config.min_batch_size = min_batch;
429 }
430 if let Some(max_wait_ms) = config.adaptive_max_wait_ms {
431 adaptive_config.max_wait_time = Duration::from_millis(max_wait_ms);
432 }
433 if let Some(min_wait_ms) = config.adaptive_min_wait_ms {
434 adaptive_config.min_wait_time = Duration::from_millis(min_wait_ms);
435 }
436 if let Some(window_secs) = config.adaptive_window_secs {
437 adaptive_config.throughput_window = Duration::from_secs(window_secs);
438 }
439 if let Some(enabled) = config.adaptive_enabled {
440 adaptive_config.adaptive_enabled = enabled;
441 }
442
443 Ok(Self {
444 base: SourceBase::new(params)?,
445 config,
446 adaptive_config,
447 })
448 }
449
450 async fn handle_single_event(
455 Path(source_id): Path<String>,
456 State(state): State<HttpAppState>,
457 Json(event): Json<HttpSourceChange>,
458 ) -> Result<impl IntoResponse, (StatusCode, Json<EventResponse>)> {
459 debug!("[{source_id}] HTTP endpoint received single event: {event:?}");
460 Self::process_events(&source_id, &state, vec![event]).await
461 }
462
463 async fn handle_batch_events(
468 Path(source_id): Path<String>,
469 State(state): State<HttpAppState>,
470 Json(batch): Json<BatchEventRequest>,
471 ) -> Result<impl IntoResponse, (StatusCode, Json<EventResponse>)> {
472 debug!(
473 "[{}] HTTP endpoint received batch of {} events",
474 source_id,
475 batch.events.len()
476 );
477 Self::process_events(&source_id, &state, batch.events).await
478 }
479
480 async fn process_events(
492 source_id: &str,
493 state: &HttpAppState,
494 events: Vec<HttpSourceChange>,
495 ) -> Result<impl IntoResponse, (StatusCode, Json<EventResponse>)> {
496 trace!("[{}] Processing {} events", source_id, events.len());
497
498 if source_id != state.source_id {
499 error!(
500 "[{}] Source name mismatch. Expected '{}', got '{}'",
501 state.source_id, state.source_id, source_id
502 );
503 return Err((
504 StatusCode::BAD_REQUEST,
505 Json(EventResponse {
506 success: false,
507 message: "Source name mismatch".to_string(),
508 error: Some(format!(
509 "Expected source '{}', got '{}'",
510 state.source_id, source_id
511 )),
512 }),
513 ));
514 }
515
516 let mut success_count = 0;
517 let mut error_count = 0;
518 let mut last_error = None;
519
520 for (idx, event) in events.iter().enumerate() {
521 match convert_http_to_source_change(event, source_id) {
522 Ok(source_change) => {
523 let change_event = SourceChangeEvent {
524 source_id: source_id.to_string(),
525 change: source_change,
526 timestamp: chrono::Utc::now(),
527 };
528
529 if let Err(e) = state.batch_tx.send(change_event).await {
530 error!(
531 "[{}] Failed to send event {} to batch channel: {}",
532 state.source_id,
533 idx + 1,
534 e
535 );
536 error_count += 1;
537 last_error = Some("Internal channel error".to_string());
538 } else {
539 success_count += 1;
540 }
541 }
542 Err(e) => {
543 error!(
544 "[{}] Failed to convert event {}: {}",
545 state.source_id,
546 idx + 1,
547 e
548 );
549 error_count += 1;
550 last_error = Some(e.to_string());
551 }
552 }
553 }
554
555 debug!(
556 "[{source_id}] Event processing complete: {success_count} succeeded, {error_count} failed"
557 );
558
559 if error_count > 0 && success_count == 0 {
560 Err((
561 StatusCode::BAD_REQUEST,
562 Json(EventResponse {
563 success: false,
564 message: format!("All {error_count} events failed"),
565 error: last_error,
566 }),
567 ))
568 } else if error_count > 0 {
569 Ok(Json(EventResponse {
570 success: true,
571 message: format!(
572 "Processed {success_count} events successfully, {error_count} failed"
573 ),
574 error: last_error,
575 }))
576 } else {
577 Ok(Json(EventResponse {
578 success: true,
579 message: format!("All {success_count} events processed successfully"),
580 error: None,
581 }))
582 }
583 }
584
585 async fn health_check() -> impl IntoResponse {
586 Json(serde_json::json!({
587 "status": "healthy",
588 "service": "http-source",
589 "features": ["adaptive-batching", "batch-endpoint", "webhooks"]
590 }))
591 }
592
593 async fn handle_webhook(
598 method: axum::http::Method,
599 uri: axum::http::Uri,
600 headers: axum::http::HeaderMap,
601 State(state): State<HttpAppState>,
602 body: Bytes,
603 ) -> impl IntoResponse {
604 let path = uri.path();
605 let source_id = &state.source_id;
606
607 debug!("[{source_id}] Webhook received: {method} {path}");
608
609 let webhook_state = match &state.webhook_config {
611 Some(ws) => ws,
612 None => {
613 error!("[{source_id}] Webhook handler called but no webhook config present");
614 return (
615 StatusCode::INTERNAL_SERVER_ERROR,
616 Json(EventResponse {
617 success: false,
618 message: "Internal configuration error".to_string(),
619 error: Some("Webhook mode not properly configured".to_string()),
620 }),
621 );
622 }
623 };
624
625 let http_method = match convert_method(&method) {
627 Some(m) => m,
628 None => {
629 return handle_error(
630 &webhook_state.config.error_behavior,
631 source_id,
632 StatusCode::METHOD_NOT_ALLOWED,
633 "Method not supported",
634 None,
635 );
636 }
637 };
638
639 let route_match = match webhook_state.route_matcher.match_route(
641 path,
642 &http_method,
643 &webhook_state.config.routes,
644 ) {
645 Some(rm) => rm,
646 None => {
647 debug!("[{source_id}] No matching route for {method} {path}");
648 return handle_error(
649 &webhook_state.config.error_behavior,
650 source_id,
651 StatusCode::NOT_FOUND,
652 "No matching route",
653 None,
654 );
655 }
656 };
657
658 let route = route_match.route;
659 let error_behavior = route
660 .error_behavior
661 .as_ref()
662 .unwrap_or(&webhook_state.config.error_behavior);
663
664 let auth_result = verify_auth(route.auth.as_ref(), &headers, &body);
666 if let AuthResult::Failed(reason) = auth_result {
667 warn!("[{source_id}] Authentication failed for {path}: {reason}");
668 return handle_error(
669 error_behavior,
670 source_id,
671 StatusCode::UNAUTHORIZED,
672 "Authentication failed",
673 Some(&reason),
674 );
675 }
676
677 let content_type = ContentType::from_header(
679 headers
680 .get(axum::http::header::CONTENT_TYPE)
681 .and_then(|v| v.to_str().ok()),
682 );
683
684 let payload = match parse_content(&body, content_type) {
685 Ok(p) => p,
686 Err(e) => {
687 warn!("[{source_id}] Failed to parse payload: {e}");
688 return handle_error(
689 error_behavior,
690 source_id,
691 StatusCode::BAD_REQUEST,
692 "Failed to parse payload",
693 Some(&e.to_string()),
694 );
695 }
696 };
697
698 let headers_map = headers_to_map(&headers);
700 let query_map = parse_query_string(uri.query());
701
702 let context = TemplateContext {
703 payload: payload.clone(),
704 route: route_match.path_params,
705 query: query_map,
706 headers: headers_map.clone(),
707 method: method.to_string(),
708 path: path.to_string(),
709 source_id: source_id.clone(),
710 };
711
712 let matching_mappings = find_matching_mappings(&route.mappings, &headers_map, &payload);
714
715 if matching_mappings.is_empty() {
716 debug!("[{source_id}] No matching mappings for request");
717 return handle_error(
718 error_behavior,
719 source_id,
720 StatusCode::BAD_REQUEST,
721 "No matching mapping for request",
722 None,
723 );
724 }
725
726 let mut success_count = 0;
728 let mut error_count = 0;
729 let mut last_error = None;
730
731 for mapping in matching_mappings {
732 match webhook_state
733 .template_engine
734 .process_mapping(mapping, &context, source_id)
735 {
736 Ok(source_change) => {
737 let event = SourceChangeEvent {
738 source_id: source_id.clone(),
739 change: source_change,
740 timestamp: chrono::Utc::now(),
741 };
742
743 if let Err(e) = state.batch_tx.send(event).await {
744 error!("[{source_id}] Failed to send event to batcher: {e}");
745 error_count += 1;
746 last_error = Some(format!("Failed to queue event: {e}"));
747 } else {
748 success_count += 1;
749 }
750 }
751 Err(e) => {
752 warn!("[{source_id}] Failed to process mapping: {e}");
753 error_count += 1;
754 last_error = Some(e.to_string());
755 }
756 }
757 }
758
759 debug!("[{source_id}] Webhook processing complete: {success_count} succeeded, {error_count} failed");
760
761 if error_count > 0 && success_count == 0 {
762 handle_error(
763 error_behavior,
764 source_id,
765 StatusCode::BAD_REQUEST,
766 &format!("All {error_count} mappings failed"),
767 last_error.as_deref(),
768 )
769 } else if error_count > 0 {
770 (
771 StatusCode::OK,
772 Json(EventResponse {
773 success: true,
774 message: format!("Processed {success_count} events, {error_count} failed"),
775 error: last_error,
776 }),
777 )
778 } else {
779 (
780 StatusCode::OK,
781 Json(EventResponse {
782 success: true,
783 message: format!("Processed {success_count} events successfully"),
784 error: None,
785 }),
786 )
787 }
788 }
789
790 async fn run_adaptive_batcher(
791 batch_rx: mpsc::Receiver<SourceChangeEvent>,
792 dispatchers: Arc<
793 tokio::sync::RwLock<
794 Vec<
795 Box<
796 dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync,
797 >,
798 >,
799 >,
800 >,
801 adaptive_config: AdaptiveBatchConfig,
802 source_id: String,
803 ) {
804 let mut batcher = AdaptiveBatcher::new(batch_rx, adaptive_config.clone());
805 let mut total_events = 0u64;
806 let mut total_batches = 0u64;
807
808 info!("[{source_id}] Adaptive HTTP batcher started with config: {adaptive_config:?}");
809
810 while let Some(batch) = batcher.next_batch().await {
811 if batch.is_empty() {
812 debug!("[{source_id}] Batcher received empty batch, skipping");
813 continue;
814 }
815
816 let batch_size = batch.len();
817 total_events += batch_size as u64;
818 total_batches += 1;
819
820 debug!(
821 "[{source_id}] Batcher forwarding batch #{total_batches} with {batch_size} events to dispatchers"
822 );
823
824 let mut sent_count = 0;
825 let mut failed_count = 0;
826 for (idx, event) in batch.into_iter().enumerate() {
827 debug!(
828 "[{}] Batch #{}, dispatching event {}/{}",
829 source_id,
830 total_batches,
831 idx + 1,
832 batch_size
833 );
834
835 let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
836 profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
837
838 let wrapper = SourceEventWrapper::with_profiling(
839 event.source_id.clone(),
840 SourceEvent::Change(event.change),
841 event.timestamp,
842 profiling,
843 );
844
845 if let Err(e) =
846 SourceBase::dispatch_from_task(dispatchers.clone(), wrapper.clone(), &source_id)
847 .await
848 {
849 error!(
850 "[{}] Batch #{}, failed to dispatch event {}/{} (no subscribers): {}",
851 source_id,
852 total_batches,
853 idx + 1,
854 batch_size,
855 e
856 );
857 failed_count += 1;
858 } else {
859 debug!(
860 "[{}] Batch #{}, successfully dispatched event {}/{}",
861 source_id,
862 total_batches,
863 idx + 1,
864 batch_size
865 );
866 sent_count += 1;
867 }
868 }
869
870 debug!(
871 "[{source_id}] Batch #{total_batches} complete: {sent_count} dispatched, {failed_count} failed"
872 );
873
874 if total_batches.is_multiple_of(100) {
875 info!(
876 "[{}] Adaptive HTTP metrics - Batches: {}, Events: {}, Avg batch size: {:.1}",
877 source_id,
878 total_batches,
879 total_events,
880 total_events as f64 / total_batches as f64
881 );
882 }
883 }
884
885 info!(
886 "[{source_id}] Adaptive HTTP batcher stopped - Total batches: {total_batches}, Total events: {total_events}"
887 );
888 }
889}
890
891#[async_trait]
892impl Source for HttpSource {
893 fn id(&self) -> &str {
894 &self.base.id
895 }
896
897 fn type_name(&self) -> &str {
898 "http"
899 }
900
901 fn properties(&self) -> HashMap<String, serde_json::Value> {
902 match serde_json::to_value(&self.config) {
903 Ok(serde_json::Value::Object(map)) => map.into_iter().collect(),
904 _ => HashMap::new(),
905 }
906 }
907
908 fn auto_start(&self) -> bool {
909 self.base.get_auto_start()
910 }
911
912 async fn start(&self) -> Result<()> {
913 info!("[{}] Starting adaptive HTTP source", self.base.id);
914
915 self.base
916 .set_status(
917 ComponentStatus::Starting,
918 Some("Starting adaptive HTTP source".to_string()),
919 )
920 .await;
921
922 let host = self.config.host.clone();
923 let port = self.config.port;
924
925 let batch_channel_capacity = self.adaptive_config.recommended_channel_capacity();
927 let (batch_tx, batch_rx) = mpsc::channel(batch_channel_capacity);
928 info!(
929 "[{}] HttpSource using batch channel capacity: {} (max_batch_size: {} x 5)",
930 self.base.id, batch_channel_capacity, self.adaptive_config.max_batch_size
931 );
932
933 let adaptive_config = self.adaptive_config.clone();
935 let source_id = self.base.id.clone();
936 let dispatchers = self.base.dispatchers.clone();
937
938 let instance_id = self
940 .base
941 .context()
942 .await
943 .map(|c| c.instance_id)
944 .unwrap_or_default();
945
946 info!("[{source_id}] Starting adaptive batcher task");
947 let source_id_for_span = source_id.clone();
948 let span = tracing::info_span!(
949 "http_adaptive_batcher",
950 instance_id = %instance_id,
951 component_id = %source_id_for_span,
952 component_type = "source"
953 );
954 tokio::spawn(
955 async move {
956 Self::run_adaptive_batcher(
957 batch_rx,
958 dispatchers,
959 adaptive_config,
960 source_id.clone(),
961 )
962 .await
963 }
964 .instrument(span),
965 );
966
967 let webhook_state = if let Some(ref webhook_config) = self.config.webhooks {
969 info!(
970 "[{}] Webhook mode enabled with {} routes",
971 self.base.id,
972 webhook_config.routes.len()
973 );
974 Some(Arc::new(WebhookState {
975 config: webhook_config.clone(),
976 route_matcher: RouteMatcher::new(&webhook_config.routes),
977 template_engine: TemplateEngine::new(),
978 }))
979 } else {
980 info!("[{}] Standard mode enabled", self.base.id);
981 None
982 };
983
984 let state = HttpAppState {
985 source_id: self.base.id.clone(),
986 batch_tx,
987 webhook_config: webhook_state,
988 };
989
990 let app = if self.config.is_webhook_mode() {
992 let router = Router::new()
994 .route("/health", get(Self::health_check))
995 .fallback(Self::handle_webhook)
996 .with_state(state);
997
998 if let Some(ref webhooks) = self.config.webhooks {
1000 if let Some(ref cors_config) = webhooks.cors {
1001 if cors_config.enabled {
1002 info!("[{}] CORS enabled for webhook endpoints", self.base.id);
1003 router.layer(build_cors_layer(cors_config))
1004 } else {
1005 router
1006 }
1007 } else {
1008 router
1009 }
1010 } else {
1011 router
1012 }
1013 } else {
1014 Router::new()
1016 .route("/health", get(Self::health_check))
1017 .route(
1018 "/sources/:source_id/events",
1019 post(Self::handle_single_event),
1020 )
1021 .route(
1022 "/sources/:source_id/events/batch",
1023 post(Self::handle_batch_events),
1024 )
1025 .with_state(state)
1026 };
1027
1028 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
1030
1031 let host_clone = host.clone();
1032
1033 let (error_tx, error_rx) = tokio::sync::oneshot::channel();
1035 let source_id = self.base.id.clone();
1036 let source_id_for_span = source_id.clone();
1037 let span = tracing::info_span!(
1038 "http_source_server",
1039 instance_id = %instance_id,
1040 component_id = %source_id_for_span,
1041 component_type = "source"
1042 );
1043 let server_handle = tokio::spawn(
1044 async move {
1045 let addr = format!("{host}:{port}");
1046 info!("[{source_id}] Adaptive HTTP source attempting to bind to {addr}");
1047
1048 let listener = match tokio::net::TcpListener::bind(&addr).await {
1049 Ok(listener) => {
1050 info!("[{source_id}] Adaptive HTTP source successfully listening on {addr}");
1051 listener
1052 }
1053 Err(e) => {
1054 error!("[{source_id}] Failed to bind HTTP server to {addr}: {e}");
1055 let _ = error_tx.send(format!(
1056 "Failed to bind HTTP server to {addr}: {e}. Common causes: port already in use, insufficient permissions"
1057 ));
1058 return;
1059 }
1060 };
1061
1062 if let Err(e) = axum::serve(listener, app)
1063 .with_graceful_shutdown(async move {
1064 let _ = shutdown_rx.await;
1065 })
1066 .await
1067 {
1068 error!("[{source_id}] HTTP server error: {e}");
1069 }
1070 }
1071 .instrument(span),
1072 );
1073
1074 *self.base.task_handle.write().await = Some(server_handle);
1075 *self.base.shutdown_tx.write().await = Some(shutdown_tx);
1076
1077 match timeout(Duration::from_millis(500), error_rx).await {
1079 Ok(Ok(error_msg)) => {
1080 self.base.set_status(ComponentStatus::Error, None).await;
1081 return Err(anyhow::anyhow!("{error_msg}"));
1082 }
1083 _ => {
1084 self.base
1085 .set_status(
1086 ComponentStatus::Running,
1087 Some(format!(
1088 "Adaptive HTTP source running on {host_clone}:{port} with batch support"
1089 )),
1090 )
1091 .await;
1092 }
1093 }
1094
1095 Ok(())
1096 }
1097
1098 async fn stop(&self) -> Result<()> {
1099 info!("[{}] Stopping adaptive HTTP source", self.base.id);
1100
1101 self.base
1102 .set_status(
1103 ComponentStatus::Stopping,
1104 Some("Stopping adaptive HTTP source".to_string()),
1105 )
1106 .await;
1107
1108 if let Some(tx) = self.base.shutdown_tx.write().await.take() {
1109 let _ = tx.send(());
1110 }
1111
1112 if let Some(handle) = self.base.task_handle.write().await.take() {
1113 let _ = timeout(Duration::from_secs(5), handle).await;
1114 }
1115
1116 self.base
1117 .set_status(
1118 ComponentStatus::Stopped,
1119 Some("Adaptive HTTP source stopped".to_string()),
1120 )
1121 .await;
1122
1123 Ok(())
1124 }
1125
1126 async fn status(&self) -> ComponentStatus {
1127 self.base.get_status().await
1128 }
1129
1130 async fn subscribe(
1131 &self,
1132 settings: drasi_lib::config::SourceSubscriptionSettings,
1133 ) -> Result<SubscriptionResponse> {
1134 self.base.subscribe_with_bootstrap(&settings, "HTTP").await
1135 }
1136
1137 fn as_any(&self) -> &dyn std::any::Any {
1138 self
1139 }
1140
1141 async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
1142 self.base.initialize(context).await;
1143 }
1144
1145 async fn set_bootstrap_provider(
1146 &self,
1147 provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
1148 ) {
1149 self.base.set_bootstrap_provider(provider).await;
1150 }
1151}
1152
1153pub struct HttpSourceBuilder {
1172 id: String,
1173 host: String,
1174 port: u16,
1175 endpoint: Option<String>,
1176 timeout_ms: u64,
1177 adaptive_max_batch_size: Option<usize>,
1178 adaptive_min_batch_size: Option<usize>,
1179 adaptive_max_wait_ms: Option<u64>,
1180 adaptive_min_wait_ms: Option<u64>,
1181 adaptive_window_secs: Option<u64>,
1182 adaptive_enabled: Option<bool>,
1183 webhooks: Option<WebhookConfig>,
1184 dispatch_mode: Option<DispatchMode>,
1185 dispatch_buffer_capacity: Option<usize>,
1186 bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
1187 auto_start: bool,
1188}
1189
1190impl HttpSourceBuilder {
1191 pub fn new(id: impl Into<String>) -> Self {
1197 Self {
1198 id: id.into(),
1199 host: String::new(),
1200 port: 8080,
1201 endpoint: None,
1202 timeout_ms: 10000,
1203 adaptive_max_batch_size: None,
1204 adaptive_min_batch_size: None,
1205 adaptive_max_wait_ms: None,
1206 adaptive_min_wait_ms: None,
1207 adaptive_window_secs: None,
1208 adaptive_enabled: None,
1209 webhooks: None,
1210 dispatch_mode: None,
1211 dispatch_buffer_capacity: None,
1212 bootstrap_provider: None,
1213 auto_start: true,
1214 }
1215 }
1216
1217 pub fn with_host(mut self, host: impl Into<String>) -> Self {
1219 self.host = host.into();
1220 self
1221 }
1222
1223 pub fn with_port(mut self, port: u16) -> Self {
1225 self.port = port;
1226 self
1227 }
1228
1229 pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
1231 self.endpoint = Some(endpoint.into());
1232 self
1233 }
1234
1235 pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
1237 self.timeout_ms = timeout_ms;
1238 self
1239 }
1240
1241 pub fn with_adaptive_max_batch_size(mut self, size: usize) -> Self {
1243 self.adaptive_max_batch_size = Some(size);
1244 self
1245 }
1246
1247 pub fn with_adaptive_min_batch_size(mut self, size: usize) -> Self {
1249 self.adaptive_min_batch_size = Some(size);
1250 self
1251 }
1252
1253 pub fn with_adaptive_max_wait_ms(mut self, wait_ms: u64) -> Self {
1255 self.adaptive_max_wait_ms = Some(wait_ms);
1256 self
1257 }
1258
1259 pub fn with_adaptive_min_wait_ms(mut self, wait_ms: u64) -> Self {
1261 self.adaptive_min_wait_ms = Some(wait_ms);
1262 self
1263 }
1264
1265 pub fn with_adaptive_window_secs(mut self, secs: u64) -> Self {
1267 self.adaptive_window_secs = Some(secs);
1268 self
1269 }
1270
1271 pub fn with_adaptive_enabled(mut self, enabled: bool) -> Self {
1273 self.adaptive_enabled = Some(enabled);
1274 self
1275 }
1276
1277 pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
1279 self.dispatch_mode = Some(mode);
1280 self
1281 }
1282
1283 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
1285 self.dispatch_buffer_capacity = Some(capacity);
1286 self
1287 }
1288
1289 pub fn with_bootstrap_provider(
1291 mut self,
1292 provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
1293 ) -> Self {
1294 self.bootstrap_provider = Some(Box::new(provider));
1295 self
1296 }
1297
1298 pub fn with_auto_start(mut self, auto_start: bool) -> Self {
1303 self.auto_start = auto_start;
1304 self
1305 }
1306
1307 pub fn with_webhooks(mut self, webhooks: WebhookConfig) -> Self {
1312 self.webhooks = Some(webhooks);
1313 self
1314 }
1315
1316 pub fn with_config(mut self, config: HttpSourceConfig) -> Self {
1318 self.host = config.host;
1319 self.port = config.port;
1320 self.endpoint = config.endpoint;
1321 self.timeout_ms = config.timeout_ms;
1322 self.adaptive_max_batch_size = config.adaptive_max_batch_size;
1323 self.adaptive_min_batch_size = config.adaptive_min_batch_size;
1324 self.adaptive_max_wait_ms = config.adaptive_max_wait_ms;
1325 self.adaptive_min_wait_ms = config.adaptive_min_wait_ms;
1326 self.adaptive_window_secs = config.adaptive_window_secs;
1327 self.adaptive_enabled = config.adaptive_enabled;
1328 self.webhooks = config.webhooks;
1329 self
1330 }
1331
1332 pub fn build(self) -> Result<HttpSource> {
1338 let config = HttpSourceConfig {
1339 host: self.host,
1340 port: self.port,
1341 endpoint: self.endpoint,
1342 timeout_ms: self.timeout_ms,
1343 adaptive_max_batch_size: self.adaptive_max_batch_size,
1344 adaptive_min_batch_size: self.adaptive_min_batch_size,
1345 adaptive_max_wait_ms: self.adaptive_max_wait_ms,
1346 adaptive_min_wait_ms: self.adaptive_min_wait_ms,
1347 adaptive_window_secs: self.adaptive_window_secs,
1348 adaptive_enabled: self.adaptive_enabled,
1349 webhooks: self.webhooks,
1350 };
1351
1352 let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
1354 if let Some(mode) = self.dispatch_mode {
1355 params = params.with_dispatch_mode(mode);
1356 }
1357 if let Some(capacity) = self.dispatch_buffer_capacity {
1358 params = params.with_dispatch_buffer_capacity(capacity);
1359 }
1360 if let Some(provider) = self.bootstrap_provider {
1361 params = params.with_bootstrap_provider(provider);
1362 }
1363
1364 let mut adaptive_config = AdaptiveBatchConfig::default();
1366 if let Some(max_batch) = config.adaptive_max_batch_size {
1367 adaptive_config.max_batch_size = max_batch;
1368 }
1369 if let Some(min_batch) = config.adaptive_min_batch_size {
1370 adaptive_config.min_batch_size = min_batch;
1371 }
1372 if let Some(max_wait_ms) = config.adaptive_max_wait_ms {
1373 adaptive_config.max_wait_time = Duration::from_millis(max_wait_ms);
1374 }
1375 if let Some(min_wait_ms) = config.adaptive_min_wait_ms {
1376 adaptive_config.min_wait_time = Duration::from_millis(min_wait_ms);
1377 }
1378 if let Some(window_secs) = config.adaptive_window_secs {
1379 adaptive_config.throughput_window = Duration::from_secs(window_secs);
1380 }
1381 if let Some(enabled) = config.adaptive_enabled {
1382 adaptive_config.adaptive_enabled = enabled;
1383 }
1384
1385 Ok(HttpSource {
1386 base: SourceBase::new(params)?,
1387 config,
1388 adaptive_config,
1389 })
1390 }
1391}
1392
1393impl HttpSource {
1394 pub fn builder(id: impl Into<String>) -> HttpSourceBuilder {
1412 HttpSourceBuilder::new(id)
1413 }
1414}
1415
1416fn handle_error(
1418 behavior: &ErrorBehavior,
1419 source_id: &str,
1420 status: StatusCode,
1421 message: &str,
1422 detail: Option<&str>,
1423) -> (StatusCode, Json<EventResponse>) {
1424 match behavior {
1425 ErrorBehavior::Reject => {
1426 debug!("[{source_id}] Rejecting request: {message}");
1427 (
1428 status,
1429 Json(EventResponse {
1430 success: false,
1431 message: message.to_string(),
1432 error: detail.map(String::from),
1433 }),
1434 )
1435 }
1436 ErrorBehavior::AcceptAndLog => {
1437 warn!("[{source_id}] Accepting with error (logged): {message}");
1438 (
1439 StatusCode::OK,
1440 Json(EventResponse {
1441 success: true,
1442 message: format!("Accepted with warning: {message}"),
1443 error: detail.map(String::from),
1444 }),
1445 )
1446 }
1447 ErrorBehavior::AcceptAndSkip => {
1448 trace!("[{source_id}] Accepting silently: {message}");
1449 (
1450 StatusCode::OK,
1451 Json(EventResponse {
1452 success: true,
1453 message: "Accepted".to_string(),
1454 error: None,
1455 }),
1456 )
1457 }
1458 }
1459}
1460
1461fn parse_query_string(query: Option<&str>) -> HashMap<String, String> {
1463 query
1464 .map(|q| {
1465 q.split('&')
1466 .filter_map(|pair| {
1467 let mut parts = pair.splitn(2, '=');
1468 let key = parts.next()?;
1469 let value = parts.next().unwrap_or("");
1470 Some((urlencoding_decode(key), urlencoding_decode(value)))
1471 })
1472 .collect()
1473 })
1474 .unwrap_or_default()
1475}
1476
1477fn urlencoding_decode(s: &str) -> String {
1479 let mut decoded: Vec<u8> = Vec::with_capacity(s.len());
1481 let mut chars = s.chars();
1482
1483 while let Some(c) = chars.next() {
1484 if c == '%' {
1485 let mut hex = String::new();
1486 if let Some(c1) = chars.next() {
1487 hex.push(c1);
1488 }
1489 if let Some(c2) = chars.next() {
1490 hex.push(c2);
1491 }
1492
1493 if hex.len() == 2 {
1494 if let Ok(byte) = u8::from_str_radix(&hex, 16) {
1495 decoded.push(byte);
1496 continue;
1497 }
1498 }
1499
1500 decoded.extend_from_slice(b"%");
1502 decoded.extend_from_slice(hex.as_bytes());
1503 } else if c == '+' {
1504 decoded.push(b' ');
1505 } else {
1506 let mut buf = [0u8; 4];
1508 let encoded = c.encode_utf8(&mut buf);
1509 decoded.extend_from_slice(encoded.as_bytes());
1510 }
1511 }
1512
1513 String::from_utf8_lossy(&decoded).into_owned()
1515}
1516
1517fn build_cors_layer(cors_config: &CorsConfig) -> CorsLayer {
1519 let mut cors = CorsLayer::new();
1520
1521 if cors_config.allow_origins.len() == 1 && cors_config.allow_origins[0] == "*" {
1523 cors = cors.allow_origin(Any);
1524 } else {
1525 let origins: Vec<_> = cors_config
1526 .allow_origins
1527 .iter()
1528 .filter_map(|o| o.parse().ok())
1529 .collect();
1530 cors = cors.allow_origin(origins);
1531 }
1532
1533 let methods: Vec<Method> = cors_config
1535 .allow_methods
1536 .iter()
1537 .filter_map(|m| m.parse().ok())
1538 .collect();
1539 cors = cors.allow_methods(methods);
1540
1541 if cors_config.allow_headers.len() == 1 && cors_config.allow_headers[0] == "*" {
1543 cors = cors.allow_headers(Any);
1544 } else {
1545 let headers: Vec<header::HeaderName> = cors_config
1546 .allow_headers
1547 .iter()
1548 .filter_map(|h| h.parse().ok())
1549 .collect();
1550 cors = cors.allow_headers(headers);
1551 }
1552
1553 if !cors_config.expose_headers.is_empty() {
1555 let exposed: Vec<header::HeaderName> = cors_config
1556 .expose_headers
1557 .iter()
1558 .filter_map(|h| h.parse().ok())
1559 .collect();
1560 cors = cors.expose_headers(exposed);
1561 }
1562
1563 if cors_config.allow_credentials {
1565 cors = cors.allow_credentials(true);
1566 }
1567
1568 cors = cors.max_age(Duration::from_secs(cors_config.max_age));
1570
1571 cors
1572}
1573
1574#[cfg(test)]
1575mod tests {
1576 use super::*;
1577
1578 mod construction {
1579 use super::*;
1580
1581 #[test]
1582 fn test_builder_with_valid_config() {
1583 let source = HttpSourceBuilder::new("test-source")
1584 .with_host("localhost")
1585 .with_port(8080)
1586 .build();
1587 assert!(source.is_ok());
1588 }
1589
1590 #[test]
1591 fn test_builder_with_custom_config() {
1592 let source = HttpSourceBuilder::new("http-source")
1593 .with_host("0.0.0.0")
1594 .with_port(9000)
1595 .with_endpoint("/events")
1596 .build()
1597 .unwrap();
1598 assert_eq!(source.id(), "http-source");
1599 }
1600
1601 #[test]
1602 fn test_with_dispatch_creates_source() {
1603 let config = HttpSourceConfig {
1604 host: "localhost".to_string(),
1605 port: 8080,
1606 endpoint: None,
1607 timeout_ms: 10000,
1608 adaptive_max_batch_size: None,
1609 adaptive_min_batch_size: None,
1610 adaptive_max_wait_ms: None,
1611 adaptive_min_wait_ms: None,
1612 adaptive_window_secs: None,
1613 adaptive_enabled: None,
1614 webhooks: None,
1615 };
1616 let source = HttpSource::with_dispatch(
1617 "dispatch-source",
1618 config,
1619 Some(DispatchMode::Channel),
1620 Some(1000),
1621 );
1622 assert!(source.is_ok());
1623 assert_eq!(source.unwrap().id(), "dispatch-source");
1624 }
1625 }
1626
1627 mod properties {
1628 use super::*;
1629
1630 #[test]
1631 fn test_id_returns_correct_value() {
1632 let source = HttpSourceBuilder::new("my-http-source")
1633 .with_host("localhost")
1634 .build()
1635 .unwrap();
1636 assert_eq!(source.id(), "my-http-source");
1637 }
1638
1639 #[test]
1640 fn test_type_name_returns_http() {
1641 let source = HttpSourceBuilder::new("test")
1642 .with_host("localhost")
1643 .build()
1644 .unwrap();
1645 assert_eq!(source.type_name(), "http");
1646 }
1647
1648 #[test]
1649 fn test_properties_contains_host_and_port() {
1650 let source = HttpSourceBuilder::new("test")
1651 .with_host("192.168.1.1")
1652 .with_port(9000)
1653 .build()
1654 .unwrap();
1655 let props = source.properties();
1656
1657 assert_eq!(
1658 props.get("host"),
1659 Some(&serde_json::Value::String("192.168.1.1".to_string()))
1660 );
1661 assert_eq!(
1662 props.get("port"),
1663 Some(&serde_json::Value::Number(9000.into()))
1664 );
1665 }
1666
1667 #[test]
1668 fn test_properties_includes_endpoint_when_set() {
1669 let source = HttpSourceBuilder::new("test")
1670 .with_host("localhost")
1671 .with_endpoint("/api/v1")
1672 .build()
1673 .unwrap();
1674 let props = source.properties();
1675
1676 assert_eq!(
1677 props.get("endpoint"),
1678 Some(&serde_json::Value::String("/api/v1".to_string()))
1679 );
1680 }
1681
1682 #[test]
1683 fn test_properties_excludes_endpoint_when_none() {
1684 let source = HttpSourceBuilder::new("test")
1685 .with_host("localhost")
1686 .build()
1687 .unwrap();
1688 let props = source.properties();
1689
1690 assert!(!props.contains_key("endpoint"));
1691 }
1692 }
1693
1694 mod lifecycle {
1695 use super::*;
1696
1697 #[tokio::test]
1698 async fn test_initial_status_is_stopped() {
1699 let source = HttpSourceBuilder::new("test")
1700 .with_host("localhost")
1701 .build()
1702 .unwrap();
1703 assert_eq!(source.status().await, ComponentStatus::Stopped);
1704 }
1705 }
1706
1707 mod builder {
1708 use super::*;
1709
1710 #[test]
1711 fn test_http_builder_defaults() {
1712 let source = HttpSourceBuilder::new("test").build().unwrap();
1713 assert_eq!(source.config.port, 8080);
1714 assert_eq!(source.config.timeout_ms, 10000);
1715 assert_eq!(source.config.endpoint, None);
1716 }
1717
1718 #[test]
1719 fn test_http_builder_custom_values() {
1720 let source = HttpSourceBuilder::new("test")
1721 .with_host("api.example.com")
1722 .with_port(9000)
1723 .with_endpoint("/webhook")
1724 .with_timeout_ms(5000)
1725 .build()
1726 .unwrap();
1727
1728 assert_eq!(source.config.host, "api.example.com");
1729 assert_eq!(source.config.port, 9000);
1730 assert_eq!(source.config.endpoint, Some("/webhook".to_string()));
1731 assert_eq!(source.config.timeout_ms, 5000);
1732 }
1733
1734 #[test]
1735 fn test_http_builder_adaptive_batching() {
1736 let source = HttpSourceBuilder::new("test")
1737 .with_host("localhost")
1738 .with_adaptive_max_batch_size(1000)
1739 .with_adaptive_min_batch_size(10)
1740 .with_adaptive_max_wait_ms(500)
1741 .with_adaptive_min_wait_ms(50)
1742 .with_adaptive_window_secs(60)
1743 .with_adaptive_enabled(true)
1744 .build()
1745 .unwrap();
1746
1747 assert_eq!(source.config.adaptive_max_batch_size, Some(1000));
1748 assert_eq!(source.config.adaptive_min_batch_size, Some(10));
1749 assert_eq!(source.config.adaptive_max_wait_ms, Some(500));
1750 assert_eq!(source.config.adaptive_min_wait_ms, Some(50));
1751 assert_eq!(source.config.adaptive_window_secs, Some(60));
1752 assert_eq!(source.config.adaptive_enabled, Some(true));
1753 }
1754
1755 #[test]
1756 fn test_builder_id() {
1757 let source = HttpSource::builder("my-http-source")
1758 .with_host("localhost")
1759 .build()
1760 .unwrap();
1761
1762 assert_eq!(source.base.id, "my-http-source");
1763 }
1764 }
1765
1766 mod event_conversion {
1767 use super::*;
1768
1769 #[test]
1770 fn test_convert_node_insert() {
1771 let mut props = serde_json::Map::new();
1772 props.insert(
1773 "name".to_string(),
1774 serde_json::Value::String("Alice".to_string()),
1775 );
1776 props.insert("age".to_string(), serde_json::Value::Number(30.into()));
1777
1778 let http_change = HttpSourceChange::Insert {
1779 element: HttpElement::Node {
1780 id: "user-1".to_string(),
1781 labels: vec!["User".to_string()],
1782 properties: props,
1783 },
1784 timestamp: Some(1234567890000000000),
1785 };
1786
1787 let result = convert_http_to_source_change(&http_change, "test-source");
1788 assert!(result.is_ok());
1789
1790 match result.unwrap() {
1791 drasi_core::models::SourceChange::Insert { element } => match element {
1792 drasi_core::models::Element::Node {
1793 metadata,
1794 properties,
1795 } => {
1796 assert_eq!(metadata.reference.element_id.as_ref(), "user-1");
1797 assert_eq!(metadata.labels.len(), 1);
1798 assert_eq!(metadata.effective_from, 1234567890000);
1799 assert!(properties.get("name").is_some());
1800 assert!(properties.get("age").is_some());
1801 }
1802 _ => panic!("Expected Node element"),
1803 },
1804 _ => panic!("Expected Insert operation"),
1805 }
1806 }
1807
1808 #[test]
1809 fn test_convert_relation_insert() {
1810 let http_change = HttpSourceChange::Insert {
1811 element: HttpElement::Relation {
1812 id: "follows-1".to_string(),
1813 labels: vec!["FOLLOWS".to_string()],
1814 from: "user-1".to_string(),
1815 to: "user-2".to_string(),
1816 properties: serde_json::Map::new(),
1817 },
1818 timestamp: None,
1819 };
1820
1821 let result = convert_http_to_source_change(&http_change, "test-source");
1822 assert!(result.is_ok());
1823
1824 match result.unwrap() {
1825 drasi_core::models::SourceChange::Insert { element } => match element {
1826 drasi_core::models::Element::Relation {
1827 metadata,
1828 out_node,
1829 in_node,
1830 ..
1831 } => {
1832 assert_eq!(metadata.reference.element_id.as_ref(), "follows-1");
1833 assert_eq!(in_node.element_id.as_ref(), "user-1");
1834 assert_eq!(out_node.element_id.as_ref(), "user-2");
1835 }
1836 _ => panic!("Expected Relation element"),
1837 },
1838 _ => panic!("Expected Insert operation"),
1839 }
1840 }
1841
1842 #[test]
1843 fn test_convert_delete() {
1844 let http_change = HttpSourceChange::Delete {
1845 id: "user-1".to_string(),
1846 labels: Some(vec!["User".to_string()]),
1847 timestamp: Some(9999999999),
1848 };
1849
1850 let result = convert_http_to_source_change(&http_change, "test-source");
1851 assert!(result.is_ok());
1852
1853 match result.unwrap() {
1854 drasi_core::models::SourceChange::Delete { metadata } => {
1855 assert_eq!(metadata.reference.element_id.as_ref(), "user-1");
1856 assert_eq!(metadata.labels.len(), 1);
1857 }
1858 _ => panic!("Expected Delete operation"),
1859 }
1860 }
1861
1862 #[test]
1863 fn test_convert_update() {
1864 let http_change = HttpSourceChange::Update {
1865 element: HttpElement::Node {
1866 id: "user-1".to_string(),
1867 labels: vec!["User".to_string()],
1868 properties: serde_json::Map::new(),
1869 },
1870 timestamp: None,
1871 };
1872
1873 let result = convert_http_to_source_change(&http_change, "test-source");
1874 assert!(result.is_ok());
1875
1876 match result.unwrap() {
1877 drasi_core::models::SourceChange::Update { .. } => {
1878 }
1880 _ => panic!("Expected Update operation"),
1881 }
1882 }
1883 }
1884
1885 mod adaptive_config {
1886 use super::*;
1887
1888 #[test]
1889 fn test_adaptive_config_from_http_config() {
1890 let source = HttpSourceBuilder::new("test")
1891 .with_host("localhost")
1892 .with_adaptive_max_batch_size(500)
1893 .with_adaptive_enabled(true)
1894 .build()
1895 .unwrap();
1896
1897 assert_eq!(source.adaptive_config.max_batch_size, 500);
1899 assert!(source.adaptive_config.adaptive_enabled);
1900 }
1901
1902 #[test]
1903 fn test_adaptive_config_uses_defaults_when_not_specified() {
1904 let source = HttpSourceBuilder::new("test")
1905 .with_host("localhost")
1906 .build()
1907 .unwrap();
1908
1909 let default_config = AdaptiveBatchConfig::default();
1911 assert_eq!(
1912 source.adaptive_config.max_batch_size,
1913 default_config.max_batch_size
1914 );
1915 assert_eq!(
1916 source.adaptive_config.min_batch_size,
1917 default_config.min_batch_size
1918 );
1919 }
1920 }
1921}
1922
1923#[cfg(feature = "dynamic-plugin")]
1927drasi_plugin_sdk::export_plugin!(
1928 plugin_id = "http-source",
1929 core_version = env!("CARGO_PKG_VERSION"),
1930 lib_version = env!("CARGO_PKG_VERSION"),
1931 plugin_version = env!("CARGO_PKG_VERSION"),
1932 source_descriptors = [descriptor::HttpSourceDescriptor],
1933 reaction_descriptors = [],
1934 bootstrap_descriptors = [],
1935);