pub use super::config::ApplicationReactionConfig;
use crate::subscription::{Subscription, SubscriptionOptions};
use anyhow::Result;
use async_trait::async_trait;
use log::{debug, error, info};
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use drasi_lib::channels::{ComponentStatus, QueryResult};
use drasi_lib::managers::log_component_start;
use drasi_lib::reactions::common::base::{ReactionBase, ReactionBaseParams};
use drasi_lib::Reaction;
use std::collections::HashMap;
#[derive(Clone)]
pub struct ApplicationReactionHandle {
rx: Arc<RwLock<Option<mpsc::Receiver<QueryResult>>>>,
reaction_id: String,
}
impl ApplicationReactionHandle {
pub async fn take_receiver(&self) -> Option<mpsc::Receiver<QueryResult>> {
self.rx.write().await.take()
}
pub async fn subscribe<F>(&self, mut callback: F) -> Result<()>
where
F: FnMut(QueryResult) + Send + 'static,
{
if let Some(mut rx) = self.take_receiver().await {
tokio::spawn(async move {
while let Some(result) = rx.recv().await {
callback(result);
}
});
Ok(())
} else {
Err(anyhow::anyhow!("Receiver already taken"))
}
}
pub async fn subscribe_filtered<F>(
&self,
query_filter: Vec<String>,
mut callback: F,
) -> Result<()>
where
F: FnMut(QueryResult) + Send + 'static,
{
if let Some(mut rx) = self.take_receiver().await {
tokio::spawn(async move {
while let Some(result) = rx.recv().await {
if query_filter.is_empty() || query_filter.contains(&result.query_id) {
callback(result);
}
}
});
Ok(())
} else {
Err(anyhow::anyhow!("Receiver already taken"))
}
}
pub async fn as_stream(&self) -> Option<ResultStream> {
self.take_receiver().await.map(ResultStream::new)
}
pub async fn subscribe_with_options(
&self,
options: SubscriptionOptions,
) -> Result<Subscription> {
if let Some(rx) = self.take_receiver().await {
Ok(Subscription::new(rx, options))
} else {
Err(anyhow::anyhow!("Receiver already taken"))
}
}
pub fn reaction_id(&self) -> &str {
&self.reaction_id
}
}
pub struct ResultStream {
rx: mpsc::Receiver<QueryResult>,
}
impl ResultStream {
fn new(rx: mpsc::Receiver<QueryResult>) -> Self {
Self { rx }
}
pub async fn next(&mut self) -> Option<QueryResult> {
self.rx.recv().await
}
pub fn try_next(&mut self) -> Option<QueryResult> {
self.rx.try_recv().ok()
}
}
use super::ApplicationReactionBuilder;
pub struct ApplicationReaction {
base: ReactionBase,
app_tx: mpsc::Sender<QueryResult>,
}
impl ApplicationReaction {
pub fn builder(id: impl Into<String>) -> ApplicationReactionBuilder {
ApplicationReactionBuilder::new(id)
}
pub fn new(id: impl Into<String>, queries: Vec<String>) -> (Self, ApplicationReactionHandle) {
Self::create_internal(id.into(), queries, None, true)
}
pub(crate) fn from_builder(
id: String,
queries: Vec<String>,
priority_queue_capacity: Option<usize>,
auto_start: bool,
) -> (Self, ApplicationReactionHandle) {
Self::create_internal(id, queries, priority_queue_capacity, auto_start)
}
fn create_internal(
id: String,
queries: Vec<String>,
priority_queue_capacity: Option<usize>,
auto_start: bool,
) -> (Self, ApplicationReactionHandle) {
let (app_tx, app_rx) = mpsc::channel(1000);
let handle = ApplicationReactionHandle {
rx: Arc::new(RwLock::new(Some(app_rx))),
reaction_id: id.clone(),
};
let mut params = ReactionBaseParams::new(id, queries).with_auto_start(auto_start);
if let Some(capacity) = priority_queue_capacity {
params = params.with_priority_queue_capacity(capacity);
}
let reaction = Self {
base: ReactionBase::new(params),
app_tx,
};
(reaction, handle)
}
}
#[async_trait]
impl Reaction for ApplicationReaction {
fn id(&self) -> &str {
&self.base.id
}
fn type_name(&self) -> &str {
"application"
}
fn properties(&self) -> HashMap<String, serde_json::Value> {
HashMap::new()
}
fn query_ids(&self) -> Vec<String> {
self.base.queries.clone()
}
fn auto_start(&self) -> bool {
self.base.get_auto_start()
}
async fn initialize(&self, context: drasi_lib::context::ReactionRuntimeContext) {
self.base.initialize(context).await;
}
async fn start(&self) -> Result<()> {
log_component_start("Reaction", &self.base.id);
self.base
.set_status_with_event(
ComponentStatus::Starting,
Some("Starting application reaction".to_string()),
)
.await?;
self.base
.set_status_with_event(
ComponentStatus::Running,
Some("Application reaction started".to_string()),
)
.await?;
let mut shutdown_rx = self.base.create_shutdown_channel().await;
let priority_queue = self.base.priority_queue.clone();
let reaction_name = self.base.id.clone();
let app_tx = self.app_tx.clone();
let query_filter = self.base.queries.clone();
let processing_task = tokio::spawn(async move {
info!("ApplicationReaction '{reaction_name}' result processor started");
loop {
let query_result_arc = tokio::select! {
biased;
_ = &mut shutdown_rx => {
debug!("[{reaction_name}] Received shutdown signal, exiting processing loop");
break;
}
result = priority_queue.dequeue() => result,
};
let query_result = (*query_result_arc).clone();
if !query_filter.is_empty() && !query_filter.contains(&query_result.query_id) {
continue;
}
debug!(
"ApplicationReaction '{}' forwarding result from query '{}': {} results",
reaction_name,
query_result.query_id,
query_result.results.len()
);
if let Err(e) = app_tx.send(query_result).await {
error!("Failed to send result to application: {e}");
break;
}
}
});
self.base.set_processing_task(processing_task).await;
Ok(())
}
async fn stop(&self) -> Result<()> {
self.base.stop_common().await?;
self.base
.set_status_with_event(
ComponentStatus::Stopped,
Some("Application reaction stopped".to_string()),
)
.await?;
Ok(())
}
async fn status(&self) -> ComponentStatus {
self.base.get_status().await
}
async fn enqueue_query_result(&self, result: QueryResult) -> anyhow::Result<()> {
self.base.enqueue_query_result(result).await
}
}
#[cfg(test)]
mod tests;