1#![allow(unexpected_cfgs)]
2pub mod config;
218pub mod descriptor;
219pub use config::HttpSourceConfig;
220
221mod adaptive_batcher;
222mod models;
223mod time;
224
225pub mod auth;
227pub mod content_parser;
228pub mod route_matcher;
229pub mod template_engine;
230
231pub use models::{convert_http_to_source_change, HttpElement, HttpSourceChange};
233
234use anyhow::Result;
235use async_trait::async_trait;
236use axum::{
237 body::Bytes,
238 extract::{Path, State},
239 http::{header, Method, StatusCode},
240 response::IntoResponse,
241 routing::{delete, get, post, put},
242 Json, Router,
243};
244use log::{debug, error, info, trace, warn};
245use serde::{Deserialize, Serialize};
246use std::collections::HashMap;
247use std::sync::Arc;
248use std::time::Duration;
249use tokio::sync::mpsc;
250use tokio::time::timeout;
251use tower_http::cors::{Any, CorsLayer};
252
253use drasi_lib::channels::{ComponentType, *};
254use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
255use drasi_lib::Source;
256use tracing::Instrument;
257
258use crate::adaptive_batcher::{AdaptiveBatchConfig, AdaptiveBatcher};
259use crate::auth::{verify_auth, AuthResult};
260use crate::config::{CorsConfig, ErrorBehavior, WebhookConfig};
261use crate::content_parser::{parse_content, ContentType};
262use crate::route_matcher::{convert_method, find_matching_mappings, headers_to_map, RouteMatcher};
263use crate::template_engine::{TemplateContext, TemplateEngine};
264
265#[derive(Debug, Serialize, Deserialize)]
267pub struct EventResponse {
268 pub success: bool,
269 pub message: String,
270 #[serde(skip_serializing_if = "Option::is_none")]
271 pub error: Option<String>,
272}
273
274pub struct HttpSource {
286 base: SourceBase,
288 config: HttpSourceConfig,
290 adaptive_config: AdaptiveBatchConfig,
292}
293
294#[derive(Debug, Clone, Serialize, Deserialize)]
296pub struct BatchEventRequest {
297 pub events: Vec<HttpSourceChange>,
298}
299
300#[derive(Clone)]
304struct HttpAppState {
305 source_id: String,
307 batch_tx: mpsc::Sender<SourceChangeEvent>,
309 webhook_config: Option<Arc<WebhookState>>,
311}
312
313struct WebhookState {
315 config: WebhookConfig,
317 route_matcher: RouteMatcher,
319 template_engine: TemplateEngine,
321}
322
323impl HttpSource {
324 pub fn new(id: impl Into<String>, config: HttpSourceConfig) -> Result<Self> {
355 let id = id.into();
356 let params = SourceBaseParams::new(id);
357
358 let mut adaptive_config = AdaptiveBatchConfig::default();
360
361 if let Some(max_batch) = config.adaptive_max_batch_size {
363 adaptive_config.max_batch_size = max_batch;
364 }
365 if let Some(min_batch) = config.adaptive_min_batch_size {
366 adaptive_config.min_batch_size = min_batch;
367 }
368 if let Some(max_wait_ms) = config.adaptive_max_wait_ms {
369 adaptive_config.max_wait_time = Duration::from_millis(max_wait_ms);
370 }
371 if let Some(min_wait_ms) = config.adaptive_min_wait_ms {
372 adaptive_config.min_wait_time = Duration::from_millis(min_wait_ms);
373 }
374 if let Some(window_secs) = config.adaptive_window_secs {
375 adaptive_config.throughput_window = Duration::from_secs(window_secs);
376 }
377 if let Some(enabled) = config.adaptive_enabled {
378 adaptive_config.adaptive_enabled = enabled;
379 }
380
381 Ok(Self {
382 base: SourceBase::new(params)?,
383 config,
384 adaptive_config,
385 })
386 }
387
388 pub fn with_dispatch(
408 id: impl Into<String>,
409 config: HttpSourceConfig,
410 dispatch_mode: Option<DispatchMode>,
411 dispatch_buffer_capacity: Option<usize>,
412 ) -> Result<Self> {
413 let id = id.into();
414 let mut params = SourceBaseParams::new(id);
415 if let Some(mode) = dispatch_mode {
416 params = params.with_dispatch_mode(mode);
417 }
418 if let Some(capacity) = dispatch_buffer_capacity {
419 params = params.with_dispatch_buffer_capacity(capacity);
420 }
421
422 let mut adaptive_config = AdaptiveBatchConfig::default();
423
424 if let Some(max_batch) = config.adaptive_max_batch_size {
425 adaptive_config.max_batch_size = max_batch;
426 }
427 if let Some(min_batch) = config.adaptive_min_batch_size {
428 adaptive_config.min_batch_size = min_batch;
429 }
430 if let Some(max_wait_ms) = config.adaptive_max_wait_ms {
431 adaptive_config.max_wait_time = Duration::from_millis(max_wait_ms);
432 }
433 if let Some(min_wait_ms) = config.adaptive_min_wait_ms {
434 adaptive_config.min_wait_time = Duration::from_millis(min_wait_ms);
435 }
436 if let Some(window_secs) = config.adaptive_window_secs {
437 adaptive_config.throughput_window = Duration::from_secs(window_secs);
438 }
439 if let Some(enabled) = config.adaptive_enabled {
440 adaptive_config.adaptive_enabled = enabled;
441 }
442
443 Ok(Self {
444 base: SourceBase::new(params)?,
445 config,
446 adaptive_config,
447 })
448 }
449
450 async fn handle_single_event(
455 Path(source_id): Path<String>,
456 State(state): State<HttpAppState>,
457 Json(event): Json<HttpSourceChange>,
458 ) -> Result<impl IntoResponse, (StatusCode, Json<EventResponse>)> {
459 debug!("[{source_id}] HTTP endpoint received single event: {event:?}");
460 Self::process_events(&source_id, &state, vec![event]).await
461 }
462
463 async fn handle_batch_events(
468 Path(source_id): Path<String>,
469 State(state): State<HttpAppState>,
470 Json(batch): Json<BatchEventRequest>,
471 ) -> Result<impl IntoResponse, (StatusCode, Json<EventResponse>)> {
472 debug!(
473 "[{}] HTTP endpoint received batch of {} events",
474 source_id,
475 batch.events.len()
476 );
477 Self::process_events(&source_id, &state, batch.events).await
478 }
479
480 async fn process_events(
492 source_id: &str,
493 state: &HttpAppState,
494 events: Vec<HttpSourceChange>,
495 ) -> Result<impl IntoResponse, (StatusCode, Json<EventResponse>)> {
496 trace!("[{}] Processing {} events", source_id, events.len());
497
498 if source_id != state.source_id {
499 error!(
500 "[{}] Source name mismatch. Expected '{}', got '{}'",
501 state.source_id, state.source_id, source_id
502 );
503 return Err((
504 StatusCode::BAD_REQUEST,
505 Json(EventResponse {
506 success: false,
507 message: "Source name mismatch".to_string(),
508 error: Some(format!(
509 "Expected source '{}', got '{}'",
510 state.source_id, source_id
511 )),
512 }),
513 ));
514 }
515
516 let mut success_count = 0;
517 let mut error_count = 0;
518 let mut last_error = None;
519
520 for (idx, event) in events.iter().enumerate() {
521 match convert_http_to_source_change(event, source_id) {
522 Ok(source_change) => {
523 let change_event = SourceChangeEvent {
524 source_id: source_id.to_string(),
525 change: source_change,
526 timestamp: chrono::Utc::now(),
527 };
528
529 if let Err(e) = state.batch_tx.send(change_event).await {
530 error!(
531 "[{}] Failed to send event {} to batch channel: {}",
532 state.source_id,
533 idx + 1,
534 e
535 );
536 error_count += 1;
537 last_error = Some("Internal channel error".to_string());
538 } else {
539 success_count += 1;
540 }
541 }
542 Err(e) => {
543 error!(
544 "[{}] Failed to convert event {}: {}",
545 state.source_id,
546 idx + 1,
547 e
548 );
549 error_count += 1;
550 last_error = Some(e.to_string());
551 }
552 }
553 }
554
555 debug!(
556 "[{source_id}] Event processing complete: {success_count} succeeded, {error_count} failed"
557 );
558
559 if error_count > 0 && success_count == 0 {
560 Err((
561 StatusCode::BAD_REQUEST,
562 Json(EventResponse {
563 success: false,
564 message: format!("All {error_count} events failed"),
565 error: last_error,
566 }),
567 ))
568 } else if error_count > 0 {
569 Ok(Json(EventResponse {
570 success: true,
571 message: format!(
572 "Processed {success_count} events successfully, {error_count} failed"
573 ),
574 error: last_error,
575 }))
576 } else {
577 Ok(Json(EventResponse {
578 success: true,
579 message: format!("All {success_count} events processed successfully"),
580 error: None,
581 }))
582 }
583 }
584
585 async fn health_check() -> impl IntoResponse {
586 Json(serde_json::json!({
587 "status": "healthy",
588 "service": "http-source",
589 "features": ["adaptive-batching", "batch-endpoint", "webhooks"]
590 }))
591 }
592
593 async fn handle_webhook(
598 method: axum::http::Method,
599 uri: axum::http::Uri,
600 headers: axum::http::HeaderMap,
601 State(state): State<HttpAppState>,
602 body: Bytes,
603 ) -> impl IntoResponse {
604 let path = uri.path();
605 let source_id = &state.source_id;
606
607 debug!("[{source_id}] Webhook received: {method} {path}");
608
609 let webhook_state = match &state.webhook_config {
611 Some(ws) => ws,
612 None => {
613 error!("[{source_id}] Webhook handler called but no webhook config present");
614 return (
615 StatusCode::INTERNAL_SERVER_ERROR,
616 Json(EventResponse {
617 success: false,
618 message: "Internal configuration error".to_string(),
619 error: Some("Webhook mode not properly configured".to_string()),
620 }),
621 );
622 }
623 };
624
625 let http_method = match convert_method(&method) {
627 Some(m) => m,
628 None => {
629 return handle_error(
630 &webhook_state.config.error_behavior,
631 source_id,
632 StatusCode::METHOD_NOT_ALLOWED,
633 "Method not supported",
634 None,
635 );
636 }
637 };
638
639 let route_match = match webhook_state.route_matcher.match_route(
641 path,
642 &http_method,
643 &webhook_state.config.routes,
644 ) {
645 Some(rm) => rm,
646 None => {
647 debug!("[{source_id}] No matching route for {method} {path}");
648 return handle_error(
649 &webhook_state.config.error_behavior,
650 source_id,
651 StatusCode::NOT_FOUND,
652 "No matching route",
653 None,
654 );
655 }
656 };
657
658 let route = route_match.route;
659 let error_behavior = route
660 .error_behavior
661 .as_ref()
662 .unwrap_or(&webhook_state.config.error_behavior);
663
664 let auth_result = verify_auth(route.auth.as_ref(), &headers, &body);
666 if let AuthResult::Failed(reason) = auth_result {
667 warn!("[{source_id}] Authentication failed for {path}: {reason}");
668 return handle_error(
669 error_behavior,
670 source_id,
671 StatusCode::UNAUTHORIZED,
672 "Authentication failed",
673 Some(&reason),
674 );
675 }
676
677 let content_type = ContentType::from_header(
679 headers
680 .get(axum::http::header::CONTENT_TYPE)
681 .and_then(|v| v.to_str().ok()),
682 );
683
684 let payload = match parse_content(&body, content_type) {
685 Ok(p) => p,
686 Err(e) => {
687 warn!("[{source_id}] Failed to parse payload: {e}");
688 return handle_error(
689 error_behavior,
690 source_id,
691 StatusCode::BAD_REQUEST,
692 "Failed to parse payload",
693 Some(&e.to_string()),
694 );
695 }
696 };
697
698 let headers_map = headers_to_map(&headers);
700 let query_map = parse_query_string(uri.query());
701
702 let context = TemplateContext {
703 payload: payload.clone(),
704 route: route_match.path_params,
705 query: query_map,
706 headers: headers_map.clone(),
707 method: method.to_string(),
708 path: path.to_string(),
709 source_id: source_id.clone(),
710 };
711
712 let matching_mappings = find_matching_mappings(&route.mappings, &headers_map, &payload);
714
715 if matching_mappings.is_empty() {
716 debug!("[{source_id}] No matching mappings for request");
717 return handle_error(
718 error_behavior,
719 source_id,
720 StatusCode::BAD_REQUEST,
721 "No matching mapping for request",
722 None,
723 );
724 }
725
726 let mut success_count = 0;
728 let mut error_count = 0;
729 let mut last_error = None;
730
731 for mapping in matching_mappings {
732 match webhook_state
733 .template_engine
734 .process_mapping(mapping, &context, source_id)
735 {
736 Ok(source_change) => {
737 let event = SourceChangeEvent {
738 source_id: source_id.clone(),
739 change: source_change,
740 timestamp: chrono::Utc::now(),
741 };
742
743 if let Err(e) = state.batch_tx.send(event).await {
744 error!("[{source_id}] Failed to send event to batcher: {e}");
745 error_count += 1;
746 last_error = Some(format!("Failed to queue event: {e}"));
747 } else {
748 success_count += 1;
749 }
750 }
751 Err(e) => {
752 warn!("[{source_id}] Failed to process mapping: {e}");
753 error_count += 1;
754 last_error = Some(e.to_string());
755 }
756 }
757 }
758
759 debug!("[{source_id}] Webhook processing complete: {success_count} succeeded, {error_count} failed");
760
761 if error_count > 0 && success_count == 0 {
762 handle_error(
763 error_behavior,
764 source_id,
765 StatusCode::BAD_REQUEST,
766 &format!("All {error_count} mappings failed"),
767 last_error.as_deref(),
768 )
769 } else if error_count > 0 {
770 (
771 StatusCode::OK,
772 Json(EventResponse {
773 success: true,
774 message: format!("Processed {success_count} events, {error_count} failed"),
775 error: last_error,
776 }),
777 )
778 } else {
779 (
780 StatusCode::OK,
781 Json(EventResponse {
782 success: true,
783 message: format!("Processed {success_count} events successfully"),
784 error: None,
785 }),
786 )
787 }
788 }
789
790 async fn run_adaptive_batcher(
791 batch_rx: mpsc::Receiver<SourceChangeEvent>,
792 dispatchers: Arc<
793 tokio::sync::RwLock<
794 Vec<
795 Box<
796 dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync,
797 >,
798 >,
799 >,
800 >,
801 adaptive_config: AdaptiveBatchConfig,
802 source_id: String,
803 ) {
804 let mut batcher = AdaptiveBatcher::new(batch_rx, adaptive_config.clone());
805 let mut total_events = 0u64;
806 let mut total_batches = 0u64;
807
808 info!("[{source_id}] Adaptive HTTP batcher started with config: {adaptive_config:?}");
809
810 while let Some(batch) = batcher.next_batch().await {
811 if batch.is_empty() {
812 debug!("[{source_id}] Batcher received empty batch, skipping");
813 continue;
814 }
815
816 let batch_size = batch.len();
817 total_events += batch_size as u64;
818 total_batches += 1;
819
820 debug!(
821 "[{source_id}] Batcher forwarding batch #{total_batches} with {batch_size} events to dispatchers"
822 );
823
824 let mut sent_count = 0;
825 let mut failed_count = 0;
826 for (idx, event) in batch.into_iter().enumerate() {
827 debug!(
828 "[{}] Batch #{}, dispatching event {}/{}",
829 source_id,
830 total_batches,
831 idx + 1,
832 batch_size
833 );
834
835 let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
836 profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
837
838 let wrapper = SourceEventWrapper::with_profiling(
839 event.source_id.clone(),
840 SourceEvent::Change(event.change),
841 event.timestamp,
842 profiling,
843 );
844
845 if let Err(e) =
846 SourceBase::dispatch_from_task(dispatchers.clone(), wrapper.clone(), &source_id)
847 .await
848 {
849 error!(
850 "[{}] Batch #{}, failed to dispatch event {}/{} (no subscribers): {}",
851 source_id,
852 total_batches,
853 idx + 1,
854 batch_size,
855 e
856 );
857 failed_count += 1;
858 } else {
859 debug!(
860 "[{}] Batch #{}, successfully dispatched event {}/{}",
861 source_id,
862 total_batches,
863 idx + 1,
864 batch_size
865 );
866 sent_count += 1;
867 }
868 }
869
870 debug!(
871 "[{source_id}] Batch #{total_batches} complete: {sent_count} dispatched, {failed_count} failed"
872 );
873
874 if total_batches.is_multiple_of(100) {
875 info!(
876 "[{}] Adaptive HTTP metrics - Batches: {}, Events: {}, Avg batch size: {:.1}",
877 source_id,
878 total_batches,
879 total_events,
880 total_events as f64 / total_batches as f64
881 );
882 }
883 }
884
885 info!(
886 "[{source_id}] Adaptive HTTP batcher stopped - Total batches: {total_batches}, Total events: {total_events}"
887 );
888 }
889}
890
891#[async_trait]
892impl Source for HttpSource {
893 fn id(&self) -> &str {
894 &self.base.id
895 }
896
897 fn type_name(&self) -> &str {
898 "http"
899 }
900
901 fn properties(&self) -> HashMap<String, serde_json::Value> {
902 let mut props = HashMap::new();
903 props.insert(
904 "host".to_string(),
905 serde_json::Value::String(self.config.host.clone()),
906 );
907 props.insert(
908 "port".to_string(),
909 serde_json::Value::Number(self.config.port.into()),
910 );
911 if let Some(ref endpoint) = self.config.endpoint {
912 props.insert(
913 "endpoint".to_string(),
914 serde_json::Value::String(endpoint.clone()),
915 );
916 }
917 props
918 }
919
920 fn auto_start(&self) -> bool {
921 self.base.get_auto_start()
922 }
923
924 async fn start(&self) -> Result<()> {
925 info!("[{}] Starting adaptive HTTP source", self.base.id);
926
927 self.base.set_status(ComponentStatus::Starting).await;
928 self.base
929 .send_component_event(
930 ComponentStatus::Starting,
931 Some("Starting adaptive HTTP source".to_string()),
932 )
933 .await?;
934
935 let host = self.config.host.clone();
936 let port = self.config.port;
937
938 let batch_channel_capacity = self.adaptive_config.recommended_channel_capacity();
940 let (batch_tx, batch_rx) = mpsc::channel(batch_channel_capacity);
941 info!(
942 "[{}] HttpSource using batch channel capacity: {} (max_batch_size: {} x 5)",
943 self.base.id, batch_channel_capacity, self.adaptive_config.max_batch_size
944 );
945
946 let adaptive_config = self.adaptive_config.clone();
948 let source_id = self.base.id.clone();
949 let dispatchers = self.base.dispatchers.clone();
950
951 let instance_id = self
953 .base
954 .context()
955 .await
956 .map(|c| c.instance_id)
957 .unwrap_or_default();
958
959 info!("[{source_id}] Starting adaptive batcher task");
960 let source_id_for_span = source_id.clone();
961 let span = tracing::info_span!(
962 "http_adaptive_batcher",
963 instance_id = %instance_id,
964 component_id = %source_id_for_span,
965 component_type = "source"
966 );
967 tokio::spawn(
968 async move {
969 Self::run_adaptive_batcher(
970 batch_rx,
971 dispatchers,
972 adaptive_config,
973 source_id.clone(),
974 )
975 .await
976 }
977 .instrument(span),
978 );
979
980 let webhook_state = if let Some(ref webhook_config) = self.config.webhooks {
982 info!(
983 "[{}] Webhook mode enabled with {} routes",
984 self.base.id,
985 webhook_config.routes.len()
986 );
987 Some(Arc::new(WebhookState {
988 config: webhook_config.clone(),
989 route_matcher: RouteMatcher::new(&webhook_config.routes),
990 template_engine: TemplateEngine::new(),
991 }))
992 } else {
993 info!("[{}] Standard mode enabled", self.base.id);
994 None
995 };
996
997 let state = HttpAppState {
998 source_id: self.base.id.clone(),
999 batch_tx,
1000 webhook_config: webhook_state,
1001 };
1002
1003 let app = if self.config.is_webhook_mode() {
1005 let router = Router::new()
1007 .route("/health", get(Self::health_check))
1008 .fallback(Self::handle_webhook)
1009 .with_state(state);
1010
1011 if let Some(ref webhooks) = self.config.webhooks {
1013 if let Some(ref cors_config) = webhooks.cors {
1014 if cors_config.enabled {
1015 info!("[{}] CORS enabled for webhook endpoints", self.base.id);
1016 router.layer(build_cors_layer(cors_config))
1017 } else {
1018 router
1019 }
1020 } else {
1021 router
1022 }
1023 } else {
1024 router
1025 }
1026 } else {
1027 Router::new()
1029 .route("/health", get(Self::health_check))
1030 .route(
1031 "/sources/:source_id/events",
1032 post(Self::handle_single_event),
1033 )
1034 .route(
1035 "/sources/:source_id/events/batch",
1036 post(Self::handle_batch_events),
1037 )
1038 .with_state(state)
1039 };
1040
1041 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
1043
1044 let host_clone = host.clone();
1045
1046 let (error_tx, error_rx) = tokio::sync::oneshot::channel();
1048 let source_id = self.base.id.clone();
1049 let source_id_for_span = source_id.clone();
1050 let span = tracing::info_span!(
1051 "http_source_server",
1052 instance_id = %instance_id,
1053 component_id = %source_id_for_span,
1054 component_type = "source"
1055 );
1056 let server_handle = tokio::spawn(
1057 async move {
1058 let addr = format!("{host}:{port}");
1059 info!("[{source_id}] Adaptive HTTP source attempting to bind to {addr}");
1060
1061 let listener = match tokio::net::TcpListener::bind(&addr).await {
1062 Ok(listener) => {
1063 info!("[{source_id}] Adaptive HTTP source successfully listening on {addr}");
1064 listener
1065 }
1066 Err(e) => {
1067 error!("[{source_id}] Failed to bind HTTP server to {addr}: {e}");
1068 let _ = error_tx.send(format!(
1069 "Failed to bind HTTP server to {addr}: {e}. Common causes: port already in use, insufficient permissions"
1070 ));
1071 return;
1072 }
1073 };
1074
1075 if let Err(e) = axum::serve(listener, app)
1076 .with_graceful_shutdown(async move {
1077 let _ = shutdown_rx.await;
1078 })
1079 .await
1080 {
1081 error!("[{source_id}] HTTP server error: {e}");
1082 }
1083 }
1084 .instrument(span),
1085 );
1086
1087 *self.base.task_handle.write().await = Some(server_handle);
1088 *self.base.shutdown_tx.write().await = Some(shutdown_tx);
1089
1090 match timeout(Duration::from_millis(500), error_rx).await {
1092 Ok(Ok(error_msg)) => {
1093 self.base.set_status(ComponentStatus::Error).await;
1094 return Err(anyhow::anyhow!("{error_msg}"));
1095 }
1096 _ => {
1097 self.base.set_status(ComponentStatus::Running).await;
1098 }
1099 }
1100
1101 self.base
1102 .send_component_event(
1103 ComponentStatus::Running,
1104 Some(format!(
1105 "Adaptive HTTP source running on {host_clone}:{port} with batch support"
1106 )),
1107 )
1108 .await?;
1109
1110 Ok(())
1111 }
1112
1113 async fn stop(&self) -> Result<()> {
1114 info!("[{}] Stopping adaptive HTTP source", self.base.id);
1115
1116 self.base.set_status(ComponentStatus::Stopping).await;
1117 self.base
1118 .send_component_event(
1119 ComponentStatus::Stopping,
1120 Some("Stopping adaptive HTTP source".to_string()),
1121 )
1122 .await?;
1123
1124 if let Some(tx) = self.base.shutdown_tx.write().await.take() {
1125 let _ = tx.send(());
1126 }
1127
1128 if let Some(handle) = self.base.task_handle.write().await.take() {
1129 let _ = timeout(Duration::from_secs(5), handle).await;
1130 }
1131
1132 self.base.set_status(ComponentStatus::Stopped).await;
1133 self.base
1134 .send_component_event(
1135 ComponentStatus::Stopped,
1136 Some("Adaptive HTTP source stopped".to_string()),
1137 )
1138 .await?;
1139
1140 Ok(())
1141 }
1142
1143 async fn status(&self) -> ComponentStatus {
1144 self.base.get_status().await
1145 }
1146
1147 async fn subscribe(
1148 &self,
1149 settings: drasi_lib::config::SourceSubscriptionSettings,
1150 ) -> Result<SubscriptionResponse> {
1151 self.base.subscribe_with_bootstrap(&settings, "HTTP").await
1152 }
1153
1154 fn as_any(&self) -> &dyn std::any::Any {
1155 self
1156 }
1157
1158 async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
1159 self.base.initialize(context).await;
1160 }
1161
1162 async fn set_bootstrap_provider(
1163 &self,
1164 provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
1165 ) {
1166 self.base.set_bootstrap_provider(provider).await;
1167 }
1168}
1169
1170pub struct HttpSourceBuilder {
1189 id: String,
1190 host: String,
1191 port: u16,
1192 endpoint: Option<String>,
1193 timeout_ms: u64,
1194 adaptive_max_batch_size: Option<usize>,
1195 adaptive_min_batch_size: Option<usize>,
1196 adaptive_max_wait_ms: Option<u64>,
1197 adaptive_min_wait_ms: Option<u64>,
1198 adaptive_window_secs: Option<u64>,
1199 adaptive_enabled: Option<bool>,
1200 webhooks: Option<WebhookConfig>,
1201 dispatch_mode: Option<DispatchMode>,
1202 dispatch_buffer_capacity: Option<usize>,
1203 bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
1204 auto_start: bool,
1205}
1206
1207impl HttpSourceBuilder {
1208 pub fn new(id: impl Into<String>) -> Self {
1214 Self {
1215 id: id.into(),
1216 host: String::new(),
1217 port: 8080,
1218 endpoint: None,
1219 timeout_ms: 10000,
1220 adaptive_max_batch_size: None,
1221 adaptive_min_batch_size: None,
1222 adaptive_max_wait_ms: None,
1223 adaptive_min_wait_ms: None,
1224 adaptive_window_secs: None,
1225 adaptive_enabled: None,
1226 webhooks: None,
1227 dispatch_mode: None,
1228 dispatch_buffer_capacity: None,
1229 bootstrap_provider: None,
1230 auto_start: true,
1231 }
1232 }
1233
1234 pub fn with_host(mut self, host: impl Into<String>) -> Self {
1236 self.host = host.into();
1237 self
1238 }
1239
1240 pub fn with_port(mut self, port: u16) -> Self {
1242 self.port = port;
1243 self
1244 }
1245
1246 pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
1248 self.endpoint = Some(endpoint.into());
1249 self
1250 }
1251
1252 pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
1254 self.timeout_ms = timeout_ms;
1255 self
1256 }
1257
1258 pub fn with_adaptive_max_batch_size(mut self, size: usize) -> Self {
1260 self.adaptive_max_batch_size = Some(size);
1261 self
1262 }
1263
1264 pub fn with_adaptive_min_batch_size(mut self, size: usize) -> Self {
1266 self.adaptive_min_batch_size = Some(size);
1267 self
1268 }
1269
1270 pub fn with_adaptive_max_wait_ms(mut self, wait_ms: u64) -> Self {
1272 self.adaptive_max_wait_ms = Some(wait_ms);
1273 self
1274 }
1275
1276 pub fn with_adaptive_min_wait_ms(mut self, wait_ms: u64) -> Self {
1278 self.adaptive_min_wait_ms = Some(wait_ms);
1279 self
1280 }
1281
1282 pub fn with_adaptive_window_secs(mut self, secs: u64) -> Self {
1284 self.adaptive_window_secs = Some(secs);
1285 self
1286 }
1287
1288 pub fn with_adaptive_enabled(mut self, enabled: bool) -> Self {
1290 self.adaptive_enabled = Some(enabled);
1291 self
1292 }
1293
1294 pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
1296 self.dispatch_mode = Some(mode);
1297 self
1298 }
1299
1300 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
1302 self.dispatch_buffer_capacity = Some(capacity);
1303 self
1304 }
1305
1306 pub fn with_bootstrap_provider(
1308 mut self,
1309 provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
1310 ) -> Self {
1311 self.bootstrap_provider = Some(Box::new(provider));
1312 self
1313 }
1314
1315 pub fn with_auto_start(mut self, auto_start: bool) -> Self {
1320 self.auto_start = auto_start;
1321 self
1322 }
1323
1324 pub fn with_webhooks(mut self, webhooks: WebhookConfig) -> Self {
1329 self.webhooks = Some(webhooks);
1330 self
1331 }
1332
1333 pub fn with_config(mut self, config: HttpSourceConfig) -> Self {
1335 self.host = config.host;
1336 self.port = config.port;
1337 self.endpoint = config.endpoint;
1338 self.timeout_ms = config.timeout_ms;
1339 self.adaptive_max_batch_size = config.adaptive_max_batch_size;
1340 self.adaptive_min_batch_size = config.adaptive_min_batch_size;
1341 self.adaptive_max_wait_ms = config.adaptive_max_wait_ms;
1342 self.adaptive_min_wait_ms = config.adaptive_min_wait_ms;
1343 self.adaptive_window_secs = config.adaptive_window_secs;
1344 self.adaptive_enabled = config.adaptive_enabled;
1345 self.webhooks = config.webhooks;
1346 self
1347 }
1348
1349 pub fn build(self) -> Result<HttpSource> {
1355 let config = HttpSourceConfig {
1356 host: self.host,
1357 port: self.port,
1358 endpoint: self.endpoint,
1359 timeout_ms: self.timeout_ms,
1360 adaptive_max_batch_size: self.adaptive_max_batch_size,
1361 adaptive_min_batch_size: self.adaptive_min_batch_size,
1362 adaptive_max_wait_ms: self.adaptive_max_wait_ms,
1363 adaptive_min_wait_ms: self.adaptive_min_wait_ms,
1364 adaptive_window_secs: self.adaptive_window_secs,
1365 adaptive_enabled: self.adaptive_enabled,
1366 webhooks: self.webhooks,
1367 };
1368
1369 let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
1371 if let Some(mode) = self.dispatch_mode {
1372 params = params.with_dispatch_mode(mode);
1373 }
1374 if let Some(capacity) = self.dispatch_buffer_capacity {
1375 params = params.with_dispatch_buffer_capacity(capacity);
1376 }
1377 if let Some(provider) = self.bootstrap_provider {
1378 params = params.with_bootstrap_provider(provider);
1379 }
1380
1381 let mut adaptive_config = AdaptiveBatchConfig::default();
1383 if let Some(max_batch) = config.adaptive_max_batch_size {
1384 adaptive_config.max_batch_size = max_batch;
1385 }
1386 if let Some(min_batch) = config.adaptive_min_batch_size {
1387 adaptive_config.min_batch_size = min_batch;
1388 }
1389 if let Some(max_wait_ms) = config.adaptive_max_wait_ms {
1390 adaptive_config.max_wait_time = Duration::from_millis(max_wait_ms);
1391 }
1392 if let Some(min_wait_ms) = config.adaptive_min_wait_ms {
1393 adaptive_config.min_wait_time = Duration::from_millis(min_wait_ms);
1394 }
1395 if let Some(window_secs) = config.adaptive_window_secs {
1396 adaptive_config.throughput_window = Duration::from_secs(window_secs);
1397 }
1398 if let Some(enabled) = config.adaptive_enabled {
1399 adaptive_config.adaptive_enabled = enabled;
1400 }
1401
1402 Ok(HttpSource {
1403 base: SourceBase::new(params)?,
1404 config,
1405 adaptive_config,
1406 })
1407 }
1408}
1409
1410impl HttpSource {
1411 pub fn builder(id: impl Into<String>) -> HttpSourceBuilder {
1429 HttpSourceBuilder::new(id)
1430 }
1431}
1432
1433fn handle_error(
1435 behavior: &ErrorBehavior,
1436 source_id: &str,
1437 status: StatusCode,
1438 message: &str,
1439 detail: Option<&str>,
1440) -> (StatusCode, Json<EventResponse>) {
1441 match behavior {
1442 ErrorBehavior::Reject => {
1443 debug!("[{source_id}] Rejecting request: {message}");
1444 (
1445 status,
1446 Json(EventResponse {
1447 success: false,
1448 message: message.to_string(),
1449 error: detail.map(String::from),
1450 }),
1451 )
1452 }
1453 ErrorBehavior::AcceptAndLog => {
1454 warn!("[{source_id}] Accepting with error (logged): {message}");
1455 (
1456 StatusCode::OK,
1457 Json(EventResponse {
1458 success: true,
1459 message: format!("Accepted with warning: {message}"),
1460 error: detail.map(String::from),
1461 }),
1462 )
1463 }
1464 ErrorBehavior::AcceptAndSkip => {
1465 trace!("[{source_id}] Accepting silently: {message}");
1466 (
1467 StatusCode::OK,
1468 Json(EventResponse {
1469 success: true,
1470 message: "Accepted".to_string(),
1471 error: None,
1472 }),
1473 )
1474 }
1475 }
1476}
1477
1478fn parse_query_string(query: Option<&str>) -> HashMap<String, String> {
1480 query
1481 .map(|q| {
1482 q.split('&')
1483 .filter_map(|pair| {
1484 let mut parts = pair.splitn(2, '=');
1485 let key = parts.next()?;
1486 let value = parts.next().unwrap_or("");
1487 Some((urlencoding_decode(key), urlencoding_decode(value)))
1488 })
1489 .collect()
1490 })
1491 .unwrap_or_default()
1492}
1493
1494fn urlencoding_decode(s: &str) -> String {
1496 let mut decoded: Vec<u8> = Vec::with_capacity(s.len());
1498 let mut chars = s.chars();
1499
1500 while let Some(c) = chars.next() {
1501 if c == '%' {
1502 let mut hex = String::new();
1503 if let Some(c1) = chars.next() {
1504 hex.push(c1);
1505 }
1506 if let Some(c2) = chars.next() {
1507 hex.push(c2);
1508 }
1509
1510 if hex.len() == 2 {
1511 if let Ok(byte) = u8::from_str_radix(&hex, 16) {
1512 decoded.push(byte);
1513 continue;
1514 }
1515 }
1516
1517 decoded.extend_from_slice(b"%");
1519 decoded.extend_from_slice(hex.as_bytes());
1520 } else if c == '+' {
1521 decoded.push(b' ');
1522 } else {
1523 let mut buf = [0u8; 4];
1525 let encoded = c.encode_utf8(&mut buf);
1526 decoded.extend_from_slice(encoded.as_bytes());
1527 }
1528 }
1529
1530 String::from_utf8_lossy(&decoded).into_owned()
1532}
1533
1534fn build_cors_layer(cors_config: &CorsConfig) -> CorsLayer {
1536 let mut cors = CorsLayer::new();
1537
1538 if cors_config.allow_origins.len() == 1 && cors_config.allow_origins[0] == "*" {
1540 cors = cors.allow_origin(Any);
1541 } else {
1542 let origins: Vec<_> = cors_config
1543 .allow_origins
1544 .iter()
1545 .filter_map(|o| o.parse().ok())
1546 .collect();
1547 cors = cors.allow_origin(origins);
1548 }
1549
1550 let methods: Vec<Method> = cors_config
1552 .allow_methods
1553 .iter()
1554 .filter_map(|m| m.parse().ok())
1555 .collect();
1556 cors = cors.allow_methods(methods);
1557
1558 if cors_config.allow_headers.len() == 1 && cors_config.allow_headers[0] == "*" {
1560 cors = cors.allow_headers(Any);
1561 } else {
1562 let headers: Vec<header::HeaderName> = cors_config
1563 .allow_headers
1564 .iter()
1565 .filter_map(|h| h.parse().ok())
1566 .collect();
1567 cors = cors.allow_headers(headers);
1568 }
1569
1570 if !cors_config.expose_headers.is_empty() {
1572 let exposed: Vec<header::HeaderName> = cors_config
1573 .expose_headers
1574 .iter()
1575 .filter_map(|h| h.parse().ok())
1576 .collect();
1577 cors = cors.expose_headers(exposed);
1578 }
1579
1580 if cors_config.allow_credentials {
1582 cors = cors.allow_credentials(true);
1583 }
1584
1585 cors = cors.max_age(Duration::from_secs(cors_config.max_age));
1587
1588 cors
1589}
1590
1591#[cfg(test)]
1592mod tests {
1593 use super::*;
1594
1595 mod construction {
1596 use super::*;
1597
1598 #[test]
1599 fn test_builder_with_valid_config() {
1600 let source = HttpSourceBuilder::new("test-source")
1601 .with_host("localhost")
1602 .with_port(8080)
1603 .build();
1604 assert!(source.is_ok());
1605 }
1606
1607 #[test]
1608 fn test_builder_with_custom_config() {
1609 let source = HttpSourceBuilder::new("http-source")
1610 .with_host("0.0.0.0")
1611 .with_port(9000)
1612 .with_endpoint("/events")
1613 .build()
1614 .unwrap();
1615 assert_eq!(source.id(), "http-source");
1616 }
1617
1618 #[test]
1619 fn test_with_dispatch_creates_source() {
1620 let config = HttpSourceConfig {
1621 host: "localhost".to_string(),
1622 port: 8080,
1623 endpoint: None,
1624 timeout_ms: 10000,
1625 adaptive_max_batch_size: None,
1626 adaptive_min_batch_size: None,
1627 adaptive_max_wait_ms: None,
1628 adaptive_min_wait_ms: None,
1629 adaptive_window_secs: None,
1630 adaptive_enabled: None,
1631 webhooks: None,
1632 };
1633 let source = HttpSource::with_dispatch(
1634 "dispatch-source",
1635 config,
1636 Some(DispatchMode::Channel),
1637 Some(1000),
1638 );
1639 assert!(source.is_ok());
1640 assert_eq!(source.unwrap().id(), "dispatch-source");
1641 }
1642 }
1643
1644 mod properties {
1645 use super::*;
1646
1647 #[test]
1648 fn test_id_returns_correct_value() {
1649 let source = HttpSourceBuilder::new("my-http-source")
1650 .with_host("localhost")
1651 .build()
1652 .unwrap();
1653 assert_eq!(source.id(), "my-http-source");
1654 }
1655
1656 #[test]
1657 fn test_type_name_returns_http() {
1658 let source = HttpSourceBuilder::new("test")
1659 .with_host("localhost")
1660 .build()
1661 .unwrap();
1662 assert_eq!(source.type_name(), "http");
1663 }
1664
1665 #[test]
1666 fn test_properties_contains_host_and_port() {
1667 let source = HttpSourceBuilder::new("test")
1668 .with_host("192.168.1.1")
1669 .with_port(9000)
1670 .build()
1671 .unwrap();
1672 let props = source.properties();
1673
1674 assert_eq!(
1675 props.get("host"),
1676 Some(&serde_json::Value::String("192.168.1.1".to_string()))
1677 );
1678 assert_eq!(
1679 props.get("port"),
1680 Some(&serde_json::Value::Number(9000.into()))
1681 );
1682 }
1683
1684 #[test]
1685 fn test_properties_includes_endpoint_when_set() {
1686 let source = HttpSourceBuilder::new("test")
1687 .with_host("localhost")
1688 .with_endpoint("/api/v1")
1689 .build()
1690 .unwrap();
1691 let props = source.properties();
1692
1693 assert_eq!(
1694 props.get("endpoint"),
1695 Some(&serde_json::Value::String("/api/v1".to_string()))
1696 );
1697 }
1698
1699 #[test]
1700 fn test_properties_excludes_endpoint_when_none() {
1701 let source = HttpSourceBuilder::new("test")
1702 .with_host("localhost")
1703 .build()
1704 .unwrap();
1705 let props = source.properties();
1706
1707 assert!(!props.contains_key("endpoint"));
1708 }
1709 }
1710
1711 mod lifecycle {
1712 use super::*;
1713
1714 #[tokio::test]
1715 async fn test_initial_status_is_stopped() {
1716 let source = HttpSourceBuilder::new("test")
1717 .with_host("localhost")
1718 .build()
1719 .unwrap();
1720 assert_eq!(source.status().await, ComponentStatus::Stopped);
1721 }
1722 }
1723
1724 mod builder {
1725 use super::*;
1726
1727 #[test]
1728 fn test_http_builder_defaults() {
1729 let source = HttpSourceBuilder::new("test").build().unwrap();
1730 assert_eq!(source.config.port, 8080);
1731 assert_eq!(source.config.timeout_ms, 10000);
1732 assert_eq!(source.config.endpoint, None);
1733 }
1734
1735 #[test]
1736 fn test_http_builder_custom_values() {
1737 let source = HttpSourceBuilder::new("test")
1738 .with_host("api.example.com")
1739 .with_port(9000)
1740 .with_endpoint("/webhook")
1741 .with_timeout_ms(5000)
1742 .build()
1743 .unwrap();
1744
1745 assert_eq!(source.config.host, "api.example.com");
1746 assert_eq!(source.config.port, 9000);
1747 assert_eq!(source.config.endpoint, Some("/webhook".to_string()));
1748 assert_eq!(source.config.timeout_ms, 5000);
1749 }
1750
1751 #[test]
1752 fn test_http_builder_adaptive_batching() {
1753 let source = HttpSourceBuilder::new("test")
1754 .with_host("localhost")
1755 .with_adaptive_max_batch_size(1000)
1756 .with_adaptive_min_batch_size(10)
1757 .with_adaptive_max_wait_ms(500)
1758 .with_adaptive_min_wait_ms(50)
1759 .with_adaptive_window_secs(60)
1760 .with_adaptive_enabled(true)
1761 .build()
1762 .unwrap();
1763
1764 assert_eq!(source.config.adaptive_max_batch_size, Some(1000));
1765 assert_eq!(source.config.adaptive_min_batch_size, Some(10));
1766 assert_eq!(source.config.adaptive_max_wait_ms, Some(500));
1767 assert_eq!(source.config.adaptive_min_wait_ms, Some(50));
1768 assert_eq!(source.config.adaptive_window_secs, Some(60));
1769 assert_eq!(source.config.adaptive_enabled, Some(true));
1770 }
1771
1772 #[test]
1773 fn test_builder_id() {
1774 let source = HttpSource::builder("my-http-source")
1775 .with_host("localhost")
1776 .build()
1777 .unwrap();
1778
1779 assert_eq!(source.base.id, "my-http-source");
1780 }
1781 }
1782
1783 mod event_conversion {
1784 use super::*;
1785
1786 #[test]
1787 fn test_convert_node_insert() {
1788 let mut props = serde_json::Map::new();
1789 props.insert(
1790 "name".to_string(),
1791 serde_json::Value::String("Alice".to_string()),
1792 );
1793 props.insert("age".to_string(), serde_json::Value::Number(30.into()));
1794
1795 let http_change = HttpSourceChange::Insert {
1796 element: HttpElement::Node {
1797 id: "user-1".to_string(),
1798 labels: vec!["User".to_string()],
1799 properties: props,
1800 },
1801 timestamp: Some(1234567890000000000),
1802 };
1803
1804 let result = convert_http_to_source_change(&http_change, "test-source");
1805 assert!(result.is_ok());
1806
1807 match result.unwrap() {
1808 drasi_core::models::SourceChange::Insert { element } => match element {
1809 drasi_core::models::Element::Node {
1810 metadata,
1811 properties,
1812 } => {
1813 assert_eq!(metadata.reference.element_id.as_ref(), "user-1");
1814 assert_eq!(metadata.labels.len(), 1);
1815 assert_eq!(metadata.effective_from, 1234567890000);
1816 assert!(properties.get("name").is_some());
1817 assert!(properties.get("age").is_some());
1818 }
1819 _ => panic!("Expected Node element"),
1820 },
1821 _ => panic!("Expected Insert operation"),
1822 }
1823 }
1824
1825 #[test]
1826 fn test_convert_relation_insert() {
1827 let http_change = HttpSourceChange::Insert {
1828 element: HttpElement::Relation {
1829 id: "follows-1".to_string(),
1830 labels: vec!["FOLLOWS".to_string()],
1831 from: "user-1".to_string(),
1832 to: "user-2".to_string(),
1833 properties: serde_json::Map::new(),
1834 },
1835 timestamp: None,
1836 };
1837
1838 let result = convert_http_to_source_change(&http_change, "test-source");
1839 assert!(result.is_ok());
1840
1841 match result.unwrap() {
1842 drasi_core::models::SourceChange::Insert { element } => match element {
1843 drasi_core::models::Element::Relation {
1844 metadata,
1845 out_node,
1846 in_node,
1847 ..
1848 } => {
1849 assert_eq!(metadata.reference.element_id.as_ref(), "follows-1");
1850 assert_eq!(in_node.element_id.as_ref(), "user-1");
1851 assert_eq!(out_node.element_id.as_ref(), "user-2");
1852 }
1853 _ => panic!("Expected Relation element"),
1854 },
1855 _ => panic!("Expected Insert operation"),
1856 }
1857 }
1858
1859 #[test]
1860 fn test_convert_delete() {
1861 let http_change = HttpSourceChange::Delete {
1862 id: "user-1".to_string(),
1863 labels: Some(vec!["User".to_string()]),
1864 timestamp: Some(9999999999),
1865 };
1866
1867 let result = convert_http_to_source_change(&http_change, "test-source");
1868 assert!(result.is_ok());
1869
1870 match result.unwrap() {
1871 drasi_core::models::SourceChange::Delete { metadata } => {
1872 assert_eq!(metadata.reference.element_id.as_ref(), "user-1");
1873 assert_eq!(metadata.labels.len(), 1);
1874 }
1875 _ => panic!("Expected Delete operation"),
1876 }
1877 }
1878
1879 #[test]
1880 fn test_convert_update() {
1881 let http_change = HttpSourceChange::Update {
1882 element: HttpElement::Node {
1883 id: "user-1".to_string(),
1884 labels: vec!["User".to_string()],
1885 properties: serde_json::Map::new(),
1886 },
1887 timestamp: None,
1888 };
1889
1890 let result = convert_http_to_source_change(&http_change, "test-source");
1891 assert!(result.is_ok());
1892
1893 match result.unwrap() {
1894 drasi_core::models::SourceChange::Update { .. } => {
1895 }
1897 _ => panic!("Expected Update operation"),
1898 }
1899 }
1900 }
1901
1902 mod adaptive_config {
1903 use super::*;
1904
1905 #[test]
1906 fn test_adaptive_config_from_http_config() {
1907 let source = HttpSourceBuilder::new("test")
1908 .with_host("localhost")
1909 .with_adaptive_max_batch_size(500)
1910 .with_adaptive_enabled(true)
1911 .build()
1912 .unwrap();
1913
1914 assert_eq!(source.adaptive_config.max_batch_size, 500);
1916 assert!(source.adaptive_config.adaptive_enabled);
1917 }
1918
1919 #[test]
1920 fn test_adaptive_config_uses_defaults_when_not_specified() {
1921 let source = HttpSourceBuilder::new("test")
1922 .with_host("localhost")
1923 .build()
1924 .unwrap();
1925
1926 let default_config = AdaptiveBatchConfig::default();
1928 assert_eq!(
1929 source.adaptive_config.max_batch_size,
1930 default_config.max_batch_size
1931 );
1932 assert_eq!(
1933 source.adaptive_config.min_batch_size,
1934 default_config.min_batch_size
1935 );
1936 }
1937 }
1938}
1939
1940#[cfg(feature = "dynamic-plugin")]
1944drasi_plugin_sdk::export_plugin!(
1945 plugin_id = "http-source",
1946 core_version = env!("CARGO_PKG_VERSION"),
1947 lib_version = env!("CARGO_PKG_VERSION"),
1948 plugin_version = env!("CARGO_PKG_VERSION"),
1949 source_descriptors = [descriptor::HttpSourceDescriptor],
1950 reaction_descriptors = [],
1951 bootstrap_descriptors = [],
1952);