use crate::{PlatformSourceBuilder, PlatformSourceConfig};
use drasi_plugin_sdk::prelude::*;
use utoipa::OpenApi;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
#[schema(as = source::platform::PlatformSourceConfig)]
#[serde(rename_all = "camelCase", deny_unknown_fields)]
pub struct PlatformSourceConfigDto {
pub redis_url: ConfigValue<String>,
pub stream_key: ConfigValue<String>,
#[serde(default = "default_consumer_group")]
pub consumer_group: ConfigValue<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub consumer_name: Option<ConfigValue<String>>,
#[serde(default = "default_batch_size")]
pub batch_size: ConfigValue<usize>,
#[serde(default = "default_block_ms")]
pub block_ms: ConfigValue<u64>,
}
fn default_consumer_group() -> ConfigValue<String> {
ConfigValue::Static("drasi-core".to_string())
}
fn default_batch_size() -> ConfigValue<usize> {
ConfigValue::Static(100)
}
fn default_block_ms() -> ConfigValue<u64> {
ConfigValue::Static(5000)
}
#[derive(OpenApi)]
#[openapi(components(schemas(PlatformSourceConfigDto)))]
struct PlatformSourceSchemas;
pub struct PlatformSourceDescriptor;
#[async_trait]
impl SourcePluginDescriptor for PlatformSourceDescriptor {
fn kind(&self) -> &str {
"platform"
}
fn config_version(&self) -> &str {
"1.0.0"
}
fn config_schema_name(&self) -> &str {
"source.platform.PlatformSourceConfig"
}
fn config_schema_json(&self) -> String {
let api = PlatformSourceSchemas::openapi();
serde_json::to_string(
&api.components
.as_ref()
.expect("OpenAPI components missing")
.schemas,
)
.expect("Failed to serialize config schema")
}
async fn create_source(
&self,
id: &str,
config_json: &serde_json::Value,
auto_start: bool,
) -> anyhow::Result<Box<dyn drasi_lib::sources::Source>> {
let dto: PlatformSourceConfigDto = serde_json::from_value(config_json.clone())?;
let mapper = DtoMapper::new();
let config = PlatformSourceConfig {
redis_url: mapper.resolve_string(&dto.redis_url)?,
stream_key: mapper.resolve_string(&dto.stream_key)?,
consumer_group: mapper.resolve_string(&dto.consumer_group)?,
consumer_name: mapper.resolve_optional(&dto.consumer_name)?,
batch_size: mapper.resolve_typed(&dto.batch_size)?,
block_ms: mapper.resolve_typed(&dto.block_ms)?,
};
let source = PlatformSourceBuilder::new(id)
.with_config(config)
.with_auto_start(auto_start)
.build()?;
Ok(Box::new(source))
}
}