1pub mod config;
172pub use config::HttpSourceConfig;
173
174mod adaptive_batcher;
175mod models;
176mod time;
177
178pub use models::{convert_http_to_source_change, HttpElement, HttpSourceChange};
180
181use anyhow::Result;
182use async_trait::async_trait;
183use axum::{
184 extract::{Path, State},
185 http::StatusCode,
186 response::IntoResponse,
187 routing::{get, post},
188 Json, Router,
189};
190use log::{debug, error, info, trace};
191use serde::{Deserialize, Serialize};
192use std::collections::HashMap;
193use std::sync::Arc;
194use std::time::Duration;
195use tokio::sync::mpsc;
196use tokio::time::timeout;
197
198use drasi_lib::channels::*;
199use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
200use drasi_lib::Source;
201
202use crate::adaptive_batcher::{AdaptiveBatchConfig, AdaptiveBatcher};
203
204#[derive(Debug, Serialize, Deserialize)]
206pub struct EventResponse {
207 pub success: bool,
208 pub message: String,
209 #[serde(skip_serializing_if = "Option::is_none")]
210 pub error: Option<String>,
211}
212
213pub struct HttpSource {
225 base: SourceBase,
227 config: HttpSourceConfig,
229 adaptive_config: AdaptiveBatchConfig,
231}
232
233#[derive(Debug, Clone, Serialize, Deserialize)]
235pub struct BatchEventRequest {
236 pub events: Vec<HttpSourceChange>,
237}
238
239#[derive(Clone)]
243struct HttpAppState {
244 source_id: String,
246 batch_tx: mpsc::Sender<SourceChangeEvent>,
248}
249
250impl HttpSource {
251 pub fn new(id: impl Into<String>, config: HttpSourceConfig) -> Result<Self> {
282 let id = id.into();
283 let params = SourceBaseParams::new(id);
284
285 let mut adaptive_config = AdaptiveBatchConfig::default();
287
288 if let Some(max_batch) = config.adaptive_max_batch_size {
290 adaptive_config.max_batch_size = max_batch;
291 }
292 if let Some(min_batch) = config.adaptive_min_batch_size {
293 adaptive_config.min_batch_size = min_batch;
294 }
295 if let Some(max_wait_ms) = config.adaptive_max_wait_ms {
296 adaptive_config.max_wait_time = Duration::from_millis(max_wait_ms);
297 }
298 if let Some(min_wait_ms) = config.adaptive_min_wait_ms {
299 adaptive_config.min_wait_time = Duration::from_millis(min_wait_ms);
300 }
301 if let Some(window_secs) = config.adaptive_window_secs {
302 adaptive_config.throughput_window = Duration::from_secs(window_secs);
303 }
304 if let Some(enabled) = config.adaptive_enabled {
305 adaptive_config.adaptive_enabled = enabled;
306 }
307
308 Ok(Self {
309 base: SourceBase::new(params)?,
310 config,
311 adaptive_config,
312 })
313 }
314
315 pub fn with_dispatch(
335 id: impl Into<String>,
336 config: HttpSourceConfig,
337 dispatch_mode: Option<DispatchMode>,
338 dispatch_buffer_capacity: Option<usize>,
339 ) -> Result<Self> {
340 let id = id.into();
341 let mut params = SourceBaseParams::new(id);
342 if let Some(mode) = dispatch_mode {
343 params = params.with_dispatch_mode(mode);
344 }
345 if let Some(capacity) = dispatch_buffer_capacity {
346 params = params.with_dispatch_buffer_capacity(capacity);
347 }
348
349 let mut adaptive_config = AdaptiveBatchConfig::default();
350
351 if let Some(max_batch) = config.adaptive_max_batch_size {
352 adaptive_config.max_batch_size = max_batch;
353 }
354 if let Some(min_batch) = config.adaptive_min_batch_size {
355 adaptive_config.min_batch_size = min_batch;
356 }
357 if let Some(max_wait_ms) = config.adaptive_max_wait_ms {
358 adaptive_config.max_wait_time = Duration::from_millis(max_wait_ms);
359 }
360 if let Some(min_wait_ms) = config.adaptive_min_wait_ms {
361 adaptive_config.min_wait_time = Duration::from_millis(min_wait_ms);
362 }
363 if let Some(window_secs) = config.adaptive_window_secs {
364 adaptive_config.throughput_window = Duration::from_secs(window_secs);
365 }
366 if let Some(enabled) = config.adaptive_enabled {
367 adaptive_config.adaptive_enabled = enabled;
368 }
369
370 Ok(Self {
371 base: SourceBase::new(params)?,
372 config,
373 adaptive_config,
374 })
375 }
376
377 async fn handle_single_event(
382 Path(source_id): Path<String>,
383 State(state): State<HttpAppState>,
384 Json(event): Json<HttpSourceChange>,
385 ) -> Result<impl IntoResponse, (StatusCode, Json<EventResponse>)> {
386 debug!("[{source_id}] HTTP endpoint received single event: {event:?}");
387 Self::process_events(&source_id, &state, vec![event]).await
388 }
389
390 async fn handle_batch_events(
395 Path(source_id): Path<String>,
396 State(state): State<HttpAppState>,
397 Json(batch): Json<BatchEventRequest>,
398 ) -> Result<impl IntoResponse, (StatusCode, Json<EventResponse>)> {
399 debug!(
400 "[{}] HTTP endpoint received batch of {} events",
401 source_id,
402 batch.events.len()
403 );
404 Self::process_events(&source_id, &state, batch.events).await
405 }
406
407 async fn process_events(
419 source_id: &str,
420 state: &HttpAppState,
421 events: Vec<HttpSourceChange>,
422 ) -> Result<impl IntoResponse, (StatusCode, Json<EventResponse>)> {
423 trace!("[{}] Processing {} events", source_id, events.len());
424
425 if source_id != state.source_id {
426 error!(
427 "[{}] Source name mismatch. Expected '{}', got '{}'",
428 state.source_id, state.source_id, source_id
429 );
430 return Err((
431 StatusCode::BAD_REQUEST,
432 Json(EventResponse {
433 success: false,
434 message: "Source name mismatch".to_string(),
435 error: Some(format!(
436 "Expected source '{}', got '{}'",
437 state.source_id, source_id
438 )),
439 }),
440 ));
441 }
442
443 let mut success_count = 0;
444 let mut error_count = 0;
445 let mut last_error = None;
446
447 for (idx, event) in events.iter().enumerate() {
448 match convert_http_to_source_change(event, source_id) {
449 Ok(source_change) => {
450 let change_event = SourceChangeEvent {
451 source_id: source_id.to_string(),
452 change: source_change,
453 timestamp: chrono::Utc::now(),
454 };
455
456 if let Err(e) = state.batch_tx.send(change_event).await {
457 error!(
458 "[{}] Failed to send event {} to batch channel: {}",
459 state.source_id,
460 idx + 1,
461 e
462 );
463 error_count += 1;
464 last_error = Some("Internal channel error".to_string());
465 } else {
466 success_count += 1;
467 }
468 }
469 Err(e) => {
470 error!(
471 "[{}] Failed to convert event {}: {}",
472 state.source_id,
473 idx + 1,
474 e
475 );
476 error_count += 1;
477 last_error = Some(e.to_string());
478 }
479 }
480 }
481
482 debug!(
483 "[{source_id}] Event processing complete: {success_count} succeeded, {error_count} failed"
484 );
485
486 if error_count > 0 && success_count == 0 {
487 Err((
488 StatusCode::BAD_REQUEST,
489 Json(EventResponse {
490 success: false,
491 message: format!("All {error_count} events failed"),
492 error: last_error,
493 }),
494 ))
495 } else if error_count > 0 {
496 Ok(Json(EventResponse {
497 success: true,
498 message: format!(
499 "Processed {success_count} events successfully, {error_count} failed"
500 ),
501 error: last_error,
502 }))
503 } else {
504 Ok(Json(EventResponse {
505 success: true,
506 message: format!("All {success_count} events processed successfully"),
507 error: None,
508 }))
509 }
510 }
511
512 async fn health_check() -> impl IntoResponse {
513 Json(serde_json::json!({
514 "status": "healthy",
515 "service": "http-source",
516 "features": ["adaptive-batching", "batch-endpoint"]
517 }))
518 }
519
520 async fn run_adaptive_batcher(
521 batch_rx: mpsc::Receiver<SourceChangeEvent>,
522 dispatchers: Arc<
523 tokio::sync::RwLock<
524 Vec<
525 Box<
526 dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync,
527 >,
528 >,
529 >,
530 >,
531 adaptive_config: AdaptiveBatchConfig,
532 source_id: String,
533 ) {
534 let mut batcher = AdaptiveBatcher::new(batch_rx, adaptive_config.clone());
535 let mut total_events = 0u64;
536 let mut total_batches = 0u64;
537
538 info!("[{source_id}] Adaptive HTTP batcher started with config: {adaptive_config:?}");
539
540 while let Some(batch) = batcher.next_batch().await {
541 if batch.is_empty() {
542 debug!("[{source_id}] Batcher received empty batch, skipping");
543 continue;
544 }
545
546 let batch_size = batch.len();
547 total_events += batch_size as u64;
548 total_batches += 1;
549
550 debug!(
551 "[{source_id}] Batcher forwarding batch #{total_batches} with {batch_size} events to dispatchers"
552 );
553
554 let mut sent_count = 0;
555 let mut failed_count = 0;
556 for (idx, event) in batch.into_iter().enumerate() {
557 debug!(
558 "[{}] Batch #{}, dispatching event {}/{}",
559 source_id,
560 total_batches,
561 idx + 1,
562 batch_size
563 );
564
565 let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
566 profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
567
568 let wrapper = SourceEventWrapper::with_profiling(
569 event.source_id.clone(),
570 SourceEvent::Change(event.change),
571 event.timestamp,
572 profiling,
573 );
574
575 if let Err(e) =
576 SourceBase::dispatch_from_task(dispatchers.clone(), wrapper.clone(), &source_id)
577 .await
578 {
579 error!(
580 "[{}] Batch #{}, failed to dispatch event {}/{} (no subscribers): {}",
581 source_id,
582 total_batches,
583 idx + 1,
584 batch_size,
585 e
586 );
587 failed_count += 1;
588 } else {
589 debug!(
590 "[{}] Batch #{}, successfully dispatched event {}/{}",
591 source_id,
592 total_batches,
593 idx + 1,
594 batch_size
595 );
596 sent_count += 1;
597 }
598 }
599
600 debug!(
601 "[{source_id}] Batch #{total_batches} complete: {sent_count} dispatched, {failed_count} failed"
602 );
603
604 if total_batches.is_multiple_of(100) {
605 info!(
606 "[{}] Adaptive HTTP metrics - Batches: {}, Events: {}, Avg batch size: {:.1}",
607 source_id,
608 total_batches,
609 total_events,
610 total_events as f64 / total_batches as f64
611 );
612 }
613 }
614
615 info!(
616 "[{source_id}] Adaptive HTTP batcher stopped - Total batches: {total_batches}, Total events: {total_events}"
617 );
618 }
619}
620
621#[async_trait]
622impl Source for HttpSource {
623 fn id(&self) -> &str {
624 &self.base.id
625 }
626
627 fn type_name(&self) -> &str {
628 "http"
629 }
630
631 fn properties(&self) -> HashMap<String, serde_json::Value> {
632 let mut props = HashMap::new();
633 props.insert(
634 "host".to_string(),
635 serde_json::Value::String(self.config.host.clone()),
636 );
637 props.insert(
638 "port".to_string(),
639 serde_json::Value::Number(self.config.port.into()),
640 );
641 if let Some(ref endpoint) = self.config.endpoint {
642 props.insert(
643 "endpoint".to_string(),
644 serde_json::Value::String(endpoint.clone()),
645 );
646 }
647 props
648 }
649
650 fn auto_start(&self) -> bool {
651 self.base.get_auto_start()
652 }
653
654 async fn start(&self) -> Result<()> {
655 info!("[{}] Starting adaptive HTTP source", self.base.id);
656
657 self.base.set_status(ComponentStatus::Starting).await;
658 self.base
659 .send_component_event(
660 ComponentStatus::Starting,
661 Some("Starting adaptive HTTP source".to_string()),
662 )
663 .await?;
664
665 let host = self.config.host.clone();
666 let port = self.config.port;
667
668 let batch_channel_capacity = self.adaptive_config.recommended_channel_capacity();
670 let (batch_tx, batch_rx) = mpsc::channel(batch_channel_capacity);
671 info!(
672 "[{}] HttpSource using batch channel capacity: {} (max_batch_size: {} x 5)",
673 self.base.id, batch_channel_capacity, self.adaptive_config.max_batch_size
674 );
675
676 let adaptive_config = self.adaptive_config.clone();
678 let source_id = self.base.id.clone();
679
680 info!("[{source_id}] Starting adaptive batcher task");
681 tokio::spawn(Self::run_adaptive_batcher(
682 batch_rx,
683 self.base.dispatchers.clone(),
684 adaptive_config,
685 source_id.clone(),
686 ));
687
688 let state = HttpAppState {
690 source_id: self.base.id.clone(),
691 batch_tx,
692 };
693
694 let app = Router::new()
696 .route("/health", get(Self::health_check))
697 .route(
698 "/sources/:source_id/events",
699 post(Self::handle_single_event),
700 )
701 .route(
702 "/sources/:source_id/events/batch",
703 post(Self::handle_batch_events),
704 )
705 .with_state(state);
706
707 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
709
710 let host_clone = host.clone();
711
712 let (error_tx, error_rx) = tokio::sync::oneshot::channel();
714 let server_handle = tokio::spawn(async move {
715 let addr = format!("{host}:{port}");
716 info!("[{source_id}] Adaptive HTTP source attempting to bind to {addr}");
717
718 let listener = match tokio::net::TcpListener::bind(&addr).await {
719 Ok(listener) => {
720 info!("[{source_id}] Adaptive HTTP source successfully listening on {addr}");
721 listener
722 }
723 Err(e) => {
724 error!("[{source_id}] Failed to bind HTTP server to {addr}: {e}");
725 let _ = error_tx.send(format!(
726 "Failed to bind HTTP server to {addr}: {e}. Common causes: port already in use, insufficient permissions"
727 ));
728 return;
729 }
730 };
731
732 if let Err(e) = axum::serve(listener, app)
733 .with_graceful_shutdown(async move {
734 let _ = shutdown_rx.await;
735 })
736 .await
737 {
738 error!("[{source_id}] HTTP server error: {e}");
739 }
740 });
741
742 *self.base.task_handle.write().await = Some(server_handle);
743 *self.base.shutdown_tx.write().await = Some(shutdown_tx);
744
745 match timeout(Duration::from_millis(500), error_rx).await {
747 Ok(Ok(error_msg)) => {
748 self.base.set_status(ComponentStatus::Error).await;
749 return Err(anyhow::anyhow!("{error_msg}"));
750 }
751 _ => {
752 self.base.set_status(ComponentStatus::Running).await;
753 }
754 }
755
756 self.base
757 .send_component_event(
758 ComponentStatus::Running,
759 Some(format!(
760 "Adaptive HTTP source running on {host_clone}:{port} with batch support"
761 )),
762 )
763 .await?;
764
765 Ok(())
766 }
767
768 async fn stop(&self) -> Result<()> {
769 info!("[{}] Stopping adaptive HTTP source", self.base.id);
770
771 self.base.set_status(ComponentStatus::Stopping).await;
772 self.base
773 .send_component_event(
774 ComponentStatus::Stopping,
775 Some("Stopping adaptive HTTP source".to_string()),
776 )
777 .await?;
778
779 if let Some(tx) = self.base.shutdown_tx.write().await.take() {
780 let _ = tx.send(());
781 }
782
783 if let Some(handle) = self.base.task_handle.write().await.take() {
784 let _ = timeout(Duration::from_secs(5), handle).await;
785 }
786
787 self.base.set_status(ComponentStatus::Stopped).await;
788 self.base
789 .send_component_event(
790 ComponentStatus::Stopped,
791 Some("Adaptive HTTP source stopped".to_string()),
792 )
793 .await?;
794
795 Ok(())
796 }
797
798 async fn status(&self) -> ComponentStatus {
799 self.base.get_status().await
800 }
801
802 async fn subscribe(
803 &self,
804 settings: drasi_lib::config::SourceSubscriptionSettings,
805 ) -> Result<SubscriptionResponse> {
806 self.base.subscribe_with_bootstrap(&settings, "HTTP").await
807 }
808
809 fn as_any(&self) -> &dyn std::any::Any {
810 self
811 }
812
813 async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
814 self.base.initialize(context).await;
815 }
816
817 async fn set_bootstrap_provider(
818 &self,
819 provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
820 ) {
821 self.base.set_bootstrap_provider(provider).await;
822 }
823}
824
825pub struct HttpSourceBuilder {
844 id: String,
845 host: String,
846 port: u16,
847 endpoint: Option<String>,
848 timeout_ms: u64,
849 adaptive_max_batch_size: Option<usize>,
850 adaptive_min_batch_size: Option<usize>,
851 adaptive_max_wait_ms: Option<u64>,
852 adaptive_min_wait_ms: Option<u64>,
853 adaptive_window_secs: Option<u64>,
854 adaptive_enabled: Option<bool>,
855 dispatch_mode: Option<DispatchMode>,
856 dispatch_buffer_capacity: Option<usize>,
857 bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
858 auto_start: bool,
859}
860
861impl HttpSourceBuilder {
862 pub fn new(id: impl Into<String>) -> Self {
868 Self {
869 id: id.into(),
870 host: String::new(),
871 port: 8080,
872 endpoint: None,
873 timeout_ms: 10000,
874 adaptive_max_batch_size: None,
875 adaptive_min_batch_size: None,
876 adaptive_max_wait_ms: None,
877 adaptive_min_wait_ms: None,
878 adaptive_window_secs: None,
879 adaptive_enabled: None,
880 dispatch_mode: None,
881 dispatch_buffer_capacity: None,
882 bootstrap_provider: None,
883 auto_start: true,
884 }
885 }
886
887 pub fn with_host(mut self, host: impl Into<String>) -> Self {
889 self.host = host.into();
890 self
891 }
892
893 pub fn with_port(mut self, port: u16) -> Self {
895 self.port = port;
896 self
897 }
898
899 pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
901 self.endpoint = Some(endpoint.into());
902 self
903 }
904
905 pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
907 self.timeout_ms = timeout_ms;
908 self
909 }
910
911 pub fn with_adaptive_max_batch_size(mut self, size: usize) -> Self {
913 self.adaptive_max_batch_size = Some(size);
914 self
915 }
916
917 pub fn with_adaptive_min_batch_size(mut self, size: usize) -> Self {
919 self.adaptive_min_batch_size = Some(size);
920 self
921 }
922
923 pub fn with_adaptive_max_wait_ms(mut self, wait_ms: u64) -> Self {
925 self.adaptive_max_wait_ms = Some(wait_ms);
926 self
927 }
928
929 pub fn with_adaptive_min_wait_ms(mut self, wait_ms: u64) -> Self {
931 self.adaptive_min_wait_ms = Some(wait_ms);
932 self
933 }
934
935 pub fn with_adaptive_window_secs(mut self, secs: u64) -> Self {
937 self.adaptive_window_secs = Some(secs);
938 self
939 }
940
941 pub fn with_adaptive_enabled(mut self, enabled: bool) -> Self {
943 self.adaptive_enabled = Some(enabled);
944 self
945 }
946
947 pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
949 self.dispatch_mode = Some(mode);
950 self
951 }
952
953 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
955 self.dispatch_buffer_capacity = Some(capacity);
956 self
957 }
958
959 pub fn with_bootstrap_provider(
961 mut self,
962 provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
963 ) -> Self {
964 self.bootstrap_provider = Some(Box::new(provider));
965 self
966 }
967
968 pub fn with_auto_start(mut self, auto_start: bool) -> Self {
973 self.auto_start = auto_start;
974 self
975 }
976
977 pub fn with_config(mut self, config: HttpSourceConfig) -> Self {
979 self.host = config.host;
980 self.port = config.port;
981 self.endpoint = config.endpoint;
982 self.timeout_ms = config.timeout_ms;
983 self.adaptive_max_batch_size = config.adaptive_max_batch_size;
984 self.adaptive_min_batch_size = config.adaptive_min_batch_size;
985 self.adaptive_max_wait_ms = config.adaptive_max_wait_ms;
986 self.adaptive_min_wait_ms = config.adaptive_min_wait_ms;
987 self.adaptive_window_secs = config.adaptive_window_secs;
988 self.adaptive_enabled = config.adaptive_enabled;
989 self
990 }
991
992 pub fn build(self) -> Result<HttpSource> {
998 let config = HttpSourceConfig {
999 host: self.host,
1000 port: self.port,
1001 endpoint: self.endpoint,
1002 timeout_ms: self.timeout_ms,
1003 adaptive_max_batch_size: self.adaptive_max_batch_size,
1004 adaptive_min_batch_size: self.adaptive_min_batch_size,
1005 adaptive_max_wait_ms: self.adaptive_max_wait_ms,
1006 adaptive_min_wait_ms: self.adaptive_min_wait_ms,
1007 adaptive_window_secs: self.adaptive_window_secs,
1008 adaptive_enabled: self.adaptive_enabled,
1009 };
1010
1011 let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
1013 if let Some(mode) = self.dispatch_mode {
1014 params = params.with_dispatch_mode(mode);
1015 }
1016 if let Some(capacity) = self.dispatch_buffer_capacity {
1017 params = params.with_dispatch_buffer_capacity(capacity);
1018 }
1019 if let Some(provider) = self.bootstrap_provider {
1020 params = params.with_bootstrap_provider(provider);
1021 }
1022
1023 let mut adaptive_config = AdaptiveBatchConfig::default();
1025 if let Some(max_batch) = config.adaptive_max_batch_size {
1026 adaptive_config.max_batch_size = max_batch;
1027 }
1028 if let Some(min_batch) = config.adaptive_min_batch_size {
1029 adaptive_config.min_batch_size = min_batch;
1030 }
1031 if let Some(max_wait_ms) = config.adaptive_max_wait_ms {
1032 adaptive_config.max_wait_time = Duration::from_millis(max_wait_ms);
1033 }
1034 if let Some(min_wait_ms) = config.adaptive_min_wait_ms {
1035 adaptive_config.min_wait_time = Duration::from_millis(min_wait_ms);
1036 }
1037 if let Some(window_secs) = config.adaptive_window_secs {
1038 adaptive_config.throughput_window = Duration::from_secs(window_secs);
1039 }
1040 if let Some(enabled) = config.adaptive_enabled {
1041 adaptive_config.adaptive_enabled = enabled;
1042 }
1043
1044 Ok(HttpSource {
1045 base: SourceBase::new(params)?,
1046 config,
1047 adaptive_config,
1048 })
1049 }
1050}
1051
1052impl HttpSource {
1053 pub fn builder(id: impl Into<String>) -> HttpSourceBuilder {
1071 HttpSourceBuilder::new(id)
1072 }
1073}
1074
1075#[cfg(test)]
1076mod tests {
1077 use super::*;
1078
1079 mod construction {
1080 use super::*;
1081
1082 #[test]
1083 fn test_builder_with_valid_config() {
1084 let source = HttpSourceBuilder::new("test-source")
1085 .with_host("localhost")
1086 .with_port(8080)
1087 .build();
1088 assert!(source.is_ok());
1089 }
1090
1091 #[test]
1092 fn test_builder_with_custom_config() {
1093 let source = HttpSourceBuilder::new("http-source")
1094 .with_host("0.0.0.0")
1095 .with_port(9000)
1096 .with_endpoint("/events")
1097 .build()
1098 .unwrap();
1099 assert_eq!(source.id(), "http-source");
1100 }
1101
1102 #[test]
1103 fn test_with_dispatch_creates_source() {
1104 let config = HttpSourceConfig {
1105 host: "localhost".to_string(),
1106 port: 8080,
1107 endpoint: None,
1108 timeout_ms: 10000,
1109 adaptive_max_batch_size: None,
1110 adaptive_min_batch_size: None,
1111 adaptive_max_wait_ms: None,
1112 adaptive_min_wait_ms: None,
1113 adaptive_window_secs: None,
1114 adaptive_enabled: None,
1115 };
1116 let source = HttpSource::with_dispatch(
1117 "dispatch-source",
1118 config,
1119 Some(DispatchMode::Channel),
1120 Some(1000),
1121 );
1122 assert!(source.is_ok());
1123 assert_eq!(source.unwrap().id(), "dispatch-source");
1124 }
1125 }
1126
1127 mod properties {
1128 use super::*;
1129
1130 #[test]
1131 fn test_id_returns_correct_value() {
1132 let source = HttpSourceBuilder::new("my-http-source")
1133 .with_host("localhost")
1134 .build()
1135 .unwrap();
1136 assert_eq!(source.id(), "my-http-source");
1137 }
1138
1139 #[test]
1140 fn test_type_name_returns_http() {
1141 let source = HttpSourceBuilder::new("test")
1142 .with_host("localhost")
1143 .build()
1144 .unwrap();
1145 assert_eq!(source.type_name(), "http");
1146 }
1147
1148 #[test]
1149 fn test_properties_contains_host_and_port() {
1150 let source = HttpSourceBuilder::new("test")
1151 .with_host("192.168.1.1")
1152 .with_port(9000)
1153 .build()
1154 .unwrap();
1155 let props = source.properties();
1156
1157 assert_eq!(
1158 props.get("host"),
1159 Some(&serde_json::Value::String("192.168.1.1".to_string()))
1160 );
1161 assert_eq!(
1162 props.get("port"),
1163 Some(&serde_json::Value::Number(9000.into()))
1164 );
1165 }
1166
1167 #[test]
1168 fn test_properties_includes_endpoint_when_set() {
1169 let source = HttpSourceBuilder::new("test")
1170 .with_host("localhost")
1171 .with_endpoint("/api/v1")
1172 .build()
1173 .unwrap();
1174 let props = source.properties();
1175
1176 assert_eq!(
1177 props.get("endpoint"),
1178 Some(&serde_json::Value::String("/api/v1".to_string()))
1179 );
1180 }
1181
1182 #[test]
1183 fn test_properties_excludes_endpoint_when_none() {
1184 let source = HttpSourceBuilder::new("test")
1185 .with_host("localhost")
1186 .build()
1187 .unwrap();
1188 let props = source.properties();
1189
1190 assert!(!props.contains_key("endpoint"));
1191 }
1192 }
1193
1194 mod lifecycle {
1195 use super::*;
1196
1197 #[tokio::test]
1198 async fn test_initial_status_is_stopped() {
1199 let source = HttpSourceBuilder::new("test")
1200 .with_host("localhost")
1201 .build()
1202 .unwrap();
1203 assert_eq!(source.status().await, ComponentStatus::Stopped);
1204 }
1205 }
1206
1207 mod builder {
1208 use super::*;
1209
1210 #[test]
1211 fn test_http_builder_defaults() {
1212 let source = HttpSourceBuilder::new("test").build().unwrap();
1213 assert_eq!(source.config.port, 8080);
1214 assert_eq!(source.config.timeout_ms, 10000);
1215 assert_eq!(source.config.endpoint, None);
1216 }
1217
1218 #[test]
1219 fn test_http_builder_custom_values() {
1220 let source = HttpSourceBuilder::new("test")
1221 .with_host("api.example.com")
1222 .with_port(9000)
1223 .with_endpoint("/webhook")
1224 .with_timeout_ms(5000)
1225 .build()
1226 .unwrap();
1227
1228 assert_eq!(source.config.host, "api.example.com");
1229 assert_eq!(source.config.port, 9000);
1230 assert_eq!(source.config.endpoint, Some("/webhook".to_string()));
1231 assert_eq!(source.config.timeout_ms, 5000);
1232 }
1233
1234 #[test]
1235 fn test_http_builder_adaptive_batching() {
1236 let source = HttpSourceBuilder::new("test")
1237 .with_host("localhost")
1238 .with_adaptive_max_batch_size(1000)
1239 .with_adaptive_min_batch_size(10)
1240 .with_adaptive_max_wait_ms(500)
1241 .with_adaptive_min_wait_ms(50)
1242 .with_adaptive_window_secs(60)
1243 .with_adaptive_enabled(true)
1244 .build()
1245 .unwrap();
1246
1247 assert_eq!(source.config.adaptive_max_batch_size, Some(1000));
1248 assert_eq!(source.config.adaptive_min_batch_size, Some(10));
1249 assert_eq!(source.config.adaptive_max_wait_ms, Some(500));
1250 assert_eq!(source.config.adaptive_min_wait_ms, Some(50));
1251 assert_eq!(source.config.adaptive_window_secs, Some(60));
1252 assert_eq!(source.config.adaptive_enabled, Some(true));
1253 }
1254
1255 #[test]
1256 fn test_builder_id() {
1257 let source = HttpSource::builder("my-http-source")
1258 .with_host("localhost")
1259 .build()
1260 .unwrap();
1261
1262 assert_eq!(source.base.id, "my-http-source");
1263 }
1264 }
1265
1266 mod event_conversion {
1267 use super::*;
1268
1269 #[test]
1270 fn test_convert_node_insert() {
1271 let mut props = serde_json::Map::new();
1272 props.insert(
1273 "name".to_string(),
1274 serde_json::Value::String("Alice".to_string()),
1275 );
1276 props.insert("age".to_string(), serde_json::Value::Number(30.into()));
1277
1278 let http_change = HttpSourceChange::Insert {
1279 element: HttpElement::Node {
1280 id: "user-1".to_string(),
1281 labels: vec!["User".to_string()],
1282 properties: props,
1283 },
1284 timestamp: Some(1234567890000000000),
1285 };
1286
1287 let result = convert_http_to_source_change(&http_change, "test-source");
1288 assert!(result.is_ok());
1289
1290 match result.unwrap() {
1291 drasi_core::models::SourceChange::Insert { element } => match element {
1292 drasi_core::models::Element::Node {
1293 metadata,
1294 properties,
1295 } => {
1296 assert_eq!(metadata.reference.element_id.as_ref(), "user-1");
1297 assert_eq!(metadata.labels.len(), 1);
1298 assert_eq!(metadata.effective_from, 1234567890000);
1299 assert!(properties.get("name").is_some());
1300 assert!(properties.get("age").is_some());
1301 }
1302 _ => panic!("Expected Node element"),
1303 },
1304 _ => panic!("Expected Insert operation"),
1305 }
1306 }
1307
1308 #[test]
1309 fn test_convert_relation_insert() {
1310 let http_change = HttpSourceChange::Insert {
1311 element: HttpElement::Relation {
1312 id: "follows-1".to_string(),
1313 labels: vec!["FOLLOWS".to_string()],
1314 from: "user-1".to_string(),
1315 to: "user-2".to_string(),
1316 properties: serde_json::Map::new(),
1317 },
1318 timestamp: None,
1319 };
1320
1321 let result = convert_http_to_source_change(&http_change, "test-source");
1322 assert!(result.is_ok());
1323
1324 match result.unwrap() {
1325 drasi_core::models::SourceChange::Insert { element } => match element {
1326 drasi_core::models::Element::Relation {
1327 metadata,
1328 out_node,
1329 in_node,
1330 ..
1331 } => {
1332 assert_eq!(metadata.reference.element_id.as_ref(), "follows-1");
1333 assert_eq!(out_node.element_id.as_ref(), "user-1");
1334 assert_eq!(in_node.element_id.as_ref(), "user-2");
1335 }
1336 _ => panic!("Expected Relation element"),
1337 },
1338 _ => panic!("Expected Insert operation"),
1339 }
1340 }
1341
1342 #[test]
1343 fn test_convert_delete() {
1344 let http_change = HttpSourceChange::Delete {
1345 id: "user-1".to_string(),
1346 labels: Some(vec!["User".to_string()]),
1347 timestamp: Some(9999999999),
1348 };
1349
1350 let result = convert_http_to_source_change(&http_change, "test-source");
1351 assert!(result.is_ok());
1352
1353 match result.unwrap() {
1354 drasi_core::models::SourceChange::Delete { metadata } => {
1355 assert_eq!(metadata.reference.element_id.as_ref(), "user-1");
1356 assert_eq!(metadata.labels.len(), 1);
1357 }
1358 _ => panic!("Expected Delete operation"),
1359 }
1360 }
1361
1362 #[test]
1363 fn test_convert_update() {
1364 let http_change = HttpSourceChange::Update {
1365 element: HttpElement::Node {
1366 id: "user-1".to_string(),
1367 labels: vec!["User".to_string()],
1368 properties: serde_json::Map::new(),
1369 },
1370 timestamp: None,
1371 };
1372
1373 let result = convert_http_to_source_change(&http_change, "test-source");
1374 assert!(result.is_ok());
1375
1376 match result.unwrap() {
1377 drasi_core::models::SourceChange::Update { .. } => {
1378 }
1380 _ => panic!("Expected Update operation"),
1381 }
1382 }
1383 }
1384
1385 mod adaptive_config {
1386 use super::*;
1387
1388 #[test]
1389 fn test_adaptive_config_from_http_config() {
1390 let source = HttpSourceBuilder::new("test")
1391 .with_host("localhost")
1392 .with_adaptive_max_batch_size(500)
1393 .with_adaptive_enabled(true)
1394 .build()
1395 .unwrap();
1396
1397 assert_eq!(source.adaptive_config.max_batch_size, 500);
1399 assert!(source.adaptive_config.adaptive_enabled);
1400 }
1401
1402 #[test]
1403 fn test_adaptive_config_uses_defaults_when_not_specified() {
1404 let source = HttpSourceBuilder::new("test")
1405 .with_host("localhost")
1406 .build()
1407 .unwrap();
1408
1409 let default_config = AdaptiveBatchConfig::default();
1411 assert_eq!(
1412 source.adaptive_config.max_batch_size,
1413 default_config.max_batch_size
1414 );
1415 assert_eq!(
1416 source.adaptive_config.min_batch_size,
1417 default_config.min_batch_size
1418 );
1419 }
1420 }
1421}