Skip to main content

feldera_types/
secret_resolver.rs

1use crate::config::ConnectorConfig;
2use crate::secret_ref::{MaybeSecretRef, MaybeSecretRefParseError, SecretRef};
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5use serde_json::{Map, Value};
6use std::collections::BTreeSet;
7use std::fmt::Debug;
8use std::fs;
9use std::io::ErrorKind;
10use std::path::Path;
11use thiserror::Error as ThisError;
12
13#[derive(Debug, Clone, PartialEq, Eq, ThisError)]
14pub enum SecretRefDiscoveryError {
15    #[error("{e}")]
16    MaybeSecretRefParseFailed { e: MaybeSecretRefParseError },
17    #[error("unable to serialize connector configuration: {error}")]
18    SerializationFailed { error: String },
19    #[error("unable to deserialize connector configuration (error omitted)")]
20    DeserializationFailed,
21}
22
23/// Discovers the secret references of the connector configuration.
24pub fn discover_secret_references_in_connector_config(
25    connector_config: &serde_json::Value,
26) -> Result<BTreeSet<SecretRef>, SecretRefDiscoveryError> {
27    let mut result = BTreeSet::new();
28    if let Some(transport_config_json) = connector_config
29        .get("transport")
30        .and_then(|v| v.get("config"))
31    {
32        result.extend(discover_secret_references_in_json(transport_config_json)?);
33    }
34    if let Some(format_config_json) = connector_config.get("format").and_then(|v| v.get("config")) {
35        result.extend(discover_secret_references_in_json(format_config_json)?);
36    }
37    Ok(result)
38}
39
40/// Discovers recursively the secret references in the JSON.
41fn discover_secret_references_in_json(
42    value: &Value,
43) -> Result<BTreeSet<SecretRef>, SecretRefDiscoveryError> {
44    Ok(match value {
45        Value::Null => BTreeSet::new(),
46        Value::Bool(_b) => BTreeSet::new(),
47        Value::Number(_n) => BTreeSet::new(),
48        Value::String(s) => {
49            if let MaybeSecretRef::SecretRef(secret_ref) = MaybeSecretRef::new(s.clone())
50                .map_err(|e| SecretRefDiscoveryError::MaybeSecretRefParseFailed { e })?
51            {
52                BTreeSet::from([secret_ref])
53            } else {
54                BTreeSet::new()
55            }
56        }
57        Value::Array(seq) => {
58            let mut result = BTreeSet::new();
59            for entry in seq.iter() {
60                result.extend(discover_secret_references_in_json(entry)?)
61            }
62            result
63        }
64        Value::Object(mapping) => {
65            let mut result = BTreeSet::new();
66            for (_k, v) in mapping.into_iter() {
67                result.extend(discover_secret_references_in_json(v)?);
68            }
69            result
70        }
71    })
72}
73
74/// Path of the default secrets directory.
75pub fn default_secrets_directory() -> &'static Path {
76    Path::new("/etc/feldera-secrets")
77}
78
79#[derive(Debug, Clone, PartialEq, Eq, ThisError)]
80pub enum SecretRefResolutionError {
81    #[error("{e}")]
82    MaybeSecretRefParseFailed { e: MaybeSecretRefParseError },
83    #[error(
84        "secret reference '{secret_ref}' resolution failed: file '{path}' does exist but unable to read it due to: {error_kind}"
85    )]
86    CannotReadSecretFile {
87        secret_ref: SecretRef,
88        path: String,
89        error_kind: ErrorKind,
90    },
91    #[error("secret reference '{secret_ref}' resolution failed: file '{path}' does not exist")]
92    SecretFileDoesNotExist { secret_ref: SecretRef, path: String },
93    #[error(
94        "secret reference '{secret_ref}' resolution failed: path '{path}' is not a regular file"
95    )]
96    SecretPathIsNotRegularFile { secret_ref: SecretRef, path: String },
97    #[error(
98        "secret reference '{secret_ref}' resolution failed: cannot determine if '{path}' is an existing file due to: {error_kind}"
99    )]
100    SecretFileExistenceUnknown {
101        secret_ref: SecretRef,
102        path: String,
103        error_kind: ErrorKind,
104    },
105    #[error("secret resolution led to a duplicate key in the mapping, which should not happen")]
106    DuplicateKeyInMapping,
107    #[error("unable to serialize connector configuration: {error}")]
108    SerializationFailed { error: String },
109    #[error("unable to deserialize connector configuration (error omitted)")]
110    DeserializationFailed,
111}
112
113/// Resolves the secret references of the connector configuration.
114pub fn resolve_secret_references_in_connector_config(
115    secrets_dir: &Path,
116    connector_config: &ConnectorConfig,
117) -> Result<ConnectorConfig, SecretRefResolutionError> {
118    let connector_config = connector_config.clone();
119    Ok(ConnectorConfig {
120        transport: resolve_secret_references_via_json(secrets_dir, &connector_config.transport)?,
121        format: resolve_secret_references_via_json(secrets_dir, &connector_config.format)?,
122        ..connector_config
123    })
124}
125
126/// Resolves secret references in `value`.
127pub fn resolve_secret_references_via_json<T>(
128    secrets_dir: &Path,
129    value: &T,
130) -> Result<T, SecretRefResolutionError>
131where
132    T: Serialize + DeserializeOwned,
133{
134    let json_value =
135        serde_json::to_value(value).map_err(|e| SecretRefResolutionError::SerializationFailed {
136            error: e.to_string(),
137        })?;
138    let resolved_json = resolve_secret_references_in_json(secrets_dir, json_value)?;
139    serde_json::from_value(resolved_json)
140        .map_err(|_e| SecretRefResolutionError::DeserializationFailed)
141}
142
143/// Resolves recursively the secret references in the JSON.
144fn resolve_secret_references_in_json(
145    secrets_dir: &Path,
146    value: Value,
147) -> Result<Value, SecretRefResolutionError> {
148    Ok(match value {
149        Value::Null => Value::Null,
150        Value::Bool(b) => Value::Bool(b),
151        Value::Number(n) => Value::Number(n),
152        Value::String(s) => {
153            Value::String(resolve_potential_secret_reference_string(secrets_dir, s)?)
154        }
155        Value::Array(seq) => Value::Array(
156            seq.into_iter()
157                .map(|v| resolve_secret_references_in_json(secrets_dir, v))
158                .collect::<Result<Vec<Value>, SecretRefResolutionError>>()?,
159        ),
160        Value::Object(mapping) => {
161            let mut new_mapping = Map::new();
162            for (k, v) in mapping.into_iter() {
163                if let Some(_existing) =
164                    new_mapping.insert(k, resolve_secret_references_in_json(secrets_dir, v)?)
165                {
166                    return Err(SecretRefResolutionError::DuplicateKeyInMapping);
167                }
168            }
169            Value::Object(new_mapping)
170        }
171    })
172}
173
174/// Resolves a string which can potentially be a secret reference.
175fn resolve_potential_secret_reference_string(
176    secrets_dir: &Path,
177    s: String,
178) -> Result<String, SecretRefResolutionError> {
179    match MaybeSecretRef::new(s) {
180        Ok(maybe_secret_ref) => match maybe_secret_ref {
181            MaybeSecretRef::String(plain_str) => Ok(plain_str),
182            MaybeSecretRef::SecretRef(secret_ref) => match &secret_ref {
183                SecretRef::Kubernetes { name, data_key } => {
184                    // Secret reference: `${secret:kubernetes:<name>/<data key>}`
185                    // File location: `<secrets dir>/kubernetes/<name>/<data key>`
186                    let path = Path::new(secrets_dir)
187                        .join("kubernetes")
188                        .join(name)
189                        .join(data_key);
190
191                    // If the file does not exist or is not a regular file, produce a custom error
192                    // informing of this fact rather than generically stating we are unable to read.
193                    if path.is_file() {
194                        match fs::read_to_string(&path) {
195                            Ok(content) => Ok(content),
196                            Err(e) => {
197                                Err(SecretRefResolutionError::CannotReadSecretFile {
198                                    secret_ref,
199                                    path: path.display().to_string(),
200                                    error_kind: e.kind(), // Only error kind to prevent displaying any of the secret content
201                                })
202                            }
203                        }
204                    } else {
205                        match path.try_exists() {
206                            Ok(exists) => {
207                                if exists {
208                                    Err(SecretRefResolutionError::SecretPathIsNotRegularFile {
209                                        secret_ref,
210                                        path: path.display().to_string(),
211                                    })
212                                } else {
213                                    Err(SecretRefResolutionError::SecretFileDoesNotExist {
214                                        secret_ref,
215                                        path: path.display().to_string(),
216                                    })
217                                }
218                            }
219                            Err(e) => Err(SecretRefResolutionError::SecretFileExistenceUnknown {
220                                secret_ref,
221                                path: path.display().to_string(),
222                                error_kind: e.kind(), // Only error kind to prevent displaying any of the secret content
223                            }),
224                        }
225                    }
226                }
227            },
228        },
229        Err(e) => Err(SecretRefResolutionError::MaybeSecretRefParseFailed { e }),
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    use crate::config::{ConnectorConfig, TransportConfig};
236    use crate::secret_ref::{MaybeSecretRef, SecretRef};
237    use crate::secret_resolver::{
238        SecretRefResolutionError, discover_secret_references_in_connector_config,
239        discover_secret_references_in_json, resolve_potential_secret_reference_string,
240        resolve_secret_references_in_connector_config, resolve_secret_references_in_json,
241    };
242    use serde_json::json;
243    use std::collections::BTreeSet;
244    use std::fs::{File, create_dir_all};
245    use std::io::Write;
246
247    #[test]
248    fn resolve_kubernetes_secret_success() {
249        // Create file at: <tempdir>/kubernetes/a/b
250        let dir = tempfile::tempdir().unwrap();
251        let dir_path = dir.path();
252        let name_dir = &dir_path.join("kubernetes").join("a");
253        create_dir_all(name_dir).unwrap();
254        let data_key_file_path = &name_dir.join("b");
255        let mut file = File::create(data_key_file_path).unwrap();
256        file.write_all(b"example").unwrap();
257
258        // Resolve secret: ${secret:kubernetes:a/b}
259        assert_eq!(
260            resolve_potential_secret_reference_string(
261                dir_path,
262                "${secret:kubernetes:a/b}".to_string()
263            )
264            .unwrap(),
265            "example"
266        );
267    }
268
269    #[test]
270    fn resolve_kubernetes_secret_max_size_success() {
271        // Create file at: <tempdir>/kubernetes/a.../b...
272        let dir = tempfile::tempdir().unwrap();
273        let dir_path = dir.path();
274        let name_dir = &dir_path.join("kubernetes").join("a".repeat(63));
275        create_dir_all(name_dir).unwrap();
276        let data_key_file_path = &name_dir.join("b".repeat(255));
277        let mut file = File::create(data_key_file_path).unwrap();
278        file.write_all(b"example").unwrap();
279
280        // Resolve secret: ${secret:kubernetes:a.../b...}
281        assert_eq!(
282            resolve_potential_secret_reference_string(
283                dir_path,
284                format!(
285                    "${{secret:kubernetes:{}/{}}}",
286                    "a".repeat(63),
287                    "b".repeat(255)
288                )
289            )
290            .unwrap(),
291            "example"
292        );
293    }
294
295    #[test]
296    fn resolve_kubernetes_secret_path_not_a_file() {
297        // Create directory at: <tempdir>/kubernetes/a/b
298        let dir = tempfile::tempdir().unwrap();
299        let dir_path = dir.path();
300        let data_key_file_path = &dir_path.join("kubernetes").join("a").join("b");
301        create_dir_all(data_key_file_path).unwrap();
302
303        // Resolve secret: ${secret:kubernetes:a/b}
304        let MaybeSecretRef::SecretRef(secret_ref) =
305            MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
306        else {
307            unreachable!();
308        };
309        assert_eq!(
310            resolve_potential_secret_reference_string(
311                dir_path,
312                "${secret:kubernetes:a/b}".to_string()
313            )
314            .unwrap_err(),
315            SecretRefResolutionError::SecretPathIsNotRegularFile {
316                secret_ref,
317                path: data_key_file_path.display().to_string()
318            }
319        );
320    }
321
322    #[test]
323    fn resolve_kubernetes_secret_file_does_not_exist() {
324        // Do not create file at: <tempdir>/kubernetes/a/b
325        let dir = tempfile::tempdir().unwrap();
326        let dir_path = dir.path();
327        let name_dir = &dir_path.join("kubernetes").join("a");
328        create_dir_all(name_dir).unwrap();
329        let data_key_file_path = &name_dir.join("b");
330
331        // Resolve secret: ${secret:kubernetes:a/b}
332        let MaybeSecretRef::SecretRef(secret_ref) =
333            MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
334        else {
335            unreachable!();
336        };
337        assert_eq!(
338            resolve_potential_secret_reference_string(
339                dir_path,
340                "${secret:kubernetes:a/b}".to_string()
341            )
342            .unwrap_err(),
343            SecretRefResolutionError::SecretFileDoesNotExist {
344                secret_ref,
345                path: data_key_file_path.display().to_string()
346            }
347        );
348    }
349
350    #[test]
351    fn resolve_secret_ref_cannot_read_file() {
352        // Create file with non UTF-8 content at: <tempdir>/kubernetes/a/b
353        let dir = tempfile::tempdir().unwrap();
354        let dir_path = dir.path();
355        let name_dir = &dir_path.join("kubernetes").join("a");
356        create_dir_all(name_dir).unwrap();
357        let data_key_file_path = &name_dir.join("b");
358        let mut file = File::create(data_key_file_path).unwrap();
359        file.write_all(&[255, 255]).unwrap();
360
361        // Resolve secret: ${secret:kubernetes:a/b}
362        let MaybeSecretRef::SecretRef(secret_ref) =
363            MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
364        else {
365            unreachable!();
366        };
367        assert_eq!(
368            resolve_potential_secret_reference_string(
369                dir_path,
370                "${secret:kubernetes:a/b}".to_string()
371            )
372            .unwrap_err(),
373            SecretRefResolutionError::CannotReadSecretFile {
374                secret_ref,
375                path: data_key_file_path.display().to_string(),
376                error_kind: std::io::ErrorKind::InvalidData
377            }
378        );
379    }
380
381    #[test]
382    fn secret_resolution_json() {
383        // Create file at:
384        // - <tempdir>/kubernetes/a/b
385        // - <tempdir>/kubernetes/c/d
386        let dir = tempfile::tempdir().unwrap();
387        let dir_path = dir.path();
388        let name_dir = &dir_path.join("kubernetes").join("a");
389        create_dir_all(name_dir).unwrap();
390        let data_key_file_path = &name_dir.join("b");
391        let mut file = File::create(data_key_file_path).unwrap();
392        file.write_all(b"example1").unwrap();
393        let name_dir = &dir_path.join("kubernetes").join("c");
394        create_dir_all(name_dir).unwrap();
395        let data_key_file_path = &name_dir.join("d");
396        let mut file = File::create(data_key_file_path).unwrap();
397        file.write_all(b"example2").unwrap();
398
399        // Resolve secrets in JSON
400        let input = json!({
401            "a": null,
402            "b": "false,",
403            "c": 123,
404            "d": "val1",
405            "e": [
406                1,
407                "2"
408            ],
409            "f": {
410                "f1": 1,
411                "f2": "val2"
412            },
413            "g": "val3",
414            "${secret:kubernetes:a/b}": 123,
415            "${secret:kubernetes:e/f}": 456,
416            "s1": "${secret:kubernetes:a/b}",
417            "s2": [
418                "${secret:kubernetes:a/b}"
419            ],
420            "s3": {
421                "s31": "${secret:kubernetes:a/b}",
422                "s32": [
423                    "${secret:kubernetes:a/b}",
424                    "${secret:kubernetes:c/d}"
425                ]
426            },
427            "s4": "${secret:kubernetes:c/d}"
428        });
429
430        let expectation = json!({
431            "a": null,
432            "b": "false,",
433            "c": 123,
434            "d": "val1",
435            "e": [
436                1,
437                "2"
438            ],
439            "f": {
440                "f1": 1,
441                "f2": "val2"
442            },
443            "g": "val3",
444            "${secret:kubernetes:a/b}": 123,
445            "${secret:kubernetes:e/f}": 456,
446            "s1": "example1",
447            "s2": [
448                "example1"
449            ],
450            "s3": {
451                "s31": "example1",
452                "s32": [
453                    "example1",
454                    "example2"
455                ]
456            },
457            "s4": "example2"
458        });
459        assert_eq!(
460            resolve_secret_references_in_json(dir_path, input.clone()).unwrap(),
461            expectation
462        );
463        assert_eq!(
464            discover_secret_references_in_json(&input).unwrap(),
465            BTreeSet::from([
466                SecretRef::Kubernetes {
467                    name: "a".to_string(),
468                    data_key: "b".to_string(),
469                },
470                SecretRef::Kubernetes {
471                    name: "c".to_string(),
472                    data_key: "d".to_string(),
473                },
474            ])
475        );
476        assert_eq!(
477            discover_secret_references_in_json(&expectation).unwrap(),
478            BTreeSet::from([])
479        );
480    }
481
482    #[test]
483    fn secret_resolution_connector_config() {
484        // Create file at:
485        // - <tempdir>/kubernetes/a/b
486        // - <tempdir>/kubernetes/c/d
487        let dir = tempfile::tempdir().unwrap();
488        let dir_path = dir.path();
489        let name_dir = &dir_path.join("kubernetes").join("a");
490        create_dir_all(name_dir).unwrap();
491        let data_key_file_path = &name_dir.join("b");
492        let mut file = File::create(data_key_file_path).unwrap();
493        file.write_all(b"example1").unwrap();
494        let name_dir = &dir_path.join("kubernetes").join("c");
495        create_dir_all(name_dir).unwrap();
496        let data_key_file_path = &name_dir.join("d");
497        let mut file = File::create(data_key_file_path).unwrap();
498        file.write_all(b"example2").unwrap();
499
500        // Resolve a connector configuration
501        let connector_config_json = json!({
502            "transport": {
503              "name": "datagen",
504              "config": {
505                "plan": [{
506                    "limit": 2,
507                    "fields": {
508                        "col1": { "values": [1, 2] },
509                        "col2": { "values": ["${secret:kubernetes:a/b}", "${secret:kubernetes:c/d}"] }
510                    }
511                }]
512              }
513            },
514            "format": {
515              "name": "json",
516              "config": {
517                "example": "${secret:kubernetes:a/b}"
518              }
519            },
520            "index": "${secret:kubernetes:e/f}"
521        });
522        assert_eq!(
523            discover_secret_references_in_connector_config(&connector_config_json).unwrap(),
524            BTreeSet::from([
525                SecretRef::Kubernetes {
526                    name: "a".to_string(),
527                    data_key: "b".to_string(),
528                },
529                SecretRef::Kubernetes {
530                    name: "c".to_string(),
531                    data_key: "d".to_string(),
532                },
533            ])
534        );
535
536        let connector_config: ConnectorConfig =
537            serde_json::from_value(connector_config_json).unwrap();
538
539        let connector_config_secrets_resolved =
540            resolve_secret_references_in_connector_config(dir_path, &connector_config).unwrap();
541
542        // Transport configuration resolution
543        let TransportConfig::Datagen(datagen_input_config) =
544            connector_config_secrets_resolved.transport
545        else {
546            unreachable!();
547        };
548        assert_eq!(
549            datagen_input_config.plan[0].fields["col2"]
550                .values
551                .as_ref()
552                .unwrap(),
553            &vec![json!("example1"), json!("example2")]
554        );
555
556        // Format configuration resolution
557        let Some(format_config) = connector_config_secrets_resolved.format else {
558            unreachable!();
559        };
560        assert_eq!(format_config.config, json!({"example": "example1"}));
561
562        // Other fields should not be resolved
563        assert_eq!(
564            connector_config.index,
565            Some("${secret:kubernetes:e/f}".to_string())
566        );
567    }
568}