1use crate::config::ConnectorConfig;
2use crate::secret_ref::{MaybeSecretRef, MaybeSecretRefParseError, SecretRef};
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5use serde_json::{Map, Value};
6use std::collections::BTreeSet;
7use std::env;
8use std::fmt::Debug;
9use std::fs;
10use std::io::ErrorKind;
11use std::path::Path;
12use thiserror::Error as ThisError;
13
14#[derive(Debug, Clone, PartialEq, Eq, ThisError)]
15pub enum SecretRefDiscoveryError {
16 #[error("{e}")]
17 MaybeSecretRefParseFailed { e: MaybeSecretRefParseError },
18 #[error("unable to serialize connector configuration: {error}")]
19 SerializationFailed { error: String },
20 #[error("unable to deserialize connector configuration (error omitted)")]
21 DeserializationFailed,
22}
23
24pub fn discover_secret_references_in_connector_config(
26 connector_config: &serde_json::Value,
27) -> Result<BTreeSet<SecretRef>, SecretRefDiscoveryError> {
28 let mut result = BTreeSet::new();
29 if let Some(transport_config_json) = connector_config
30 .get("transport")
31 .and_then(|v| v.get("config"))
32 {
33 result.extend(discover_secret_references_in_json(transport_config_json)?);
34 }
35 if let Some(format_config_json) = connector_config.get("format").and_then(|v| v.get("config")) {
36 result.extend(discover_secret_references_in_json(format_config_json)?);
37 }
38 Ok(result)
39}
40
41fn discover_secret_references_in_json(
43 value: &Value,
44) -> Result<BTreeSet<SecretRef>, SecretRefDiscoveryError> {
45 Ok(match value {
46 Value::Null => BTreeSet::new(),
47 Value::Bool(_b) => BTreeSet::new(),
48 Value::Number(_n) => BTreeSet::new(),
49 Value::String(s) => {
50 if let MaybeSecretRef::SecretRef(secret_ref) = MaybeSecretRef::new(s.clone())
51 .map_err(|e| SecretRefDiscoveryError::MaybeSecretRefParseFailed { e })?
52 {
53 BTreeSet::from([secret_ref])
54 } else {
55 BTreeSet::new()
56 }
57 }
58 Value::Array(seq) => {
59 let mut result = BTreeSet::new();
60 for entry in seq.iter() {
61 result.extend(discover_secret_references_in_json(entry)?)
62 }
63 result
64 }
65 Value::Object(mapping) => {
66 let mut result = BTreeSet::new();
67 for (_k, v) in mapping.into_iter() {
68 result.extend(discover_secret_references_in_json(v)?);
69 }
70 result
71 }
72 })
73}
74
75pub fn default_secrets_directory() -> &'static Path {
77 Path::new("/etc/feldera-secrets")
78}
79
80#[derive(Debug, Clone, PartialEq, Eq, ThisError)]
81pub enum SecretRefResolutionError {
82 #[error("{e}")]
83 MaybeSecretRefParseFailed { e: MaybeSecretRefParseError },
84 #[error(
85 "secret reference '{secret_ref}' resolution failed: file '{path}' does exist but unable to read it due to: {error_kind}"
86 )]
87 CannotReadSecretFile {
88 secret_ref: SecretRef,
89 path: String,
90 error_kind: ErrorKind,
91 },
92 #[error("secret reference '{secret_ref}' resolution failed: file '{path}' does not exist")]
93 SecretFileDoesNotExist { secret_ref: SecretRef, path: String },
94 #[error(
95 "secret reference '{secret_ref}' resolution failed: path '{path}' is not a regular file"
96 )]
97 SecretPathIsNotRegularFile { secret_ref: SecretRef, path: String },
98 #[error(
99 "secret reference '{secret_ref}' resolution failed: cannot determine if '{path}' is an existing file due to: {error_kind}"
100 )]
101 SecretFileExistenceUnknown {
102 secret_ref: SecretRef,
103 path: String,
104 error_kind: ErrorKind,
105 },
106 #[error(
107 "environment variable reference '{env_ref}' resolution failed: environment variable '{name}' is not set"
108 )]
109 EnvVarNotSet { env_ref: SecretRef, name: String },
110 #[error("secret resolution led to a duplicate key in the mapping, which should not happen")]
111 DuplicateKeyInMapping,
112 #[error("unable to serialize connector configuration: {error}")]
113 SerializationFailed { error: String },
114 #[error("unable to deserialize connector configuration (error omitted)")]
115 DeserializationFailed,
116}
117
118pub fn resolve_secret_references_in_connector_config(
120 secrets_dir: &Path,
121 connector_config: &ConnectorConfig,
122) -> Result<ConnectorConfig, SecretRefResolutionError> {
123 let connector_config = connector_config.clone();
124 Ok(ConnectorConfig {
125 transport: resolve_secret_references_via_json(secrets_dir, &connector_config.transport)?,
126 format: resolve_secret_references_via_json(secrets_dir, &connector_config.format)?,
127 ..connector_config
128 })
129}
130
131pub fn resolve_secret_references_via_json<T>(
133 secrets_dir: &Path,
134 value: &T,
135) -> Result<T, SecretRefResolutionError>
136where
137 T: Serialize + DeserializeOwned,
138{
139 let json_value =
140 serde_json::to_value(value).map_err(|e| SecretRefResolutionError::SerializationFailed {
141 error: e.to_string(),
142 })?;
143 let resolved_json = resolve_secret_references_in_json(secrets_dir, json_value)?;
144 serde_json::from_value(resolved_json)
145 .map_err(|_e| SecretRefResolutionError::DeserializationFailed)
146}
147
148fn resolve_secret_references_in_json(
150 secrets_dir: &Path,
151 value: Value,
152) -> Result<Value, SecretRefResolutionError> {
153 Ok(match value {
154 Value::Null => Value::Null,
155 Value::Bool(b) => Value::Bool(b),
156 Value::Number(n) => Value::Number(n),
157 Value::String(s) => {
158 Value::String(resolve_potential_secret_reference_string(secrets_dir, s)?)
159 }
160 Value::Array(seq) => Value::Array(
161 seq.into_iter()
162 .map(|v| resolve_secret_references_in_json(secrets_dir, v))
163 .collect::<Result<Vec<Value>, SecretRefResolutionError>>()?,
164 ),
165 Value::Object(mapping) => {
166 let mut new_mapping = Map::new();
167 for (k, v) in mapping.into_iter() {
168 if let Some(_existing) =
169 new_mapping.insert(k, resolve_secret_references_in_json(secrets_dir, v)?)
170 {
171 return Err(SecretRefResolutionError::DuplicateKeyInMapping);
172 }
173 }
174 Value::Object(new_mapping)
175 }
176 })
177}
178
179fn resolve_potential_secret_reference_string(
181 secrets_dir: &Path,
182 s: String,
183) -> Result<String, SecretRefResolutionError> {
184 match MaybeSecretRef::new(s) {
185 Ok(maybe_secret_ref) => match maybe_secret_ref {
186 MaybeSecretRef::String(plain_str) => Ok(plain_str),
187 MaybeSecretRef::SecretRef(secret_ref) => match secret_ref {
188 SecretRef::Kubernetes {
189 ref name,
190 ref data_key,
191 } => {
192 let path = Path::new(secrets_dir)
195 .join("kubernetes")
196 .join(name)
197 .join(data_key);
198
199 if path.is_file() {
202 match fs::read_to_string(&path) {
203 Ok(content) => Ok(content),
204 Err(e) => {
205 Err(SecretRefResolutionError::CannotReadSecretFile {
206 secret_ref,
207 path: path.display().to_string(),
208 error_kind: e.kind(), })
210 }
211 }
212 } else {
213 match path.try_exists() {
214 Ok(exists) => {
215 if exists {
216 Err(SecretRefResolutionError::SecretPathIsNotRegularFile {
217 secret_ref,
218 path: path.display().to_string(),
219 })
220 } else {
221 Err(SecretRefResolutionError::SecretFileDoesNotExist {
222 secret_ref,
223 path: path.display().to_string(),
224 })
225 }
226 }
227 Err(e) => Err(SecretRefResolutionError::SecretFileExistenceUnknown {
228 secret_ref,
229 path: path.display().to_string(),
230 error_kind: e.kind(), }),
232 }
233 }
234 }
235 SecretRef::EnvVar { ref name } => {
236 let name = name.clone();
239 match env::var(&name) {
240 Ok(value) => Ok(value),
241 Err(env::VarError::NotPresent) | Err(env::VarError::NotUnicode(_)) => {
242 Err(SecretRefResolutionError::EnvVarNotSet {
243 env_ref: secret_ref,
244 name,
245 })
246 }
247 }
248 }
249 },
250 },
251 Err(e) => Err(SecretRefResolutionError::MaybeSecretRefParseFailed { e }),
252 }
253}
254
255#[cfg(test)]
256mod tests {
257 use crate::config::{ConnectorConfig, TransportConfig};
258 use crate::secret_ref::{MaybeSecretRef, SecretRef};
259 use crate::secret_resolver::{
260 SecretRefResolutionError, discover_secret_references_in_connector_config,
261 discover_secret_references_in_json, resolve_potential_secret_reference_string,
262 resolve_secret_references_in_connector_config, resolve_secret_references_in_json,
263 };
264 use serde_json::json;
265 use std::collections::BTreeSet;
266 use std::fs::{File, create_dir_all};
267 use std::io::Write;
268
269 #[test]
270 fn resolve_kubernetes_secret_success() {
271 let dir = tempfile::tempdir().unwrap();
273 let dir_path = dir.path();
274 let name_dir = &dir_path.join("kubernetes").join("a");
275 create_dir_all(name_dir).unwrap();
276 let data_key_file_path = &name_dir.join("b");
277 let mut file = File::create(data_key_file_path).unwrap();
278 file.write_all(b"example").unwrap();
279
280 assert_eq!(
282 resolve_potential_secret_reference_string(
283 dir_path,
284 "${secret:kubernetes:a/b}".to_string()
285 )
286 .unwrap(),
287 "example"
288 );
289 }
290
291 #[test]
292 fn resolve_kubernetes_secret_max_size_success() {
293 let dir = tempfile::tempdir().unwrap();
295 let dir_path = dir.path();
296 let name_dir = &dir_path.join("kubernetes").join("a".repeat(63));
297 create_dir_all(name_dir).unwrap();
298 let data_key_file_path = &name_dir.join("b".repeat(255));
299 let mut file = File::create(data_key_file_path).unwrap();
300 file.write_all(b"example").unwrap();
301
302 assert_eq!(
304 resolve_potential_secret_reference_string(
305 dir_path,
306 format!(
307 "${{secret:kubernetes:{}/{}}}",
308 "a".repeat(63),
309 "b".repeat(255)
310 )
311 )
312 .unwrap(),
313 "example"
314 );
315 }
316
317 #[test]
318 fn resolve_kubernetes_secret_path_not_a_file() {
319 let dir = tempfile::tempdir().unwrap();
321 let dir_path = dir.path();
322 let data_key_file_path = &dir_path.join("kubernetes").join("a").join("b");
323 create_dir_all(data_key_file_path).unwrap();
324
325 let MaybeSecretRef::SecretRef(secret_ref) =
327 MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
328 else {
329 unreachable!();
330 };
331 assert_eq!(
332 resolve_potential_secret_reference_string(
333 dir_path,
334 "${secret:kubernetes:a/b}".to_string()
335 )
336 .unwrap_err(),
337 SecretRefResolutionError::SecretPathIsNotRegularFile {
338 secret_ref,
339 path: data_key_file_path.display().to_string()
340 }
341 );
342 }
343
344 #[test]
345 fn resolve_kubernetes_secret_file_does_not_exist() {
346 let dir = tempfile::tempdir().unwrap();
348 let dir_path = dir.path();
349 let name_dir = &dir_path.join("kubernetes").join("a");
350 create_dir_all(name_dir).unwrap();
351 let data_key_file_path = &name_dir.join("b");
352
353 let MaybeSecretRef::SecretRef(secret_ref) =
355 MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
356 else {
357 unreachable!();
358 };
359 assert_eq!(
360 resolve_potential_secret_reference_string(
361 dir_path,
362 "${secret:kubernetes:a/b}".to_string()
363 )
364 .unwrap_err(),
365 SecretRefResolutionError::SecretFileDoesNotExist {
366 secret_ref,
367 path: data_key_file_path.display().to_string()
368 }
369 );
370 }
371
372 #[test]
373 fn resolve_secret_ref_cannot_read_file() {
374 let dir = tempfile::tempdir().unwrap();
376 let dir_path = dir.path();
377 let name_dir = &dir_path.join("kubernetes").join("a");
378 create_dir_all(name_dir).unwrap();
379 let data_key_file_path = &name_dir.join("b");
380 let mut file = File::create(data_key_file_path).unwrap();
381 file.write_all(&[255, 255]).unwrap();
382
383 let MaybeSecretRef::SecretRef(secret_ref) =
385 MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
386 else {
387 unreachable!();
388 };
389 assert_eq!(
390 resolve_potential_secret_reference_string(
391 dir_path,
392 "${secret:kubernetes:a/b}".to_string()
393 )
394 .unwrap_err(),
395 SecretRefResolutionError::CannotReadSecretFile {
396 secret_ref,
397 path: data_key_file_path.display().to_string(),
398 error_kind: std::io::ErrorKind::InvalidData
399 }
400 );
401 }
402
403 #[test]
404 fn secret_resolution_json() {
405 let dir = tempfile::tempdir().unwrap();
409 let dir_path = dir.path();
410 let name_dir = &dir_path.join("kubernetes").join("a");
411 create_dir_all(name_dir).unwrap();
412 let data_key_file_path = &name_dir.join("b");
413 let mut file = File::create(data_key_file_path).unwrap();
414 file.write_all(b"example1").unwrap();
415 let name_dir = &dir_path.join("kubernetes").join("c");
416 create_dir_all(name_dir).unwrap();
417 let data_key_file_path = &name_dir.join("d");
418 let mut file = File::create(data_key_file_path).unwrap();
419 file.write_all(b"example2").unwrap();
420
421 let input = json!({
423 "a": null,
424 "b": "false,",
425 "c": 123,
426 "d": "val1",
427 "e": [
428 1,
429 "2"
430 ],
431 "f": {
432 "f1": 1,
433 "f2": "val2"
434 },
435 "g": "val3",
436 "${secret:kubernetes:a/b}": 123,
437 "${secret:kubernetes:e/f}": 456,
438 "s1": "${secret:kubernetes:a/b}",
439 "s2": [
440 "${secret:kubernetes:a/b}"
441 ],
442 "s3": {
443 "s31": "${secret:kubernetes:a/b}",
444 "s32": [
445 "${secret:kubernetes:a/b}",
446 "${secret:kubernetes:c/d}"
447 ]
448 },
449 "s4": "${secret:kubernetes:c/d}"
450 });
451
452 let expectation = json!({
453 "a": null,
454 "b": "false,",
455 "c": 123,
456 "d": "val1",
457 "e": [
458 1,
459 "2"
460 ],
461 "f": {
462 "f1": 1,
463 "f2": "val2"
464 },
465 "g": "val3",
466 "${secret:kubernetes:a/b}": 123,
467 "${secret:kubernetes:e/f}": 456,
468 "s1": "example1",
469 "s2": [
470 "example1"
471 ],
472 "s3": {
473 "s31": "example1",
474 "s32": [
475 "example1",
476 "example2"
477 ]
478 },
479 "s4": "example2"
480 });
481 assert_eq!(
482 resolve_secret_references_in_json(dir_path, input.clone()).unwrap(),
483 expectation
484 );
485 assert_eq!(
486 discover_secret_references_in_json(&input).unwrap(),
487 BTreeSet::from([
488 SecretRef::Kubernetes {
489 name: "a".to_string(),
490 data_key: "b".to_string(),
491 },
492 SecretRef::Kubernetes {
493 name: "c".to_string(),
494 data_key: "d".to_string(),
495 },
496 ])
497 );
498 assert_eq!(
499 discover_secret_references_in_json(&expectation).unwrap(),
500 BTreeSet::from([])
501 );
502 }
503
504 #[test]
505 fn secret_resolution_connector_config() {
506 let dir = tempfile::tempdir().unwrap();
510 let dir_path = dir.path();
511 let name_dir = &dir_path.join("kubernetes").join("a");
512 create_dir_all(name_dir).unwrap();
513 let data_key_file_path = &name_dir.join("b");
514 let mut file = File::create(data_key_file_path).unwrap();
515 file.write_all(b"example1").unwrap();
516 let name_dir = &dir_path.join("kubernetes").join("c");
517 create_dir_all(name_dir).unwrap();
518 let data_key_file_path = &name_dir.join("d");
519 let mut file = File::create(data_key_file_path).unwrap();
520 file.write_all(b"example2").unwrap();
521
522 let connector_config_json = json!({
524 "transport": {
525 "name": "datagen",
526 "config": {
527 "plan": [{
528 "limit": 2,
529 "fields": {
530 "col1": { "values": [1, 2] },
531 "col2": { "values": ["${secret:kubernetes:a/b}", "${secret:kubernetes:c/d}"] }
532 }
533 }]
534 }
535 },
536 "format": {
537 "name": "json",
538 "config": {
539 "example": "${secret:kubernetes:a/b}"
540 }
541 },
542 "index": "${secret:kubernetes:e/f}"
543 });
544 assert_eq!(
545 discover_secret_references_in_connector_config(&connector_config_json).unwrap(),
546 BTreeSet::from([
547 SecretRef::Kubernetes {
548 name: "a".to_string(),
549 data_key: "b".to_string(),
550 },
551 SecretRef::Kubernetes {
552 name: "c".to_string(),
553 data_key: "d".to_string(),
554 },
555 ])
556 );
557
558 let connector_config: ConnectorConfig =
559 serde_json::from_value(connector_config_json).unwrap();
560
561 let connector_config_secrets_resolved =
562 resolve_secret_references_in_connector_config(dir_path, &connector_config).unwrap();
563
564 let TransportConfig::Datagen(datagen_input_config) =
566 connector_config_secrets_resolved.transport
567 else {
568 unreachable!();
569 };
570 assert_eq!(
571 datagen_input_config.plan[0].fields["col2"]
572 .values
573 .as_ref()
574 .unwrap(),
575 &vec![json!("example1"), json!("example2")]
576 );
577
578 let Some(format_config) = connector_config_secrets_resolved.format else {
580 unreachable!();
581 };
582 assert_eq!(format_config.config, json!({"example": "example1"}));
583
584 assert_eq!(
586 connector_config.index,
587 Some("${secret:kubernetes:e/f}".to_string())
588 );
589 }
590
591 #[test]
592 fn resolve_env_var_success() {
593 unsafe {
595 std::env::set_var("FELDERA_TEST_ENV_VAR_ABC123", "my_value");
596 }
597
598 let dir = tempfile::tempdir().unwrap();
599 assert_eq!(
600 resolve_potential_secret_reference_string(
601 dir.path(),
602 "${env:FELDERA_TEST_ENV_VAR_ABC123}".to_string()
603 )
604 .unwrap(),
605 "my_value"
606 );
607
608 unsafe {
609 std::env::remove_var("FELDERA_TEST_ENV_VAR_ABC123");
610 }
611 }
612
613 #[test]
614 fn resolve_env_var_not_set() {
615 let dir = tempfile::tempdir().unwrap();
616 let env_ref_str = "${env:FELDERA_TEST_ENV_VAR_NOT_SET_XYZ}";
617 unsafe {
618 std::env::remove_var("FELDERA_TEST_ENV_VAR_NOT_SET_XYZ");
619 }
620
621 let MaybeSecretRef::SecretRef(expected_ref) =
622 crate::secret_ref::MaybeSecretRef::new(env_ref_str.to_string()).unwrap()
623 else {
624 unreachable!();
625 };
626
627 assert_eq!(
628 resolve_potential_secret_reference_string(dir.path(), env_ref_str.to_string())
629 .unwrap_err(),
630 SecretRefResolutionError::EnvVarNotSet {
631 env_ref: expected_ref,
632 name: "FELDERA_TEST_ENV_VAR_NOT_SET_XYZ".to_string(),
633 }
634 );
635 }
636
637 #[test]
638 fn resolve_env_var_in_connector_config() {
639 unsafe {
640 std::env::set_var("FELDERA_TEST_CONN_VAR_A", "resolved_value_a");
641 std::env::set_var("FELDERA_TEST_CONN_VAR_B", "resolved_value_b");
642 }
643
644 let connector_config_json = json!({
645 "transport": {
646 "name": "datagen",
647 "config": {
648 "plan": [{
649 "limit": 2,
650 "fields": {
651 "col1": { "values": [1, 2] },
652 "col2": { "values": ["${env:FELDERA_TEST_CONN_VAR_A}", "${env:FELDERA_TEST_CONN_VAR_B}"] }
653 }
654 }]
655 }
656 },
657 "format": {
658 "name": "json",
659 "config": {
660 "example": "${env:FELDERA_TEST_CONN_VAR_A}"
661 }
662 }
663 });
664
665 let connector_config: ConnectorConfig =
666 serde_json::from_value(connector_config_json).unwrap();
667
668 let dir = tempfile::tempdir().unwrap();
669 let resolved =
670 resolve_secret_references_in_connector_config(dir.path(), &connector_config).unwrap();
671
672 let TransportConfig::Datagen(datagen_input_config) = resolved.transport else {
673 unreachable!();
674 };
675 assert_eq!(
676 datagen_input_config.plan[0].fields["col2"]
677 .values
678 .as_ref()
679 .unwrap(),
680 &vec![json!("resolved_value_a"), json!("resolved_value_b")]
681 );
682
683 let Some(format_config) = resolved.format else {
684 unreachable!();
685 };
686 assert_eq!(format_config.config, json!({"example": "resolved_value_a"}));
687
688 unsafe {
689 std::env::remove_var("FELDERA_TEST_CONN_VAR_A");
690 std::env::remove_var("FELDERA_TEST_CONN_VAR_B");
691 }
692 }
693}