drasi_source_platform/
descriptor.rs1use crate::{PlatformSourceBuilder, PlatformSourceConfig};
18use drasi_plugin_sdk::prelude::*;
19use utoipa::OpenApi;
20
21#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
23#[schema(as = source::platform::PlatformSourceConfig)]
24#[serde(rename_all = "camelCase", deny_unknown_fields)]
25pub struct PlatformSourceConfigDto {
26 pub redis_url: ConfigValue<String>,
27 pub stream_key: ConfigValue<String>,
28 #[serde(default = "default_consumer_group")]
29 pub consumer_group: ConfigValue<String>,
30 #[serde(default, skip_serializing_if = "Option::is_none")]
31 pub consumer_name: Option<ConfigValue<String>>,
32 #[serde(default = "default_batch_size")]
33 pub batch_size: ConfigValue<usize>,
34 #[serde(default = "default_block_ms")]
35 pub block_ms: ConfigValue<u64>,
36}
37
38fn default_consumer_group() -> ConfigValue<String> {
39 ConfigValue::Static("drasi-core".to_string())
40}
41
42fn default_batch_size() -> ConfigValue<usize> {
43 ConfigValue::Static(100)
44}
45
46fn default_block_ms() -> ConfigValue<u64> {
47 ConfigValue::Static(5000)
48}
49
50#[derive(OpenApi)]
51#[openapi(components(schemas(PlatformSourceConfigDto)))]
52struct PlatformSourceSchemas;
53
54pub struct PlatformSourceDescriptor;
56
57#[async_trait]
58impl SourcePluginDescriptor for PlatformSourceDescriptor {
59 fn kind(&self) -> &str {
60 "platform"
61 }
62
63 fn config_version(&self) -> &str {
64 "1.0.0"
65 }
66
67 fn config_schema_name(&self) -> &str {
68 "source.platform.PlatformSourceConfig"
69 }
70
71 fn config_schema_json(&self) -> String {
72 let api = PlatformSourceSchemas::openapi();
73 serde_json::to_string(
74 &api.components
75 .as_ref()
76 .expect("OpenAPI components missing")
77 .schemas,
78 )
79 .expect("Failed to serialize config schema")
80 }
81
82 async fn create_source(
83 &self,
84 id: &str,
85 config_json: &serde_json::Value,
86 auto_start: bool,
87 ) -> anyhow::Result<Box<dyn drasi_lib::sources::Source>> {
88 let dto: PlatformSourceConfigDto = serde_json::from_value(config_json.clone())?;
89 let mapper = DtoMapper::new();
90
91 let config = PlatformSourceConfig {
92 redis_url: mapper.resolve_string(&dto.redis_url)?,
93 stream_key: mapper.resolve_string(&dto.stream_key)?,
94 consumer_group: mapper.resolve_string(&dto.consumer_group)?,
95 consumer_name: mapper.resolve_optional(&dto.consumer_name)?,
96 batch_size: mapper.resolve_typed(&dto.batch_size)?,
97 block_ms: mapper.resolve_typed(&dto.block_ms)?,
98 };
99
100 let source = PlatformSourceBuilder::new(id)
101 .with_config(config)
102 .with_auto_start(auto_start)
103 .build()?;
104
105 Ok(Box::new(source))
106 }
107}