Skip to main content

feldera_types/
secret_resolver.rs

1use crate::config::ConnectorConfig;
2use crate::secret_ref::{MaybeSecretRef, MaybeSecretRefParseError, SecretRef};
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5use serde_json::{Map, Value};
6use std::collections::BTreeSet;
7use std::env;
8use std::fmt::Debug;
9use std::fs;
10use std::io::ErrorKind;
11use std::path::Path;
12use thiserror::Error as ThisError;
13
14#[derive(Debug, Clone, PartialEq, Eq, ThisError)]
15pub enum SecretRefDiscoveryError {
16    #[error("{e}")]
17    MaybeSecretRefParseFailed { e: MaybeSecretRefParseError },
18    #[error("unable to serialize connector configuration: {error}")]
19    SerializationFailed { error: String },
20    #[error("unable to deserialize connector configuration (error omitted)")]
21    DeserializationFailed,
22}
23
24/// Discovers the secret references of the connector configuration.
25pub fn discover_secret_references_in_connector_config(
26    connector_config: &serde_json::Value,
27) -> Result<BTreeSet<SecretRef>, SecretRefDiscoveryError> {
28    let mut result = BTreeSet::new();
29    if let Some(transport_config_json) = connector_config
30        .get("transport")
31        .and_then(|v| v.get("config"))
32    {
33        result.extend(discover_secret_references_in_json(transport_config_json)?);
34    }
35    if let Some(format_config_json) = connector_config.get("format").and_then(|v| v.get("config")) {
36        result.extend(discover_secret_references_in_json(format_config_json)?);
37    }
38    Ok(result)
39}
40
41/// Discovers recursively the secret references in the JSON.
42fn discover_secret_references_in_json(
43    value: &Value,
44) -> Result<BTreeSet<SecretRef>, SecretRefDiscoveryError> {
45    Ok(match value {
46        Value::Null => BTreeSet::new(),
47        Value::Bool(_b) => BTreeSet::new(),
48        Value::Number(_n) => BTreeSet::new(),
49        Value::String(s) => {
50            if let MaybeSecretRef::SecretRef(secret_ref) = MaybeSecretRef::new(s.clone())
51                .map_err(|e| SecretRefDiscoveryError::MaybeSecretRefParseFailed { e })?
52            {
53                BTreeSet::from([secret_ref])
54            } else {
55                BTreeSet::new()
56            }
57        }
58        Value::Array(seq) => {
59            let mut result = BTreeSet::new();
60            for entry in seq.iter() {
61                result.extend(discover_secret_references_in_json(entry)?)
62            }
63            result
64        }
65        Value::Object(mapping) => {
66            let mut result = BTreeSet::new();
67            for (_k, v) in mapping.into_iter() {
68                result.extend(discover_secret_references_in_json(v)?);
69            }
70            result
71        }
72    })
73}
74
75/// Path of the default secrets directory.
76pub fn default_secrets_directory() -> &'static Path {
77    Path::new("/etc/feldera-secrets")
78}
79
80#[derive(Debug, Clone, PartialEq, Eq, ThisError)]
81pub enum SecretRefResolutionError {
82    #[error("{e}")]
83    MaybeSecretRefParseFailed { e: MaybeSecretRefParseError },
84    #[error(
85        "secret reference '{secret_ref}' resolution failed: file '{path}' does exist but unable to read it due to: {error_kind}"
86    )]
87    CannotReadSecretFile {
88        secret_ref: SecretRef,
89        path: String,
90        error_kind: ErrorKind,
91    },
92    #[error("secret reference '{secret_ref}' resolution failed: file '{path}' does not exist")]
93    SecretFileDoesNotExist { secret_ref: SecretRef, path: String },
94    #[error(
95        "secret reference '{secret_ref}' resolution failed: path '{path}' is not a regular file"
96    )]
97    SecretPathIsNotRegularFile { secret_ref: SecretRef, path: String },
98    #[error(
99        "secret reference '{secret_ref}' resolution failed: cannot determine if '{path}' is an existing file due to: {error_kind}"
100    )]
101    SecretFileExistenceUnknown {
102        secret_ref: SecretRef,
103        path: String,
104        error_kind: ErrorKind,
105    },
106    #[error(
107        "environment variable reference '{env_ref}' resolution failed: environment variable '{name}' is not set"
108    )]
109    EnvVarNotSet { env_ref: SecretRef, name: String },
110    #[error("secret resolution led to a duplicate key in the mapping, which should not happen")]
111    DuplicateKeyInMapping,
112    #[error("unable to serialize connector configuration: {error}")]
113    SerializationFailed { error: String },
114    #[error("unable to deserialize connector configuration (error omitted)")]
115    DeserializationFailed,
116}
117
118/// Resolves the secret references of the connector configuration.
119pub fn resolve_secret_references_in_connector_config(
120    secrets_dir: &Path,
121    connector_config: &ConnectorConfig,
122) -> Result<ConnectorConfig, SecretRefResolutionError> {
123    let connector_config = connector_config.clone();
124    Ok(ConnectorConfig {
125        transport: resolve_secret_references_via_json(secrets_dir, &connector_config.transport)?,
126        format: resolve_secret_references_via_json(secrets_dir, &connector_config.format)?,
127        ..connector_config
128    })
129}
130
131/// Resolves secret references in `value`.
132pub fn resolve_secret_references_via_json<T>(
133    secrets_dir: &Path,
134    value: &T,
135) -> Result<T, SecretRefResolutionError>
136where
137    T: Serialize + DeserializeOwned,
138{
139    let json_value =
140        serde_json::to_value(value).map_err(|e| SecretRefResolutionError::SerializationFailed {
141            error: e.to_string(),
142        })?;
143    let resolved_json = resolve_secret_references_in_json(secrets_dir, json_value)?;
144    serde_json::from_value(resolved_json)
145        .map_err(|_e| SecretRefResolutionError::DeserializationFailed)
146}
147
148/// Resolves recursively the secret references in the JSON.
149fn resolve_secret_references_in_json(
150    secrets_dir: &Path,
151    value: Value,
152) -> Result<Value, SecretRefResolutionError> {
153    Ok(match value {
154        Value::Null => Value::Null,
155        Value::Bool(b) => Value::Bool(b),
156        Value::Number(n) => Value::Number(n),
157        Value::String(s) => {
158            Value::String(resolve_potential_secret_reference_string(secrets_dir, s)?)
159        }
160        Value::Array(seq) => Value::Array(
161            seq.into_iter()
162                .map(|v| resolve_secret_references_in_json(secrets_dir, v))
163                .collect::<Result<Vec<Value>, SecretRefResolutionError>>()?,
164        ),
165        Value::Object(mapping) => {
166            let mut new_mapping = Map::new();
167            for (k, v) in mapping.into_iter() {
168                if let Some(_existing) =
169                    new_mapping.insert(k, resolve_secret_references_in_json(secrets_dir, v)?)
170                {
171                    return Err(SecretRefResolutionError::DuplicateKeyInMapping);
172                }
173            }
174            Value::Object(new_mapping)
175        }
176    })
177}
178
179/// Resolves a string which can potentially be a secret reference or an environment variable reference.
180fn resolve_potential_secret_reference_string(
181    secrets_dir: &Path,
182    s: String,
183) -> Result<String, SecretRefResolutionError> {
184    match MaybeSecretRef::new(s) {
185        Ok(maybe_secret_ref) => match maybe_secret_ref {
186            MaybeSecretRef::String(plain_str) => Ok(plain_str),
187            MaybeSecretRef::SecretRef(secret_ref) => match secret_ref {
188                SecretRef::Kubernetes {
189                    ref name,
190                    ref data_key,
191                } => {
192                    // Secret reference: `${secret:kubernetes:<name>/<data key>}`
193                    // File location: `<secrets dir>/kubernetes/<name>/<data key>`
194                    let path = Path::new(secrets_dir)
195                        .join("kubernetes")
196                        .join(name)
197                        .join(data_key);
198
199                    // If the file does not exist or is not a regular file, produce a custom error
200                    // informing of this fact rather than generically stating we are unable to read.
201                    if path.is_file() {
202                        match fs::read_to_string(&path) {
203                            Ok(content) => Ok(content),
204                            Err(e) => {
205                                Err(SecretRefResolutionError::CannotReadSecretFile {
206                                    secret_ref,
207                                    path: path.display().to_string(),
208                                    error_kind: e.kind(), // Only error kind to prevent displaying any of the secret content
209                                })
210                            }
211                        }
212                    } else {
213                        match path.try_exists() {
214                            Ok(exists) => {
215                                if exists {
216                                    Err(SecretRefResolutionError::SecretPathIsNotRegularFile {
217                                        secret_ref,
218                                        path: path.display().to_string(),
219                                    })
220                                } else {
221                                    Err(SecretRefResolutionError::SecretFileDoesNotExist {
222                                        secret_ref,
223                                        path: path.display().to_string(),
224                                    })
225                                }
226                            }
227                            Err(e) => Err(SecretRefResolutionError::SecretFileExistenceUnknown {
228                                secret_ref,
229                                path: path.display().to_string(),
230                                error_kind: e.kind(), // Only error kind to prevent displaying any of the secret content
231                            }),
232                        }
233                    }
234                }
235                SecretRef::EnvVar { ref name } => {
236                    // Environment variable reference: `${env:<name>}`
237                    // Resolved by reading the named environment variable from the process.
238                    let name = name.clone();
239                    match env::var(&name) {
240                        Ok(value) => Ok(value),
241                        Err(env::VarError::NotPresent) | Err(env::VarError::NotUnicode(_)) => {
242                            Err(SecretRefResolutionError::EnvVarNotSet {
243                                env_ref: secret_ref,
244                                name,
245                            })
246                        }
247                    }
248                }
249            },
250        },
251        Err(e) => Err(SecretRefResolutionError::MaybeSecretRefParseFailed { e }),
252    }
253}
254
255#[cfg(test)]
256mod tests {
257    use crate::config::{ConnectorConfig, TransportConfig};
258    use crate::secret_ref::{MaybeSecretRef, SecretRef};
259    use crate::secret_resolver::{
260        SecretRefResolutionError, discover_secret_references_in_connector_config,
261        discover_secret_references_in_json, resolve_potential_secret_reference_string,
262        resolve_secret_references_in_connector_config, resolve_secret_references_in_json,
263    };
264    use serde_json::json;
265    use std::collections::BTreeSet;
266    use std::fs::{File, create_dir_all};
267    use std::io::Write;
268
269    #[test]
270    fn resolve_kubernetes_secret_success() {
271        // Create file at: <tempdir>/kubernetes/a/b
272        let dir = tempfile::tempdir().unwrap();
273        let dir_path = dir.path();
274        let name_dir = &dir_path.join("kubernetes").join("a");
275        create_dir_all(name_dir).unwrap();
276        let data_key_file_path = &name_dir.join("b");
277        let mut file = File::create(data_key_file_path).unwrap();
278        file.write_all(b"example").unwrap();
279
280        // Resolve secret: ${secret:kubernetes:a/b}
281        assert_eq!(
282            resolve_potential_secret_reference_string(
283                dir_path,
284                "${secret:kubernetes:a/b}".to_string()
285            )
286            .unwrap(),
287            "example"
288        );
289    }
290
291    #[test]
292    fn resolve_kubernetes_secret_max_size_success() {
293        // Create file at: <tempdir>/kubernetes/a.../b...
294        let dir = tempfile::tempdir().unwrap();
295        let dir_path = dir.path();
296        let name_dir = &dir_path.join("kubernetes").join("a".repeat(63));
297        create_dir_all(name_dir).unwrap();
298        let data_key_file_path = &name_dir.join("b".repeat(255));
299        let mut file = File::create(data_key_file_path).unwrap();
300        file.write_all(b"example").unwrap();
301
302        // Resolve secret: ${secret:kubernetes:a.../b...}
303        assert_eq!(
304            resolve_potential_secret_reference_string(
305                dir_path,
306                format!(
307                    "${{secret:kubernetes:{}/{}}}",
308                    "a".repeat(63),
309                    "b".repeat(255)
310                )
311            )
312            .unwrap(),
313            "example"
314        );
315    }
316
317    #[test]
318    fn resolve_kubernetes_secret_path_not_a_file() {
319        // Create directory at: <tempdir>/kubernetes/a/b
320        let dir = tempfile::tempdir().unwrap();
321        let dir_path = dir.path();
322        let data_key_file_path = &dir_path.join("kubernetes").join("a").join("b");
323        create_dir_all(data_key_file_path).unwrap();
324
325        // Resolve secret: ${secret:kubernetes:a/b}
326        let MaybeSecretRef::SecretRef(secret_ref) =
327            MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
328        else {
329            unreachable!();
330        };
331        assert_eq!(
332            resolve_potential_secret_reference_string(
333                dir_path,
334                "${secret:kubernetes:a/b}".to_string()
335            )
336            .unwrap_err(),
337            SecretRefResolutionError::SecretPathIsNotRegularFile {
338                secret_ref,
339                path: data_key_file_path.display().to_string()
340            }
341        );
342    }
343
344    #[test]
345    fn resolve_kubernetes_secret_file_does_not_exist() {
346        // Do not create file at: <tempdir>/kubernetes/a/b
347        let dir = tempfile::tempdir().unwrap();
348        let dir_path = dir.path();
349        let name_dir = &dir_path.join("kubernetes").join("a");
350        create_dir_all(name_dir).unwrap();
351        let data_key_file_path = &name_dir.join("b");
352
353        // Resolve secret: ${secret:kubernetes:a/b}
354        let MaybeSecretRef::SecretRef(secret_ref) =
355            MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
356        else {
357            unreachable!();
358        };
359        assert_eq!(
360            resolve_potential_secret_reference_string(
361                dir_path,
362                "${secret:kubernetes:a/b}".to_string()
363            )
364            .unwrap_err(),
365            SecretRefResolutionError::SecretFileDoesNotExist {
366                secret_ref,
367                path: data_key_file_path.display().to_string()
368            }
369        );
370    }
371
372    #[test]
373    fn resolve_secret_ref_cannot_read_file() {
374        // Create file with non UTF-8 content at: <tempdir>/kubernetes/a/b
375        let dir = tempfile::tempdir().unwrap();
376        let dir_path = dir.path();
377        let name_dir = &dir_path.join("kubernetes").join("a");
378        create_dir_all(name_dir).unwrap();
379        let data_key_file_path = &name_dir.join("b");
380        let mut file = File::create(data_key_file_path).unwrap();
381        file.write_all(&[255, 255]).unwrap();
382
383        // Resolve secret: ${secret:kubernetes:a/b}
384        let MaybeSecretRef::SecretRef(secret_ref) =
385            MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
386        else {
387            unreachable!();
388        };
389        assert_eq!(
390            resolve_potential_secret_reference_string(
391                dir_path,
392                "${secret:kubernetes:a/b}".to_string()
393            )
394            .unwrap_err(),
395            SecretRefResolutionError::CannotReadSecretFile {
396                secret_ref,
397                path: data_key_file_path.display().to_string(),
398                error_kind: std::io::ErrorKind::InvalidData
399            }
400        );
401    }
402
403    #[test]
404    fn secret_resolution_json() {
405        // Create file at:
406        // - <tempdir>/kubernetes/a/b
407        // - <tempdir>/kubernetes/c/d
408        let dir = tempfile::tempdir().unwrap();
409        let dir_path = dir.path();
410        let name_dir = &dir_path.join("kubernetes").join("a");
411        create_dir_all(name_dir).unwrap();
412        let data_key_file_path = &name_dir.join("b");
413        let mut file = File::create(data_key_file_path).unwrap();
414        file.write_all(b"example1").unwrap();
415        let name_dir = &dir_path.join("kubernetes").join("c");
416        create_dir_all(name_dir).unwrap();
417        let data_key_file_path = &name_dir.join("d");
418        let mut file = File::create(data_key_file_path).unwrap();
419        file.write_all(b"example2").unwrap();
420
421        // Resolve secrets in JSON
422        let input = json!({
423            "a": null,
424            "b": "false,",
425            "c": 123,
426            "d": "val1",
427            "e": [
428                1,
429                "2"
430            ],
431            "f": {
432                "f1": 1,
433                "f2": "val2"
434            },
435            "g": "val3",
436            "${secret:kubernetes:a/b}": 123,
437            "${secret:kubernetes:e/f}": 456,
438            "s1": "${secret:kubernetes:a/b}",
439            "s2": [
440                "${secret:kubernetes:a/b}"
441            ],
442            "s3": {
443                "s31": "${secret:kubernetes:a/b}",
444                "s32": [
445                    "${secret:kubernetes:a/b}",
446                    "${secret:kubernetes:c/d}"
447                ]
448            },
449            "s4": "${secret:kubernetes:c/d}"
450        });
451
452        let expectation = json!({
453            "a": null,
454            "b": "false,",
455            "c": 123,
456            "d": "val1",
457            "e": [
458                1,
459                "2"
460            ],
461            "f": {
462                "f1": 1,
463                "f2": "val2"
464            },
465            "g": "val3",
466            "${secret:kubernetes:a/b}": 123,
467            "${secret:kubernetes:e/f}": 456,
468            "s1": "example1",
469            "s2": [
470                "example1"
471            ],
472            "s3": {
473                "s31": "example1",
474                "s32": [
475                    "example1",
476                    "example2"
477                ]
478            },
479            "s4": "example2"
480        });
481        assert_eq!(
482            resolve_secret_references_in_json(dir_path, input.clone()).unwrap(),
483            expectation
484        );
485        assert_eq!(
486            discover_secret_references_in_json(&input).unwrap(),
487            BTreeSet::from([
488                SecretRef::Kubernetes {
489                    name: "a".to_string(),
490                    data_key: "b".to_string(),
491                },
492                SecretRef::Kubernetes {
493                    name: "c".to_string(),
494                    data_key: "d".to_string(),
495                },
496            ])
497        );
498        assert_eq!(
499            discover_secret_references_in_json(&expectation).unwrap(),
500            BTreeSet::from([])
501        );
502    }
503
504    #[test]
505    fn secret_resolution_connector_config() {
506        // Create file at:
507        // - <tempdir>/kubernetes/a/b
508        // - <tempdir>/kubernetes/c/d
509        let dir = tempfile::tempdir().unwrap();
510        let dir_path = dir.path();
511        let name_dir = &dir_path.join("kubernetes").join("a");
512        create_dir_all(name_dir).unwrap();
513        let data_key_file_path = &name_dir.join("b");
514        let mut file = File::create(data_key_file_path).unwrap();
515        file.write_all(b"example1").unwrap();
516        let name_dir = &dir_path.join("kubernetes").join("c");
517        create_dir_all(name_dir).unwrap();
518        let data_key_file_path = &name_dir.join("d");
519        let mut file = File::create(data_key_file_path).unwrap();
520        file.write_all(b"example2").unwrap();
521
522        // Resolve a connector configuration
523        let connector_config_json = json!({
524            "transport": {
525              "name": "datagen",
526              "config": {
527                "plan": [{
528                    "limit": 2,
529                    "fields": {
530                        "col1": { "values": [1, 2] },
531                        "col2": { "values": ["${secret:kubernetes:a/b}", "${secret:kubernetes:c/d}"] }
532                    }
533                }]
534              }
535            },
536            "format": {
537              "name": "json",
538              "config": {
539                "example": "${secret:kubernetes:a/b}"
540              }
541            },
542            "index": "${secret:kubernetes:e/f}"
543        });
544        assert_eq!(
545            discover_secret_references_in_connector_config(&connector_config_json).unwrap(),
546            BTreeSet::from([
547                SecretRef::Kubernetes {
548                    name: "a".to_string(),
549                    data_key: "b".to_string(),
550                },
551                SecretRef::Kubernetes {
552                    name: "c".to_string(),
553                    data_key: "d".to_string(),
554                },
555            ])
556        );
557
558        let connector_config: ConnectorConfig =
559            serde_json::from_value(connector_config_json).unwrap();
560
561        let connector_config_secrets_resolved =
562            resolve_secret_references_in_connector_config(dir_path, &connector_config).unwrap();
563
564        // Transport configuration resolution
565        let TransportConfig::Datagen(datagen_input_config) =
566            connector_config_secrets_resolved.transport
567        else {
568            unreachable!();
569        };
570        assert_eq!(
571            datagen_input_config.plan[0].fields["col2"]
572                .values
573                .as_ref()
574                .unwrap(),
575            &vec![json!("example1"), json!("example2")]
576        );
577
578        // Format configuration resolution
579        let Some(format_config) = connector_config_secrets_resolved.format else {
580            unreachable!();
581        };
582        assert_eq!(format_config.config, json!({"example": "example1"}));
583
584        // Other fields should not be resolved
585        assert_eq!(
586            connector_config.index,
587            Some("${secret:kubernetes:e/f}".to_string())
588        );
589    }
590
591    #[test]
592    fn resolve_env_var_success() {
593        // Set the environment variable
594        unsafe {
595            std::env::set_var("FELDERA_TEST_ENV_VAR_ABC123", "my_value");
596        }
597
598        let dir = tempfile::tempdir().unwrap();
599        assert_eq!(
600            resolve_potential_secret_reference_string(
601                dir.path(),
602                "${env:FELDERA_TEST_ENV_VAR_ABC123}".to_string()
603            )
604            .unwrap(),
605            "my_value"
606        );
607
608        unsafe {
609            std::env::remove_var("FELDERA_TEST_ENV_VAR_ABC123");
610        }
611    }
612
613    #[test]
614    fn resolve_env_var_not_set() {
615        let dir = tempfile::tempdir().unwrap();
616        let env_ref_str = "${env:FELDERA_TEST_ENV_VAR_NOT_SET_XYZ}";
617        unsafe {
618            std::env::remove_var("FELDERA_TEST_ENV_VAR_NOT_SET_XYZ");
619        }
620
621        let MaybeSecretRef::SecretRef(expected_ref) =
622            crate::secret_ref::MaybeSecretRef::new(env_ref_str.to_string()).unwrap()
623        else {
624            unreachable!();
625        };
626
627        assert_eq!(
628            resolve_potential_secret_reference_string(dir.path(), env_ref_str.to_string())
629                .unwrap_err(),
630            SecretRefResolutionError::EnvVarNotSet {
631                env_ref: expected_ref,
632                name: "FELDERA_TEST_ENV_VAR_NOT_SET_XYZ".to_string(),
633            }
634        );
635    }
636
637    #[test]
638    fn resolve_env_var_in_connector_config() {
639        unsafe {
640            std::env::set_var("FELDERA_TEST_CONN_VAR_A", "resolved_value_a");
641            std::env::set_var("FELDERA_TEST_CONN_VAR_B", "resolved_value_b");
642        }
643
644        let connector_config_json = json!({
645            "transport": {
646              "name": "datagen",
647              "config": {
648                "plan": [{
649                    "limit": 2,
650                    "fields": {
651                        "col1": { "values": [1, 2] },
652                        "col2": { "values": ["${env:FELDERA_TEST_CONN_VAR_A}", "${env:FELDERA_TEST_CONN_VAR_B}"] }
653                    }
654                }]
655              }
656            },
657            "format": {
658              "name": "json",
659              "config": {
660                "example": "${env:FELDERA_TEST_CONN_VAR_A}"
661              }
662            }
663        });
664
665        let connector_config: ConnectorConfig =
666            serde_json::from_value(connector_config_json).unwrap();
667
668        let dir = tempfile::tempdir().unwrap();
669        let resolved =
670            resolve_secret_references_in_connector_config(dir.path(), &connector_config).unwrap();
671
672        let TransportConfig::Datagen(datagen_input_config) = resolved.transport else {
673            unreachable!();
674        };
675        assert_eq!(
676            datagen_input_config.plan[0].fields["col2"]
677                .values
678                .as_ref()
679                .unwrap(),
680            &vec![json!("resolved_value_a"), json!("resolved_value_b")]
681        );
682
683        let Some(format_config) = resolved.format else {
684            unreachable!();
685        };
686        assert_eq!(format_config.config, json!({"example": "resolved_value_a"}));
687
688        unsafe {
689            std::env::remove_var("FELDERA_TEST_CONN_VAR_A");
690            std::env::remove_var("FELDERA_TEST_CONN_VAR_B");
691        }
692    }
693}