feldera_types/
secret_resolver.rs

1use crate::config::ConnectorConfig;
2use crate::secret_ref::{MaybeSecretRef, MaybeSecretRefParseError, SecretRef};
3use serde_yaml::{Mapping, Value};
4use std::collections::BTreeSet;
5use std::fmt::Debug;
6use std::fs;
7use std::io::ErrorKind;
8use std::path::Path;
9use thiserror::Error as ThisError;
10
11#[derive(Debug, Clone, PartialEq, Eq, ThisError)]
12pub enum SecretRefDiscoveryError {
13    #[error("{e}")]
14    MaybeSecretRefParseFailed { e: MaybeSecretRefParseError },
15    #[error("unable to serialize connector configuration: {error}")]
16    SerializationFailed { error: String },
17    #[error("unable to deserialize connector configuration (error omitted)")]
18    DeserializationFailed,
19}
20
21/// Discovers the secret references of the connector configuration.
22pub fn discover_secret_references_in_connector_config(
23    connector_config: &ConnectorConfig,
24) -> Result<BTreeSet<SecretRef>, SecretRefDiscoveryError> {
25    let yaml_value = serde_yaml::to_value(connector_config).map_err(|e| {
26        SecretRefDiscoveryError::SerializationFailed {
27            error: e.to_string(),
28        }
29    })?;
30    let mut result = BTreeSet::new();
31    if let Some(transport_config_yaml) = yaml_value.get("transport").and_then(|v| v.get("config")) {
32        result.extend(discover_secret_references_in_yaml(transport_config_yaml)?);
33    }
34    if let Some(format_config_yaml) = yaml_value.get("format").and_then(|v| v.get("config")) {
35        result.extend(discover_secret_references_in_yaml(format_config_yaml)?);
36    }
37    Ok(result)
38}
39
40/// Discovers recursively the secret references in the YAML.
41fn discover_secret_references_in_yaml(
42    value: &Value,
43) -> Result<BTreeSet<SecretRef>, SecretRefDiscoveryError> {
44    Ok(match value {
45        Value::Null => BTreeSet::new(),
46        Value::Bool(_b) => BTreeSet::new(),
47        Value::Number(_n) => BTreeSet::new(),
48        Value::String(s) => {
49            if let MaybeSecretRef::SecretRef(secret_ref) = MaybeSecretRef::new(s.clone())
50                .map_err(|e| SecretRefDiscoveryError::MaybeSecretRefParseFailed { e })?
51            {
52                BTreeSet::from([secret_ref])
53            } else {
54                BTreeSet::new()
55            }
56        }
57        Value::Sequence(seq) => {
58            let mut result = BTreeSet::new();
59            for entry in seq.iter() {
60                result.extend(discover_secret_references_in_yaml(entry)?)
61            }
62            result
63        }
64        Value::Mapping(mapping) => {
65            let mut result = BTreeSet::new();
66            for (_k, v) in mapping.into_iter() {
67                result.extend(discover_secret_references_in_yaml(v)?);
68            }
69            result
70        }
71        Value::Tagged(tag_val) => discover_secret_references_in_yaml(&tag_val.value)?,
72    })
73}
74
75/// Path of the default secrets directory.
76pub const DEFAULT_SECRETS_DIRECTORY_PATH: &str = "/etc/feldera-secrets";
77
78#[derive(Debug, Clone, PartialEq, Eq, ThisError)]
79pub enum SecretRefResolutionError {
80    #[error("{e}")]
81    MaybeSecretRefParseFailed { e: MaybeSecretRefParseError },
82    #[error("secret reference '{secret_ref}' resolution failed: file '{path}' does exist but unable to read it due to: {error_kind}")]
83    CannotReadSecretFile {
84        secret_ref: SecretRef,
85        path: String,
86        error_kind: ErrorKind,
87    },
88    #[error("secret reference '{secret_ref}' resolution failed: file '{path}' does not exist")]
89    SecretFileDoesNotExist { secret_ref: SecretRef, path: String },
90    #[error(
91        "secret reference '{secret_ref}' resolution failed: path '{path}' is not a regular file"
92    )]
93    SecretPathIsNotRegularFile { secret_ref: SecretRef, path: String },
94    #[error("secret reference '{secret_ref}' resolution failed: cannot determine if '{path}' is an existing file due to: {error_kind}")]
95    SecretFileExistenceUnknown {
96        secret_ref: SecretRef,
97        path: String,
98        error_kind: ErrorKind,
99    },
100    #[error("secret resolution led to a duplicate key in the mapping, which should not happen")]
101    DuplicateKeyInMapping,
102    #[error("unable to serialize connector configuration: {error}")]
103    SerializationFailed { error: String },
104    #[error("unable to deserialize connector configuration (error omitted)")]
105    DeserializationFailed,
106}
107
108/// Resolves the secret references of the connector configuration.
109pub fn resolve_secret_references_in_connector_config(
110    secrets_dir: &Path,
111    connector_config: &ConnectorConfig,
112) -> Result<ConnectorConfig, SecretRefResolutionError> {
113    let mut yaml_value = serde_yaml::to_value(connector_config).map_err(|e| {
114        SecretRefResolutionError::SerializationFailed {
115            error: e.to_string(),
116        }
117    })?;
118    if let Some(transport_config_yaml) = yaml_value.get("transport").and_then(|v| v.get("config")) {
119        let transport_config_yaml_resolved =
120            resolve_secret_references_in_yaml(secrets_dir, transport_config_yaml.clone())?;
121        yaml_value["transport"]["config"] = transport_config_yaml_resolved;
122    }
123    if let Some(format_config_yaml) = yaml_value.get("format").and_then(|v| v.get("config")) {
124        let format_config_yaml_resolved =
125            resolve_secret_references_in_yaml(secrets_dir, format_config_yaml.clone())?;
126        yaml_value["format"]["config"] = format_config_yaml_resolved;
127    }
128    let connector_config_resolved = serde_yaml::from_value(yaml_value)
129        .map_err(|_e| SecretRefResolutionError::DeserializationFailed)?;
130    Ok(connector_config_resolved)
131}
132
133/// Resolves recursively the secret references in the YAML.
134fn resolve_secret_references_in_yaml(
135    secrets_dir: &Path,
136    value: Value,
137) -> Result<Value, SecretRefResolutionError> {
138    Ok(match value {
139        Value::Null => Value::Null,
140        Value::Bool(b) => Value::Bool(b),
141        Value::Number(n) => Value::Number(n),
142        Value::String(s) => {
143            Value::String(resolve_potential_secret_reference_string(secrets_dir, s)?)
144        }
145        Value::Sequence(seq) => Value::Sequence(
146            seq.into_iter()
147                .map(|v| resolve_secret_references_in_yaml(secrets_dir, v))
148                .collect::<Result<Vec<Value>, SecretRefResolutionError>>()?,
149        ),
150        Value::Mapping(mapping) => {
151            let mut new_mapping = Mapping::new();
152            for (k, v) in mapping.into_iter() {
153                if let Some(_existing) =
154                    new_mapping.insert(k, resolve_secret_references_in_yaml(secrets_dir, v)?)
155                {
156                    return Err(SecretRefResolutionError::DuplicateKeyInMapping);
157                }
158            }
159            Value::Mapping(new_mapping)
160        }
161        Value::Tagged(mut tag_val) => {
162            tag_val.value = resolve_secret_references_in_yaml(secrets_dir, tag_val.value)?;
163            Value::Tagged(tag_val)
164        }
165    })
166}
167
168/// Resolves a string which can potentially be a secret reference.
169fn resolve_potential_secret_reference_string(
170    secrets_dir: &Path,
171    s: String,
172) -> Result<String, SecretRefResolutionError> {
173    match MaybeSecretRef::new(s) {
174        Ok(maybe_secret_ref) => match maybe_secret_ref {
175            MaybeSecretRef::String(plain_str) => Ok(plain_str),
176            MaybeSecretRef::SecretRef(secret_ref) => match &secret_ref {
177                SecretRef::Kubernetes { name, data_key } => {
178                    // Secret reference: `${secret:kubernetes:<name>/<data key>}`
179                    // File location: `<secrets dir>/kubernetes/<name>/<data key>`
180                    let path = Path::new(secrets_dir)
181                        .join("kubernetes")
182                        .join(name)
183                        .join(data_key);
184
185                    // If the file does not exist or is not a regular file, produce a custom error
186                    // informing of this fact rather than generically stating we are unable to read.
187                    if path.is_file() {
188                        match fs::read_to_string(&path) {
189                            Ok(content) => Ok(content),
190                            Err(e) => {
191                                Err(SecretRefResolutionError::CannotReadSecretFile {
192                                    secret_ref,
193                                    path: path.display().to_string(),
194                                    error_kind: e.kind(), // Only error kind to prevent displaying any of the secret content
195                                })
196                            }
197                        }
198                    } else {
199                        match path.try_exists() {
200                            Ok(exists) => {
201                                if exists {
202                                    Err(SecretRefResolutionError::SecretPathIsNotRegularFile {
203                                        secret_ref,
204                                        path: path.display().to_string(),
205                                    })
206                                } else {
207                                    Err(SecretRefResolutionError::SecretFileDoesNotExist {
208                                        secret_ref,
209                                        path: path.display().to_string(),
210                                    })
211                                }
212                            }
213                            Err(e) => Err(SecretRefResolutionError::SecretFileExistenceUnknown {
214                                secret_ref,
215                                path: path.display().to_string(),
216                                error_kind: e.kind(), // Only error kind to prevent displaying any of the secret content
217                            }),
218                        }
219                    }
220                }
221            },
222        },
223        Err(e) => Err(SecretRefResolutionError::MaybeSecretRefParseFailed { e }),
224    }
225}
226
227#[cfg(test)]
228mod tests {
229    use crate::config::{ConnectorConfig, TransportConfig};
230    use crate::secret_ref::{MaybeSecretRef, SecretRef};
231    use crate::secret_resolver::{
232        discover_secret_references_in_connector_config, discover_secret_references_in_yaml,
233        resolve_potential_secret_reference_string, resolve_secret_references_in_connector_config,
234        resolve_secret_references_in_yaml, SecretRefResolutionError,
235    };
236    use serde_json::json;
237    use std::collections::BTreeSet;
238    use std::fs::{create_dir_all, File};
239    use std::io::Write;
240
241    #[test]
242    fn resolve_kubernetes_secret_success() {
243        // Create file at: <tempdir>/kubernetes/a/b
244        let dir = tempfile::tempdir().unwrap();
245        let dir_path = dir.path();
246        let name_dir = &dir_path.join("kubernetes").join("a");
247        create_dir_all(name_dir).unwrap();
248        let data_key_file_path = &name_dir.join("b");
249        let mut file = File::create(data_key_file_path).unwrap();
250        file.write_all(b"example").unwrap();
251
252        // Resolve secret: ${secret:kubernetes:a/b}
253        assert_eq!(
254            resolve_potential_secret_reference_string(
255                dir_path,
256                "${secret:kubernetes:a/b}".to_string()
257            )
258            .unwrap(),
259            "example"
260        );
261    }
262
263    #[test]
264    fn resolve_kubernetes_secret_max_size_success() {
265        // Create file at: <tempdir>/kubernetes/a.../b...
266        let dir = tempfile::tempdir().unwrap();
267        let dir_path = dir.path();
268        let name_dir = &dir_path.join("kubernetes").join("a".repeat(63));
269        create_dir_all(name_dir).unwrap();
270        let data_key_file_path = &name_dir.join("b".repeat(255));
271        let mut file = File::create(data_key_file_path).unwrap();
272        file.write_all(b"example").unwrap();
273
274        // Resolve secret: ${secret:kubernetes:a.../b...}
275        assert_eq!(
276            resolve_potential_secret_reference_string(
277                dir_path,
278                format!(
279                    "${{secret:kubernetes:{}/{}}}",
280                    "a".repeat(63),
281                    "b".repeat(255)
282                )
283            )
284            .unwrap(),
285            "example"
286        );
287    }
288
289    #[test]
290    fn resolve_kubernetes_secret_path_not_a_file() {
291        // Create directory at: <tempdir>/kubernetes/a/b
292        let dir = tempfile::tempdir().unwrap();
293        let dir_path = dir.path();
294        let data_key_file_path = &dir_path.join("kubernetes").join("a").join("b");
295        create_dir_all(data_key_file_path).unwrap();
296
297        // Resolve secret: ${secret:kubernetes:a/b}
298        let MaybeSecretRef::SecretRef(secret_ref) =
299            MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
300        else {
301            unreachable!();
302        };
303        assert_eq!(
304            resolve_potential_secret_reference_string(
305                dir_path,
306                "${secret:kubernetes:a/b}".to_string()
307            )
308            .unwrap_err(),
309            SecretRefResolutionError::SecretPathIsNotRegularFile {
310                secret_ref,
311                path: data_key_file_path.display().to_string()
312            }
313        );
314    }
315
316    #[test]
317    fn resolve_kubernetes_secret_file_does_not_exist() {
318        // Do not create file at: <tempdir>/kubernetes/a/b
319        let dir = tempfile::tempdir().unwrap();
320        let dir_path = dir.path();
321        let name_dir = &dir_path.join("kubernetes").join("a");
322        create_dir_all(name_dir).unwrap();
323        let data_key_file_path = &name_dir.join("b");
324
325        // Resolve secret: ${secret:kubernetes:a/b}
326        let MaybeSecretRef::SecretRef(secret_ref) =
327            MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
328        else {
329            unreachable!();
330        };
331        assert_eq!(
332            resolve_potential_secret_reference_string(
333                dir_path,
334                "${secret:kubernetes:a/b}".to_string()
335            )
336            .unwrap_err(),
337            SecretRefResolutionError::SecretFileDoesNotExist {
338                secret_ref,
339                path: data_key_file_path.display().to_string()
340            }
341        );
342    }
343
344    #[test]
345    fn resolve_secret_ref_cannot_read_file() {
346        // Create file with non UTF-8 content at: <tempdir>/kubernetes/a/b
347        let dir = tempfile::tempdir().unwrap();
348        let dir_path = dir.path();
349        let name_dir = &dir_path.join("kubernetes").join("a");
350        create_dir_all(name_dir).unwrap();
351        let data_key_file_path = &name_dir.join("b");
352        let mut file = File::create(data_key_file_path).unwrap();
353        file.write_all(&[255, 255]).unwrap();
354
355        // Resolve secret: ${secret:kubernetes:a/b}
356        let MaybeSecretRef::SecretRef(secret_ref) =
357            MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
358        else {
359            unreachable!();
360        };
361        assert_eq!(
362            resolve_potential_secret_reference_string(
363                dir_path,
364                "${secret:kubernetes:a/b}".to_string()
365            )
366            .unwrap_err(),
367            SecretRefResolutionError::CannotReadSecretFile {
368                secret_ref,
369                path: data_key_file_path.display().to_string(),
370                error_kind: std::io::ErrorKind::InvalidData
371            }
372        );
373    }
374
375    #[test]
376    fn secret_resolution_yaml() {
377        // Create file at:
378        // - <tempdir>/kubernetes/a/b
379        // - <tempdir>/kubernetes/c/d
380        let dir = tempfile::tempdir().unwrap();
381        let dir_path = dir.path();
382        let name_dir = &dir_path.join("kubernetes").join("a");
383        create_dir_all(name_dir).unwrap();
384        let data_key_file_path = &name_dir.join("b");
385        let mut file = File::create(data_key_file_path).unwrap();
386        file.write_all(b"example1").unwrap();
387        let name_dir = &dir_path.join("kubernetes").join("c");
388        create_dir_all(name_dir).unwrap();
389        let data_key_file_path = &name_dir.join("d");
390        let mut file = File::create(data_key_file_path).unwrap();
391        file.write_all(b"example2").unwrap();
392
393        // Resolve secrets in YAML
394        let input = r#"
395        a: null
396        b: false,
397        c: 123
398        d: "val1"
399        e: [1, "2"]
400        f:
401          f1: 1
402          f2: "val2"
403        g: !str "val3"
404        "${secret:kubernetes:a/b}": 123
405        "${secret:kubernetes:e/f}": 456
406        s1: "${secret:kubernetes:a/b}"
407        s2: ["${secret:kubernetes:a/b}"]
408        s3:
409          s31: "${secret:kubernetes:a/b}"
410          s32: ["${secret:kubernetes:a/b}", "${secret:kubernetes:c/d}"]
411        s4: !str "${secret:kubernetes:c/d}"
412        "#;
413        let expectation = r#"
414        a: null
415        b: false,
416        c: 123
417        d: "val1"
418        e: [1, "2"]
419        f:
420          f1: 1
421          f2: "val2"
422        g: !str "val3"
423        "${secret:kubernetes:a/b}": 123
424        "${secret:kubernetes:e/f}": 456
425        s1: "example1"
426        s2: ["example1"]
427        s3:
428          s31: "example1"
429          s32: ["example1", "example2"]
430        s4: !str "example2"
431        "#;
432        assert_eq!(
433            resolve_secret_references_in_yaml(dir_path, serde_yaml::from_str(input).unwrap())
434                .unwrap(),
435            serde_yaml::from_str::<serde_yaml::Value>(expectation).unwrap()
436        );
437        assert_eq!(
438            discover_secret_references_in_yaml(&serde_yaml::from_str(input).unwrap()).unwrap(),
439            BTreeSet::from([
440                SecretRef::Kubernetes {
441                    name: "a".to_string(),
442                    data_key: "b".to_string(),
443                },
444                SecretRef::Kubernetes {
445                    name: "c".to_string(),
446                    data_key: "d".to_string(),
447                },
448            ])
449        );
450        assert_eq!(
451            discover_secret_references_in_yaml(&serde_yaml::from_str(expectation).unwrap())
452                .unwrap(),
453            BTreeSet::from([])
454        );
455    }
456
457    #[test]
458    fn secret_resolution_connector_config() {
459        // Create file at:
460        // - <tempdir>/kubernetes/a/b
461        // - <tempdir>/kubernetes/c/d
462        let dir = tempfile::tempdir().unwrap();
463        let dir_path = dir.path();
464        let name_dir = &dir_path.join("kubernetes").join("a");
465        create_dir_all(name_dir).unwrap();
466        let data_key_file_path = &name_dir.join("b");
467        let mut file = File::create(data_key_file_path).unwrap();
468        file.write_all(b"example1").unwrap();
469        let name_dir = &dir_path.join("kubernetes").join("c");
470        create_dir_all(name_dir).unwrap();
471        let data_key_file_path = &name_dir.join("d");
472        let mut file = File::create(data_key_file_path).unwrap();
473        file.write_all(b"example2").unwrap();
474
475        // Resolve a connector configuration
476        let connector_config_json = json!({
477            "transport": {
478              "name": "datagen",
479              "config": {
480                "plan": [{
481                    "limit": 2,
482                    "fields": {
483                        "col1": { "values": [1, 2] },
484                        "col2": { "values": ["${secret:kubernetes:a/b}", "${secret:kubernetes:c/d}"] }
485                    }
486                }]
487              }
488            },
489            "format": {
490              "name": "json",
491              "config": {
492                "example": "${secret:kubernetes:a/b}"
493              }
494            },
495            "index": "${secret:kubernetes:e/f}"
496        });
497        let connector_config: ConnectorConfig =
498            serde_json::from_value(connector_config_json).unwrap();
499        assert_eq!(
500            discover_secret_references_in_connector_config(&connector_config).unwrap(),
501            BTreeSet::from([
502                SecretRef::Kubernetes {
503                    name: "a".to_string(),
504                    data_key: "b".to_string(),
505                },
506                SecretRef::Kubernetes {
507                    name: "c".to_string(),
508                    data_key: "d".to_string(),
509                },
510            ])
511        );
512        let connector_config_secrets_resolved =
513            resolve_secret_references_in_connector_config(dir_path, &connector_config).unwrap();
514
515        // Transport configuration resolution
516        let TransportConfig::Datagen(datagen_input_config) =
517            connector_config_secrets_resolved.transport
518        else {
519            unreachable!();
520        };
521        assert_eq!(
522            datagen_input_config.plan[0].fields["col2"]
523                .values
524                .as_ref()
525                .unwrap(),
526            &vec![json!("example1"), json!("example2")]
527        );
528
529        // Format configuration resolution
530        let Some(format_config) = connector_config_secrets_resolved.format else {
531            unreachable!();
532        };
533        let mut expected_mapping = serde_yaml::Mapping::new();
534        expected_mapping.insert(
535            serde_yaml::Value::String("example".to_string()),
536            serde_yaml::Value::String("example1".to_string()),
537        );
538        assert_eq!(
539            format_config.config,
540            serde_yaml::Value::Mapping(expected_mapping)
541        );
542
543        // Other fields should not be resolved
544        assert_eq!(
545            connector_config.index,
546            Some("${secret:kubernetes:e/f}".to_string())
547        );
548    }
549}