use anyhow::Result;
use async_trait::async_trait;
use handlebars::Handlebars;
use log::{debug, error, info, warn};
use reqwest::{
header::{HeaderMap, HeaderName, HeaderValue},
Client, Method,
};
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use drasi_lib::channels::{ComponentStatus, ResultDiff};
use drasi_lib::reactions::common::base::{ReactionBase, ReactionBaseParams};
use drasi_lib::Reaction;
use crate::adaptive_batcher::{AdaptiveBatchConfig, AdaptiveBatcher};
use drasi_reaction_http::QueryConfig;
pub use super::config::HttpAdaptiveReactionConfig;
use super::HttpAdaptiveReactionBuilder;
#[cfg(test)]
mod tests;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchResult {
pub query_id: String,
pub results: Vec<ResultDiff>,
pub timestamp: String,
pub count: usize,
}
pub struct AdaptiveHttpReaction {
base: ReactionBase,
config: HttpAdaptiveReactionConfig,
base_url: String,
token: Option<String>,
timeout_ms: u64,
query_configs: HashMap<String, QueryConfig>,
adaptive_config: AdaptiveBatchConfig,
client: Client,
batch_endpoints_enabled: bool,
}
impl AdaptiveHttpReaction {
pub fn builder(id: impl Into<String>) -> HttpAdaptiveReactionBuilder {
HttpAdaptiveReactionBuilder::new(id)
}
pub fn new(
id: impl Into<String>,
queries: Vec<String>,
config: HttpAdaptiveReactionConfig,
) -> Self {
Self::create_internal(id.into(), queries, config, None, true)
}
pub(crate) fn from_builder(
id: String,
queries: Vec<String>,
config: HttpAdaptiveReactionConfig,
priority_queue_capacity: Option<usize>,
auto_start: bool,
) -> Self {
Self::create_internal(id, queries, config, priority_queue_capacity, auto_start)
}
fn create_internal(
id: String,
queries: Vec<String>,
config: HttpAdaptiveReactionConfig,
priority_queue_capacity: Option<usize>,
auto_start: bool,
) -> Self {
let utils_adaptive_config = AdaptiveBatchConfig {
min_batch_size: config.adaptive.adaptive_min_batch_size,
max_batch_size: config.adaptive.adaptive_max_batch_size,
throughput_window: Duration::from_millis(
config.adaptive.adaptive_window_size as u64 * 100,
),
max_wait_time: Duration::from_millis(config.adaptive.adaptive_batch_timeout_ms),
min_wait_time: Duration::from_millis(100),
adaptive_enabled: true,
};
let batch_endpoints_enabled = true;
let client = Client::builder()
.timeout(Duration::from_millis(config.timeout_ms))
.pool_idle_timeout(Duration::from_secs(90))
.pool_max_idle_per_host(10)
.http2_prior_knowledge() .build()
.unwrap_or_else(|_| Client::new());
let mut params = ReactionBaseParams::new(id, queries).with_auto_start(auto_start);
if let Some(capacity) = priority_queue_capacity {
params = params.with_priority_queue_capacity(capacity);
}
Self {
base: ReactionBase::new(params),
base_url: config.base_url.clone(),
token: config.token.clone(),
timeout_ms: config.timeout_ms,
query_configs: config.routes.clone(),
adaptive_config: utils_adaptive_config,
client,
batch_endpoints_enabled,
config,
}
}
async fn send_batch(
&self,
batch: Vec<(String, Vec<ResultDiff>)>,
reaction_name: &str,
) -> Result<()> {
if batch.is_empty() {
return Ok(());
}
let mut batches_by_query: HashMap<String, Vec<ResultDiff>> = HashMap::new();
for (query_id, results) in batch {
batches_by_query
.entry(query_id)
.or_default()
.extend(results);
}
if self.batch_endpoints_enabled && batches_by_query.values().any(|v| v.len() > 1) {
let batch_results: Vec<BatchResult> = batches_by_query
.into_iter()
.map(|(query_id, results)| BatchResult {
query_id: query_id.clone(),
count: results.len(),
results,
timestamp: chrono::Utc::now().to_rfc3339(),
})
.collect();
let batch_url = format!("{}/batch", self.base_url);
let body = serde_json::to_string(&batch_results)?;
let mut headers = HeaderMap::new();
headers.insert("Content-Type", HeaderValue::from_static("application/json"));
if let Some(ref token) = self.token {
headers.insert(
"Authorization",
HeaderValue::from_str(&format!("Bearer {token}"))?,
);
}
debug!(
"[{}] Sending batch of {} results to {}",
reaction_name,
batch_results.iter().map(|b| b.count).sum::<usize>(),
batch_url
);
let response = self
.client
.post(&batch_url)
.headers(headers)
.body(body)
.send()
.await?;
let status = response.status();
if !status.is_success() {
let error_body = response
.text()
.await
.unwrap_or_else(|_| "Unable to read response body".to_string());
warn!(
"[{}] Batch HTTP request failed with status {}: {}",
reaction_name,
status.as_u16(),
error_body
);
} else {
debug!("[{reaction_name}] Batch sent successfully");
}
} else {
for (query_id, results) in batches_by_query {
for result in results {
if let Err(e) = self
.send_single_result(&query_id, &result, reaction_name)
.await
{
error!("[{reaction_name}] Failed to send result: {e}");
}
}
}
}
Ok(())
}
async fn send_single_result(
&self,
query_id: &str,
result: &ResultDiff,
reaction_name: &str,
) -> Result<()> {
let operation = match result {
ResultDiff::Add { .. } => "added",
ResultDiff::Update { .. } | ResultDiff::Aggregation { .. } => "updated",
ResultDiff::Delete { .. } => "deleted",
ResultDiff::Noop => return Ok(()),
};
let call_spec = match self.query_configs.get(query_id) {
Some(config) => match operation {
"added" => config.added.as_ref(),
"updated" => config.updated.as_ref(),
"deleted" => config.deleted.as_ref(),
_ => None,
},
None => None,
};
if let Some(call_spec) = call_spec {
let mut context = Map::new();
let data =
serde_json::to_value(result).expect("ResultDiff serialization should succeed");
context.insert("data".to_string(), data.clone());
context.insert("query_id".to_string(), Value::String(query_id.to_string()));
context.insert(
"operation".to_string(),
Value::String(operation.to_string()),
);
let handlebars = Handlebars::new();
let full_url = handlebars
.render_template(&format!("{}{}", self.base_url, call_spec.url), &context)?;
let body = if !call_spec.body.is_empty() {
handlebars.render_template(&call_spec.body, &context)?
} else {
serde_json::to_string(&data)?
};
let mut headers = HeaderMap::new();
headers.insert("Content-Type", HeaderValue::from_static("application/json"));
if let Some(ref token) = self.token {
headers.insert(
"Authorization",
HeaderValue::from_str(&format!("Bearer {token}"))?,
);
}
for (key, value) in &call_spec.headers {
let header_name = HeaderName::from_bytes(key.as_bytes())?;
let header_value =
HeaderValue::from_str(&handlebars.render_template(value, &context)?)?;
headers.insert(header_name, header_value);
}
let method = match call_spec.method.to_uppercase().as_str() {
"GET" => Method::GET,
"POST" => Method::POST,
"PUT" => Method::PUT,
"DELETE" => Method::DELETE,
"PATCH" => Method::PATCH,
_ => Method::POST,
};
let response = self
.client
.request(method, &full_url)
.headers(headers)
.body(body)
.send()
.await?;
let status = response.status();
if !status.is_success() {
let error_body = response
.text()
.await
.unwrap_or_else(|_| "Unable to read response body".to_string());
warn!(
"[{}] HTTP request failed with status {}: {}",
reaction_name,
status.as_u16(),
error_body
);
}
}
Ok(())
}
async fn run_internal(
reaction_name: String,
base: ReactionBase,
adaptive_config: AdaptiveBatchConfig,
sender: Arc<Self>,
mut shutdown_rx: tokio::sync::oneshot::Receiver<()>,
) {
let batch_channel_capacity = adaptive_config.recommended_channel_capacity();
let (batch_tx, batch_rx) = mpsc::channel(batch_channel_capacity);
debug!(
"[{}] HttpAdaptiveReaction using batch channel capacity: {} (max_batch_size: {} × 5)",
reaction_name, batch_channel_capacity, adaptive_config.max_batch_size
);
let batcher_handle = tokio::spawn({
let reaction_name = reaction_name.clone();
let sender = sender.clone();
async move {
let mut batcher = AdaptiveBatcher::new(batch_rx, adaptive_config);
let mut total_batches = 0u64;
let mut total_results = 0u64;
info!("[{reaction_name}] Adaptive HTTP batcher started");
while let Some(batch) = batcher.next_batch().await {
if batch.is_empty() {
continue;
}
let batch_size = batch
.iter()
.map(|(_, v): &(String, Vec<ResultDiff>)| v.len())
.sum::<usize>();
total_results += batch_size as u64;
total_batches += 1;
debug!("[{reaction_name}] Processing adaptive batch of {batch_size} results");
if let Err(e) = sender.send_batch(batch, &reaction_name).await {
error!("[{reaction_name}] Failed to send batch: {e}");
}
if total_batches.is_multiple_of(100) {
info!(
"[{}] Adaptive HTTP metrics - Batches: {}, Results: {}, Avg batch size: {:.1}",
reaction_name,
total_batches,
total_results,
total_results as f64 / total_batches as f64
);
}
}
info!(
"[{reaction_name}] Adaptive HTTP batcher stopped - Total batches: {total_batches}, Total results: {total_results}"
);
}
});
loop {
let query_result_arc = tokio::select! {
biased;
_ = &mut shutdown_rx => {
debug!("[{reaction_name}] Received shutdown signal, exiting processing loop");
break;
}
result = base.priority_queue.dequeue() => result,
};
let query_result = query_result_arc.as_ref();
if !matches!(base.get_status().await, ComponentStatus::Running) {
info!("[{reaction_name}] Reaction status changed to non-running, exiting");
break;
}
if query_result.results.is_empty() {
continue;
}
if batch_tx
.send((query_result.query_id.clone(), query_result.results.clone()))
.await
.is_err()
{
error!("[{reaction_name}] Failed to send to batch channel");
break;
}
}
drop(batch_tx);
let _ = tokio::time::timeout(Duration::from_secs(5), batcher_handle).await;
info!("[{reaction_name}] Adaptive HTTP reaction completed");
}
}
#[async_trait]
impl Reaction for AdaptiveHttpReaction {
fn id(&self) -> &str {
&self.base.id
}
fn type_name(&self) -> &str {
"http_adaptive"
}
fn properties(&self) -> HashMap<String, serde_json::Value> {
use crate::descriptor::{CallSpecDto, HttpAdaptiveReactionConfigDto, HttpQueryConfigDto};
use drasi_plugin_sdk::ConfigValue;
fn map_call_to_dto(cs: &drasi_reaction_http::CallSpec) -> CallSpecDto {
CallSpecDto {
url: cs.url.clone(),
method: cs.method.clone(),
body: cs.body.clone(),
headers: cs.headers.clone(),
}
}
fn map_qc_to_dto(qc: &drasi_reaction_http::QueryConfig) -> HttpQueryConfigDto {
HttpQueryConfigDto {
added: qc.added.as_ref().map(map_call_to_dto),
updated: qc.updated.as_ref().map(map_call_to_dto),
deleted: qc.deleted.as_ref().map(map_call_to_dto),
}
}
let dto = HttpAdaptiveReactionConfigDto {
base_url: ConfigValue::Static(self.config.base_url.clone()),
token: self
.config
.token
.as_ref()
.map(|t| ConfigValue::Static(t.clone())),
timeout_ms: Some(ConfigValue::Static(self.config.timeout_ms)),
routes: self
.config
.routes
.iter()
.map(|(k, v)| (k.clone(), map_qc_to_dto(v)))
.collect(),
adaptive_min_batch_size: Some(ConfigValue::Static(
self.config.adaptive.adaptive_min_batch_size,
)),
adaptive_max_batch_size: Some(ConfigValue::Static(
self.config.adaptive.adaptive_max_batch_size,
)),
adaptive_window_size: Some(ConfigValue::Static(
self.config.adaptive.adaptive_window_size,
)),
adaptive_batch_timeout_ms: Some(ConfigValue::Static(
self.config.adaptive.adaptive_batch_timeout_ms,
)),
};
match serde_json::to_value(&dto) {
Ok(serde_json::Value::Object(map)) => map.into_iter().collect(),
_ => HashMap::new(),
}
}
fn query_ids(&self) -> Vec<String> {
self.base.queries.clone()
}
fn auto_start(&self) -> bool {
self.base.get_auto_start()
}
async fn initialize(&self, context: drasi_lib::context::ReactionRuntimeContext) {
self.base.initialize(context).await;
}
async fn start(&self) -> Result<()> {
info!("[{}] Starting adaptive HTTP reaction", self.base.id);
self.base
.set_status(
ComponentStatus::Starting,
Some("Starting adaptive HTTP reaction".to_string()),
)
.await;
self.base
.set_status(
ComponentStatus::Running,
Some(format!(
"Adaptive HTTP reaction running - Base URL: {}, Batch endpoints: {}",
self.base_url,
if self.batch_endpoints_enabled {
"enabled"
} else {
"disabled"
}
)),
)
.await;
let base_for_arc = self.base.clone_shared();
let self_arc = Arc::new(Self {
base: base_for_arc,
config: self.config.clone(),
base_url: self.base_url.clone(),
token: self.token.clone(),
timeout_ms: self.timeout_ms,
query_configs: self.query_configs.clone(),
adaptive_config: self.adaptive_config.clone(),
client: self.client.clone(),
batch_endpoints_enabled: self.batch_endpoints_enabled,
});
let shutdown_rx = self.base.create_shutdown_channel().await;
let reaction_name = self.base.id.clone();
let base = self.base.clone_shared();
let adaptive_config = self.adaptive_config.clone();
let processing_task_handle = tokio::spawn(Self::run_internal(
reaction_name,
base,
adaptive_config,
self_arc,
shutdown_rx,
));
self.base.set_processing_task(processing_task_handle).await;
Ok(())
}
async fn stop(&self) -> Result<()> {
info!("[{}] Stopping adaptive HTTP reaction", self.base.id);
self.base
.set_status(
ComponentStatus::Stopping,
Some("Stopping adaptive HTTP reaction".to_string()),
)
.await;
self.base.stop_common().await?;
tokio::time::sleep(Duration::from_millis(100)).await;
self.base
.set_status(
ComponentStatus::Stopped,
Some("Adaptive HTTP reaction stopped successfully".to_string()),
)
.await;
Ok(())
}
async fn status(&self) -> ComponentStatus {
self.base.get_status().await
}
async fn enqueue_query_result(
&self,
result: drasi_lib::channels::QueryResult,
) -> anyhow::Result<()> {
self.base.enqueue_query_result(result).await
}
}