pub use super::config::PlatformReactionConfig;
use crate::publisher;
use crate::transformer;
use crate::types::{CloudEvent, CloudEventConfig, ControlSignal, ResultControlEvent, ResultEvent};
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use drasi_lib::channels::ComponentStatus;
use drasi_lib::managers::log_component_start;
use drasi_lib::reactions::common::base::{ReactionBase, ReactionBaseParams};
use drasi_lib::Reaction;
use publisher::{PublisherConfig, RedisStreamPublisher};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
pub struct PlatformReaction {
base: ReactionBase,
config: PlatformReactionConfig,
publisher: Arc<RedisStreamPublisher>,
sequence_counter: Arc<RwLock<i64>>,
cloud_event_config: CloudEventConfig,
emit_control_events: bool,
batch_enabled: bool,
batch_max_size: usize,
batch_max_wait_ms: u64,
}
use super::PlatformReactionBuilder;
impl PlatformReaction {
pub fn builder(id: impl Into<String>) -> PlatformReactionBuilder {
PlatformReactionBuilder::new(id)
}
pub fn new(
id: impl Into<String>,
queries: Vec<String>,
config: PlatformReactionConfig,
) -> Result<Self> {
Self::create_internal(id.into(), queries, config, None, true)
}
pub(crate) fn from_builder(
id: String,
queries: Vec<String>,
config: PlatformReactionConfig,
priority_queue_capacity: Option<usize>,
auto_start: bool,
) -> Result<Self> {
Self::create_internal(id, queries, config, priority_queue_capacity, auto_start)
}
fn create_internal(
id: String,
queries: Vec<String>,
config: PlatformReactionConfig,
priority_queue_capacity: Option<usize>,
auto_start: bool,
) -> Result<Self> {
let redis_url = config.redis_url.clone();
let pubsub_name = config
.pubsub_name
.clone()
.unwrap_or_else(|| "drasi-pubsub".to_string());
let source_name = config
.source_name
.clone()
.unwrap_or_else(|| "drasi-core".to_string());
let max_stream_length = config.max_stream_length;
let emit_control_events = config.emit_control_events;
let batch_enabled = config.batch_enabled;
let batch_max_size = config.batch_max_size;
let batch_max_wait_ms = config.batch_max_wait_ms;
if batch_max_size == 0 {
return Err(anyhow!("batch_max_size must be greater than 0"));
}
if batch_max_size > 10000 {
log::warn!(
"batch_max_size {batch_max_size} is very large, consider using a smaller value (recommended: 100-1000)"
);
}
if batch_max_wait_ms > 1000 {
log::warn!(
"batch_max_wait_ms {batch_max_wait_ms} is large and may increase latency (recommended: 1-100ms)"
);
}
let publisher_config = PublisherConfig { max_stream_length };
let publisher = RedisStreamPublisher::new(&redis_url, publisher_config)
.context("Failed to create Redis Stream Publisher")?;
let cloud_event_config = CloudEventConfig::with_values(pubsub_name, source_name);
let mut params = ReactionBaseParams::new(id, queries).with_auto_start(auto_start);
if let Some(capacity) = priority_queue_capacity {
params = params.with_priority_queue_capacity(capacity);
}
Ok(Self {
base: ReactionBase::new(params),
config,
publisher: Arc::new(publisher),
sequence_counter: Arc::new(RwLock::new(0)),
cloud_event_config,
emit_control_events,
batch_enabled,
batch_max_size,
batch_max_wait_ms,
})
}
async fn emit_control_event(&self, signal: ControlSignal) -> Result<()> {
if !self.emit_control_events {
return Ok(());
}
let query_id = self
.base
.queries
.first()
.ok_or_else(|| anyhow!("No queries configured for reaction"))?;
let sequence = {
let mut counter = self.sequence_counter.write().await;
*counter += 1;
*counter as u64
};
let result_event = ResultEvent::from_control_signal(
query_id,
sequence,
chrono::Utc::now().timestamp_millis() as u64,
signal,
);
let cloud_event = CloudEvent::new(result_event, query_id, &self.cloud_event_config);
self.publisher.publish(cloud_event).await?;
Ok(())
}
}
#[async_trait]
impl Reaction for PlatformReaction {
fn id(&self) -> &str {
&self.base.id
}
fn type_name(&self) -> &str {
"platform"
}
fn properties(&self) -> HashMap<String, serde_json::Value> {
use crate::descriptor::PlatformReactionConfigDto;
use drasi_plugin_sdk::ConfigValue;
let dto = PlatformReactionConfigDto {
redis_url: ConfigValue::Static(self.config.redis_url.clone()),
pubsub_name: self
.config
.pubsub_name
.as_ref()
.map(|n| ConfigValue::Static(n.clone())),
source_name: self
.config
.source_name
.as_ref()
.map(|n| ConfigValue::Static(n.clone())),
max_stream_length: self.config.max_stream_length.map(ConfigValue::Static),
emit_control_events: Some(ConfigValue::Static(self.config.emit_control_events)),
batch_enabled: Some(ConfigValue::Static(self.config.batch_enabled)),
batch_max_size: Some(ConfigValue::Static(self.config.batch_max_size)),
batch_max_wait_ms: Some(ConfigValue::Static(self.config.batch_max_wait_ms)),
};
match serde_json::to_value(&dto) {
Ok(serde_json::Value::Object(map)) => map.into_iter().collect(),
_ => HashMap::new(),
}
}
fn query_ids(&self) -> Vec<String> {
self.base.queries.clone()
}
fn auto_start(&self) -> bool {
self.base.get_auto_start()
}
async fn initialize(&self, context: drasi_lib::context::ReactionRuntimeContext) {
self.base.initialize(context).await;
}
async fn start(&self) -> Result<()> {
log_component_start("Reaction", &self.base.id);
self.base
.set_status(
ComponentStatus::Starting,
Some("Starting platform reaction".to_string()),
)
.await;
if let Err(e) = self.emit_control_event(ControlSignal::Running).await {
log::warn!("Failed to emit Running control event: {e}");
}
self.base
.set_status(
ComponentStatus::Running,
Some("Platform reaction started".to_string()),
)
.await;
let mut shutdown_rx = self.base.create_shutdown_channel().await;
let publisher = self.publisher.clone();
let sequence_counter = self.sequence_counter.clone();
let cloud_event_config = self.cloud_event_config.clone();
let reaction_id = self.base.id.clone();
let emit_control_events = self.emit_control_events;
let priority_queue = self.base.priority_queue.clone();
let batch_enabled = self.batch_enabled;
let batch_max_size = self.batch_max_size;
let batch_max_wait_ms = self.batch_max_wait_ms;
let processing_task_handle = tokio::spawn(async move {
log::debug!("Platform Reaction '{reaction_id}' processing task started");
let mut event_buffer: Vec<CloudEvent<ResultEvent>> = Vec::new();
let mut last_flush_time = std::time::Instant::now();
loop {
let query_result = tokio::select! {
biased;
_ = &mut shutdown_rx => {
log::debug!("[{reaction_id}] Received shutdown signal, exiting processing loop");
break;
}
result = priority_queue.dequeue() => result,
};
if let Some(control_signal) = query_result.metadata.get("control_signal") {
if !event_buffer.is_empty() {
log::debug!(
"Flushing {} buffered events before control signal",
event_buffer.len()
);
if let Err(e) = publisher
.publish_batch_with_retry(event_buffer.clone(), 3)
.await
{
log::error!(
"Failed to publish buffered events before control signal: {e}"
);
}
event_buffer.clear();
last_flush_time = std::time::Instant::now();
}
if emit_control_events {
if let Some(signal_type) = control_signal.as_str() {
let control_signal = match signal_type {
"bootstrapStarted" => ControlSignal::BootstrapStarted,
"bootstrapCompleted" => ControlSignal::BootstrapCompleted,
_ => {
log::debug!("Unknown control signal type: {signal_type}");
continue;
}
};
let control_sequence = {
let mut counter = sequence_counter.write().await;
*counter += 1;
*counter
};
let control_event = ResultControlEvent {
query_id: query_result.query_id.clone(),
sequence: control_sequence as u64,
source_time_ms: query_result.timestamp.timestamp_millis() as u64,
metadata: None,
control_signal: control_signal.clone(),
};
let cloud_event = CloudEvent::new(
ResultEvent::Control(control_event),
&query_result.query_id,
&cloud_event_config,
);
if let Err(e) = publisher.publish_with_retry(cloud_event, 3).await {
log::warn!("Failed to emit control event {signal_type}: {e}");
} else {
log::info!("Emitted control event: {signal_type}");
}
}
}
continue;
}
let mut query_result_mut = (*query_result).clone();
if let Some(ref mut profiling) = query_result_mut.profiling {
profiling.reaction_receive_ns = Some(drasi_lib::profiling::timestamp_ns());
}
let sequence = {
let mut counter = sequence_counter.write().await;
*counter += 1;
*counter
};
match transformer::transform_query_result(
query_result_mut.clone(),
sequence,
sequence as u64,
) {
Ok(result_event) => {
let cloud_event = CloudEvent::new(
result_event,
&query_result_mut.query_id,
&cloud_event_config,
);
if batch_enabled {
event_buffer.push(cloud_event);
let should_flush = event_buffer.len() >= batch_max_size
|| last_flush_time.elapsed().as_millis()
>= batch_max_wait_ms as u128;
if should_flush {
log::debug!(
"Flushing batch of {} events (size: {}, time_elapsed: {}ms)",
event_buffer.len(),
event_buffer.len() >= batch_max_size,
last_flush_time.elapsed().as_millis()
);
match publisher
.publish_batch_with_retry(event_buffer.clone(), 3)
.await
{
Ok(message_ids) => {
log::debug!(
"Published batch of {} query results",
message_ids.len()
);
}
Err(e) => {
log::error!(
"Failed to publish batch of {} query results: {}",
event_buffer.len(),
e
);
}
}
event_buffer.clear();
last_flush_time = std::time::Instant::now();
}
} else {
match publisher.publish_with_retry(cloud_event, 3).await {
Ok(message_id) => {
log::debug!(
"Published query result for '{}' (sequence: {}, message_id: {})",
query_result_mut.query_id,
sequence,
message_id
);
}
Err(e) => {
log::error!(
"Failed to publish query result for '{}': {}",
query_result_mut.query_id,
e
);
}
}
}
}
Err(e) => {
log::error!(
"Failed to transform query result for '{}': {}",
query_result_mut.query_id,
e
);
}
}
}
});
self.base.set_processing_task(processing_task_handle).await;
Ok(())
}
async fn stop(&self) -> Result<()> {
self.base.stop_common().await?;
if let Err(e) = self.emit_control_event(ControlSignal::Stopped).await {
log::warn!("Failed to emit Stopped control event: {e}");
}
self.base
.set_status(
ComponentStatus::Stopped,
Some("Platform reaction stopped".to_string()),
)
.await;
Ok(())
}
async fn status(&self) -> ComponentStatus {
self.base.get_status().await
}
async fn enqueue_query_result(
&self,
result: drasi_lib::channels::QueryResult,
) -> anyhow::Result<()> {
self.base.enqueue_query_result(result).await
}
}