1use crate::config::ConnectorConfig;
2use crate::secret_ref::{MaybeSecretRef, MaybeSecretRefParseError, SecretRef};
3use serde_yaml::{Mapping, Value};
4use std::collections::BTreeSet;
5use std::fmt::Debug;
6use std::fs;
7use std::io::ErrorKind;
8use std::path::Path;
9use thiserror::Error as ThisError;
10
11#[derive(Debug, Clone, PartialEq, Eq, ThisError)]
12pub enum SecretRefDiscoveryError {
13 #[error("{e}")]
14 MaybeSecretRefParseFailed { e: MaybeSecretRefParseError },
15 #[error("unable to serialize connector configuration: {error}")]
16 SerializationFailed { error: String },
17 #[error("unable to deserialize connector configuration (error omitted)")]
18 DeserializationFailed,
19}
20
21pub fn discover_secret_references_in_connector_config(
23 connector_config: &ConnectorConfig,
24) -> Result<BTreeSet<SecretRef>, SecretRefDiscoveryError> {
25 let yaml_value = serde_yaml::to_value(connector_config).map_err(|e| {
26 SecretRefDiscoveryError::SerializationFailed {
27 error: e.to_string(),
28 }
29 })?;
30 let mut result = BTreeSet::new();
31 if let Some(transport_config_yaml) = yaml_value.get("transport").and_then(|v| v.get("config")) {
32 result.extend(discover_secret_references_in_yaml(transport_config_yaml)?);
33 }
34 if let Some(format_config_yaml) = yaml_value.get("format").and_then(|v| v.get("config")) {
35 result.extend(discover_secret_references_in_yaml(format_config_yaml)?);
36 }
37 Ok(result)
38}
39
40fn discover_secret_references_in_yaml(
42 value: &Value,
43) -> Result<BTreeSet<SecretRef>, SecretRefDiscoveryError> {
44 Ok(match value {
45 Value::Null => BTreeSet::new(),
46 Value::Bool(_b) => BTreeSet::new(),
47 Value::Number(_n) => BTreeSet::new(),
48 Value::String(s) => {
49 if let MaybeSecretRef::SecretRef(secret_ref) = MaybeSecretRef::new(s.clone())
50 .map_err(|e| SecretRefDiscoveryError::MaybeSecretRefParseFailed { e })?
51 {
52 BTreeSet::from([secret_ref])
53 } else {
54 BTreeSet::new()
55 }
56 }
57 Value::Sequence(seq) => {
58 let mut result = BTreeSet::new();
59 for entry in seq.iter() {
60 result.extend(discover_secret_references_in_yaml(entry)?)
61 }
62 result
63 }
64 Value::Mapping(mapping) => {
65 let mut result = BTreeSet::new();
66 for (_k, v) in mapping.into_iter() {
67 result.extend(discover_secret_references_in_yaml(v)?);
68 }
69 result
70 }
71 Value::Tagged(tag_val) => discover_secret_references_in_yaml(&tag_val.value)?,
72 })
73}
74
75pub const DEFAULT_SECRETS_DIRECTORY_PATH: &str = "/etc/feldera-secrets";
77
78#[derive(Debug, Clone, PartialEq, Eq, ThisError)]
79pub enum SecretRefResolutionError {
80 #[error("{e}")]
81 MaybeSecretRefParseFailed { e: MaybeSecretRefParseError },
82 #[error("secret reference '{secret_ref}' resolution failed: file '{path}' does exist but unable to read it due to: {error_kind}")]
83 CannotReadSecretFile {
84 secret_ref: SecretRef,
85 path: String,
86 error_kind: ErrorKind,
87 },
88 #[error("secret reference '{secret_ref}' resolution failed: file '{path}' does not exist")]
89 SecretFileDoesNotExist { secret_ref: SecretRef, path: String },
90 #[error(
91 "secret reference '{secret_ref}' resolution failed: path '{path}' is not a regular file"
92 )]
93 SecretPathIsNotRegularFile { secret_ref: SecretRef, path: String },
94 #[error("secret reference '{secret_ref}' resolution failed: cannot determine if '{path}' is an existing file due to: {error_kind}")]
95 SecretFileExistenceUnknown {
96 secret_ref: SecretRef,
97 path: String,
98 error_kind: ErrorKind,
99 },
100 #[error("secret resolution led to a duplicate key in the mapping, which should not happen")]
101 DuplicateKeyInMapping,
102 #[error("unable to serialize connector configuration: {error}")]
103 SerializationFailed { error: String },
104 #[error("unable to deserialize connector configuration (error omitted)")]
105 DeserializationFailed,
106}
107
108pub fn resolve_secret_references_in_connector_config(
110 secrets_dir: &Path,
111 connector_config: &ConnectorConfig,
112) -> Result<ConnectorConfig, SecretRefResolutionError> {
113 let mut yaml_value = serde_yaml::to_value(connector_config).map_err(|e| {
114 SecretRefResolutionError::SerializationFailed {
115 error: e.to_string(),
116 }
117 })?;
118 if let Some(transport_config_yaml) = yaml_value.get("transport").and_then(|v| v.get("config")) {
119 let transport_config_yaml_resolved =
120 resolve_secret_references_in_yaml(secrets_dir, transport_config_yaml.clone())?;
121 yaml_value["transport"]["config"] = transport_config_yaml_resolved;
122 }
123 if let Some(format_config_yaml) = yaml_value.get("format").and_then(|v| v.get("config")) {
124 let format_config_yaml_resolved =
125 resolve_secret_references_in_yaml(secrets_dir, format_config_yaml.clone())?;
126 yaml_value["format"]["config"] = format_config_yaml_resolved;
127 }
128 let connector_config_resolved = serde_yaml::from_value(yaml_value)
129 .map_err(|_e| SecretRefResolutionError::DeserializationFailed)?;
130 Ok(connector_config_resolved)
131}
132
133fn resolve_secret_references_in_yaml(
135 secrets_dir: &Path,
136 value: Value,
137) -> Result<Value, SecretRefResolutionError> {
138 Ok(match value {
139 Value::Null => Value::Null,
140 Value::Bool(b) => Value::Bool(b),
141 Value::Number(n) => Value::Number(n),
142 Value::String(s) => {
143 Value::String(resolve_potential_secret_reference_string(secrets_dir, s)?)
144 }
145 Value::Sequence(seq) => Value::Sequence(
146 seq.into_iter()
147 .map(|v| resolve_secret_references_in_yaml(secrets_dir, v))
148 .collect::<Result<Vec<Value>, SecretRefResolutionError>>()?,
149 ),
150 Value::Mapping(mapping) => {
151 let mut new_mapping = Mapping::new();
152 for (k, v) in mapping.into_iter() {
153 if let Some(_existing) =
154 new_mapping.insert(k, resolve_secret_references_in_yaml(secrets_dir, v)?)
155 {
156 return Err(SecretRefResolutionError::DuplicateKeyInMapping);
157 }
158 }
159 Value::Mapping(new_mapping)
160 }
161 Value::Tagged(mut tag_val) => {
162 tag_val.value = resolve_secret_references_in_yaml(secrets_dir, tag_val.value)?;
163 Value::Tagged(tag_val)
164 }
165 })
166}
167
168fn resolve_potential_secret_reference_string(
170 secrets_dir: &Path,
171 s: String,
172) -> Result<String, SecretRefResolutionError> {
173 match MaybeSecretRef::new(s) {
174 Ok(maybe_secret_ref) => match maybe_secret_ref {
175 MaybeSecretRef::String(plain_str) => Ok(plain_str),
176 MaybeSecretRef::SecretRef(secret_ref) => match &secret_ref {
177 SecretRef::Kubernetes { name, data_key } => {
178 let path = Path::new(secrets_dir)
181 .join("kubernetes")
182 .join(name)
183 .join(data_key);
184
185 if path.is_file() {
188 match fs::read_to_string(&path) {
189 Ok(content) => Ok(content),
190 Err(e) => {
191 Err(SecretRefResolutionError::CannotReadSecretFile {
192 secret_ref,
193 path: path.display().to_string(),
194 error_kind: e.kind(), })
196 }
197 }
198 } else {
199 match path.try_exists() {
200 Ok(exists) => {
201 if exists {
202 Err(SecretRefResolutionError::SecretPathIsNotRegularFile {
203 secret_ref,
204 path: path.display().to_string(),
205 })
206 } else {
207 Err(SecretRefResolutionError::SecretFileDoesNotExist {
208 secret_ref,
209 path: path.display().to_string(),
210 })
211 }
212 }
213 Err(e) => Err(SecretRefResolutionError::SecretFileExistenceUnknown {
214 secret_ref,
215 path: path.display().to_string(),
216 error_kind: e.kind(), }),
218 }
219 }
220 }
221 },
222 },
223 Err(e) => Err(SecretRefResolutionError::MaybeSecretRefParseFailed { e }),
224 }
225}
226
227#[cfg(test)]
228mod tests {
229 use crate::config::{ConnectorConfig, TransportConfig};
230 use crate::secret_ref::{MaybeSecretRef, SecretRef};
231 use crate::secret_resolver::{
232 discover_secret_references_in_connector_config, discover_secret_references_in_yaml,
233 resolve_potential_secret_reference_string, resolve_secret_references_in_connector_config,
234 resolve_secret_references_in_yaml, SecretRefResolutionError,
235 };
236 use serde_json::json;
237 use std::collections::BTreeSet;
238 use std::fs::{create_dir_all, File};
239 use std::io::Write;
240
241 #[test]
242 fn resolve_kubernetes_secret_success() {
243 let dir = tempfile::tempdir().unwrap();
245 let dir_path = dir.path();
246 let name_dir = &dir_path.join("kubernetes").join("a");
247 create_dir_all(name_dir).unwrap();
248 let data_key_file_path = &name_dir.join("b");
249 let mut file = File::create(data_key_file_path).unwrap();
250 file.write_all(b"example").unwrap();
251
252 assert_eq!(
254 resolve_potential_secret_reference_string(
255 dir_path,
256 "${secret:kubernetes:a/b}".to_string()
257 )
258 .unwrap(),
259 "example"
260 );
261 }
262
263 #[test]
264 fn resolve_kubernetes_secret_max_size_success() {
265 let dir = tempfile::tempdir().unwrap();
267 let dir_path = dir.path();
268 let name_dir = &dir_path.join("kubernetes").join("a".repeat(63));
269 create_dir_all(name_dir).unwrap();
270 let data_key_file_path = &name_dir.join("b".repeat(255));
271 let mut file = File::create(data_key_file_path).unwrap();
272 file.write_all(b"example").unwrap();
273
274 assert_eq!(
276 resolve_potential_secret_reference_string(
277 dir_path,
278 format!(
279 "${{secret:kubernetes:{}/{}}}",
280 "a".repeat(63),
281 "b".repeat(255)
282 )
283 )
284 .unwrap(),
285 "example"
286 );
287 }
288
289 #[test]
290 fn resolve_kubernetes_secret_path_not_a_file() {
291 let dir = tempfile::tempdir().unwrap();
293 let dir_path = dir.path();
294 let data_key_file_path = &dir_path.join("kubernetes").join("a").join("b");
295 create_dir_all(data_key_file_path).unwrap();
296
297 let MaybeSecretRef::SecretRef(secret_ref) =
299 MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
300 else {
301 unreachable!();
302 };
303 assert_eq!(
304 resolve_potential_secret_reference_string(
305 dir_path,
306 "${secret:kubernetes:a/b}".to_string()
307 )
308 .unwrap_err(),
309 SecretRefResolutionError::SecretPathIsNotRegularFile {
310 secret_ref,
311 path: data_key_file_path.display().to_string()
312 }
313 );
314 }
315
316 #[test]
317 fn resolve_kubernetes_secret_file_does_not_exist() {
318 let dir = tempfile::tempdir().unwrap();
320 let dir_path = dir.path();
321 let name_dir = &dir_path.join("kubernetes").join("a");
322 create_dir_all(name_dir).unwrap();
323 let data_key_file_path = &name_dir.join("b");
324
325 let MaybeSecretRef::SecretRef(secret_ref) =
327 MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
328 else {
329 unreachable!();
330 };
331 assert_eq!(
332 resolve_potential_secret_reference_string(
333 dir_path,
334 "${secret:kubernetes:a/b}".to_string()
335 )
336 .unwrap_err(),
337 SecretRefResolutionError::SecretFileDoesNotExist {
338 secret_ref,
339 path: data_key_file_path.display().to_string()
340 }
341 );
342 }
343
344 #[test]
345 fn resolve_secret_ref_cannot_read_file() {
346 let dir = tempfile::tempdir().unwrap();
348 let dir_path = dir.path();
349 let name_dir = &dir_path.join("kubernetes").join("a");
350 create_dir_all(name_dir).unwrap();
351 let data_key_file_path = &name_dir.join("b");
352 let mut file = File::create(data_key_file_path).unwrap();
353 file.write_all(&[255, 255]).unwrap();
354
355 let MaybeSecretRef::SecretRef(secret_ref) =
357 MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
358 else {
359 unreachable!();
360 };
361 assert_eq!(
362 resolve_potential_secret_reference_string(
363 dir_path,
364 "${secret:kubernetes:a/b}".to_string()
365 )
366 .unwrap_err(),
367 SecretRefResolutionError::CannotReadSecretFile {
368 secret_ref,
369 path: data_key_file_path.display().to_string(),
370 error_kind: std::io::ErrorKind::InvalidData
371 }
372 );
373 }
374
375 #[test]
376 fn secret_resolution_yaml() {
377 let dir = tempfile::tempdir().unwrap();
381 let dir_path = dir.path();
382 let name_dir = &dir_path.join("kubernetes").join("a");
383 create_dir_all(name_dir).unwrap();
384 let data_key_file_path = &name_dir.join("b");
385 let mut file = File::create(data_key_file_path).unwrap();
386 file.write_all(b"example1").unwrap();
387 let name_dir = &dir_path.join("kubernetes").join("c");
388 create_dir_all(name_dir).unwrap();
389 let data_key_file_path = &name_dir.join("d");
390 let mut file = File::create(data_key_file_path).unwrap();
391 file.write_all(b"example2").unwrap();
392
393 let input = r#"
395 a: null
396 b: false,
397 c: 123
398 d: "val1"
399 e: [1, "2"]
400 f:
401 f1: 1
402 f2: "val2"
403 g: !str "val3"
404 "${secret:kubernetes:a/b}": 123
405 "${secret:kubernetes:e/f}": 456
406 s1: "${secret:kubernetes:a/b}"
407 s2: ["${secret:kubernetes:a/b}"]
408 s3:
409 s31: "${secret:kubernetes:a/b}"
410 s32: ["${secret:kubernetes:a/b}", "${secret:kubernetes:c/d}"]
411 s4: !str "${secret:kubernetes:c/d}"
412 "#;
413 let expectation = r#"
414 a: null
415 b: false,
416 c: 123
417 d: "val1"
418 e: [1, "2"]
419 f:
420 f1: 1
421 f2: "val2"
422 g: !str "val3"
423 "${secret:kubernetes:a/b}": 123
424 "${secret:kubernetes:e/f}": 456
425 s1: "example1"
426 s2: ["example1"]
427 s3:
428 s31: "example1"
429 s32: ["example1", "example2"]
430 s4: !str "example2"
431 "#;
432 assert_eq!(
433 resolve_secret_references_in_yaml(dir_path, serde_yaml::from_str(input).unwrap())
434 .unwrap(),
435 serde_yaml::from_str::<serde_yaml::Value>(expectation).unwrap()
436 );
437 assert_eq!(
438 discover_secret_references_in_yaml(&serde_yaml::from_str(input).unwrap()).unwrap(),
439 BTreeSet::from([
440 SecretRef::Kubernetes {
441 name: "a".to_string(),
442 data_key: "b".to_string(),
443 },
444 SecretRef::Kubernetes {
445 name: "c".to_string(),
446 data_key: "d".to_string(),
447 },
448 ])
449 );
450 assert_eq!(
451 discover_secret_references_in_yaml(&serde_yaml::from_str(expectation).unwrap())
452 .unwrap(),
453 BTreeSet::from([])
454 );
455 }
456
457 #[test]
458 fn secret_resolution_connector_config() {
459 let dir = tempfile::tempdir().unwrap();
463 let dir_path = dir.path();
464 let name_dir = &dir_path.join("kubernetes").join("a");
465 create_dir_all(name_dir).unwrap();
466 let data_key_file_path = &name_dir.join("b");
467 let mut file = File::create(data_key_file_path).unwrap();
468 file.write_all(b"example1").unwrap();
469 let name_dir = &dir_path.join("kubernetes").join("c");
470 create_dir_all(name_dir).unwrap();
471 let data_key_file_path = &name_dir.join("d");
472 let mut file = File::create(data_key_file_path).unwrap();
473 file.write_all(b"example2").unwrap();
474
475 let connector_config_json = json!({
477 "transport": {
478 "name": "datagen",
479 "config": {
480 "plan": [{
481 "limit": 2,
482 "fields": {
483 "col1": { "values": [1, 2] },
484 "col2": { "values": ["${secret:kubernetes:a/b}", "${secret:kubernetes:c/d}"] }
485 }
486 }]
487 }
488 },
489 "format": {
490 "name": "json",
491 "config": {
492 "example": "${secret:kubernetes:a/b}"
493 }
494 },
495 "index": "${secret:kubernetes:e/f}"
496 });
497 let connector_config: ConnectorConfig =
498 serde_json::from_value(connector_config_json).unwrap();
499 assert_eq!(
500 discover_secret_references_in_connector_config(&connector_config).unwrap(),
501 BTreeSet::from([
502 SecretRef::Kubernetes {
503 name: "a".to_string(),
504 data_key: "b".to_string(),
505 },
506 SecretRef::Kubernetes {
507 name: "c".to_string(),
508 data_key: "d".to_string(),
509 },
510 ])
511 );
512 let connector_config_secrets_resolved =
513 resolve_secret_references_in_connector_config(dir_path, &connector_config).unwrap();
514
515 let TransportConfig::Datagen(datagen_input_config) =
517 connector_config_secrets_resolved.transport
518 else {
519 unreachable!();
520 };
521 assert_eq!(
522 datagen_input_config.plan[0].fields["col2"]
523 .values
524 .as_ref()
525 .unwrap(),
526 &vec![json!("example1"), json!("example2")]
527 );
528
529 let Some(format_config) = connector_config_secrets_resolved.format else {
531 unreachable!();
532 };
533 let mut expected_mapping = serde_yaml::Mapping::new();
534 expected_mapping.insert(
535 serde_yaml::Value::String("example".to_string()),
536 serde_yaml::Value::String("example1".to_string()),
537 );
538 assert_eq!(
539 format_config.config,
540 serde_yaml::Value::Mapping(expected_mapping)
541 );
542
543 assert_eq!(
545 connector_config.index,
546 Some("${secret:kubernetes:e/f}".to_string())
547 );
548 }
549}