feldera_types/
secret_resolver.rs

1use crate::config::ConnectorConfig;
2use crate::secret_ref::{MaybeSecretRef, MaybeSecretRefParseError, SecretRef};
3use serde::de::DeserializeOwned;
4use serde::Serialize;
5use serde_json::{Map, Value};
6use std::collections::BTreeSet;
7use std::fmt::Debug;
8use std::fs;
9use std::io::ErrorKind;
10use std::path::Path;
11use thiserror::Error as ThisError;
12
13#[derive(Debug, Clone, PartialEq, Eq, ThisError)]
14pub enum SecretRefDiscoveryError {
15    #[error("{e}")]
16    MaybeSecretRefParseFailed { e: MaybeSecretRefParseError },
17    #[error("unable to serialize connector configuration: {error}")]
18    SerializationFailed { error: String },
19    #[error("unable to deserialize connector configuration (error omitted)")]
20    DeserializationFailed,
21}
22
23/// Discovers the secret references of the connector configuration.
24pub fn discover_secret_references_in_connector_config(
25    connector_config: &ConnectorConfig,
26) -> Result<BTreeSet<SecretRef>, SecretRefDiscoveryError> {
27    let json_value = serde_json::to_value(connector_config).map_err(|e| {
28        SecretRefDiscoveryError::SerializationFailed {
29            error: e.to_string(),
30        }
31    })?;
32    let mut result = BTreeSet::new();
33    if let Some(transport_config_json) = json_value.get("transport").and_then(|v| v.get("config")) {
34        result.extend(discover_secret_references_in_json(transport_config_json)?);
35    }
36    if let Some(format_config_json) = json_value.get("format").and_then(|v| v.get("config")) {
37        result.extend(discover_secret_references_in_json(format_config_json)?);
38    }
39    Ok(result)
40}
41
42/// Discovers recursively the secret references in the JSON.
43fn discover_secret_references_in_json(
44    value: &Value,
45) -> Result<BTreeSet<SecretRef>, SecretRefDiscoveryError> {
46    Ok(match value {
47        Value::Null => BTreeSet::new(),
48        Value::Bool(_b) => BTreeSet::new(),
49        Value::Number(_n) => BTreeSet::new(),
50        Value::String(s) => {
51            if let MaybeSecretRef::SecretRef(secret_ref) = MaybeSecretRef::new(s.clone())
52                .map_err(|e| SecretRefDiscoveryError::MaybeSecretRefParseFailed { e })?
53            {
54                BTreeSet::from([secret_ref])
55            } else {
56                BTreeSet::new()
57            }
58        }
59        Value::Array(seq) => {
60            let mut result = BTreeSet::new();
61            for entry in seq.iter() {
62                result.extend(discover_secret_references_in_json(entry)?)
63            }
64            result
65        }
66        Value::Object(mapping) => {
67            let mut result = BTreeSet::new();
68            for (_k, v) in mapping.into_iter() {
69                result.extend(discover_secret_references_in_json(v)?);
70            }
71            result
72        }
73    })
74}
75
76/// Path of the default secrets directory.
77pub fn default_secrets_directory() -> &'static Path {
78    Path::new("/etc/feldera-secrets")
79}
80
81#[derive(Debug, Clone, PartialEq, Eq, ThisError)]
82pub enum SecretRefResolutionError {
83    #[error("{e}")]
84    MaybeSecretRefParseFailed { e: MaybeSecretRefParseError },
85    #[error("secret reference '{secret_ref}' resolution failed: file '{path}' does exist but unable to read it due to: {error_kind}")]
86    CannotReadSecretFile {
87        secret_ref: SecretRef,
88        path: String,
89        error_kind: ErrorKind,
90    },
91    #[error("secret reference '{secret_ref}' resolution failed: file '{path}' does not exist")]
92    SecretFileDoesNotExist { secret_ref: SecretRef, path: String },
93    #[error(
94        "secret reference '{secret_ref}' resolution failed: path '{path}' is not a regular file"
95    )]
96    SecretPathIsNotRegularFile { secret_ref: SecretRef, path: String },
97    #[error("secret reference '{secret_ref}' resolution failed: cannot determine if '{path}' is an existing file due to: {error_kind}")]
98    SecretFileExistenceUnknown {
99        secret_ref: SecretRef,
100        path: String,
101        error_kind: ErrorKind,
102    },
103    #[error("secret resolution led to a duplicate key in the mapping, which should not happen")]
104    DuplicateKeyInMapping,
105    #[error("unable to serialize connector configuration: {error}")]
106    SerializationFailed { error: String },
107    #[error("unable to deserialize connector configuration (error omitted)")]
108    DeserializationFailed,
109}
110
111/// Resolves the secret references of the connector configuration.
112pub fn resolve_secret_references_in_connector_config(
113    secrets_dir: &Path,
114    connector_config: &ConnectorConfig,
115) -> Result<ConnectorConfig, SecretRefResolutionError> {
116    let connector_config = connector_config.clone();
117    Ok(ConnectorConfig {
118        transport: resolve_secret_references_via_json(secrets_dir, &connector_config.transport)?,
119        format: resolve_secret_references_via_json(secrets_dir, &connector_config.format)?,
120        ..connector_config
121    })
122}
123
124/// Resolves secret references in `value`.
125pub fn resolve_secret_references_via_json<T>(
126    secrets_dir: &Path,
127    value: &T,
128) -> Result<T, SecretRefResolutionError>
129where
130    T: Serialize + DeserializeOwned,
131{
132    let json_value =
133        serde_json::to_value(value).map_err(|e| SecretRefResolutionError::SerializationFailed {
134            error: e.to_string(),
135        })?;
136    let resolved_json = resolve_secret_references_in_json(secrets_dir, json_value)?;
137    serde_json::from_value(resolved_json)
138        .map_err(|_e| SecretRefResolutionError::DeserializationFailed)
139}
140
141/// Resolves recursively the secret references in the JSON.
142fn resolve_secret_references_in_json(
143    secrets_dir: &Path,
144    value: Value,
145) -> Result<Value, SecretRefResolutionError> {
146    Ok(match value {
147        Value::Null => Value::Null,
148        Value::Bool(b) => Value::Bool(b),
149        Value::Number(n) => Value::Number(n),
150        Value::String(s) => {
151            Value::String(resolve_potential_secret_reference_string(secrets_dir, s)?)
152        }
153        Value::Array(seq) => Value::Array(
154            seq.into_iter()
155                .map(|v| resolve_secret_references_in_json(secrets_dir, v))
156                .collect::<Result<Vec<Value>, SecretRefResolutionError>>()?,
157        ),
158        Value::Object(mapping) => {
159            let mut new_mapping = Map::new();
160            for (k, v) in mapping.into_iter() {
161                if let Some(_existing) =
162                    new_mapping.insert(k, resolve_secret_references_in_json(secrets_dir, v)?)
163                {
164                    return Err(SecretRefResolutionError::DuplicateKeyInMapping);
165                }
166            }
167            Value::Object(new_mapping)
168        }
169    })
170}
171
172/// Resolves a string which can potentially be a secret reference.
173fn resolve_potential_secret_reference_string(
174    secrets_dir: &Path,
175    s: String,
176) -> Result<String, SecretRefResolutionError> {
177    match MaybeSecretRef::new(s) {
178        Ok(maybe_secret_ref) => match maybe_secret_ref {
179            MaybeSecretRef::String(plain_str) => Ok(plain_str),
180            MaybeSecretRef::SecretRef(secret_ref) => match &secret_ref {
181                SecretRef::Kubernetes { name, data_key } => {
182                    // Secret reference: `${secret:kubernetes:<name>/<data key>}`
183                    // File location: `<secrets dir>/kubernetes/<name>/<data key>`
184                    let path = Path::new(secrets_dir)
185                        .join("kubernetes")
186                        .join(name)
187                        .join(data_key);
188
189                    // If the file does not exist or is not a regular file, produce a custom error
190                    // informing of this fact rather than generically stating we are unable to read.
191                    if path.is_file() {
192                        match fs::read_to_string(&path) {
193                            Ok(content) => Ok(content),
194                            Err(e) => {
195                                Err(SecretRefResolutionError::CannotReadSecretFile {
196                                    secret_ref,
197                                    path: path.display().to_string(),
198                                    error_kind: e.kind(), // Only error kind to prevent displaying any of the secret content
199                                })
200                            }
201                        }
202                    } else {
203                        match path.try_exists() {
204                            Ok(exists) => {
205                                if exists {
206                                    Err(SecretRefResolutionError::SecretPathIsNotRegularFile {
207                                        secret_ref,
208                                        path: path.display().to_string(),
209                                    })
210                                } else {
211                                    Err(SecretRefResolutionError::SecretFileDoesNotExist {
212                                        secret_ref,
213                                        path: path.display().to_string(),
214                                    })
215                                }
216                            }
217                            Err(e) => Err(SecretRefResolutionError::SecretFileExistenceUnknown {
218                                secret_ref,
219                                path: path.display().to_string(),
220                                error_kind: e.kind(), // Only error kind to prevent displaying any of the secret content
221                            }),
222                        }
223                    }
224                }
225            },
226        },
227        Err(e) => Err(SecretRefResolutionError::MaybeSecretRefParseFailed { e }),
228    }
229}
230
231#[cfg(test)]
232mod tests {
233    use crate::config::{ConnectorConfig, TransportConfig};
234    use crate::secret_ref::{MaybeSecretRef, SecretRef};
235    use crate::secret_resolver::{
236        discover_secret_references_in_connector_config, discover_secret_references_in_json,
237        resolve_potential_secret_reference_string, resolve_secret_references_in_connector_config,
238        resolve_secret_references_in_json, SecretRefResolutionError,
239    };
240    use serde_json::json;
241    use std::collections::BTreeSet;
242    use std::fs::{create_dir_all, File};
243    use std::io::Write;
244
245    #[test]
246    fn resolve_kubernetes_secret_success() {
247        // Create file at: <tempdir>/kubernetes/a/b
248        let dir = tempfile::tempdir().unwrap();
249        let dir_path = dir.path();
250        let name_dir = &dir_path.join("kubernetes").join("a");
251        create_dir_all(name_dir).unwrap();
252        let data_key_file_path = &name_dir.join("b");
253        let mut file = File::create(data_key_file_path).unwrap();
254        file.write_all(b"example").unwrap();
255
256        // Resolve secret: ${secret:kubernetes:a/b}
257        assert_eq!(
258            resolve_potential_secret_reference_string(
259                dir_path,
260                "${secret:kubernetes:a/b}".to_string()
261            )
262            .unwrap(),
263            "example"
264        );
265    }
266
267    #[test]
268    fn resolve_kubernetes_secret_max_size_success() {
269        // Create file at: <tempdir>/kubernetes/a.../b...
270        let dir = tempfile::tempdir().unwrap();
271        let dir_path = dir.path();
272        let name_dir = &dir_path.join("kubernetes").join("a".repeat(63));
273        create_dir_all(name_dir).unwrap();
274        let data_key_file_path = &name_dir.join("b".repeat(255));
275        let mut file = File::create(data_key_file_path).unwrap();
276        file.write_all(b"example").unwrap();
277
278        // Resolve secret: ${secret:kubernetes:a.../b...}
279        assert_eq!(
280            resolve_potential_secret_reference_string(
281                dir_path,
282                format!(
283                    "${{secret:kubernetes:{}/{}}}",
284                    "a".repeat(63),
285                    "b".repeat(255)
286                )
287            )
288            .unwrap(),
289            "example"
290        );
291    }
292
293    #[test]
294    fn resolve_kubernetes_secret_path_not_a_file() {
295        // Create directory at: <tempdir>/kubernetes/a/b
296        let dir = tempfile::tempdir().unwrap();
297        let dir_path = dir.path();
298        let data_key_file_path = &dir_path.join("kubernetes").join("a").join("b");
299        create_dir_all(data_key_file_path).unwrap();
300
301        // Resolve secret: ${secret:kubernetes:a/b}
302        let MaybeSecretRef::SecretRef(secret_ref) =
303            MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
304        else {
305            unreachable!();
306        };
307        assert_eq!(
308            resolve_potential_secret_reference_string(
309                dir_path,
310                "${secret:kubernetes:a/b}".to_string()
311            )
312            .unwrap_err(),
313            SecretRefResolutionError::SecretPathIsNotRegularFile {
314                secret_ref,
315                path: data_key_file_path.display().to_string()
316            }
317        );
318    }
319
320    #[test]
321    fn resolve_kubernetes_secret_file_does_not_exist() {
322        // Do not create file at: <tempdir>/kubernetes/a/b
323        let dir = tempfile::tempdir().unwrap();
324        let dir_path = dir.path();
325        let name_dir = &dir_path.join("kubernetes").join("a");
326        create_dir_all(name_dir).unwrap();
327        let data_key_file_path = &name_dir.join("b");
328
329        // Resolve secret: ${secret:kubernetes:a/b}
330        let MaybeSecretRef::SecretRef(secret_ref) =
331            MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
332        else {
333            unreachable!();
334        };
335        assert_eq!(
336            resolve_potential_secret_reference_string(
337                dir_path,
338                "${secret:kubernetes:a/b}".to_string()
339            )
340            .unwrap_err(),
341            SecretRefResolutionError::SecretFileDoesNotExist {
342                secret_ref,
343                path: data_key_file_path.display().to_string()
344            }
345        );
346    }
347
348    #[test]
349    fn resolve_secret_ref_cannot_read_file() {
350        // Create file with non UTF-8 content at: <tempdir>/kubernetes/a/b
351        let dir = tempfile::tempdir().unwrap();
352        let dir_path = dir.path();
353        let name_dir = &dir_path.join("kubernetes").join("a");
354        create_dir_all(name_dir).unwrap();
355        let data_key_file_path = &name_dir.join("b");
356        let mut file = File::create(data_key_file_path).unwrap();
357        file.write_all(&[255, 255]).unwrap();
358
359        // Resolve secret: ${secret:kubernetes:a/b}
360        let MaybeSecretRef::SecretRef(secret_ref) =
361            MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
362        else {
363            unreachable!();
364        };
365        assert_eq!(
366            resolve_potential_secret_reference_string(
367                dir_path,
368                "${secret:kubernetes:a/b}".to_string()
369            )
370            .unwrap_err(),
371            SecretRefResolutionError::CannotReadSecretFile {
372                secret_ref,
373                path: data_key_file_path.display().to_string(),
374                error_kind: std::io::ErrorKind::InvalidData
375            }
376        );
377    }
378
379    #[test]
380    fn secret_resolution_json() {
381        // Create file at:
382        // - <tempdir>/kubernetes/a/b
383        // - <tempdir>/kubernetes/c/d
384        let dir = tempfile::tempdir().unwrap();
385        let dir_path = dir.path();
386        let name_dir = &dir_path.join("kubernetes").join("a");
387        create_dir_all(name_dir).unwrap();
388        let data_key_file_path = &name_dir.join("b");
389        let mut file = File::create(data_key_file_path).unwrap();
390        file.write_all(b"example1").unwrap();
391        let name_dir = &dir_path.join("kubernetes").join("c");
392        create_dir_all(name_dir).unwrap();
393        let data_key_file_path = &name_dir.join("d");
394        let mut file = File::create(data_key_file_path).unwrap();
395        file.write_all(b"example2").unwrap();
396
397        // Resolve secrets in JSON
398        let input = json!({
399            "a": null,
400            "b": "false,",
401            "c": 123,
402            "d": "val1",
403            "e": [
404                1,
405                "2"
406            ],
407            "f": {
408                "f1": 1,
409                "f2": "val2"
410            },
411            "g": "val3",
412            "${secret:kubernetes:a/b}": 123,
413            "${secret:kubernetes:e/f}": 456,
414            "s1": "${secret:kubernetes:a/b}",
415            "s2": [
416                "${secret:kubernetes:a/b}"
417            ],
418            "s3": {
419                "s31": "${secret:kubernetes:a/b}",
420                "s32": [
421                    "${secret:kubernetes:a/b}",
422                    "${secret:kubernetes:c/d}"
423                ]
424            },
425            "s4": "${secret:kubernetes:c/d}"
426        });
427
428        let expectation = json!({
429            "a": null,
430            "b": "false,",
431            "c": 123,
432            "d": "val1",
433            "e": [
434                1,
435                "2"
436            ],
437            "f": {
438                "f1": 1,
439                "f2": "val2"
440            },
441            "g": "val3",
442            "${secret:kubernetes:a/b}": 123,
443            "${secret:kubernetes:e/f}": 456,
444            "s1": "example1",
445            "s2": [
446                "example1"
447            ],
448            "s3": {
449                "s31": "example1",
450                "s32": [
451                    "example1",
452                    "example2"
453                ]
454            },
455            "s4": "example2"
456        });
457        assert_eq!(
458            resolve_secret_references_in_json(dir_path, input.clone()).unwrap(),
459            expectation
460        );
461        assert_eq!(
462            discover_secret_references_in_json(&input).unwrap(),
463            BTreeSet::from([
464                SecretRef::Kubernetes {
465                    name: "a".to_string(),
466                    data_key: "b".to_string(),
467                },
468                SecretRef::Kubernetes {
469                    name: "c".to_string(),
470                    data_key: "d".to_string(),
471                },
472            ])
473        );
474        assert_eq!(
475            discover_secret_references_in_json(&expectation).unwrap(),
476            BTreeSet::from([])
477        );
478    }
479
480    #[test]
481    fn secret_resolution_connector_config() {
482        // Create file at:
483        // - <tempdir>/kubernetes/a/b
484        // - <tempdir>/kubernetes/c/d
485        let dir = tempfile::tempdir().unwrap();
486        let dir_path = dir.path();
487        let name_dir = &dir_path.join("kubernetes").join("a");
488        create_dir_all(name_dir).unwrap();
489        let data_key_file_path = &name_dir.join("b");
490        let mut file = File::create(data_key_file_path).unwrap();
491        file.write_all(b"example1").unwrap();
492        let name_dir = &dir_path.join("kubernetes").join("c");
493        create_dir_all(name_dir).unwrap();
494        let data_key_file_path = &name_dir.join("d");
495        let mut file = File::create(data_key_file_path).unwrap();
496        file.write_all(b"example2").unwrap();
497
498        // Resolve a connector configuration
499        let connector_config_json = json!({
500            "transport": {
501              "name": "datagen",
502              "config": {
503                "plan": [{
504                    "limit": 2,
505                    "fields": {
506                        "col1": { "values": [1, 2] },
507                        "col2": { "values": ["${secret:kubernetes:a/b}", "${secret:kubernetes:c/d}"] }
508                    }
509                }]
510              }
511            },
512            "format": {
513              "name": "json",
514              "config": {
515                "example": "${secret:kubernetes:a/b}"
516              }
517            },
518            "index": "${secret:kubernetes:e/f}"
519        });
520        let connector_config: ConnectorConfig =
521            serde_json::from_value(connector_config_json).unwrap();
522        assert_eq!(
523            discover_secret_references_in_connector_config(&connector_config).unwrap(),
524            BTreeSet::from([
525                SecretRef::Kubernetes {
526                    name: "a".to_string(),
527                    data_key: "b".to_string(),
528                },
529                SecretRef::Kubernetes {
530                    name: "c".to_string(),
531                    data_key: "d".to_string(),
532                },
533            ])
534        );
535        let connector_config_secrets_resolved =
536            resolve_secret_references_in_connector_config(dir_path, &connector_config).unwrap();
537
538        // Transport configuration resolution
539        let TransportConfig::Datagen(datagen_input_config) =
540            connector_config_secrets_resolved.transport
541        else {
542            unreachable!();
543        };
544        assert_eq!(
545            datagen_input_config.plan[0].fields["col2"]
546                .values
547                .as_ref()
548                .unwrap(),
549            &vec![json!("example1"), json!("example2")]
550        );
551
552        // Format configuration resolution
553        let Some(format_config) = connector_config_secrets_resolved.format else {
554            unreachable!();
555        };
556        assert_eq!(format_config.config, json!({"example": "example1"}));
557
558        // Other fields should not be resolved
559        assert_eq!(
560            connector_config.index,
561            Some("${secret:kubernetes:e/f}".to_string())
562        );
563    }
564}