1pub mod config;
217pub use config::HttpSourceConfig;
218
219mod adaptive_batcher;
220mod models;
221mod time;
222
223pub mod auth;
225pub mod content_parser;
226pub mod route_matcher;
227pub mod template_engine;
228
229pub use models::{convert_http_to_source_change, HttpElement, HttpSourceChange};
231
232use anyhow::Result;
233use async_trait::async_trait;
234use axum::{
235 body::Bytes,
236 extract::{Path, State},
237 http::{header, Method, StatusCode},
238 response::IntoResponse,
239 routing::{delete, get, post, put},
240 Json, Router,
241};
242use log::{debug, error, info, trace, warn};
243use serde::{Deserialize, Serialize};
244use std::collections::HashMap;
245use std::sync::Arc;
246use std::time::Duration;
247use tokio::sync::mpsc;
248use tokio::time::timeout;
249use tower_http::cors::{Any, CorsLayer};
250
251use drasi_lib::channels::{ComponentType, *};
252use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
253use drasi_lib::Source;
254use tracing::Instrument;
255
256use crate::adaptive_batcher::{AdaptiveBatchConfig, AdaptiveBatcher};
257use crate::auth::{verify_auth, AuthResult};
258use crate::config::{CorsConfig, ErrorBehavior, WebhookConfig};
259use crate::content_parser::{parse_content, ContentType};
260use crate::route_matcher::{convert_method, find_matching_mappings, headers_to_map, RouteMatcher};
261use crate::template_engine::{TemplateContext, TemplateEngine};
262
263#[derive(Debug, Serialize, Deserialize)]
265pub struct EventResponse {
266 pub success: bool,
267 pub message: String,
268 #[serde(skip_serializing_if = "Option::is_none")]
269 pub error: Option<String>,
270}
271
272pub struct HttpSource {
284 base: SourceBase,
286 config: HttpSourceConfig,
288 adaptive_config: AdaptiveBatchConfig,
290}
291
292#[derive(Debug, Clone, Serialize, Deserialize)]
294pub struct BatchEventRequest {
295 pub events: Vec<HttpSourceChange>,
296}
297
298#[derive(Clone)]
302struct HttpAppState {
303 source_id: String,
305 batch_tx: mpsc::Sender<SourceChangeEvent>,
307 webhook_config: Option<Arc<WebhookState>>,
309}
310
311struct WebhookState {
313 config: WebhookConfig,
315 route_matcher: RouteMatcher,
317 template_engine: TemplateEngine,
319}
320
321impl HttpSource {
322 pub fn new(id: impl Into<String>, config: HttpSourceConfig) -> Result<Self> {
353 let id = id.into();
354 let params = SourceBaseParams::new(id);
355
356 let mut adaptive_config = AdaptiveBatchConfig::default();
358
359 if let Some(max_batch) = config.adaptive_max_batch_size {
361 adaptive_config.max_batch_size = max_batch;
362 }
363 if let Some(min_batch) = config.adaptive_min_batch_size {
364 adaptive_config.min_batch_size = min_batch;
365 }
366 if let Some(max_wait_ms) = config.adaptive_max_wait_ms {
367 adaptive_config.max_wait_time = Duration::from_millis(max_wait_ms);
368 }
369 if let Some(min_wait_ms) = config.adaptive_min_wait_ms {
370 adaptive_config.min_wait_time = Duration::from_millis(min_wait_ms);
371 }
372 if let Some(window_secs) = config.adaptive_window_secs {
373 adaptive_config.throughput_window = Duration::from_secs(window_secs);
374 }
375 if let Some(enabled) = config.adaptive_enabled {
376 adaptive_config.adaptive_enabled = enabled;
377 }
378
379 Ok(Self {
380 base: SourceBase::new(params)?,
381 config,
382 adaptive_config,
383 })
384 }
385
386 pub fn with_dispatch(
406 id: impl Into<String>,
407 config: HttpSourceConfig,
408 dispatch_mode: Option<DispatchMode>,
409 dispatch_buffer_capacity: Option<usize>,
410 ) -> Result<Self> {
411 let id = id.into();
412 let mut params = SourceBaseParams::new(id);
413 if let Some(mode) = dispatch_mode {
414 params = params.with_dispatch_mode(mode);
415 }
416 if let Some(capacity) = dispatch_buffer_capacity {
417 params = params.with_dispatch_buffer_capacity(capacity);
418 }
419
420 let mut adaptive_config = AdaptiveBatchConfig::default();
421
422 if let Some(max_batch) = config.adaptive_max_batch_size {
423 adaptive_config.max_batch_size = max_batch;
424 }
425 if let Some(min_batch) = config.adaptive_min_batch_size {
426 adaptive_config.min_batch_size = min_batch;
427 }
428 if let Some(max_wait_ms) = config.adaptive_max_wait_ms {
429 adaptive_config.max_wait_time = Duration::from_millis(max_wait_ms);
430 }
431 if let Some(min_wait_ms) = config.adaptive_min_wait_ms {
432 adaptive_config.min_wait_time = Duration::from_millis(min_wait_ms);
433 }
434 if let Some(window_secs) = config.adaptive_window_secs {
435 adaptive_config.throughput_window = Duration::from_secs(window_secs);
436 }
437 if let Some(enabled) = config.adaptive_enabled {
438 adaptive_config.adaptive_enabled = enabled;
439 }
440
441 Ok(Self {
442 base: SourceBase::new(params)?,
443 config,
444 adaptive_config,
445 })
446 }
447
448 async fn handle_single_event(
453 Path(source_id): Path<String>,
454 State(state): State<HttpAppState>,
455 Json(event): Json<HttpSourceChange>,
456 ) -> Result<impl IntoResponse, (StatusCode, Json<EventResponse>)> {
457 debug!("[{source_id}] HTTP endpoint received single event: {event:?}");
458 Self::process_events(&source_id, &state, vec![event]).await
459 }
460
461 async fn handle_batch_events(
466 Path(source_id): Path<String>,
467 State(state): State<HttpAppState>,
468 Json(batch): Json<BatchEventRequest>,
469 ) -> Result<impl IntoResponse, (StatusCode, Json<EventResponse>)> {
470 debug!(
471 "[{}] HTTP endpoint received batch of {} events",
472 source_id,
473 batch.events.len()
474 );
475 Self::process_events(&source_id, &state, batch.events).await
476 }
477
478 async fn process_events(
490 source_id: &str,
491 state: &HttpAppState,
492 events: Vec<HttpSourceChange>,
493 ) -> Result<impl IntoResponse, (StatusCode, Json<EventResponse>)> {
494 trace!("[{}] Processing {} events", source_id, events.len());
495
496 if source_id != state.source_id {
497 error!(
498 "[{}] Source name mismatch. Expected '{}', got '{}'",
499 state.source_id, state.source_id, source_id
500 );
501 return Err((
502 StatusCode::BAD_REQUEST,
503 Json(EventResponse {
504 success: false,
505 message: "Source name mismatch".to_string(),
506 error: Some(format!(
507 "Expected source '{}', got '{}'",
508 state.source_id, source_id
509 )),
510 }),
511 ));
512 }
513
514 let mut success_count = 0;
515 let mut error_count = 0;
516 let mut last_error = None;
517
518 for (idx, event) in events.iter().enumerate() {
519 match convert_http_to_source_change(event, source_id) {
520 Ok(source_change) => {
521 let change_event = SourceChangeEvent {
522 source_id: source_id.to_string(),
523 change: source_change,
524 timestamp: chrono::Utc::now(),
525 };
526
527 if let Err(e) = state.batch_tx.send(change_event).await {
528 error!(
529 "[{}] Failed to send event {} to batch channel: {}",
530 state.source_id,
531 idx + 1,
532 e
533 );
534 error_count += 1;
535 last_error = Some("Internal channel error".to_string());
536 } else {
537 success_count += 1;
538 }
539 }
540 Err(e) => {
541 error!(
542 "[{}] Failed to convert event {}: {}",
543 state.source_id,
544 idx + 1,
545 e
546 );
547 error_count += 1;
548 last_error = Some(e.to_string());
549 }
550 }
551 }
552
553 debug!(
554 "[{source_id}] Event processing complete: {success_count} succeeded, {error_count} failed"
555 );
556
557 if error_count > 0 && success_count == 0 {
558 Err((
559 StatusCode::BAD_REQUEST,
560 Json(EventResponse {
561 success: false,
562 message: format!("All {error_count} events failed"),
563 error: last_error,
564 }),
565 ))
566 } else if error_count > 0 {
567 Ok(Json(EventResponse {
568 success: true,
569 message: format!(
570 "Processed {success_count} events successfully, {error_count} failed"
571 ),
572 error: last_error,
573 }))
574 } else {
575 Ok(Json(EventResponse {
576 success: true,
577 message: format!("All {success_count} events processed successfully"),
578 error: None,
579 }))
580 }
581 }
582
583 async fn health_check() -> impl IntoResponse {
584 Json(serde_json::json!({
585 "status": "healthy",
586 "service": "http-source",
587 "features": ["adaptive-batching", "batch-endpoint", "webhooks"]
588 }))
589 }
590
591 async fn handle_webhook(
596 method: axum::http::Method,
597 uri: axum::http::Uri,
598 headers: axum::http::HeaderMap,
599 State(state): State<HttpAppState>,
600 body: Bytes,
601 ) -> impl IntoResponse {
602 let path = uri.path();
603 let source_id = &state.source_id;
604
605 debug!("[{source_id}] Webhook received: {method} {path}");
606
607 let webhook_state = match &state.webhook_config {
609 Some(ws) => ws,
610 None => {
611 error!("[{source_id}] Webhook handler called but no webhook config present");
612 return (
613 StatusCode::INTERNAL_SERVER_ERROR,
614 Json(EventResponse {
615 success: false,
616 message: "Internal configuration error".to_string(),
617 error: Some("Webhook mode not properly configured".to_string()),
618 }),
619 );
620 }
621 };
622
623 let http_method = match convert_method(&method) {
625 Some(m) => m,
626 None => {
627 return handle_error(
628 &webhook_state.config.error_behavior,
629 source_id,
630 StatusCode::METHOD_NOT_ALLOWED,
631 "Method not supported",
632 None,
633 );
634 }
635 };
636
637 let route_match = match webhook_state.route_matcher.match_route(
639 path,
640 &http_method,
641 &webhook_state.config.routes,
642 ) {
643 Some(rm) => rm,
644 None => {
645 debug!("[{source_id}] No matching route for {method} {path}");
646 return handle_error(
647 &webhook_state.config.error_behavior,
648 source_id,
649 StatusCode::NOT_FOUND,
650 "No matching route",
651 None,
652 );
653 }
654 };
655
656 let route = route_match.route;
657 let error_behavior = route
658 .error_behavior
659 .as_ref()
660 .unwrap_or(&webhook_state.config.error_behavior);
661
662 let auth_result = verify_auth(route.auth.as_ref(), &headers, &body);
664 if let AuthResult::Failed(reason) = auth_result {
665 warn!("[{source_id}] Authentication failed for {path}: {reason}");
666 return handle_error(
667 error_behavior,
668 source_id,
669 StatusCode::UNAUTHORIZED,
670 "Authentication failed",
671 Some(&reason),
672 );
673 }
674
675 let content_type = ContentType::from_header(
677 headers
678 .get(axum::http::header::CONTENT_TYPE)
679 .and_then(|v| v.to_str().ok()),
680 );
681
682 let payload = match parse_content(&body, content_type) {
683 Ok(p) => p,
684 Err(e) => {
685 warn!("[{source_id}] Failed to parse payload: {e}");
686 return handle_error(
687 error_behavior,
688 source_id,
689 StatusCode::BAD_REQUEST,
690 "Failed to parse payload",
691 Some(&e.to_string()),
692 );
693 }
694 };
695
696 let headers_map = headers_to_map(&headers);
698 let query_map = parse_query_string(uri.query());
699
700 let context = TemplateContext {
701 payload: payload.clone(),
702 route: route_match.path_params,
703 query: query_map,
704 headers: headers_map.clone(),
705 method: method.to_string(),
706 path: path.to_string(),
707 source_id: source_id.clone(),
708 };
709
710 let matching_mappings = find_matching_mappings(&route.mappings, &headers_map, &payload);
712
713 if matching_mappings.is_empty() {
714 debug!("[{source_id}] No matching mappings for request");
715 return handle_error(
716 error_behavior,
717 source_id,
718 StatusCode::BAD_REQUEST,
719 "No matching mapping for request",
720 None,
721 );
722 }
723
724 let mut success_count = 0;
726 let mut error_count = 0;
727 let mut last_error = None;
728
729 for mapping in matching_mappings {
730 match webhook_state
731 .template_engine
732 .process_mapping(mapping, &context, source_id)
733 {
734 Ok(source_change) => {
735 let event = SourceChangeEvent {
736 source_id: source_id.clone(),
737 change: source_change,
738 timestamp: chrono::Utc::now(),
739 };
740
741 if let Err(e) = state.batch_tx.send(event).await {
742 error!("[{source_id}] Failed to send event to batcher: {e}");
743 error_count += 1;
744 last_error = Some(format!("Failed to queue event: {e}"));
745 } else {
746 success_count += 1;
747 }
748 }
749 Err(e) => {
750 warn!("[{source_id}] Failed to process mapping: {e}");
751 error_count += 1;
752 last_error = Some(e.to_string());
753 }
754 }
755 }
756
757 debug!("[{source_id}] Webhook processing complete: {success_count} succeeded, {error_count} failed");
758
759 if error_count > 0 && success_count == 0 {
760 handle_error(
761 error_behavior,
762 source_id,
763 StatusCode::BAD_REQUEST,
764 &format!("All {error_count} mappings failed"),
765 last_error.as_deref(),
766 )
767 } else if error_count > 0 {
768 (
769 StatusCode::OK,
770 Json(EventResponse {
771 success: true,
772 message: format!("Processed {success_count} events, {error_count} failed"),
773 error: last_error,
774 }),
775 )
776 } else {
777 (
778 StatusCode::OK,
779 Json(EventResponse {
780 success: true,
781 message: format!("Processed {success_count} events successfully"),
782 error: None,
783 }),
784 )
785 }
786 }
787
788 async fn run_adaptive_batcher(
789 batch_rx: mpsc::Receiver<SourceChangeEvent>,
790 dispatchers: Arc<
791 tokio::sync::RwLock<
792 Vec<
793 Box<
794 dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync,
795 >,
796 >,
797 >,
798 >,
799 adaptive_config: AdaptiveBatchConfig,
800 source_id: String,
801 ) {
802 let mut batcher = AdaptiveBatcher::new(batch_rx, adaptive_config.clone());
803 let mut total_events = 0u64;
804 let mut total_batches = 0u64;
805
806 info!("[{source_id}] Adaptive HTTP batcher started with config: {adaptive_config:?}");
807
808 while let Some(batch) = batcher.next_batch().await {
809 if batch.is_empty() {
810 debug!("[{source_id}] Batcher received empty batch, skipping");
811 continue;
812 }
813
814 let batch_size = batch.len();
815 total_events += batch_size as u64;
816 total_batches += 1;
817
818 debug!(
819 "[{source_id}] Batcher forwarding batch #{total_batches} with {batch_size} events to dispatchers"
820 );
821
822 let mut sent_count = 0;
823 let mut failed_count = 0;
824 for (idx, event) in batch.into_iter().enumerate() {
825 debug!(
826 "[{}] Batch #{}, dispatching event {}/{}",
827 source_id,
828 total_batches,
829 idx + 1,
830 batch_size
831 );
832
833 let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
834 profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
835
836 let wrapper = SourceEventWrapper::with_profiling(
837 event.source_id.clone(),
838 SourceEvent::Change(event.change),
839 event.timestamp,
840 profiling,
841 );
842
843 if let Err(e) =
844 SourceBase::dispatch_from_task(dispatchers.clone(), wrapper.clone(), &source_id)
845 .await
846 {
847 error!(
848 "[{}] Batch #{}, failed to dispatch event {}/{} (no subscribers): {}",
849 source_id,
850 total_batches,
851 idx + 1,
852 batch_size,
853 e
854 );
855 failed_count += 1;
856 } else {
857 debug!(
858 "[{}] Batch #{}, successfully dispatched event {}/{}",
859 source_id,
860 total_batches,
861 idx + 1,
862 batch_size
863 );
864 sent_count += 1;
865 }
866 }
867
868 debug!(
869 "[{source_id}] Batch #{total_batches} complete: {sent_count} dispatched, {failed_count} failed"
870 );
871
872 if total_batches.is_multiple_of(100) {
873 info!(
874 "[{}] Adaptive HTTP metrics - Batches: {}, Events: {}, Avg batch size: {:.1}",
875 source_id,
876 total_batches,
877 total_events,
878 total_events as f64 / total_batches as f64
879 );
880 }
881 }
882
883 info!(
884 "[{source_id}] Adaptive HTTP batcher stopped - Total batches: {total_batches}, Total events: {total_events}"
885 );
886 }
887}
888
889#[async_trait]
890impl Source for HttpSource {
891 fn id(&self) -> &str {
892 &self.base.id
893 }
894
895 fn type_name(&self) -> &str {
896 "http"
897 }
898
899 fn properties(&self) -> HashMap<String, serde_json::Value> {
900 let mut props = HashMap::new();
901 props.insert(
902 "host".to_string(),
903 serde_json::Value::String(self.config.host.clone()),
904 );
905 props.insert(
906 "port".to_string(),
907 serde_json::Value::Number(self.config.port.into()),
908 );
909 if let Some(ref endpoint) = self.config.endpoint {
910 props.insert(
911 "endpoint".to_string(),
912 serde_json::Value::String(endpoint.clone()),
913 );
914 }
915 props
916 }
917
918 fn auto_start(&self) -> bool {
919 self.base.get_auto_start()
920 }
921
922 async fn start(&self) -> Result<()> {
923 info!("[{}] Starting adaptive HTTP source", self.base.id);
924
925 self.base.set_status(ComponentStatus::Starting).await;
926 self.base
927 .send_component_event(
928 ComponentStatus::Starting,
929 Some("Starting adaptive HTTP source".to_string()),
930 )
931 .await?;
932
933 let host = self.config.host.clone();
934 let port = self.config.port;
935
936 let batch_channel_capacity = self.adaptive_config.recommended_channel_capacity();
938 let (batch_tx, batch_rx) = mpsc::channel(batch_channel_capacity);
939 info!(
940 "[{}] HttpSource using batch channel capacity: {} (max_batch_size: {} x 5)",
941 self.base.id, batch_channel_capacity, self.adaptive_config.max_batch_size
942 );
943
944 let adaptive_config = self.adaptive_config.clone();
946 let source_id = self.base.id.clone();
947 let dispatchers = self.base.dispatchers.clone();
948
949 let instance_id = self
951 .base
952 .context()
953 .await
954 .map(|c| c.instance_id)
955 .unwrap_or_default();
956
957 info!("[{source_id}] Starting adaptive batcher task");
958 let source_id_for_span = source_id.clone();
959 let span = tracing::info_span!(
960 "http_adaptive_batcher",
961 instance_id = %instance_id,
962 component_id = %source_id_for_span,
963 component_type = "source"
964 );
965 tokio::spawn(
966 async move {
967 Self::run_adaptive_batcher(
968 batch_rx,
969 dispatchers,
970 adaptive_config,
971 source_id.clone(),
972 )
973 .await
974 }
975 .instrument(span),
976 );
977
978 let webhook_state = if let Some(ref webhook_config) = self.config.webhooks {
980 info!(
981 "[{}] Webhook mode enabled with {} routes",
982 self.base.id,
983 webhook_config.routes.len()
984 );
985 Some(Arc::new(WebhookState {
986 config: webhook_config.clone(),
987 route_matcher: RouteMatcher::new(&webhook_config.routes),
988 template_engine: TemplateEngine::new(),
989 }))
990 } else {
991 info!("[{}] Standard mode enabled", self.base.id);
992 None
993 };
994
995 let state = HttpAppState {
996 source_id: self.base.id.clone(),
997 batch_tx,
998 webhook_config: webhook_state,
999 };
1000
1001 let app = if self.config.is_webhook_mode() {
1003 let router = Router::new()
1005 .route("/health", get(Self::health_check))
1006 .fallback(Self::handle_webhook)
1007 .with_state(state);
1008
1009 if let Some(ref webhooks) = self.config.webhooks {
1011 if let Some(ref cors_config) = webhooks.cors {
1012 if cors_config.enabled {
1013 info!("[{}] CORS enabled for webhook endpoints", self.base.id);
1014 router.layer(build_cors_layer(cors_config))
1015 } else {
1016 router
1017 }
1018 } else {
1019 router
1020 }
1021 } else {
1022 router
1023 }
1024 } else {
1025 Router::new()
1027 .route("/health", get(Self::health_check))
1028 .route(
1029 "/sources/:source_id/events",
1030 post(Self::handle_single_event),
1031 )
1032 .route(
1033 "/sources/:source_id/events/batch",
1034 post(Self::handle_batch_events),
1035 )
1036 .with_state(state)
1037 };
1038
1039 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
1041
1042 let host_clone = host.clone();
1043
1044 let (error_tx, error_rx) = tokio::sync::oneshot::channel();
1046 let source_id = self.base.id.clone();
1047 let source_id_for_span = source_id.clone();
1048 let span = tracing::info_span!(
1049 "http_source_server",
1050 instance_id = %instance_id,
1051 component_id = %source_id_for_span,
1052 component_type = "source"
1053 );
1054 let server_handle = tokio::spawn(
1055 async move {
1056 let addr = format!("{host}:{port}");
1057 info!("[{source_id}] Adaptive HTTP source attempting to bind to {addr}");
1058
1059 let listener = match tokio::net::TcpListener::bind(&addr).await {
1060 Ok(listener) => {
1061 info!("[{source_id}] Adaptive HTTP source successfully listening on {addr}");
1062 listener
1063 }
1064 Err(e) => {
1065 error!("[{source_id}] Failed to bind HTTP server to {addr}: {e}");
1066 let _ = error_tx.send(format!(
1067 "Failed to bind HTTP server to {addr}: {e}. Common causes: port already in use, insufficient permissions"
1068 ));
1069 return;
1070 }
1071 };
1072
1073 if let Err(e) = axum::serve(listener, app)
1074 .with_graceful_shutdown(async move {
1075 let _ = shutdown_rx.await;
1076 })
1077 .await
1078 {
1079 error!("[{source_id}] HTTP server error: {e}");
1080 }
1081 }
1082 .instrument(span),
1083 );
1084
1085 *self.base.task_handle.write().await = Some(server_handle);
1086 *self.base.shutdown_tx.write().await = Some(shutdown_tx);
1087
1088 match timeout(Duration::from_millis(500), error_rx).await {
1090 Ok(Ok(error_msg)) => {
1091 self.base.set_status(ComponentStatus::Error).await;
1092 return Err(anyhow::anyhow!("{error_msg}"));
1093 }
1094 _ => {
1095 self.base.set_status(ComponentStatus::Running).await;
1096 }
1097 }
1098
1099 self.base
1100 .send_component_event(
1101 ComponentStatus::Running,
1102 Some(format!(
1103 "Adaptive HTTP source running on {host_clone}:{port} with batch support"
1104 )),
1105 )
1106 .await?;
1107
1108 Ok(())
1109 }
1110
1111 async fn stop(&self) -> Result<()> {
1112 info!("[{}] Stopping adaptive HTTP source", self.base.id);
1113
1114 self.base.set_status(ComponentStatus::Stopping).await;
1115 self.base
1116 .send_component_event(
1117 ComponentStatus::Stopping,
1118 Some("Stopping adaptive HTTP source".to_string()),
1119 )
1120 .await?;
1121
1122 if let Some(tx) = self.base.shutdown_tx.write().await.take() {
1123 let _ = tx.send(());
1124 }
1125
1126 if let Some(handle) = self.base.task_handle.write().await.take() {
1127 let _ = timeout(Duration::from_secs(5), handle).await;
1128 }
1129
1130 self.base.set_status(ComponentStatus::Stopped).await;
1131 self.base
1132 .send_component_event(
1133 ComponentStatus::Stopped,
1134 Some("Adaptive HTTP source stopped".to_string()),
1135 )
1136 .await?;
1137
1138 Ok(())
1139 }
1140
1141 async fn status(&self) -> ComponentStatus {
1142 self.base.get_status().await
1143 }
1144
1145 async fn subscribe(
1146 &self,
1147 settings: drasi_lib::config::SourceSubscriptionSettings,
1148 ) -> Result<SubscriptionResponse> {
1149 self.base.subscribe_with_bootstrap(&settings, "HTTP").await
1150 }
1151
1152 fn as_any(&self) -> &dyn std::any::Any {
1153 self
1154 }
1155
1156 async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
1157 self.base.initialize(context).await;
1158 }
1159
1160 async fn set_bootstrap_provider(
1161 &self,
1162 provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
1163 ) {
1164 self.base.set_bootstrap_provider(provider).await;
1165 }
1166}
1167
1168pub struct HttpSourceBuilder {
1187 id: String,
1188 host: String,
1189 port: u16,
1190 endpoint: Option<String>,
1191 timeout_ms: u64,
1192 adaptive_max_batch_size: Option<usize>,
1193 adaptive_min_batch_size: Option<usize>,
1194 adaptive_max_wait_ms: Option<u64>,
1195 adaptive_min_wait_ms: Option<u64>,
1196 adaptive_window_secs: Option<u64>,
1197 adaptive_enabled: Option<bool>,
1198 webhooks: Option<WebhookConfig>,
1199 dispatch_mode: Option<DispatchMode>,
1200 dispatch_buffer_capacity: Option<usize>,
1201 bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
1202 auto_start: bool,
1203}
1204
1205impl HttpSourceBuilder {
1206 pub fn new(id: impl Into<String>) -> Self {
1212 Self {
1213 id: id.into(),
1214 host: String::new(),
1215 port: 8080,
1216 endpoint: None,
1217 timeout_ms: 10000,
1218 adaptive_max_batch_size: None,
1219 adaptive_min_batch_size: None,
1220 adaptive_max_wait_ms: None,
1221 adaptive_min_wait_ms: None,
1222 adaptive_window_secs: None,
1223 adaptive_enabled: None,
1224 webhooks: None,
1225 dispatch_mode: None,
1226 dispatch_buffer_capacity: None,
1227 bootstrap_provider: None,
1228 auto_start: true,
1229 }
1230 }
1231
1232 pub fn with_host(mut self, host: impl Into<String>) -> Self {
1234 self.host = host.into();
1235 self
1236 }
1237
1238 pub fn with_port(mut self, port: u16) -> Self {
1240 self.port = port;
1241 self
1242 }
1243
1244 pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
1246 self.endpoint = Some(endpoint.into());
1247 self
1248 }
1249
1250 pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
1252 self.timeout_ms = timeout_ms;
1253 self
1254 }
1255
1256 pub fn with_adaptive_max_batch_size(mut self, size: usize) -> Self {
1258 self.adaptive_max_batch_size = Some(size);
1259 self
1260 }
1261
1262 pub fn with_adaptive_min_batch_size(mut self, size: usize) -> Self {
1264 self.adaptive_min_batch_size = Some(size);
1265 self
1266 }
1267
1268 pub fn with_adaptive_max_wait_ms(mut self, wait_ms: u64) -> Self {
1270 self.adaptive_max_wait_ms = Some(wait_ms);
1271 self
1272 }
1273
1274 pub fn with_adaptive_min_wait_ms(mut self, wait_ms: u64) -> Self {
1276 self.adaptive_min_wait_ms = Some(wait_ms);
1277 self
1278 }
1279
1280 pub fn with_adaptive_window_secs(mut self, secs: u64) -> Self {
1282 self.adaptive_window_secs = Some(secs);
1283 self
1284 }
1285
1286 pub fn with_adaptive_enabled(mut self, enabled: bool) -> Self {
1288 self.adaptive_enabled = Some(enabled);
1289 self
1290 }
1291
1292 pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
1294 self.dispatch_mode = Some(mode);
1295 self
1296 }
1297
1298 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
1300 self.dispatch_buffer_capacity = Some(capacity);
1301 self
1302 }
1303
1304 pub fn with_bootstrap_provider(
1306 mut self,
1307 provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
1308 ) -> Self {
1309 self.bootstrap_provider = Some(Box::new(provider));
1310 self
1311 }
1312
1313 pub fn with_auto_start(mut self, auto_start: bool) -> Self {
1318 self.auto_start = auto_start;
1319 self
1320 }
1321
1322 pub fn with_webhooks(mut self, webhooks: WebhookConfig) -> Self {
1327 self.webhooks = Some(webhooks);
1328 self
1329 }
1330
1331 pub fn with_config(mut self, config: HttpSourceConfig) -> Self {
1333 self.host = config.host;
1334 self.port = config.port;
1335 self.endpoint = config.endpoint;
1336 self.timeout_ms = config.timeout_ms;
1337 self.adaptive_max_batch_size = config.adaptive_max_batch_size;
1338 self.adaptive_min_batch_size = config.adaptive_min_batch_size;
1339 self.adaptive_max_wait_ms = config.adaptive_max_wait_ms;
1340 self.adaptive_min_wait_ms = config.adaptive_min_wait_ms;
1341 self.adaptive_window_secs = config.adaptive_window_secs;
1342 self.adaptive_enabled = config.adaptive_enabled;
1343 self.webhooks = config.webhooks;
1344 self
1345 }
1346
1347 pub fn build(self) -> Result<HttpSource> {
1353 let config = HttpSourceConfig {
1354 host: self.host,
1355 port: self.port,
1356 endpoint: self.endpoint,
1357 timeout_ms: self.timeout_ms,
1358 adaptive_max_batch_size: self.adaptive_max_batch_size,
1359 adaptive_min_batch_size: self.adaptive_min_batch_size,
1360 adaptive_max_wait_ms: self.adaptive_max_wait_ms,
1361 adaptive_min_wait_ms: self.adaptive_min_wait_ms,
1362 adaptive_window_secs: self.adaptive_window_secs,
1363 adaptive_enabled: self.adaptive_enabled,
1364 webhooks: self.webhooks,
1365 };
1366
1367 let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
1369 if let Some(mode) = self.dispatch_mode {
1370 params = params.with_dispatch_mode(mode);
1371 }
1372 if let Some(capacity) = self.dispatch_buffer_capacity {
1373 params = params.with_dispatch_buffer_capacity(capacity);
1374 }
1375 if let Some(provider) = self.bootstrap_provider {
1376 params = params.with_bootstrap_provider(provider);
1377 }
1378
1379 let mut adaptive_config = AdaptiveBatchConfig::default();
1381 if let Some(max_batch) = config.adaptive_max_batch_size {
1382 adaptive_config.max_batch_size = max_batch;
1383 }
1384 if let Some(min_batch) = config.adaptive_min_batch_size {
1385 adaptive_config.min_batch_size = min_batch;
1386 }
1387 if let Some(max_wait_ms) = config.adaptive_max_wait_ms {
1388 adaptive_config.max_wait_time = Duration::from_millis(max_wait_ms);
1389 }
1390 if let Some(min_wait_ms) = config.adaptive_min_wait_ms {
1391 adaptive_config.min_wait_time = Duration::from_millis(min_wait_ms);
1392 }
1393 if let Some(window_secs) = config.adaptive_window_secs {
1394 adaptive_config.throughput_window = Duration::from_secs(window_secs);
1395 }
1396 if let Some(enabled) = config.adaptive_enabled {
1397 adaptive_config.adaptive_enabled = enabled;
1398 }
1399
1400 Ok(HttpSource {
1401 base: SourceBase::new(params)?,
1402 config,
1403 adaptive_config,
1404 })
1405 }
1406}
1407
1408impl HttpSource {
1409 pub fn builder(id: impl Into<String>) -> HttpSourceBuilder {
1427 HttpSourceBuilder::new(id)
1428 }
1429}
1430
1431fn handle_error(
1433 behavior: &ErrorBehavior,
1434 source_id: &str,
1435 status: StatusCode,
1436 message: &str,
1437 detail: Option<&str>,
1438) -> (StatusCode, Json<EventResponse>) {
1439 match behavior {
1440 ErrorBehavior::Reject => {
1441 debug!("[{source_id}] Rejecting request: {message}");
1442 (
1443 status,
1444 Json(EventResponse {
1445 success: false,
1446 message: message.to_string(),
1447 error: detail.map(String::from),
1448 }),
1449 )
1450 }
1451 ErrorBehavior::AcceptAndLog => {
1452 warn!("[{source_id}] Accepting with error (logged): {message}");
1453 (
1454 StatusCode::OK,
1455 Json(EventResponse {
1456 success: true,
1457 message: format!("Accepted with warning: {message}"),
1458 error: detail.map(String::from),
1459 }),
1460 )
1461 }
1462 ErrorBehavior::AcceptAndSkip => {
1463 trace!("[{source_id}] Accepting silently: {message}");
1464 (
1465 StatusCode::OK,
1466 Json(EventResponse {
1467 success: true,
1468 message: "Accepted".to_string(),
1469 error: None,
1470 }),
1471 )
1472 }
1473 }
1474}
1475
1476fn parse_query_string(query: Option<&str>) -> HashMap<String, String> {
1478 query
1479 .map(|q| {
1480 q.split('&')
1481 .filter_map(|pair| {
1482 let mut parts = pair.splitn(2, '=');
1483 let key = parts.next()?;
1484 let value = parts.next().unwrap_or("");
1485 Some((urlencoding_decode(key), urlencoding_decode(value)))
1486 })
1487 .collect()
1488 })
1489 .unwrap_or_default()
1490}
1491
1492fn urlencoding_decode(s: &str) -> String {
1494 let mut decoded: Vec<u8> = Vec::with_capacity(s.len());
1496 let mut chars = s.chars();
1497
1498 while let Some(c) = chars.next() {
1499 if c == '%' {
1500 let mut hex = String::new();
1501 if let Some(c1) = chars.next() {
1502 hex.push(c1);
1503 }
1504 if let Some(c2) = chars.next() {
1505 hex.push(c2);
1506 }
1507
1508 if hex.len() == 2 {
1509 if let Ok(byte) = u8::from_str_radix(&hex, 16) {
1510 decoded.push(byte);
1511 continue;
1512 }
1513 }
1514
1515 decoded.extend_from_slice(b"%");
1517 decoded.extend_from_slice(hex.as_bytes());
1518 } else if c == '+' {
1519 decoded.push(b' ');
1520 } else {
1521 let mut buf = [0u8; 4];
1523 let encoded = c.encode_utf8(&mut buf);
1524 decoded.extend_from_slice(encoded.as_bytes());
1525 }
1526 }
1527
1528 String::from_utf8_lossy(&decoded).into_owned()
1530}
1531
1532fn build_cors_layer(cors_config: &CorsConfig) -> CorsLayer {
1534 let mut cors = CorsLayer::new();
1535
1536 if cors_config.allow_origins.len() == 1 && cors_config.allow_origins[0] == "*" {
1538 cors = cors.allow_origin(Any);
1539 } else {
1540 let origins: Vec<_> = cors_config
1541 .allow_origins
1542 .iter()
1543 .filter_map(|o| o.parse().ok())
1544 .collect();
1545 cors = cors.allow_origin(origins);
1546 }
1547
1548 let methods: Vec<Method> = cors_config
1550 .allow_methods
1551 .iter()
1552 .filter_map(|m| m.parse().ok())
1553 .collect();
1554 cors = cors.allow_methods(methods);
1555
1556 if cors_config.allow_headers.len() == 1 && cors_config.allow_headers[0] == "*" {
1558 cors = cors.allow_headers(Any);
1559 } else {
1560 let headers: Vec<header::HeaderName> = cors_config
1561 .allow_headers
1562 .iter()
1563 .filter_map(|h| h.parse().ok())
1564 .collect();
1565 cors = cors.allow_headers(headers);
1566 }
1567
1568 if !cors_config.expose_headers.is_empty() {
1570 let exposed: Vec<header::HeaderName> = cors_config
1571 .expose_headers
1572 .iter()
1573 .filter_map(|h| h.parse().ok())
1574 .collect();
1575 cors = cors.expose_headers(exposed);
1576 }
1577
1578 if cors_config.allow_credentials {
1580 cors = cors.allow_credentials(true);
1581 }
1582
1583 cors = cors.max_age(Duration::from_secs(cors_config.max_age));
1585
1586 cors
1587}
1588
1589#[cfg(test)]
1590mod tests {
1591 use super::*;
1592
1593 mod construction {
1594 use super::*;
1595
1596 #[test]
1597 fn test_builder_with_valid_config() {
1598 let source = HttpSourceBuilder::new("test-source")
1599 .with_host("localhost")
1600 .with_port(8080)
1601 .build();
1602 assert!(source.is_ok());
1603 }
1604
1605 #[test]
1606 fn test_builder_with_custom_config() {
1607 let source = HttpSourceBuilder::new("http-source")
1608 .with_host("0.0.0.0")
1609 .with_port(9000)
1610 .with_endpoint("/events")
1611 .build()
1612 .unwrap();
1613 assert_eq!(source.id(), "http-source");
1614 }
1615
1616 #[test]
1617 fn test_with_dispatch_creates_source() {
1618 let config = HttpSourceConfig {
1619 host: "localhost".to_string(),
1620 port: 8080,
1621 endpoint: None,
1622 timeout_ms: 10000,
1623 adaptive_max_batch_size: None,
1624 adaptive_min_batch_size: None,
1625 adaptive_max_wait_ms: None,
1626 adaptive_min_wait_ms: None,
1627 adaptive_window_secs: None,
1628 adaptive_enabled: None,
1629 webhooks: None,
1630 };
1631 let source = HttpSource::with_dispatch(
1632 "dispatch-source",
1633 config,
1634 Some(DispatchMode::Channel),
1635 Some(1000),
1636 );
1637 assert!(source.is_ok());
1638 assert_eq!(source.unwrap().id(), "dispatch-source");
1639 }
1640 }
1641
1642 mod properties {
1643 use super::*;
1644
1645 #[test]
1646 fn test_id_returns_correct_value() {
1647 let source = HttpSourceBuilder::new("my-http-source")
1648 .with_host("localhost")
1649 .build()
1650 .unwrap();
1651 assert_eq!(source.id(), "my-http-source");
1652 }
1653
1654 #[test]
1655 fn test_type_name_returns_http() {
1656 let source = HttpSourceBuilder::new("test")
1657 .with_host("localhost")
1658 .build()
1659 .unwrap();
1660 assert_eq!(source.type_name(), "http");
1661 }
1662
1663 #[test]
1664 fn test_properties_contains_host_and_port() {
1665 let source = HttpSourceBuilder::new("test")
1666 .with_host("192.168.1.1")
1667 .with_port(9000)
1668 .build()
1669 .unwrap();
1670 let props = source.properties();
1671
1672 assert_eq!(
1673 props.get("host"),
1674 Some(&serde_json::Value::String("192.168.1.1".to_string()))
1675 );
1676 assert_eq!(
1677 props.get("port"),
1678 Some(&serde_json::Value::Number(9000.into()))
1679 );
1680 }
1681
1682 #[test]
1683 fn test_properties_includes_endpoint_when_set() {
1684 let source = HttpSourceBuilder::new("test")
1685 .with_host("localhost")
1686 .with_endpoint("/api/v1")
1687 .build()
1688 .unwrap();
1689 let props = source.properties();
1690
1691 assert_eq!(
1692 props.get("endpoint"),
1693 Some(&serde_json::Value::String("/api/v1".to_string()))
1694 );
1695 }
1696
1697 #[test]
1698 fn test_properties_excludes_endpoint_when_none() {
1699 let source = HttpSourceBuilder::new("test")
1700 .with_host("localhost")
1701 .build()
1702 .unwrap();
1703 let props = source.properties();
1704
1705 assert!(!props.contains_key("endpoint"));
1706 }
1707 }
1708
1709 mod lifecycle {
1710 use super::*;
1711
1712 #[tokio::test]
1713 async fn test_initial_status_is_stopped() {
1714 let source = HttpSourceBuilder::new("test")
1715 .with_host("localhost")
1716 .build()
1717 .unwrap();
1718 assert_eq!(source.status().await, ComponentStatus::Stopped);
1719 }
1720 }
1721
1722 mod builder {
1723 use super::*;
1724
1725 #[test]
1726 fn test_http_builder_defaults() {
1727 let source = HttpSourceBuilder::new("test").build().unwrap();
1728 assert_eq!(source.config.port, 8080);
1729 assert_eq!(source.config.timeout_ms, 10000);
1730 assert_eq!(source.config.endpoint, None);
1731 }
1732
1733 #[test]
1734 fn test_http_builder_custom_values() {
1735 let source = HttpSourceBuilder::new("test")
1736 .with_host("api.example.com")
1737 .with_port(9000)
1738 .with_endpoint("/webhook")
1739 .with_timeout_ms(5000)
1740 .build()
1741 .unwrap();
1742
1743 assert_eq!(source.config.host, "api.example.com");
1744 assert_eq!(source.config.port, 9000);
1745 assert_eq!(source.config.endpoint, Some("/webhook".to_string()));
1746 assert_eq!(source.config.timeout_ms, 5000);
1747 }
1748
1749 #[test]
1750 fn test_http_builder_adaptive_batching() {
1751 let source = HttpSourceBuilder::new("test")
1752 .with_host("localhost")
1753 .with_adaptive_max_batch_size(1000)
1754 .with_adaptive_min_batch_size(10)
1755 .with_adaptive_max_wait_ms(500)
1756 .with_adaptive_min_wait_ms(50)
1757 .with_adaptive_window_secs(60)
1758 .with_adaptive_enabled(true)
1759 .build()
1760 .unwrap();
1761
1762 assert_eq!(source.config.adaptive_max_batch_size, Some(1000));
1763 assert_eq!(source.config.adaptive_min_batch_size, Some(10));
1764 assert_eq!(source.config.adaptive_max_wait_ms, Some(500));
1765 assert_eq!(source.config.adaptive_min_wait_ms, Some(50));
1766 assert_eq!(source.config.adaptive_window_secs, Some(60));
1767 assert_eq!(source.config.adaptive_enabled, Some(true));
1768 }
1769
1770 #[test]
1771 fn test_builder_id() {
1772 let source = HttpSource::builder("my-http-source")
1773 .with_host("localhost")
1774 .build()
1775 .unwrap();
1776
1777 assert_eq!(source.base.id, "my-http-source");
1778 }
1779 }
1780
1781 mod event_conversion {
1782 use super::*;
1783
1784 #[test]
1785 fn test_convert_node_insert() {
1786 let mut props = serde_json::Map::new();
1787 props.insert(
1788 "name".to_string(),
1789 serde_json::Value::String("Alice".to_string()),
1790 );
1791 props.insert("age".to_string(), serde_json::Value::Number(30.into()));
1792
1793 let http_change = HttpSourceChange::Insert {
1794 element: HttpElement::Node {
1795 id: "user-1".to_string(),
1796 labels: vec!["User".to_string()],
1797 properties: props,
1798 },
1799 timestamp: Some(1234567890000000000),
1800 };
1801
1802 let result = convert_http_to_source_change(&http_change, "test-source");
1803 assert!(result.is_ok());
1804
1805 match result.unwrap() {
1806 drasi_core::models::SourceChange::Insert { element } => match element {
1807 drasi_core::models::Element::Node {
1808 metadata,
1809 properties,
1810 } => {
1811 assert_eq!(metadata.reference.element_id.as_ref(), "user-1");
1812 assert_eq!(metadata.labels.len(), 1);
1813 assert_eq!(metadata.effective_from, 1234567890000);
1814 assert!(properties.get("name").is_some());
1815 assert!(properties.get("age").is_some());
1816 }
1817 _ => panic!("Expected Node element"),
1818 },
1819 _ => panic!("Expected Insert operation"),
1820 }
1821 }
1822
1823 #[test]
1824 fn test_convert_relation_insert() {
1825 let http_change = HttpSourceChange::Insert {
1826 element: HttpElement::Relation {
1827 id: "follows-1".to_string(),
1828 labels: vec!["FOLLOWS".to_string()],
1829 from: "user-1".to_string(),
1830 to: "user-2".to_string(),
1831 properties: serde_json::Map::new(),
1832 },
1833 timestamp: None,
1834 };
1835
1836 let result = convert_http_to_source_change(&http_change, "test-source");
1837 assert!(result.is_ok());
1838
1839 match result.unwrap() {
1840 drasi_core::models::SourceChange::Insert { element } => match element {
1841 drasi_core::models::Element::Relation {
1842 metadata,
1843 out_node,
1844 in_node,
1845 ..
1846 } => {
1847 assert_eq!(metadata.reference.element_id.as_ref(), "follows-1");
1848 assert_eq!(out_node.element_id.as_ref(), "user-1");
1849 assert_eq!(in_node.element_id.as_ref(), "user-2");
1850 }
1851 _ => panic!("Expected Relation element"),
1852 },
1853 _ => panic!("Expected Insert operation"),
1854 }
1855 }
1856
1857 #[test]
1858 fn test_convert_delete() {
1859 let http_change = HttpSourceChange::Delete {
1860 id: "user-1".to_string(),
1861 labels: Some(vec!["User".to_string()]),
1862 timestamp: Some(9999999999),
1863 };
1864
1865 let result = convert_http_to_source_change(&http_change, "test-source");
1866 assert!(result.is_ok());
1867
1868 match result.unwrap() {
1869 drasi_core::models::SourceChange::Delete { metadata } => {
1870 assert_eq!(metadata.reference.element_id.as_ref(), "user-1");
1871 assert_eq!(metadata.labels.len(), 1);
1872 }
1873 _ => panic!("Expected Delete operation"),
1874 }
1875 }
1876
1877 #[test]
1878 fn test_convert_update() {
1879 let http_change = HttpSourceChange::Update {
1880 element: HttpElement::Node {
1881 id: "user-1".to_string(),
1882 labels: vec!["User".to_string()],
1883 properties: serde_json::Map::new(),
1884 },
1885 timestamp: None,
1886 };
1887
1888 let result = convert_http_to_source_change(&http_change, "test-source");
1889 assert!(result.is_ok());
1890
1891 match result.unwrap() {
1892 drasi_core::models::SourceChange::Update { .. } => {
1893 }
1895 _ => panic!("Expected Update operation"),
1896 }
1897 }
1898 }
1899
1900 mod adaptive_config {
1901 use super::*;
1902
1903 #[test]
1904 fn test_adaptive_config_from_http_config() {
1905 let source = HttpSourceBuilder::new("test")
1906 .with_host("localhost")
1907 .with_adaptive_max_batch_size(500)
1908 .with_adaptive_enabled(true)
1909 .build()
1910 .unwrap();
1911
1912 assert_eq!(source.adaptive_config.max_batch_size, 500);
1914 assert!(source.adaptive_config.adaptive_enabled);
1915 }
1916
1917 #[test]
1918 fn test_adaptive_config_uses_defaults_when_not_specified() {
1919 let source = HttpSourceBuilder::new("test")
1920 .with_host("localhost")
1921 .build()
1922 .unwrap();
1923
1924 let default_config = AdaptiveBatchConfig::default();
1926 assert_eq!(
1927 source.adaptive_config.max_batch_size,
1928 default_config.max_batch_size
1929 );
1930 assert_eq!(
1931 source.adaptive_config.min_batch_size,
1932 default_config.min_batch_size
1933 );
1934 }
1935 }
1936}