use drasi_lib::reactions::Reaction;
use drasi_plugin_sdk::prelude::*;
use utoipa::OpenApi;
use crate::PlatformReactionBuilder;
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
#[schema(as = reaction::platform::PlatformReactionConfig)]
#[serde(rename_all = "camelCase")]
pub struct PlatformReactionConfigDto {
#[schema(value_type = ConfigValueString)]
pub redis_url: ConfigValue<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[schema(value_type = Option<ConfigValueString>)]
pub pubsub_name: Option<ConfigValue<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
#[schema(value_type = Option<ConfigValueString>)]
pub source_name: Option<ConfigValue<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
#[schema(value_type = Option<ConfigValueUsize>)]
pub max_stream_length: Option<ConfigValue<usize>>,
#[serde(skip_serializing_if = "Option::is_none")]
#[schema(value_type = Option<ConfigValueBool>)]
pub emit_control_events: Option<ConfigValue<bool>>,
#[serde(skip_serializing_if = "Option::is_none")]
#[schema(value_type = Option<ConfigValueBool>)]
pub batch_enabled: Option<ConfigValue<bool>>,
#[serde(skip_serializing_if = "Option::is_none")]
#[schema(value_type = Option<ConfigValueUsize>)]
pub batch_max_size: Option<ConfigValue<usize>>,
#[serde(skip_serializing_if = "Option::is_none")]
#[schema(value_type = Option<ConfigValueU64>)]
pub batch_max_wait_ms: Option<ConfigValue<u64>>,
}
#[derive(OpenApi)]
#[openapi(components(schemas(PlatformReactionConfigDto)))]
struct PlatformReactionSchemas;
pub struct PlatformReactionDescriptor;
#[async_trait]
impl ReactionPluginDescriptor for PlatformReactionDescriptor {
fn kind(&self) -> &str {
"platform"
}
fn config_version(&self) -> &str {
"1.0.0"
}
fn config_schema_name(&self) -> &str {
"reaction.platform.PlatformReactionConfig"
}
fn config_schema_json(&self) -> String {
let api = PlatformReactionSchemas::openapi();
serde_json::to_string(
&api.components
.as_ref()
.expect("OpenAPI components missing")
.schemas,
)
.expect("Failed to serialize config schema")
}
async fn create_reaction(
&self,
id: &str,
query_ids: Vec<String>,
config_json: &serde_json::Value,
auto_start: bool,
) -> anyhow::Result<Box<dyn Reaction>> {
let dto: PlatformReactionConfigDto = serde_json::from_value(config_json.clone())?;
let mapper = DtoMapper::new();
let mut builder = PlatformReactionBuilder::new(id)
.with_queries(query_ids)
.with_auto_start(auto_start)
.with_redis_url(mapper.resolve_string(&dto.redis_url)?);
if let Some(ref v) = dto.pubsub_name {
builder = builder.with_pubsub_name(mapper.resolve_string(v)?);
}
if let Some(ref v) = dto.source_name {
builder = builder.with_source_name(mapper.resolve_string(v)?);
}
if let Some(ref v) = dto.max_stream_length {
builder = builder.with_max_stream_length(mapper.resolve_typed(v)?);
}
if let Some(ref v) = dto.emit_control_events {
builder = builder.with_emit_control_events(mapper.resolve_typed(v)?);
}
if let Some(ref v) = dto.batch_enabled {
builder = builder.with_batch_enabled(mapper.resolve_typed(v)?);
}
if let Some(ref v) = dto.batch_max_size {
builder = builder.with_batch_max_size(mapper.resolve_typed(v)?);
}
if let Some(ref v) = dto.batch_max_wait_ms {
builder = builder.with_batch_max_wait_ms(mapper.resolve_typed(v)?);
}
let reaction = builder.build()?;
Ok(Box::new(reaction))
}
}