Skip to main content

drasi_source_platform/
config.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Configuration types for the platform source plugin.
16//!
17//! This module defines the configuration structure for connecting to Redis Streams
18//! and consuming CloudEvent-wrapped change events.
19
20use serde::{Deserialize, Serialize};
21
22fn default_consumer_group() -> String {
23    "drasi-core".to_string()
24}
25
26fn default_batch_size() -> usize {
27    100
28}
29
30fn default_block_ms() -> u64 {
31    5000
32}
33
34/// Platform source configuration for Redis Streams consumption.
35///
36/// This configuration defines how the platform source connects to Redis
37/// and consumes events from a stream using consumer groups.
38///
39/// # Example
40///
41/// ```rust
42/// use drasi_source_platform::PlatformSourceConfig;
43///
44/// let config = PlatformSourceConfig {
45///     redis_url: "redis://localhost:6379".to_string(),
46///     stream_key: "my-app-changes".to_string(),
47///     consumer_group: "my-consumers".to_string(),
48///     consumer_name: Some("consumer-1".to_string()),
49///     batch_size: 50,
50///     block_ms: 10000,
51/// };
52/// ```
53///
54/// # YAML Configuration
55///
56/// ```yaml
57/// source_type: platform
58/// properties:
59///   redis_url: "redis://localhost:6379"
60///   stream_key: "my-app-changes"
61///   consumer_group: "my-consumers"
62///   batch_size: 50
63///   block_ms: 10000
64/// ```
65#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
66pub struct PlatformSourceConfig {
67    /// Redis connection URL.
68    ///
69    /// Standard Redis connection string format.
70    ///
71    /// # Examples
72    ///
73    /// - `redis://localhost:6379` - Local Redis without auth
74    /// - `redis://:password@host:6379` - Redis with password
75    /// - `redis://user:password@host:6379` - Redis with username and password
76    /// - `rediss://host:6379` - Redis with TLS
77    pub redis_url: String,
78
79    /// Redis stream key to consume from.
80    ///
81    /// This is the name of the Redis stream that contains the CloudEvent-wrapped
82    /// change events. The stream must exist or will be created automatically
83    /// when the consumer group is created with MKSTREAM.
84    pub stream_key: String,
85
86    /// Consumer group name.
87    ///
88    /// All source instances with the same consumer group share the workload.
89    /// Each message is delivered to only one consumer in the group.
90    ///
91    /// **Default**: `"drasi-core"`
92    #[serde(default = "default_consumer_group")]
93    pub consumer_group: String,
94
95    /// Consumer name (unique within group).
96    ///
97    /// Identifies this specific consumer instance within the consumer group.
98    /// If not specified, a unique name is auto-generated based on the source ID.
99    ///
100    /// **Default**: Auto-generated from source ID
101    #[serde(default, skip_serializing_if = "Option::is_none")]
102    pub consumer_name: Option<String>,
103
104    /// Number of events to read per XREADGROUP call.
105    ///
106    /// Higher values improve throughput but increase memory usage and
107    /// may delay processing of individual events.
108    ///
109    /// **Default**: `100`
110    ///
111    /// **Valid range**: 1-10000 (recommended)
112    #[serde(default = "default_batch_size")]
113    pub batch_size: usize,
114
115    /// Milliseconds to block waiting for new events.
116    ///
117    /// When no events are available, the consumer blocks for this duration
118    /// before returning an empty result. Higher values reduce CPU usage
119    /// but increase latency for detecting source shutdown.
120    ///
121    /// **Default**: `5000` (5 seconds)
122    ///
123    /// **Valid range**: 100-60000 (recommended)
124    #[serde(default = "default_block_ms")]
125    pub block_ms: u64,
126}
127
128impl PlatformSourceConfig {
129    /// Validate the configuration.
130    ///
131    /// # Errors
132    ///
133    /// Returns an error if:
134    /// - `redis_url` is empty
135    /// - `stream_key` is empty
136    /// - `consumer_group` is empty
137    /// - `batch_size` is 0
138    pub fn validate(&self) -> anyhow::Result<()> {
139        if self.redis_url.is_empty() {
140            return Err(anyhow::anyhow!(
141                "Validation error: redis_url cannot be empty. \
142                 Please provide a valid Redis connection URL (e.g., redis://localhost:6379)"
143            ));
144        }
145
146        if self.stream_key.is_empty() {
147            return Err(anyhow::anyhow!(
148                "Validation error: stream_key cannot be empty. \
149                 Please specify the Redis stream key to consume from"
150            ));
151        }
152
153        if self.consumer_group.is_empty() {
154            return Err(anyhow::anyhow!(
155                "Validation error: consumer_group cannot be empty. \
156                 Please specify a consumer group name"
157            ));
158        }
159
160        if self.batch_size == 0 {
161            return Err(anyhow::anyhow!(
162                "Validation error: batch_size cannot be 0. \
163                 Please specify a positive batch size"
164            ));
165        }
166
167        Ok(())
168    }
169}