feldera_types/
secret_resolver.rs

1use crate::config::ConnectorConfig;
2use crate::secret_ref::{MaybeSecretRef, MaybeSecretRefParseError, SecretRef};
3use serde::de::DeserializeOwned;
4use serde::Serialize;
5use serde_yaml::{Mapping, Value};
6use std::collections::BTreeSet;
7use std::fmt::Debug;
8use std::fs;
9use std::io::ErrorKind;
10use std::path::Path;
11use thiserror::Error as ThisError;
12
13#[derive(Debug, Clone, PartialEq, Eq, ThisError)]
14pub enum SecretRefDiscoveryError {
15    #[error("{e}")]
16    MaybeSecretRefParseFailed { e: MaybeSecretRefParseError },
17    #[error("unable to serialize connector configuration: {error}")]
18    SerializationFailed { error: String },
19    #[error("unable to deserialize connector configuration (error omitted)")]
20    DeserializationFailed,
21}
22
23/// Discovers the secret references of the connector configuration.
24pub fn discover_secret_references_in_connector_config(
25    connector_config: &ConnectorConfig,
26) -> Result<BTreeSet<SecretRef>, SecretRefDiscoveryError> {
27    let yaml_value = serde_yaml::to_value(connector_config).map_err(|e| {
28        SecretRefDiscoveryError::SerializationFailed {
29            error: e.to_string(),
30        }
31    })?;
32    let mut result = BTreeSet::new();
33    if let Some(transport_config_yaml) = yaml_value.get("transport").and_then(|v| v.get("config")) {
34        result.extend(discover_secret_references_in_yaml(transport_config_yaml)?);
35    }
36    if let Some(format_config_yaml) = yaml_value.get("format").and_then(|v| v.get("config")) {
37        result.extend(discover_secret_references_in_yaml(format_config_yaml)?);
38    }
39    Ok(result)
40}
41
42/// Discovers recursively the secret references in the YAML.
43fn discover_secret_references_in_yaml(
44    value: &Value,
45) -> Result<BTreeSet<SecretRef>, SecretRefDiscoveryError> {
46    Ok(match value {
47        Value::Null => BTreeSet::new(),
48        Value::Bool(_b) => BTreeSet::new(),
49        Value::Number(_n) => BTreeSet::new(),
50        Value::String(s) => {
51            if let MaybeSecretRef::SecretRef(secret_ref) = MaybeSecretRef::new(s.clone())
52                .map_err(|e| SecretRefDiscoveryError::MaybeSecretRefParseFailed { e })?
53            {
54                BTreeSet::from([secret_ref])
55            } else {
56                BTreeSet::new()
57            }
58        }
59        Value::Sequence(seq) => {
60            let mut result = BTreeSet::new();
61            for entry in seq.iter() {
62                result.extend(discover_secret_references_in_yaml(entry)?)
63            }
64            result
65        }
66        Value::Mapping(mapping) => {
67            let mut result = BTreeSet::new();
68            for (_k, v) in mapping.into_iter() {
69                result.extend(discover_secret_references_in_yaml(v)?);
70            }
71            result
72        }
73        Value::Tagged(tag_val) => discover_secret_references_in_yaml(&tag_val.value)?,
74    })
75}
76
77/// Path of the default secrets directory.
78pub fn default_secrets_directory() -> &'static Path {
79    Path::new("/etc/feldera-secrets")
80}
81
82#[derive(Debug, Clone, PartialEq, Eq, ThisError)]
83pub enum SecretRefResolutionError {
84    #[error("{e}")]
85    MaybeSecretRefParseFailed { e: MaybeSecretRefParseError },
86    #[error("secret reference '{secret_ref}' resolution failed: file '{path}' does exist but unable to read it due to: {error_kind}")]
87    CannotReadSecretFile {
88        secret_ref: SecretRef,
89        path: String,
90        error_kind: ErrorKind,
91    },
92    #[error("secret reference '{secret_ref}' resolution failed: file '{path}' does not exist")]
93    SecretFileDoesNotExist { secret_ref: SecretRef, path: String },
94    #[error(
95        "secret reference '{secret_ref}' resolution failed: path '{path}' is not a regular file"
96    )]
97    SecretPathIsNotRegularFile { secret_ref: SecretRef, path: String },
98    #[error("secret reference '{secret_ref}' resolution failed: cannot determine if '{path}' is an existing file due to: {error_kind}")]
99    SecretFileExistenceUnknown {
100        secret_ref: SecretRef,
101        path: String,
102        error_kind: ErrorKind,
103    },
104    #[error("secret resolution led to a duplicate key in the mapping, which should not happen")]
105    DuplicateKeyInMapping,
106    #[error("unable to serialize connector configuration: {error}")]
107    SerializationFailed { error: String },
108    #[error("unable to deserialize connector configuration (error omitted)")]
109    DeserializationFailed,
110}
111
112/// Resolves the secret references of the connector configuration.
113pub fn resolve_secret_references_in_connector_config(
114    secrets_dir: &Path,
115    connector_config: &ConnectorConfig,
116) -> Result<ConnectorConfig, SecretRefResolutionError> {
117    let connector_config = connector_config.clone();
118    Ok(ConnectorConfig {
119        transport: resolve_secret_references_via_yaml(secrets_dir, &connector_config.transport)?,
120        format: resolve_secret_references_via_yaml(secrets_dir, &connector_config.format)?,
121        ..connector_config
122    })
123}
124
125/// Resolves secret references in `value`.
126pub fn resolve_secret_references_via_yaml<T>(
127    secrets_dir: &Path,
128    value: &T,
129) -> Result<T, SecretRefResolutionError>
130where
131    T: Serialize + DeserializeOwned,
132{
133    let yaml_value =
134        serde_yaml::to_value(value).map_err(|e| SecretRefResolutionError::SerializationFailed {
135            error: e.to_string(),
136        })?;
137    let resolved_yaml = resolve_secret_references_in_yaml(secrets_dir, yaml_value)?;
138    serde_yaml::from_value(resolved_yaml)
139        .map_err(|_e| SecretRefResolutionError::DeserializationFailed)
140}
141
142/// Resolves recursively the secret references in the YAML.
143fn resolve_secret_references_in_yaml(
144    secrets_dir: &Path,
145    value: Value,
146) -> Result<Value, SecretRefResolutionError> {
147    Ok(match value {
148        Value::Null => Value::Null,
149        Value::Bool(b) => Value::Bool(b),
150        Value::Number(n) => Value::Number(n),
151        Value::String(s) => {
152            Value::String(resolve_potential_secret_reference_string(secrets_dir, s)?)
153        }
154        Value::Sequence(seq) => Value::Sequence(
155            seq.into_iter()
156                .map(|v| resolve_secret_references_in_yaml(secrets_dir, v))
157                .collect::<Result<Vec<Value>, SecretRefResolutionError>>()?,
158        ),
159        Value::Mapping(mapping) => {
160            let mut new_mapping = Mapping::new();
161            for (k, v) in mapping.into_iter() {
162                if let Some(_existing) =
163                    new_mapping.insert(k, resolve_secret_references_in_yaml(secrets_dir, v)?)
164                {
165                    return Err(SecretRefResolutionError::DuplicateKeyInMapping);
166                }
167            }
168            Value::Mapping(new_mapping)
169        }
170        Value::Tagged(mut tag_val) => {
171            tag_val.value = resolve_secret_references_in_yaml(secrets_dir, tag_val.value)?;
172            Value::Tagged(tag_val)
173        }
174    })
175}
176
177/// Resolves a string which can potentially be a secret reference.
178fn resolve_potential_secret_reference_string(
179    secrets_dir: &Path,
180    s: String,
181) -> Result<String, SecretRefResolutionError> {
182    match MaybeSecretRef::new(s) {
183        Ok(maybe_secret_ref) => match maybe_secret_ref {
184            MaybeSecretRef::String(plain_str) => Ok(plain_str),
185            MaybeSecretRef::SecretRef(secret_ref) => match &secret_ref {
186                SecretRef::Kubernetes { name, data_key } => {
187                    // Secret reference: `${secret:kubernetes:<name>/<data key>}`
188                    // File location: `<secrets dir>/kubernetes/<name>/<data key>`
189                    let path = Path::new(secrets_dir)
190                        .join("kubernetes")
191                        .join(name)
192                        .join(data_key);
193
194                    // If the file does not exist or is not a regular file, produce a custom error
195                    // informing of this fact rather than generically stating we are unable to read.
196                    if path.is_file() {
197                        match fs::read_to_string(&path) {
198                            Ok(content) => Ok(content),
199                            Err(e) => {
200                                Err(SecretRefResolutionError::CannotReadSecretFile {
201                                    secret_ref,
202                                    path: path.display().to_string(),
203                                    error_kind: e.kind(), // Only error kind to prevent displaying any of the secret content
204                                })
205                            }
206                        }
207                    } else {
208                        match path.try_exists() {
209                            Ok(exists) => {
210                                if exists {
211                                    Err(SecretRefResolutionError::SecretPathIsNotRegularFile {
212                                        secret_ref,
213                                        path: path.display().to_string(),
214                                    })
215                                } else {
216                                    Err(SecretRefResolutionError::SecretFileDoesNotExist {
217                                        secret_ref,
218                                        path: path.display().to_string(),
219                                    })
220                                }
221                            }
222                            Err(e) => Err(SecretRefResolutionError::SecretFileExistenceUnknown {
223                                secret_ref,
224                                path: path.display().to_string(),
225                                error_kind: e.kind(), // Only error kind to prevent displaying any of the secret content
226                            }),
227                        }
228                    }
229                }
230            },
231        },
232        Err(e) => Err(SecretRefResolutionError::MaybeSecretRefParseFailed { e }),
233    }
234}
235
236#[cfg(test)]
237mod tests {
238    use crate::config::{ConnectorConfig, TransportConfig};
239    use crate::secret_ref::{MaybeSecretRef, SecretRef};
240    use crate::secret_resolver::{
241        discover_secret_references_in_connector_config, discover_secret_references_in_yaml,
242        resolve_potential_secret_reference_string, resolve_secret_references_in_connector_config,
243        resolve_secret_references_in_yaml, SecretRefResolutionError,
244    };
245    use serde_json::json;
246    use std::collections::BTreeSet;
247    use std::fs::{create_dir_all, File};
248    use std::io::Write;
249
250    #[test]
251    fn resolve_kubernetes_secret_success() {
252        // Create file at: <tempdir>/kubernetes/a/b
253        let dir = tempfile::tempdir().unwrap();
254        let dir_path = dir.path();
255        let name_dir = &dir_path.join("kubernetes").join("a");
256        create_dir_all(name_dir).unwrap();
257        let data_key_file_path = &name_dir.join("b");
258        let mut file = File::create(data_key_file_path).unwrap();
259        file.write_all(b"example").unwrap();
260
261        // Resolve secret: ${secret:kubernetes:a/b}
262        assert_eq!(
263            resolve_potential_secret_reference_string(
264                dir_path,
265                "${secret:kubernetes:a/b}".to_string()
266            )
267            .unwrap(),
268            "example"
269        );
270    }
271
272    #[test]
273    fn resolve_kubernetes_secret_max_size_success() {
274        // Create file at: <tempdir>/kubernetes/a.../b...
275        let dir = tempfile::tempdir().unwrap();
276        let dir_path = dir.path();
277        let name_dir = &dir_path.join("kubernetes").join("a".repeat(63));
278        create_dir_all(name_dir).unwrap();
279        let data_key_file_path = &name_dir.join("b".repeat(255));
280        let mut file = File::create(data_key_file_path).unwrap();
281        file.write_all(b"example").unwrap();
282
283        // Resolve secret: ${secret:kubernetes:a.../b...}
284        assert_eq!(
285            resolve_potential_secret_reference_string(
286                dir_path,
287                format!(
288                    "${{secret:kubernetes:{}/{}}}",
289                    "a".repeat(63),
290                    "b".repeat(255)
291                )
292            )
293            .unwrap(),
294            "example"
295        );
296    }
297
298    #[test]
299    fn resolve_kubernetes_secret_path_not_a_file() {
300        // Create directory at: <tempdir>/kubernetes/a/b
301        let dir = tempfile::tempdir().unwrap();
302        let dir_path = dir.path();
303        let data_key_file_path = &dir_path.join("kubernetes").join("a").join("b");
304        create_dir_all(data_key_file_path).unwrap();
305
306        // Resolve secret: ${secret:kubernetes:a/b}
307        let MaybeSecretRef::SecretRef(secret_ref) =
308            MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
309        else {
310            unreachable!();
311        };
312        assert_eq!(
313            resolve_potential_secret_reference_string(
314                dir_path,
315                "${secret:kubernetes:a/b}".to_string()
316            )
317            .unwrap_err(),
318            SecretRefResolutionError::SecretPathIsNotRegularFile {
319                secret_ref,
320                path: data_key_file_path.display().to_string()
321            }
322        );
323    }
324
325    #[test]
326    fn resolve_kubernetes_secret_file_does_not_exist() {
327        // Do not create file at: <tempdir>/kubernetes/a/b
328        let dir = tempfile::tempdir().unwrap();
329        let dir_path = dir.path();
330        let name_dir = &dir_path.join("kubernetes").join("a");
331        create_dir_all(name_dir).unwrap();
332        let data_key_file_path = &name_dir.join("b");
333
334        // Resolve secret: ${secret:kubernetes:a/b}
335        let MaybeSecretRef::SecretRef(secret_ref) =
336            MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
337        else {
338            unreachable!();
339        };
340        assert_eq!(
341            resolve_potential_secret_reference_string(
342                dir_path,
343                "${secret:kubernetes:a/b}".to_string()
344            )
345            .unwrap_err(),
346            SecretRefResolutionError::SecretFileDoesNotExist {
347                secret_ref,
348                path: data_key_file_path.display().to_string()
349            }
350        );
351    }
352
353    #[test]
354    fn resolve_secret_ref_cannot_read_file() {
355        // Create file with non UTF-8 content at: <tempdir>/kubernetes/a/b
356        let dir = tempfile::tempdir().unwrap();
357        let dir_path = dir.path();
358        let name_dir = &dir_path.join("kubernetes").join("a");
359        create_dir_all(name_dir).unwrap();
360        let data_key_file_path = &name_dir.join("b");
361        let mut file = File::create(data_key_file_path).unwrap();
362        file.write_all(&[255, 255]).unwrap();
363
364        // Resolve secret: ${secret:kubernetes:a/b}
365        let MaybeSecretRef::SecretRef(secret_ref) =
366            MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
367        else {
368            unreachable!();
369        };
370        assert_eq!(
371            resolve_potential_secret_reference_string(
372                dir_path,
373                "${secret:kubernetes:a/b}".to_string()
374            )
375            .unwrap_err(),
376            SecretRefResolutionError::CannotReadSecretFile {
377                secret_ref,
378                path: data_key_file_path.display().to_string(),
379                error_kind: std::io::ErrorKind::InvalidData
380            }
381        );
382    }
383
384    #[test]
385    fn secret_resolution_yaml() {
386        // Create file at:
387        // - <tempdir>/kubernetes/a/b
388        // - <tempdir>/kubernetes/c/d
389        let dir = tempfile::tempdir().unwrap();
390        let dir_path = dir.path();
391        let name_dir = &dir_path.join("kubernetes").join("a");
392        create_dir_all(name_dir).unwrap();
393        let data_key_file_path = &name_dir.join("b");
394        let mut file = File::create(data_key_file_path).unwrap();
395        file.write_all(b"example1").unwrap();
396        let name_dir = &dir_path.join("kubernetes").join("c");
397        create_dir_all(name_dir).unwrap();
398        let data_key_file_path = &name_dir.join("d");
399        let mut file = File::create(data_key_file_path).unwrap();
400        file.write_all(b"example2").unwrap();
401
402        // Resolve secrets in YAML
403        let input = r#"
404        a: null
405        b: false,
406        c: 123
407        d: "val1"
408        e: [1, "2"]
409        f:
410          f1: 1
411          f2: "val2"
412        g: !str "val3"
413        "${secret:kubernetes:a/b}": 123
414        "${secret:kubernetes:e/f}": 456
415        s1: "${secret:kubernetes:a/b}"
416        s2: ["${secret:kubernetes:a/b}"]
417        s3:
418          s31: "${secret:kubernetes:a/b}"
419          s32: ["${secret:kubernetes:a/b}", "${secret:kubernetes:c/d}"]
420        s4: !str "${secret:kubernetes:c/d}"
421        "#;
422        let expectation = r#"
423        a: null
424        b: false,
425        c: 123
426        d: "val1"
427        e: [1, "2"]
428        f:
429          f1: 1
430          f2: "val2"
431        g: !str "val3"
432        "${secret:kubernetes:a/b}": 123
433        "${secret:kubernetes:e/f}": 456
434        s1: "example1"
435        s2: ["example1"]
436        s3:
437          s31: "example1"
438          s32: ["example1", "example2"]
439        s4: !str "example2"
440        "#;
441        assert_eq!(
442            resolve_secret_references_in_yaml(dir_path, serde_yaml::from_str(input).unwrap())
443                .unwrap(),
444            serde_yaml::from_str::<serde_yaml::Value>(expectation).unwrap()
445        );
446        assert_eq!(
447            discover_secret_references_in_yaml(&serde_yaml::from_str(input).unwrap()).unwrap(),
448            BTreeSet::from([
449                SecretRef::Kubernetes {
450                    name: "a".to_string(),
451                    data_key: "b".to_string(),
452                },
453                SecretRef::Kubernetes {
454                    name: "c".to_string(),
455                    data_key: "d".to_string(),
456                },
457            ])
458        );
459        assert_eq!(
460            discover_secret_references_in_yaml(&serde_yaml::from_str(expectation).unwrap())
461                .unwrap(),
462            BTreeSet::from([])
463        );
464    }
465
466    #[test]
467    fn secret_resolution_connector_config() {
468        // Create file at:
469        // - <tempdir>/kubernetes/a/b
470        // - <tempdir>/kubernetes/c/d
471        let dir = tempfile::tempdir().unwrap();
472        let dir_path = dir.path();
473        let name_dir = &dir_path.join("kubernetes").join("a");
474        create_dir_all(name_dir).unwrap();
475        let data_key_file_path = &name_dir.join("b");
476        let mut file = File::create(data_key_file_path).unwrap();
477        file.write_all(b"example1").unwrap();
478        let name_dir = &dir_path.join("kubernetes").join("c");
479        create_dir_all(name_dir).unwrap();
480        let data_key_file_path = &name_dir.join("d");
481        let mut file = File::create(data_key_file_path).unwrap();
482        file.write_all(b"example2").unwrap();
483
484        // Resolve a connector configuration
485        let connector_config_json = json!({
486            "transport": {
487              "name": "datagen",
488              "config": {
489                "plan": [{
490                    "limit": 2,
491                    "fields": {
492                        "col1": { "values": [1, 2] },
493                        "col2": { "values": ["${secret:kubernetes:a/b}", "${secret:kubernetes:c/d}"] }
494                    }
495                }]
496              }
497            },
498            "format": {
499              "name": "json",
500              "config": {
501                "example": "${secret:kubernetes:a/b}"
502              }
503            },
504            "index": "${secret:kubernetes:e/f}"
505        });
506        let connector_config: ConnectorConfig =
507            serde_json::from_value(connector_config_json).unwrap();
508        assert_eq!(
509            discover_secret_references_in_connector_config(&connector_config).unwrap(),
510            BTreeSet::from([
511                SecretRef::Kubernetes {
512                    name: "a".to_string(),
513                    data_key: "b".to_string(),
514                },
515                SecretRef::Kubernetes {
516                    name: "c".to_string(),
517                    data_key: "d".to_string(),
518                },
519            ])
520        );
521        let connector_config_secrets_resolved =
522            resolve_secret_references_in_connector_config(dir_path, &connector_config).unwrap();
523
524        // Transport configuration resolution
525        let TransportConfig::Datagen(datagen_input_config) =
526            connector_config_secrets_resolved.transport
527        else {
528            unreachable!();
529        };
530        assert_eq!(
531            datagen_input_config.plan[0].fields["col2"]
532                .values
533                .as_ref()
534                .unwrap(),
535            &vec![json!("example1"), json!("example2")]
536        );
537
538        // Format configuration resolution
539        let Some(format_config) = connector_config_secrets_resolved.format else {
540            unreachable!();
541        };
542        let mut expected_mapping = serde_yaml::Mapping::new();
543        expected_mapping.insert(
544            serde_yaml::Value::String("example".to_string()),
545            serde_yaml::Value::String("example1".to_string()),
546        );
547        assert_eq!(
548            format_config.config,
549            serde_yaml::Value::Mapping(expected_mapping)
550        );
551
552        // Other fields should not be resolved
553        assert_eq!(
554            connector_config.index,
555            Some("${secret:kubernetes:e/f}".to_string())
556        );
557    }
558}