#![allow(unexpected_cfgs)]
pub mod config;
pub mod descriptor;
pub use config::HttpSourceConfig;
mod adaptive_batcher;
mod models;
mod time;
pub mod auth;
pub mod content_parser;
pub mod route_matcher;
pub mod template_engine;
pub use models::{convert_http_to_source_change, HttpElement, HttpSourceChange};
use anyhow::Result;
use async_trait::async_trait;
use axum::{
body::Bytes,
extract::{Path, State},
http::{header, Method, StatusCode},
response::IntoResponse,
routing::{delete, get, post, put},
Json, Router,
};
use log::{debug, error, info, trace, warn};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time::timeout;
use tower_http::cors::{Any, CorsLayer};
use drasi_lib::channels::{ComponentType, *};
use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
use drasi_lib::Source;
use tracing::Instrument;
use crate::adaptive_batcher::{AdaptiveBatchConfig, AdaptiveBatcher};
use crate::auth::{verify_auth, AuthResult};
use crate::config::{CorsConfig, ErrorBehavior, WebhookConfig};
use crate::content_parser::{parse_content, ContentType};
use crate::route_matcher::{convert_method, find_matching_mappings, headers_to_map, RouteMatcher};
use crate::template_engine::{TemplateContext, TemplateEngine};
#[derive(Debug, Serialize, Deserialize)]
pub struct EventResponse {
pub success: bool,
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
pub struct HttpSource {
base: SourceBase,
config: HttpSourceConfig,
adaptive_config: AdaptiveBatchConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchEventRequest {
pub events: Vec<HttpSourceChange>,
}
#[derive(Clone)]
struct HttpAppState {
source_id: String,
batch_tx: mpsc::Sender<SourceChangeEvent>,
webhook_config: Option<Arc<WebhookState>>,
}
struct WebhookState {
config: WebhookConfig,
route_matcher: RouteMatcher,
template_engine: TemplateEngine,
}
impl HttpSource {
pub fn new(id: impl Into<String>, config: HttpSourceConfig) -> Result<Self> {
let id = id.into();
let params = SourceBaseParams::new(id);
let mut adaptive_config = AdaptiveBatchConfig::default();
if let Some(max_batch) = config.adaptive_max_batch_size {
adaptive_config.max_batch_size = max_batch;
}
if let Some(min_batch) = config.adaptive_min_batch_size {
adaptive_config.min_batch_size = min_batch;
}
if let Some(max_wait_ms) = config.adaptive_max_wait_ms {
adaptive_config.max_wait_time = Duration::from_millis(max_wait_ms);
}
if let Some(min_wait_ms) = config.adaptive_min_wait_ms {
adaptive_config.min_wait_time = Duration::from_millis(min_wait_ms);
}
if let Some(window_secs) = config.adaptive_window_secs {
adaptive_config.throughput_window = Duration::from_secs(window_secs);
}
if let Some(enabled) = config.adaptive_enabled {
adaptive_config.adaptive_enabled = enabled;
}
Ok(Self {
base: SourceBase::new(params)?,
config,
adaptive_config,
})
}
pub fn with_dispatch(
id: impl Into<String>,
config: HttpSourceConfig,
dispatch_mode: Option<DispatchMode>,
dispatch_buffer_capacity: Option<usize>,
) -> Result<Self> {
let id = id.into();
let mut params = SourceBaseParams::new(id);
if let Some(mode) = dispatch_mode {
params = params.with_dispatch_mode(mode);
}
if let Some(capacity) = dispatch_buffer_capacity {
params = params.with_dispatch_buffer_capacity(capacity);
}
let mut adaptive_config = AdaptiveBatchConfig::default();
if let Some(max_batch) = config.adaptive_max_batch_size {
adaptive_config.max_batch_size = max_batch;
}
if let Some(min_batch) = config.adaptive_min_batch_size {
adaptive_config.min_batch_size = min_batch;
}
if let Some(max_wait_ms) = config.adaptive_max_wait_ms {
adaptive_config.max_wait_time = Duration::from_millis(max_wait_ms);
}
if let Some(min_wait_ms) = config.adaptive_min_wait_ms {
adaptive_config.min_wait_time = Duration::from_millis(min_wait_ms);
}
if let Some(window_secs) = config.adaptive_window_secs {
adaptive_config.throughput_window = Duration::from_secs(window_secs);
}
if let Some(enabled) = config.adaptive_enabled {
adaptive_config.adaptive_enabled = enabled;
}
Ok(Self {
base: SourceBase::new(params)?,
config,
adaptive_config,
})
}
async fn handle_single_event(
Path(source_id): Path<String>,
State(state): State<HttpAppState>,
Json(event): Json<HttpSourceChange>,
) -> Result<impl IntoResponse, (StatusCode, Json<EventResponse>)> {
debug!("[{source_id}] HTTP endpoint received single event: {event:?}");
Self::process_events(&source_id, &state, vec![event]).await
}
async fn handle_batch_events(
Path(source_id): Path<String>,
State(state): State<HttpAppState>,
Json(batch): Json<BatchEventRequest>,
) -> Result<impl IntoResponse, (StatusCode, Json<EventResponse>)> {
debug!(
"[{}] HTTP endpoint received batch of {} events",
source_id,
batch.events.len()
);
Self::process_events(&source_id, &state, batch.events).await
}
async fn process_events(
source_id: &str,
state: &HttpAppState,
events: Vec<HttpSourceChange>,
) -> Result<impl IntoResponse, (StatusCode, Json<EventResponse>)> {
trace!("[{}] Processing {} events", source_id, events.len());
if source_id != state.source_id {
error!(
"[{}] Source name mismatch. Expected '{}', got '{}'",
state.source_id, state.source_id, source_id
);
return Err((
StatusCode::BAD_REQUEST,
Json(EventResponse {
success: false,
message: "Source name mismatch".to_string(),
error: Some(format!(
"Expected source '{}', got '{}'",
state.source_id, source_id
)),
}),
));
}
let mut success_count = 0;
let mut error_count = 0;
let mut last_error = None;
for (idx, event) in events.iter().enumerate() {
match convert_http_to_source_change(event, source_id) {
Ok(source_change) => {
let change_event = SourceChangeEvent {
source_id: source_id.to_string(),
change: source_change,
timestamp: chrono::Utc::now(),
};
if let Err(e) = state.batch_tx.send(change_event).await {
error!(
"[{}] Failed to send event {} to batch channel: {}",
state.source_id,
idx + 1,
e
);
error_count += 1;
last_error = Some("Internal channel error".to_string());
} else {
success_count += 1;
}
}
Err(e) => {
error!(
"[{}] Failed to convert event {}: {}",
state.source_id,
idx + 1,
e
);
error_count += 1;
last_error = Some(e.to_string());
}
}
}
debug!(
"[{source_id}] Event processing complete: {success_count} succeeded, {error_count} failed"
);
if error_count > 0 && success_count == 0 {
Err((
StatusCode::BAD_REQUEST,
Json(EventResponse {
success: false,
message: format!("All {error_count} events failed"),
error: last_error,
}),
))
} else if error_count > 0 {
Ok(Json(EventResponse {
success: true,
message: format!(
"Processed {success_count} events successfully, {error_count} failed"
),
error: last_error,
}))
} else {
Ok(Json(EventResponse {
success: true,
message: format!("All {success_count} events processed successfully"),
error: None,
}))
}
}
async fn health_check() -> impl IntoResponse {
Json(serde_json::json!({
"status": "healthy",
"service": "http-source",
"features": ["adaptive-batching", "batch-endpoint", "webhooks"]
}))
}
async fn handle_webhook(
method: axum::http::Method,
uri: axum::http::Uri,
headers: axum::http::HeaderMap,
State(state): State<HttpAppState>,
body: Bytes,
) -> impl IntoResponse {
let path = uri.path();
let source_id = &state.source_id;
debug!("[{source_id}] Webhook received: {method} {path}");
let webhook_state = match &state.webhook_config {
Some(ws) => ws,
None => {
error!("[{source_id}] Webhook handler called but no webhook config present");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(EventResponse {
success: false,
message: "Internal configuration error".to_string(),
error: Some("Webhook mode not properly configured".to_string()),
}),
);
}
};
let http_method = match convert_method(&method) {
Some(m) => m,
None => {
return handle_error(
&webhook_state.config.error_behavior,
source_id,
StatusCode::METHOD_NOT_ALLOWED,
"Method not supported",
None,
);
}
};
let route_match = match webhook_state.route_matcher.match_route(
path,
&http_method,
&webhook_state.config.routes,
) {
Some(rm) => rm,
None => {
debug!("[{source_id}] No matching route for {method} {path}");
return handle_error(
&webhook_state.config.error_behavior,
source_id,
StatusCode::NOT_FOUND,
"No matching route",
None,
);
}
};
let route = route_match.route;
let error_behavior = route
.error_behavior
.as_ref()
.unwrap_or(&webhook_state.config.error_behavior);
let auth_result = verify_auth(route.auth.as_ref(), &headers, &body);
if let AuthResult::Failed(reason) = auth_result {
warn!("[{source_id}] Authentication failed for {path}: {reason}");
return handle_error(
error_behavior,
source_id,
StatusCode::UNAUTHORIZED,
"Authentication failed",
Some(&reason),
);
}
let content_type = ContentType::from_header(
headers
.get(axum::http::header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok()),
);
let payload = match parse_content(&body, content_type) {
Ok(p) => p,
Err(e) => {
warn!("[{source_id}] Failed to parse payload: {e}");
return handle_error(
error_behavior,
source_id,
StatusCode::BAD_REQUEST,
"Failed to parse payload",
Some(&e.to_string()),
);
}
};
let headers_map = headers_to_map(&headers);
let query_map = parse_query_string(uri.query());
let context = TemplateContext {
payload: payload.clone(),
route: route_match.path_params,
query: query_map,
headers: headers_map.clone(),
method: method.to_string(),
path: path.to_string(),
source_id: source_id.clone(),
};
let matching_mappings = find_matching_mappings(&route.mappings, &headers_map, &payload);
if matching_mappings.is_empty() {
debug!("[{source_id}] No matching mappings for request");
return handle_error(
error_behavior,
source_id,
StatusCode::BAD_REQUEST,
"No matching mapping for request",
None,
);
}
let mut success_count = 0;
let mut error_count = 0;
let mut last_error = None;
for mapping in matching_mappings {
match webhook_state
.template_engine
.process_mapping(mapping, &context, source_id)
{
Ok(source_change) => {
let event = SourceChangeEvent {
source_id: source_id.clone(),
change: source_change,
timestamp: chrono::Utc::now(),
};
if let Err(e) = state.batch_tx.send(event).await {
error!("[{source_id}] Failed to send event to batcher: {e}");
error_count += 1;
last_error = Some(format!("Failed to queue event: {e}"));
} else {
success_count += 1;
}
}
Err(e) => {
warn!("[{source_id}] Failed to process mapping: {e}");
error_count += 1;
last_error = Some(e.to_string());
}
}
}
debug!("[{source_id}] Webhook processing complete: {success_count} succeeded, {error_count} failed");
if error_count > 0 && success_count == 0 {
handle_error(
error_behavior,
source_id,
StatusCode::BAD_REQUEST,
&format!("All {error_count} mappings failed"),
last_error.as_deref(),
)
} else if error_count > 0 {
(
StatusCode::OK,
Json(EventResponse {
success: true,
message: format!("Processed {success_count} events, {error_count} failed"),
error: last_error,
}),
)
} else {
(
StatusCode::OK,
Json(EventResponse {
success: true,
message: format!("Processed {success_count} events successfully"),
error: None,
}),
)
}
}
async fn run_adaptive_batcher(
batch_rx: mpsc::Receiver<SourceChangeEvent>,
dispatchers: Arc<
tokio::sync::RwLock<
Vec<
Box<
dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync,
>,
>,
>,
>,
adaptive_config: AdaptiveBatchConfig,
source_id: String,
) {
let mut batcher = AdaptiveBatcher::new(batch_rx, adaptive_config.clone());
let mut total_events = 0u64;
let mut total_batches = 0u64;
info!("[{source_id}] Adaptive HTTP batcher started with config: {adaptive_config:?}");
while let Some(batch) = batcher.next_batch().await {
if batch.is_empty() {
debug!("[{source_id}] Batcher received empty batch, skipping");
continue;
}
let batch_size = batch.len();
total_events += batch_size as u64;
total_batches += 1;
debug!(
"[{source_id}] Batcher forwarding batch #{total_batches} with {batch_size} events to dispatchers"
);
let mut sent_count = 0;
let mut failed_count = 0;
for (idx, event) in batch.into_iter().enumerate() {
debug!(
"[{}] Batch #{}, dispatching event {}/{}",
source_id,
total_batches,
idx + 1,
batch_size
);
let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
let wrapper = SourceEventWrapper::with_profiling(
event.source_id.clone(),
SourceEvent::Change(event.change),
event.timestamp,
profiling,
);
if let Err(e) =
SourceBase::dispatch_from_task(dispatchers.clone(), wrapper.clone(), &source_id)
.await
{
error!(
"[{}] Batch #{}, failed to dispatch event {}/{} (no subscribers): {}",
source_id,
total_batches,
idx + 1,
batch_size,
e
);
failed_count += 1;
} else {
debug!(
"[{}] Batch #{}, successfully dispatched event {}/{}",
source_id,
total_batches,
idx + 1,
batch_size
);
sent_count += 1;
}
}
debug!(
"[{source_id}] Batch #{total_batches} complete: {sent_count} dispatched, {failed_count} failed"
);
if total_batches.is_multiple_of(100) {
info!(
"[{}] Adaptive HTTP metrics - Batches: {}, Events: {}, Avg batch size: {:.1}",
source_id,
total_batches,
total_events,
total_events as f64 / total_batches as f64
);
}
}
info!(
"[{source_id}] Adaptive HTTP batcher stopped - Total batches: {total_batches}, Total events: {total_events}"
);
}
}
#[async_trait]
impl Source for HttpSource {
fn id(&self) -> &str {
&self.base.id
}
fn type_name(&self) -> &str {
"http"
}
fn properties(&self) -> HashMap<String, serde_json::Value> {
match serde_json::to_value(&self.config) {
Ok(serde_json::Value::Object(map)) => map.into_iter().collect(),
_ => HashMap::new(),
}
}
fn auto_start(&self) -> bool {
self.base.get_auto_start()
}
async fn start(&self) -> Result<()> {
info!("[{}] Starting adaptive HTTP source", self.base.id);
self.base
.set_status(
ComponentStatus::Starting,
Some("Starting adaptive HTTP source".to_string()),
)
.await;
let host = self.config.host.clone();
let port = self.config.port;
let batch_channel_capacity = self.adaptive_config.recommended_channel_capacity();
let (batch_tx, batch_rx) = mpsc::channel(batch_channel_capacity);
info!(
"[{}] HttpSource using batch channel capacity: {} (max_batch_size: {} x 5)",
self.base.id, batch_channel_capacity, self.adaptive_config.max_batch_size
);
let adaptive_config = self.adaptive_config.clone();
let source_id = self.base.id.clone();
let dispatchers = self.base.dispatchers.clone();
let instance_id = self
.base
.context()
.await
.map(|c| c.instance_id)
.unwrap_or_default();
info!("[{source_id}] Starting adaptive batcher task");
let source_id_for_span = source_id.clone();
let span = tracing::info_span!(
"http_adaptive_batcher",
instance_id = %instance_id,
component_id = %source_id_for_span,
component_type = "source"
);
tokio::spawn(
async move {
Self::run_adaptive_batcher(
batch_rx,
dispatchers,
adaptive_config,
source_id.clone(),
)
.await
}
.instrument(span),
);
let webhook_state = if let Some(ref webhook_config) = self.config.webhooks {
info!(
"[{}] Webhook mode enabled with {} routes",
self.base.id,
webhook_config.routes.len()
);
Some(Arc::new(WebhookState {
config: webhook_config.clone(),
route_matcher: RouteMatcher::new(&webhook_config.routes),
template_engine: TemplateEngine::new(),
}))
} else {
info!("[{}] Standard mode enabled", self.base.id);
None
};
let state = HttpAppState {
source_id: self.base.id.clone(),
batch_tx,
webhook_config: webhook_state,
};
let app = if self.config.is_webhook_mode() {
let router = Router::new()
.route("/health", get(Self::health_check))
.fallback(Self::handle_webhook)
.with_state(state);
if let Some(ref webhooks) = self.config.webhooks {
if let Some(ref cors_config) = webhooks.cors {
if cors_config.enabled {
info!("[{}] CORS enabled for webhook endpoints", self.base.id);
router.layer(build_cors_layer(cors_config))
} else {
router
}
} else {
router
}
} else {
router
}
} else {
Router::new()
.route("/health", get(Self::health_check))
.route(
"/sources/:source_id/events",
post(Self::handle_single_event),
)
.route(
"/sources/:source_id/events/batch",
post(Self::handle_batch_events),
)
.with_state(state)
};
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
let host_clone = host.clone();
let (error_tx, error_rx) = tokio::sync::oneshot::channel();
let source_id = self.base.id.clone();
let source_id_for_span = source_id.clone();
let span = tracing::info_span!(
"http_source_server",
instance_id = %instance_id,
component_id = %source_id_for_span,
component_type = "source"
);
let server_handle = tokio::spawn(
async move {
let addr = format!("{host}:{port}");
info!("[{source_id}] Adaptive HTTP source attempting to bind to {addr}");
let listener = match tokio::net::TcpListener::bind(&addr).await {
Ok(listener) => {
info!("[{source_id}] Adaptive HTTP source successfully listening on {addr}");
listener
}
Err(e) => {
error!("[{source_id}] Failed to bind HTTP server to {addr}: {e}");
let _ = error_tx.send(format!(
"Failed to bind HTTP server to {addr}: {e}. Common causes: port already in use, insufficient permissions"
));
return;
}
};
if let Err(e) = axum::serve(listener, app)
.with_graceful_shutdown(async move {
let _ = shutdown_rx.await;
})
.await
{
error!("[{source_id}] HTTP server error: {e}");
}
}
.instrument(span),
);
*self.base.task_handle.write().await = Some(server_handle);
*self.base.shutdown_tx.write().await = Some(shutdown_tx);
match timeout(Duration::from_millis(500), error_rx).await {
Ok(Ok(error_msg)) => {
self.base.set_status(ComponentStatus::Error, None).await;
return Err(anyhow::anyhow!("{error_msg}"));
}
_ => {
self.base
.set_status(
ComponentStatus::Running,
Some(format!(
"Adaptive HTTP source running on {host_clone}:{port} with batch support"
)),
)
.await;
}
}
Ok(())
}
async fn stop(&self) -> Result<()> {
info!("[{}] Stopping adaptive HTTP source", self.base.id);
self.base
.set_status(
ComponentStatus::Stopping,
Some("Stopping adaptive HTTP source".to_string()),
)
.await;
if let Some(tx) = self.base.shutdown_tx.write().await.take() {
let _ = tx.send(());
}
if let Some(handle) = self.base.task_handle.write().await.take() {
let _ = timeout(Duration::from_secs(5), handle).await;
}
self.base
.set_status(
ComponentStatus::Stopped,
Some("Adaptive HTTP source stopped".to_string()),
)
.await;
Ok(())
}
async fn status(&self) -> ComponentStatus {
self.base.get_status().await
}
async fn subscribe(
&self,
settings: drasi_lib::config::SourceSubscriptionSettings,
) -> Result<SubscriptionResponse> {
self.base.subscribe_with_bootstrap(&settings, "HTTP").await
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
self.base.initialize(context).await;
}
async fn set_bootstrap_provider(
&self,
provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
) {
self.base.set_bootstrap_provider(provider).await;
}
}
pub struct HttpSourceBuilder {
id: String,
host: String,
port: u16,
endpoint: Option<String>,
timeout_ms: u64,
adaptive_max_batch_size: Option<usize>,
adaptive_min_batch_size: Option<usize>,
adaptive_max_wait_ms: Option<u64>,
adaptive_min_wait_ms: Option<u64>,
adaptive_window_secs: Option<u64>,
adaptive_enabled: Option<bool>,
webhooks: Option<WebhookConfig>,
dispatch_mode: Option<DispatchMode>,
dispatch_buffer_capacity: Option<usize>,
bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
auto_start: bool,
}
impl HttpSourceBuilder {
pub fn new(id: impl Into<String>) -> Self {
Self {
id: id.into(),
host: String::new(),
port: 8080,
endpoint: None,
timeout_ms: 10000,
adaptive_max_batch_size: None,
adaptive_min_batch_size: None,
adaptive_max_wait_ms: None,
adaptive_min_wait_ms: None,
adaptive_window_secs: None,
adaptive_enabled: None,
webhooks: None,
dispatch_mode: None,
dispatch_buffer_capacity: None,
bootstrap_provider: None,
auto_start: true,
}
}
pub fn with_host(mut self, host: impl Into<String>) -> Self {
self.host = host.into();
self
}
pub fn with_port(mut self, port: u16) -> Self {
self.port = port;
self
}
pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
self.endpoint = Some(endpoint.into());
self
}
pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
self.timeout_ms = timeout_ms;
self
}
pub fn with_adaptive_max_batch_size(mut self, size: usize) -> Self {
self.adaptive_max_batch_size = Some(size);
self
}
pub fn with_adaptive_min_batch_size(mut self, size: usize) -> Self {
self.adaptive_min_batch_size = Some(size);
self
}
pub fn with_adaptive_max_wait_ms(mut self, wait_ms: u64) -> Self {
self.adaptive_max_wait_ms = Some(wait_ms);
self
}
pub fn with_adaptive_min_wait_ms(mut self, wait_ms: u64) -> Self {
self.adaptive_min_wait_ms = Some(wait_ms);
self
}
pub fn with_adaptive_window_secs(mut self, secs: u64) -> Self {
self.adaptive_window_secs = Some(secs);
self
}
pub fn with_adaptive_enabled(mut self, enabled: bool) -> Self {
self.adaptive_enabled = Some(enabled);
self
}
pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
self.dispatch_mode = Some(mode);
self
}
pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
self.dispatch_buffer_capacity = Some(capacity);
self
}
pub fn with_bootstrap_provider(
mut self,
provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
) -> Self {
self.bootstrap_provider = Some(Box::new(provider));
self
}
pub fn with_auto_start(mut self, auto_start: bool) -> Self {
self.auto_start = auto_start;
self
}
pub fn with_webhooks(mut self, webhooks: WebhookConfig) -> Self {
self.webhooks = Some(webhooks);
self
}
pub fn with_config(mut self, config: HttpSourceConfig) -> Self {
self.host = config.host;
self.port = config.port;
self.endpoint = config.endpoint;
self.timeout_ms = config.timeout_ms;
self.adaptive_max_batch_size = config.adaptive_max_batch_size;
self.adaptive_min_batch_size = config.adaptive_min_batch_size;
self.adaptive_max_wait_ms = config.adaptive_max_wait_ms;
self.adaptive_min_wait_ms = config.adaptive_min_wait_ms;
self.adaptive_window_secs = config.adaptive_window_secs;
self.adaptive_enabled = config.adaptive_enabled;
self.webhooks = config.webhooks;
self
}
pub fn build(self) -> Result<HttpSource> {
let config = HttpSourceConfig {
host: self.host,
port: self.port,
endpoint: self.endpoint,
timeout_ms: self.timeout_ms,
adaptive_max_batch_size: self.adaptive_max_batch_size,
adaptive_min_batch_size: self.adaptive_min_batch_size,
adaptive_max_wait_ms: self.adaptive_max_wait_ms,
adaptive_min_wait_ms: self.adaptive_min_wait_ms,
adaptive_window_secs: self.adaptive_window_secs,
adaptive_enabled: self.adaptive_enabled,
webhooks: self.webhooks,
};
let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
if let Some(mode) = self.dispatch_mode {
params = params.with_dispatch_mode(mode);
}
if let Some(capacity) = self.dispatch_buffer_capacity {
params = params.with_dispatch_buffer_capacity(capacity);
}
if let Some(provider) = self.bootstrap_provider {
params = params.with_bootstrap_provider(provider);
}
let mut adaptive_config = AdaptiveBatchConfig::default();
if let Some(max_batch) = config.adaptive_max_batch_size {
adaptive_config.max_batch_size = max_batch;
}
if let Some(min_batch) = config.adaptive_min_batch_size {
adaptive_config.min_batch_size = min_batch;
}
if let Some(max_wait_ms) = config.adaptive_max_wait_ms {
adaptive_config.max_wait_time = Duration::from_millis(max_wait_ms);
}
if let Some(min_wait_ms) = config.adaptive_min_wait_ms {
adaptive_config.min_wait_time = Duration::from_millis(min_wait_ms);
}
if let Some(window_secs) = config.adaptive_window_secs {
adaptive_config.throughput_window = Duration::from_secs(window_secs);
}
if let Some(enabled) = config.adaptive_enabled {
adaptive_config.adaptive_enabled = enabled;
}
Ok(HttpSource {
base: SourceBase::new(params)?,
config,
adaptive_config,
})
}
}
impl HttpSource {
pub fn builder(id: impl Into<String>) -> HttpSourceBuilder {
HttpSourceBuilder::new(id)
}
}
fn handle_error(
behavior: &ErrorBehavior,
source_id: &str,
status: StatusCode,
message: &str,
detail: Option<&str>,
) -> (StatusCode, Json<EventResponse>) {
match behavior {
ErrorBehavior::Reject => {
debug!("[{source_id}] Rejecting request: {message}");
(
status,
Json(EventResponse {
success: false,
message: message.to_string(),
error: detail.map(String::from),
}),
)
}
ErrorBehavior::AcceptAndLog => {
warn!("[{source_id}] Accepting with error (logged): {message}");
(
StatusCode::OK,
Json(EventResponse {
success: true,
message: format!("Accepted with warning: {message}"),
error: detail.map(String::from),
}),
)
}
ErrorBehavior::AcceptAndSkip => {
trace!("[{source_id}] Accepting silently: {message}");
(
StatusCode::OK,
Json(EventResponse {
success: true,
message: "Accepted".to_string(),
error: None,
}),
)
}
}
}
fn parse_query_string(query: Option<&str>) -> HashMap<String, String> {
query
.map(|q| {
q.split('&')
.filter_map(|pair| {
let mut parts = pair.splitn(2, '=');
let key = parts.next()?;
let value = parts.next().unwrap_or("");
Some((urlencoding_decode(key), urlencoding_decode(value)))
})
.collect()
})
.unwrap_or_default()
}
fn urlencoding_decode(s: &str) -> String {
let mut decoded: Vec<u8> = Vec::with_capacity(s.len());
let mut chars = s.chars();
while let Some(c) = chars.next() {
if c == '%' {
let mut hex = String::new();
if let Some(c1) = chars.next() {
hex.push(c1);
}
if let Some(c2) = chars.next() {
hex.push(c2);
}
if hex.len() == 2 {
if let Ok(byte) = u8::from_str_radix(&hex, 16) {
decoded.push(byte);
continue;
}
}
decoded.extend_from_slice(b"%");
decoded.extend_from_slice(hex.as_bytes());
} else if c == '+' {
decoded.push(b' ');
} else {
let mut buf = [0u8; 4];
let encoded = c.encode_utf8(&mut buf);
decoded.extend_from_slice(encoded.as_bytes());
}
}
String::from_utf8_lossy(&decoded).into_owned()
}
fn build_cors_layer(cors_config: &CorsConfig) -> CorsLayer {
let mut cors = CorsLayer::new();
if cors_config.allow_origins.len() == 1 && cors_config.allow_origins[0] == "*" {
cors = cors.allow_origin(Any);
} else {
let origins: Vec<_> = cors_config
.allow_origins
.iter()
.filter_map(|o| o.parse().ok())
.collect();
cors = cors.allow_origin(origins);
}
let methods: Vec<Method> = cors_config
.allow_methods
.iter()
.filter_map(|m| m.parse().ok())
.collect();
cors = cors.allow_methods(methods);
if cors_config.allow_headers.len() == 1 && cors_config.allow_headers[0] == "*" {
cors = cors.allow_headers(Any);
} else {
let headers: Vec<header::HeaderName> = cors_config
.allow_headers
.iter()
.filter_map(|h| h.parse().ok())
.collect();
cors = cors.allow_headers(headers);
}
if !cors_config.expose_headers.is_empty() {
let exposed: Vec<header::HeaderName> = cors_config
.expose_headers
.iter()
.filter_map(|h| h.parse().ok())
.collect();
cors = cors.expose_headers(exposed);
}
if cors_config.allow_credentials {
cors = cors.allow_credentials(true);
}
cors = cors.max_age(Duration::from_secs(cors_config.max_age));
cors
}
#[cfg(test)]
mod tests {
use super::*;
mod construction {
use super::*;
#[test]
fn test_builder_with_valid_config() {
let source = HttpSourceBuilder::new("test-source")
.with_host("localhost")
.with_port(8080)
.build();
assert!(source.is_ok());
}
#[test]
fn test_builder_with_custom_config() {
let source = HttpSourceBuilder::new("http-source")
.with_host("0.0.0.0")
.with_port(9000)
.with_endpoint("/events")
.build()
.unwrap();
assert_eq!(source.id(), "http-source");
}
#[test]
fn test_with_dispatch_creates_source() {
let config = HttpSourceConfig {
host: "localhost".to_string(),
port: 8080,
endpoint: None,
timeout_ms: 10000,
adaptive_max_batch_size: None,
adaptive_min_batch_size: None,
adaptive_max_wait_ms: None,
adaptive_min_wait_ms: None,
adaptive_window_secs: None,
adaptive_enabled: None,
webhooks: None,
};
let source = HttpSource::with_dispatch(
"dispatch-source",
config,
Some(DispatchMode::Channel),
Some(1000),
);
assert!(source.is_ok());
assert_eq!(source.unwrap().id(), "dispatch-source");
}
}
mod properties {
use super::*;
#[test]
fn test_id_returns_correct_value() {
let source = HttpSourceBuilder::new("my-http-source")
.with_host("localhost")
.build()
.unwrap();
assert_eq!(source.id(), "my-http-source");
}
#[test]
fn test_type_name_returns_http() {
let source = HttpSourceBuilder::new("test")
.with_host("localhost")
.build()
.unwrap();
assert_eq!(source.type_name(), "http");
}
#[test]
fn test_properties_contains_host_and_port() {
let source = HttpSourceBuilder::new("test")
.with_host("192.168.1.1")
.with_port(9000)
.build()
.unwrap();
let props = source.properties();
assert_eq!(
props.get("host"),
Some(&serde_json::Value::String("192.168.1.1".to_string()))
);
assert_eq!(
props.get("port"),
Some(&serde_json::Value::Number(9000.into()))
);
}
#[test]
fn test_properties_includes_endpoint_when_set() {
let source = HttpSourceBuilder::new("test")
.with_host("localhost")
.with_endpoint("/api/v1")
.build()
.unwrap();
let props = source.properties();
assert_eq!(
props.get("endpoint"),
Some(&serde_json::Value::String("/api/v1".to_string()))
);
}
#[test]
fn test_properties_excludes_endpoint_when_none() {
let source = HttpSourceBuilder::new("test")
.with_host("localhost")
.build()
.unwrap();
let props = source.properties();
assert!(!props.contains_key("endpoint"));
}
}
mod lifecycle {
use super::*;
#[tokio::test]
async fn test_initial_status_is_stopped() {
let source = HttpSourceBuilder::new("test")
.with_host("localhost")
.build()
.unwrap();
assert_eq!(source.status().await, ComponentStatus::Stopped);
}
}
mod builder {
use super::*;
#[test]
fn test_http_builder_defaults() {
let source = HttpSourceBuilder::new("test").build().unwrap();
assert_eq!(source.config.port, 8080);
assert_eq!(source.config.timeout_ms, 10000);
assert_eq!(source.config.endpoint, None);
}
#[test]
fn test_http_builder_custom_values() {
let source = HttpSourceBuilder::new("test")
.with_host("api.example.com")
.with_port(9000)
.with_endpoint("/webhook")
.with_timeout_ms(5000)
.build()
.unwrap();
assert_eq!(source.config.host, "api.example.com");
assert_eq!(source.config.port, 9000);
assert_eq!(source.config.endpoint, Some("/webhook".to_string()));
assert_eq!(source.config.timeout_ms, 5000);
}
#[test]
fn test_http_builder_adaptive_batching() {
let source = HttpSourceBuilder::new("test")
.with_host("localhost")
.with_adaptive_max_batch_size(1000)
.with_adaptive_min_batch_size(10)
.with_adaptive_max_wait_ms(500)
.with_adaptive_min_wait_ms(50)
.with_adaptive_window_secs(60)
.with_adaptive_enabled(true)
.build()
.unwrap();
assert_eq!(source.config.adaptive_max_batch_size, Some(1000));
assert_eq!(source.config.adaptive_min_batch_size, Some(10));
assert_eq!(source.config.adaptive_max_wait_ms, Some(500));
assert_eq!(source.config.adaptive_min_wait_ms, Some(50));
assert_eq!(source.config.adaptive_window_secs, Some(60));
assert_eq!(source.config.adaptive_enabled, Some(true));
}
#[test]
fn test_builder_id() {
let source = HttpSource::builder("my-http-source")
.with_host("localhost")
.build()
.unwrap();
assert_eq!(source.base.id, "my-http-source");
}
}
mod event_conversion {
use super::*;
#[test]
fn test_convert_node_insert() {
let mut props = serde_json::Map::new();
props.insert(
"name".to_string(),
serde_json::Value::String("Alice".to_string()),
);
props.insert("age".to_string(), serde_json::Value::Number(30.into()));
let http_change = HttpSourceChange::Insert {
element: HttpElement::Node {
id: "user-1".to_string(),
labels: vec!["User".to_string()],
properties: props,
},
timestamp: Some(1234567890000000000),
};
let result = convert_http_to_source_change(&http_change, "test-source");
assert!(result.is_ok());
match result.unwrap() {
drasi_core::models::SourceChange::Insert { element } => match element {
drasi_core::models::Element::Node {
metadata,
properties,
} => {
assert_eq!(metadata.reference.element_id.as_ref(), "user-1");
assert_eq!(metadata.labels.len(), 1);
assert_eq!(metadata.effective_from, 1234567890000);
assert!(properties.get("name").is_some());
assert!(properties.get("age").is_some());
}
_ => panic!("Expected Node element"),
},
_ => panic!("Expected Insert operation"),
}
}
#[test]
fn test_convert_relation_insert() {
let http_change = HttpSourceChange::Insert {
element: HttpElement::Relation {
id: "follows-1".to_string(),
labels: vec!["FOLLOWS".to_string()],
from: "user-1".to_string(),
to: "user-2".to_string(),
properties: serde_json::Map::new(),
},
timestamp: None,
};
let result = convert_http_to_source_change(&http_change, "test-source");
assert!(result.is_ok());
match result.unwrap() {
drasi_core::models::SourceChange::Insert { element } => match element {
drasi_core::models::Element::Relation {
metadata,
out_node,
in_node,
..
} => {
assert_eq!(metadata.reference.element_id.as_ref(), "follows-1");
assert_eq!(in_node.element_id.as_ref(), "user-1");
assert_eq!(out_node.element_id.as_ref(), "user-2");
}
_ => panic!("Expected Relation element"),
},
_ => panic!("Expected Insert operation"),
}
}
#[test]
fn test_convert_delete() {
let http_change = HttpSourceChange::Delete {
id: "user-1".to_string(),
labels: Some(vec!["User".to_string()]),
timestamp: Some(9999999999),
};
let result = convert_http_to_source_change(&http_change, "test-source");
assert!(result.is_ok());
match result.unwrap() {
drasi_core::models::SourceChange::Delete { metadata } => {
assert_eq!(metadata.reference.element_id.as_ref(), "user-1");
assert_eq!(metadata.labels.len(), 1);
}
_ => panic!("Expected Delete operation"),
}
}
#[test]
fn test_convert_update() {
let http_change = HttpSourceChange::Update {
element: HttpElement::Node {
id: "user-1".to_string(),
labels: vec!["User".to_string()],
properties: serde_json::Map::new(),
},
timestamp: None,
};
let result = convert_http_to_source_change(&http_change, "test-source");
assert!(result.is_ok());
match result.unwrap() {
drasi_core::models::SourceChange::Update { .. } => {
}
_ => panic!("Expected Update operation"),
}
}
}
mod adaptive_config {
use super::*;
#[test]
fn test_adaptive_config_from_http_config() {
let source = HttpSourceBuilder::new("test")
.with_host("localhost")
.with_adaptive_max_batch_size(500)
.with_adaptive_enabled(true)
.build()
.unwrap();
assert_eq!(source.adaptive_config.max_batch_size, 500);
assert!(source.adaptive_config.adaptive_enabled);
}
#[test]
fn test_adaptive_config_uses_defaults_when_not_specified() {
let source = HttpSourceBuilder::new("test")
.with_host("localhost")
.build()
.unwrap();
let default_config = AdaptiveBatchConfig::default();
assert_eq!(
source.adaptive_config.max_batch_size,
default_config.max_batch_size
);
assert_eq!(
source.adaptive_config.min_batch_size,
default_config.min_batch_size
);
}
}
}
#[cfg(feature = "dynamic-plugin")]
drasi_plugin_sdk::export_plugin!(
plugin_id = "http-source",
core_version = env!("CARGO_PKG_VERSION"),
lib_version = env!("CARGO_PKG_VERSION"),
plugin_version = env!("CARGO_PKG_VERSION"),
source_descriptors = [descriptor::HttpSourceDescriptor],
reaction_descriptors = [],
bootstrap_descriptors = [],
);