Skip to main content

drasi_source_platform/
descriptor.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Platform source plugin descriptor and configuration DTOs.
16
17use crate::{PlatformSourceBuilder, PlatformSourceConfig};
18use drasi_plugin_sdk::prelude::*;
19use utoipa::OpenApi;
20
21/// Platform source configuration DTO.
22#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
23#[schema(as = source::platform::PlatformSourceConfig)]
24#[serde(rename_all = "camelCase", deny_unknown_fields)]
25pub struct PlatformSourceConfigDto {
26    pub redis_url: ConfigValue<String>,
27    pub stream_key: ConfigValue<String>,
28    #[serde(default = "default_consumer_group")]
29    pub consumer_group: ConfigValue<String>,
30    #[serde(default, skip_serializing_if = "Option::is_none")]
31    pub consumer_name: Option<ConfigValue<String>>,
32    #[serde(default = "default_batch_size")]
33    pub batch_size: ConfigValue<usize>,
34    #[serde(default = "default_block_ms")]
35    pub block_ms: ConfigValue<u64>,
36}
37
38fn default_consumer_group() -> ConfigValue<String> {
39    ConfigValue::Static("drasi-core".to_string())
40}
41
42fn default_batch_size() -> ConfigValue<usize> {
43    ConfigValue::Static(100)
44}
45
46fn default_block_ms() -> ConfigValue<u64> {
47    ConfigValue::Static(5000)
48}
49
50#[derive(OpenApi)]
51#[openapi(components(schemas(PlatformSourceConfigDto)))]
52struct PlatformSourceSchemas;
53
54/// Descriptor for the platform source plugin.
55pub struct PlatformSourceDescriptor;
56
57#[async_trait]
58impl SourcePluginDescriptor for PlatformSourceDescriptor {
59    fn kind(&self) -> &str {
60        "platform"
61    }
62
63    fn config_version(&self) -> &str {
64        "1.0.0"
65    }
66
67    fn config_schema_name(&self) -> &str {
68        "source.platform.PlatformSourceConfig"
69    }
70
71    fn config_schema_json(&self) -> String {
72        let api = PlatformSourceSchemas::openapi();
73        serde_json::to_string(
74            &api.components
75                .as_ref()
76                .expect("OpenAPI components missing")
77                .schemas,
78        )
79        .expect("Failed to serialize config schema")
80    }
81
82    async fn create_source(
83        &self,
84        id: &str,
85        config_json: &serde_json::Value,
86        auto_start: bool,
87    ) -> anyhow::Result<Box<dyn drasi_lib::sources::Source>> {
88        let dto: PlatformSourceConfigDto = serde_json::from_value(config_json.clone())?;
89        let mapper = DtoMapper::new();
90
91        let config = PlatformSourceConfig {
92            redis_url: mapper.resolve_string(&dto.redis_url)?,
93            stream_key: mapper.resolve_string(&dto.stream_key)?,
94            consumer_group: mapper.resolve_string(&dto.consumer_group)?,
95            consumer_name: mapper.resolve_optional(&dto.consumer_name)?,
96            batch_size: mapper.resolve_typed(&dto.batch_size)?,
97            block_ms: mapper.resolve_typed(&dto.block_ms)?,
98        };
99
100        let source = PlatformSourceBuilder::new(id)
101            .with_config(config)
102            .with_auto_start(auto_start)
103            .build()?;
104
105        Ok(Box::new(source))
106    }
107}