drasi_source_platform/config.rs
1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Configuration types for the platform source plugin.
16//!
17//! This module defines the configuration structure for connecting to Redis Streams
18//! and consuming CloudEvent-wrapped change events.
19
20use serde::{Deserialize, Serialize};
21
22fn default_consumer_group() -> String {
23 "drasi-core".to_string()
24}
25
26fn default_batch_size() -> usize {
27 100
28}
29
30fn default_block_ms() -> u64 {
31 5000
32}
33
34/// Platform source configuration for Redis Streams consumption.
35///
36/// This configuration defines how the platform source connects to Redis
37/// and consumes events from a stream using consumer groups.
38///
39/// # Example
40///
41/// ```rust
42/// use drasi_source_platform::PlatformSourceConfig;
43///
44/// let config = PlatformSourceConfig {
45/// redis_url: "redis://localhost:6379".to_string(),
46/// stream_key: "my-app-changes".to_string(),
47/// consumer_group: "my-consumers".to_string(),
48/// consumer_name: Some("consumer-1".to_string()),
49/// batch_size: 50,
50/// block_ms: 10000,
51/// };
52/// ```
53///
54/// # YAML Configuration
55///
56/// ```yaml
57/// source_type: platform
58/// properties:
59/// redis_url: "redis://localhost:6379"
60/// stream_key: "my-app-changes"
61/// consumer_group: "my-consumers"
62/// batch_size: 50
63/// block_ms: 10000
64/// ```
65#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
66pub struct PlatformSourceConfig {
67 /// Redis connection URL.
68 ///
69 /// Standard Redis connection string format.
70 ///
71 /// # Examples
72 ///
73 /// - `redis://localhost:6379` - Local Redis without auth
74 /// - `redis://:password@host:6379` - Redis with password
75 /// - `redis://user:password@host:6379` - Redis with username and password
76 /// - `rediss://host:6379` - Redis with TLS
77 pub redis_url: String,
78
79 /// Redis stream key to consume from.
80 ///
81 /// This is the name of the Redis stream that contains the CloudEvent-wrapped
82 /// change events. The stream must exist or will be created automatically
83 /// when the consumer group is created with MKSTREAM.
84 pub stream_key: String,
85
86 /// Consumer group name.
87 ///
88 /// All source instances with the same consumer group share the workload.
89 /// Each message is delivered to only one consumer in the group.
90 ///
91 /// **Default**: `"drasi-core"`
92 #[serde(default = "default_consumer_group")]
93 pub consumer_group: String,
94
95 /// Consumer name (unique within group).
96 ///
97 /// Identifies this specific consumer instance within the consumer group.
98 /// If not specified, a unique name is auto-generated based on the source ID.
99 ///
100 /// **Default**: Auto-generated from source ID
101 #[serde(default, skip_serializing_if = "Option::is_none")]
102 pub consumer_name: Option<String>,
103
104 /// Number of events to read per XREADGROUP call.
105 ///
106 /// Higher values improve throughput but increase memory usage and
107 /// may delay processing of individual events.
108 ///
109 /// **Default**: `100`
110 ///
111 /// **Valid range**: 1-10000 (recommended)
112 #[serde(default = "default_batch_size")]
113 pub batch_size: usize,
114
115 /// Milliseconds to block waiting for new events.
116 ///
117 /// When no events are available, the consumer blocks for this duration
118 /// before returning an empty result. Higher values reduce CPU usage
119 /// but increase latency for detecting source shutdown.
120 ///
121 /// **Default**: `5000` (5 seconds)
122 ///
123 /// **Valid range**: 100-60000 (recommended)
124 #[serde(default = "default_block_ms")]
125 pub block_ms: u64,
126}
127
128impl PlatformSourceConfig {
129 /// Validate the configuration.
130 ///
131 /// # Errors
132 ///
133 /// Returns an error if:
134 /// - `redis_url` is empty
135 /// - `stream_key` is empty
136 /// - `consumer_group` is empty
137 /// - `batch_size` is 0
138 pub fn validate(&self) -> anyhow::Result<()> {
139 if self.redis_url.is_empty() {
140 return Err(anyhow::anyhow!(
141 "Validation error: redis_url cannot be empty. \
142 Please provide a valid Redis connection URL (e.g., redis://localhost:6379)"
143 ));
144 }
145
146 if self.stream_key.is_empty() {
147 return Err(anyhow::anyhow!(
148 "Validation error: stream_key cannot be empty. \
149 Please specify the Redis stream key to consume from"
150 ));
151 }
152
153 if self.consumer_group.is_empty() {
154 return Err(anyhow::anyhow!(
155 "Validation error: consumer_group cannot be empty. \
156 Please specify a consumer group name"
157 ));
158 }
159
160 if self.batch_size == 0 {
161 return Err(anyhow::anyhow!(
162 "Validation error: batch_size cannot be 0. \
163 Please specify a positive batch size"
164 ));
165 }
166
167 Ok(())
168 }
169}