feldera_types/
secret_resolver.rs

1use crate::config::ConnectorConfig;
2use crate::secret_ref::{MaybeSecretRef, MaybeSecretRefParseError, SecretRef};
3use serde::de::DeserializeOwned;
4use serde::Serialize;
5use serde_json::{Map, Value};
6use std::collections::BTreeSet;
7use std::fmt::Debug;
8use std::fs;
9use std::io::ErrorKind;
10use std::path::Path;
11use thiserror::Error as ThisError;
12
13#[derive(Debug, Clone, PartialEq, Eq, ThisError)]
14pub enum SecretRefDiscoveryError {
15    #[error("{e}")]
16    MaybeSecretRefParseFailed { e: MaybeSecretRefParseError },
17    #[error("unable to serialize connector configuration: {error}")]
18    SerializationFailed { error: String },
19    #[error("unable to deserialize connector configuration (error omitted)")]
20    DeserializationFailed,
21}
22
23/// Discovers the secret references of the connector configuration.
24pub fn discover_secret_references_in_connector_config(
25    connector_config: &serde_json::Value,
26) -> Result<BTreeSet<SecretRef>, SecretRefDiscoveryError> {
27    let mut result = BTreeSet::new();
28    if let Some(transport_config_json) = connector_config
29        .get("transport")
30        .and_then(|v| v.get("config"))
31    {
32        result.extend(discover_secret_references_in_json(transport_config_json)?);
33    }
34    if let Some(format_config_json) = connector_config.get("format").and_then(|v| v.get("config")) {
35        result.extend(discover_secret_references_in_json(format_config_json)?);
36    }
37    Ok(result)
38}
39
40/// Discovers recursively the secret references in the JSON.
41fn discover_secret_references_in_json(
42    value: &Value,
43) -> Result<BTreeSet<SecretRef>, SecretRefDiscoveryError> {
44    Ok(match value {
45        Value::Null => BTreeSet::new(),
46        Value::Bool(_b) => BTreeSet::new(),
47        Value::Number(_n) => BTreeSet::new(),
48        Value::String(s) => {
49            if let MaybeSecretRef::SecretRef(secret_ref) = MaybeSecretRef::new(s.clone())
50                .map_err(|e| SecretRefDiscoveryError::MaybeSecretRefParseFailed { e })?
51            {
52                BTreeSet::from([secret_ref])
53            } else {
54                BTreeSet::new()
55            }
56        }
57        Value::Array(seq) => {
58            let mut result = BTreeSet::new();
59            for entry in seq.iter() {
60                result.extend(discover_secret_references_in_json(entry)?)
61            }
62            result
63        }
64        Value::Object(mapping) => {
65            let mut result = BTreeSet::new();
66            for (_k, v) in mapping.into_iter() {
67                result.extend(discover_secret_references_in_json(v)?);
68            }
69            result
70        }
71    })
72}
73
74/// Path of the default secrets directory.
75pub fn default_secrets_directory() -> &'static Path {
76    Path::new("/etc/feldera-secrets")
77}
78
79#[derive(Debug, Clone, PartialEq, Eq, ThisError)]
80pub enum SecretRefResolutionError {
81    #[error("{e}")]
82    MaybeSecretRefParseFailed { e: MaybeSecretRefParseError },
83    #[error("secret reference '{secret_ref}' resolution failed: file '{path}' does exist but unable to read it due to: {error_kind}")]
84    CannotReadSecretFile {
85        secret_ref: SecretRef,
86        path: String,
87        error_kind: ErrorKind,
88    },
89    #[error("secret reference '{secret_ref}' resolution failed: file '{path}' does not exist")]
90    SecretFileDoesNotExist { secret_ref: SecretRef, path: String },
91    #[error(
92        "secret reference '{secret_ref}' resolution failed: path '{path}' is not a regular file"
93    )]
94    SecretPathIsNotRegularFile { secret_ref: SecretRef, path: String },
95    #[error("secret reference '{secret_ref}' resolution failed: cannot determine if '{path}' is an existing file due to: {error_kind}")]
96    SecretFileExistenceUnknown {
97        secret_ref: SecretRef,
98        path: String,
99        error_kind: ErrorKind,
100    },
101    #[error("secret resolution led to a duplicate key in the mapping, which should not happen")]
102    DuplicateKeyInMapping,
103    #[error("unable to serialize connector configuration: {error}")]
104    SerializationFailed { error: String },
105    #[error("unable to deserialize connector configuration (error omitted)")]
106    DeserializationFailed,
107}
108
109/// Resolves the secret references of the connector configuration.
110pub fn resolve_secret_references_in_connector_config(
111    secrets_dir: &Path,
112    connector_config: &ConnectorConfig,
113) -> Result<ConnectorConfig, SecretRefResolutionError> {
114    let connector_config = connector_config.clone();
115    Ok(ConnectorConfig {
116        transport: resolve_secret_references_via_json(secrets_dir, &connector_config.transport)?,
117        format: resolve_secret_references_via_json(secrets_dir, &connector_config.format)?,
118        ..connector_config
119    })
120}
121
122/// Resolves secret references in `value`.
123pub fn resolve_secret_references_via_json<T>(
124    secrets_dir: &Path,
125    value: &T,
126) -> Result<T, SecretRefResolutionError>
127where
128    T: Serialize + DeserializeOwned,
129{
130    let json_value =
131        serde_json::to_value(value).map_err(|e| SecretRefResolutionError::SerializationFailed {
132            error: e.to_string(),
133        })?;
134    let resolved_json = resolve_secret_references_in_json(secrets_dir, json_value)?;
135    serde_json::from_value(resolved_json)
136        .map_err(|_e| SecretRefResolutionError::DeserializationFailed)
137}
138
139/// Resolves recursively the secret references in the JSON.
140fn resolve_secret_references_in_json(
141    secrets_dir: &Path,
142    value: Value,
143) -> Result<Value, SecretRefResolutionError> {
144    Ok(match value {
145        Value::Null => Value::Null,
146        Value::Bool(b) => Value::Bool(b),
147        Value::Number(n) => Value::Number(n),
148        Value::String(s) => {
149            Value::String(resolve_potential_secret_reference_string(secrets_dir, s)?)
150        }
151        Value::Array(seq) => Value::Array(
152            seq.into_iter()
153                .map(|v| resolve_secret_references_in_json(secrets_dir, v))
154                .collect::<Result<Vec<Value>, SecretRefResolutionError>>()?,
155        ),
156        Value::Object(mapping) => {
157            let mut new_mapping = Map::new();
158            for (k, v) in mapping.into_iter() {
159                if let Some(_existing) =
160                    new_mapping.insert(k, resolve_secret_references_in_json(secrets_dir, v)?)
161                {
162                    return Err(SecretRefResolutionError::DuplicateKeyInMapping);
163                }
164            }
165            Value::Object(new_mapping)
166        }
167    })
168}
169
170/// Resolves a string which can potentially be a secret reference.
171fn resolve_potential_secret_reference_string(
172    secrets_dir: &Path,
173    s: String,
174) -> Result<String, SecretRefResolutionError> {
175    match MaybeSecretRef::new(s) {
176        Ok(maybe_secret_ref) => match maybe_secret_ref {
177            MaybeSecretRef::String(plain_str) => Ok(plain_str),
178            MaybeSecretRef::SecretRef(secret_ref) => match &secret_ref {
179                SecretRef::Kubernetes { name, data_key } => {
180                    // Secret reference: `${secret:kubernetes:<name>/<data key>}`
181                    // File location: `<secrets dir>/kubernetes/<name>/<data key>`
182                    let path = Path::new(secrets_dir)
183                        .join("kubernetes")
184                        .join(name)
185                        .join(data_key);
186
187                    // If the file does not exist or is not a regular file, produce a custom error
188                    // informing of this fact rather than generically stating we are unable to read.
189                    if path.is_file() {
190                        match fs::read_to_string(&path) {
191                            Ok(content) => Ok(content),
192                            Err(e) => {
193                                Err(SecretRefResolutionError::CannotReadSecretFile {
194                                    secret_ref,
195                                    path: path.display().to_string(),
196                                    error_kind: e.kind(), // Only error kind to prevent displaying any of the secret content
197                                })
198                            }
199                        }
200                    } else {
201                        match path.try_exists() {
202                            Ok(exists) => {
203                                if exists {
204                                    Err(SecretRefResolutionError::SecretPathIsNotRegularFile {
205                                        secret_ref,
206                                        path: path.display().to_string(),
207                                    })
208                                } else {
209                                    Err(SecretRefResolutionError::SecretFileDoesNotExist {
210                                        secret_ref,
211                                        path: path.display().to_string(),
212                                    })
213                                }
214                            }
215                            Err(e) => Err(SecretRefResolutionError::SecretFileExistenceUnknown {
216                                secret_ref,
217                                path: path.display().to_string(),
218                                error_kind: e.kind(), // Only error kind to prevent displaying any of the secret content
219                            }),
220                        }
221                    }
222                }
223            },
224        },
225        Err(e) => Err(SecretRefResolutionError::MaybeSecretRefParseFailed { e }),
226    }
227}
228
229#[cfg(test)]
230mod tests {
231    use crate::config::{ConnectorConfig, TransportConfig};
232    use crate::secret_ref::{MaybeSecretRef, SecretRef};
233    use crate::secret_resolver::{
234        discover_secret_references_in_connector_config, discover_secret_references_in_json,
235        resolve_potential_secret_reference_string, resolve_secret_references_in_connector_config,
236        resolve_secret_references_in_json, SecretRefResolutionError,
237    };
238    use serde_json::json;
239    use std::collections::BTreeSet;
240    use std::fs::{create_dir_all, File};
241    use std::io::Write;
242
243    #[test]
244    fn resolve_kubernetes_secret_success() {
245        // Create file at: <tempdir>/kubernetes/a/b
246        let dir = tempfile::tempdir().unwrap();
247        let dir_path = dir.path();
248        let name_dir = &dir_path.join("kubernetes").join("a");
249        create_dir_all(name_dir).unwrap();
250        let data_key_file_path = &name_dir.join("b");
251        let mut file = File::create(data_key_file_path).unwrap();
252        file.write_all(b"example").unwrap();
253
254        // Resolve secret: ${secret:kubernetes:a/b}
255        assert_eq!(
256            resolve_potential_secret_reference_string(
257                dir_path,
258                "${secret:kubernetes:a/b}".to_string()
259            )
260            .unwrap(),
261            "example"
262        );
263    }
264
265    #[test]
266    fn resolve_kubernetes_secret_max_size_success() {
267        // Create file at: <tempdir>/kubernetes/a.../b...
268        let dir = tempfile::tempdir().unwrap();
269        let dir_path = dir.path();
270        let name_dir = &dir_path.join("kubernetes").join("a".repeat(63));
271        create_dir_all(name_dir).unwrap();
272        let data_key_file_path = &name_dir.join("b".repeat(255));
273        let mut file = File::create(data_key_file_path).unwrap();
274        file.write_all(b"example").unwrap();
275
276        // Resolve secret: ${secret:kubernetes:a.../b...}
277        assert_eq!(
278            resolve_potential_secret_reference_string(
279                dir_path,
280                format!(
281                    "${{secret:kubernetes:{}/{}}}",
282                    "a".repeat(63),
283                    "b".repeat(255)
284                )
285            )
286            .unwrap(),
287            "example"
288        );
289    }
290
291    #[test]
292    fn resolve_kubernetes_secret_path_not_a_file() {
293        // Create directory at: <tempdir>/kubernetes/a/b
294        let dir = tempfile::tempdir().unwrap();
295        let dir_path = dir.path();
296        let data_key_file_path = &dir_path.join("kubernetes").join("a").join("b");
297        create_dir_all(data_key_file_path).unwrap();
298
299        // Resolve secret: ${secret:kubernetes:a/b}
300        let MaybeSecretRef::SecretRef(secret_ref) =
301            MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
302        else {
303            unreachable!();
304        };
305        assert_eq!(
306            resolve_potential_secret_reference_string(
307                dir_path,
308                "${secret:kubernetes:a/b}".to_string()
309            )
310            .unwrap_err(),
311            SecretRefResolutionError::SecretPathIsNotRegularFile {
312                secret_ref,
313                path: data_key_file_path.display().to_string()
314            }
315        );
316    }
317
318    #[test]
319    fn resolve_kubernetes_secret_file_does_not_exist() {
320        // Do not create file at: <tempdir>/kubernetes/a/b
321        let dir = tempfile::tempdir().unwrap();
322        let dir_path = dir.path();
323        let name_dir = &dir_path.join("kubernetes").join("a");
324        create_dir_all(name_dir).unwrap();
325        let data_key_file_path = &name_dir.join("b");
326
327        // Resolve secret: ${secret:kubernetes:a/b}
328        let MaybeSecretRef::SecretRef(secret_ref) =
329            MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
330        else {
331            unreachable!();
332        };
333        assert_eq!(
334            resolve_potential_secret_reference_string(
335                dir_path,
336                "${secret:kubernetes:a/b}".to_string()
337            )
338            .unwrap_err(),
339            SecretRefResolutionError::SecretFileDoesNotExist {
340                secret_ref,
341                path: data_key_file_path.display().to_string()
342            }
343        );
344    }
345
346    #[test]
347    fn resolve_secret_ref_cannot_read_file() {
348        // Create file with non UTF-8 content at: <tempdir>/kubernetes/a/b
349        let dir = tempfile::tempdir().unwrap();
350        let dir_path = dir.path();
351        let name_dir = &dir_path.join("kubernetes").join("a");
352        create_dir_all(name_dir).unwrap();
353        let data_key_file_path = &name_dir.join("b");
354        let mut file = File::create(data_key_file_path).unwrap();
355        file.write_all(&[255, 255]).unwrap();
356
357        // Resolve secret: ${secret:kubernetes:a/b}
358        let MaybeSecretRef::SecretRef(secret_ref) =
359            MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
360        else {
361            unreachable!();
362        };
363        assert_eq!(
364            resolve_potential_secret_reference_string(
365                dir_path,
366                "${secret:kubernetes:a/b}".to_string()
367            )
368            .unwrap_err(),
369            SecretRefResolutionError::CannotReadSecretFile {
370                secret_ref,
371                path: data_key_file_path.display().to_string(),
372                error_kind: std::io::ErrorKind::InvalidData
373            }
374        );
375    }
376
377    #[test]
378    fn secret_resolution_json() {
379        // Create file at:
380        // - <tempdir>/kubernetes/a/b
381        // - <tempdir>/kubernetes/c/d
382        let dir = tempfile::tempdir().unwrap();
383        let dir_path = dir.path();
384        let name_dir = &dir_path.join("kubernetes").join("a");
385        create_dir_all(name_dir).unwrap();
386        let data_key_file_path = &name_dir.join("b");
387        let mut file = File::create(data_key_file_path).unwrap();
388        file.write_all(b"example1").unwrap();
389        let name_dir = &dir_path.join("kubernetes").join("c");
390        create_dir_all(name_dir).unwrap();
391        let data_key_file_path = &name_dir.join("d");
392        let mut file = File::create(data_key_file_path).unwrap();
393        file.write_all(b"example2").unwrap();
394
395        // Resolve secrets in JSON
396        let input = json!({
397            "a": null,
398            "b": "false,",
399            "c": 123,
400            "d": "val1",
401            "e": [
402                1,
403                "2"
404            ],
405            "f": {
406                "f1": 1,
407                "f2": "val2"
408            },
409            "g": "val3",
410            "${secret:kubernetes:a/b}": 123,
411            "${secret:kubernetes:e/f}": 456,
412            "s1": "${secret:kubernetes:a/b}",
413            "s2": [
414                "${secret:kubernetes:a/b}"
415            ],
416            "s3": {
417                "s31": "${secret:kubernetes:a/b}",
418                "s32": [
419                    "${secret:kubernetes:a/b}",
420                    "${secret:kubernetes:c/d}"
421                ]
422            },
423            "s4": "${secret:kubernetes:c/d}"
424        });
425
426        let expectation = json!({
427            "a": null,
428            "b": "false,",
429            "c": 123,
430            "d": "val1",
431            "e": [
432                1,
433                "2"
434            ],
435            "f": {
436                "f1": 1,
437                "f2": "val2"
438            },
439            "g": "val3",
440            "${secret:kubernetes:a/b}": 123,
441            "${secret:kubernetes:e/f}": 456,
442            "s1": "example1",
443            "s2": [
444                "example1"
445            ],
446            "s3": {
447                "s31": "example1",
448                "s32": [
449                    "example1",
450                    "example2"
451                ]
452            },
453            "s4": "example2"
454        });
455        assert_eq!(
456            resolve_secret_references_in_json(dir_path, input.clone()).unwrap(),
457            expectation
458        );
459        assert_eq!(
460            discover_secret_references_in_json(&input).unwrap(),
461            BTreeSet::from([
462                SecretRef::Kubernetes {
463                    name: "a".to_string(),
464                    data_key: "b".to_string(),
465                },
466                SecretRef::Kubernetes {
467                    name: "c".to_string(),
468                    data_key: "d".to_string(),
469                },
470            ])
471        );
472        assert_eq!(
473            discover_secret_references_in_json(&expectation).unwrap(),
474            BTreeSet::from([])
475        );
476    }
477
478    #[test]
479    fn secret_resolution_connector_config() {
480        // Create file at:
481        // - <tempdir>/kubernetes/a/b
482        // - <tempdir>/kubernetes/c/d
483        let dir = tempfile::tempdir().unwrap();
484        let dir_path = dir.path();
485        let name_dir = &dir_path.join("kubernetes").join("a");
486        create_dir_all(name_dir).unwrap();
487        let data_key_file_path = &name_dir.join("b");
488        let mut file = File::create(data_key_file_path).unwrap();
489        file.write_all(b"example1").unwrap();
490        let name_dir = &dir_path.join("kubernetes").join("c");
491        create_dir_all(name_dir).unwrap();
492        let data_key_file_path = &name_dir.join("d");
493        let mut file = File::create(data_key_file_path).unwrap();
494        file.write_all(b"example2").unwrap();
495
496        // Resolve a connector configuration
497        let connector_config_json = json!({
498            "transport": {
499              "name": "datagen",
500              "config": {
501                "plan": [{
502                    "limit": 2,
503                    "fields": {
504                        "col1": { "values": [1, 2] },
505                        "col2": { "values": ["${secret:kubernetes:a/b}", "${secret:kubernetes:c/d}"] }
506                    }
507                }]
508              }
509            },
510            "format": {
511              "name": "json",
512              "config": {
513                "example": "${secret:kubernetes:a/b}"
514              }
515            },
516            "index": "${secret:kubernetes:e/f}"
517        });
518        assert_eq!(
519            discover_secret_references_in_connector_config(&connector_config_json).unwrap(),
520            BTreeSet::from([
521                SecretRef::Kubernetes {
522                    name: "a".to_string(),
523                    data_key: "b".to_string(),
524                },
525                SecretRef::Kubernetes {
526                    name: "c".to_string(),
527                    data_key: "d".to_string(),
528                },
529            ])
530        );
531
532        let connector_config: ConnectorConfig =
533            serde_json::from_value(connector_config_json).unwrap();
534
535        let connector_config_secrets_resolved =
536            resolve_secret_references_in_connector_config(dir_path, &connector_config).unwrap();
537
538        // Transport configuration resolution
539        let TransportConfig::Datagen(datagen_input_config) =
540            connector_config_secrets_resolved.transport
541        else {
542            unreachable!();
543        };
544        assert_eq!(
545            datagen_input_config.plan[0].fields["col2"]
546                .values
547                .as_ref()
548                .unwrap(),
549            &vec![json!("example1"), json!("example2")]
550        );
551
552        // Format configuration resolution
553        let Some(format_config) = connector_config_secrets_resolved.format else {
554            unreachable!();
555        };
556        assert_eq!(format_config.config, json!({"example": "example1"}));
557
558        // Other fields should not be resolved
559        assert_eq!(
560            connector_config.index,
561            Some("${secret:kubernetes:e/f}".to_string())
562        );
563    }
564}