1use crate::config::ConnectorConfig;
2use crate::secret_ref::{MaybeSecretRef, MaybeSecretRefParseError, SecretRef};
3use serde::de::DeserializeOwned;
4use serde::Serialize;
5use serde_yaml::{Mapping, Value};
6use std::collections::BTreeSet;
7use std::fmt::Debug;
8use std::fs;
9use std::io::ErrorKind;
10use std::path::Path;
11use thiserror::Error as ThisError;
12
13#[derive(Debug, Clone, PartialEq, Eq, ThisError)]
14pub enum SecretRefDiscoveryError {
15 #[error("{e}")]
16 MaybeSecretRefParseFailed { e: MaybeSecretRefParseError },
17 #[error("unable to serialize connector configuration: {error}")]
18 SerializationFailed { error: String },
19 #[error("unable to deserialize connector configuration (error omitted)")]
20 DeserializationFailed,
21}
22
23pub fn discover_secret_references_in_connector_config(
25 connector_config: &ConnectorConfig,
26) -> Result<BTreeSet<SecretRef>, SecretRefDiscoveryError> {
27 let yaml_value = serde_yaml::to_value(connector_config).map_err(|e| {
28 SecretRefDiscoveryError::SerializationFailed {
29 error: e.to_string(),
30 }
31 })?;
32 let mut result = BTreeSet::new();
33 if let Some(transport_config_yaml) = yaml_value.get("transport").and_then(|v| v.get("config")) {
34 result.extend(discover_secret_references_in_yaml(transport_config_yaml)?);
35 }
36 if let Some(format_config_yaml) = yaml_value.get("format").and_then(|v| v.get("config")) {
37 result.extend(discover_secret_references_in_yaml(format_config_yaml)?);
38 }
39 Ok(result)
40}
41
42fn discover_secret_references_in_yaml(
44 value: &Value,
45) -> Result<BTreeSet<SecretRef>, SecretRefDiscoveryError> {
46 Ok(match value {
47 Value::Null => BTreeSet::new(),
48 Value::Bool(_b) => BTreeSet::new(),
49 Value::Number(_n) => BTreeSet::new(),
50 Value::String(s) => {
51 if let MaybeSecretRef::SecretRef(secret_ref) = MaybeSecretRef::new(s.clone())
52 .map_err(|e| SecretRefDiscoveryError::MaybeSecretRefParseFailed { e })?
53 {
54 BTreeSet::from([secret_ref])
55 } else {
56 BTreeSet::new()
57 }
58 }
59 Value::Sequence(seq) => {
60 let mut result = BTreeSet::new();
61 for entry in seq.iter() {
62 result.extend(discover_secret_references_in_yaml(entry)?)
63 }
64 result
65 }
66 Value::Mapping(mapping) => {
67 let mut result = BTreeSet::new();
68 for (_k, v) in mapping.into_iter() {
69 result.extend(discover_secret_references_in_yaml(v)?);
70 }
71 result
72 }
73 Value::Tagged(tag_val) => discover_secret_references_in_yaml(&tag_val.value)?,
74 })
75}
76
77pub fn default_secrets_directory() -> &'static Path {
79 Path::new("/etc/feldera-secrets")
80}
81
82#[derive(Debug, Clone, PartialEq, Eq, ThisError)]
83pub enum SecretRefResolutionError {
84 #[error("{e}")]
85 MaybeSecretRefParseFailed { e: MaybeSecretRefParseError },
86 #[error("secret reference '{secret_ref}' resolution failed: file '{path}' does exist but unable to read it due to: {error_kind}")]
87 CannotReadSecretFile {
88 secret_ref: SecretRef,
89 path: String,
90 error_kind: ErrorKind,
91 },
92 #[error("secret reference '{secret_ref}' resolution failed: file '{path}' does not exist")]
93 SecretFileDoesNotExist { secret_ref: SecretRef, path: String },
94 #[error(
95 "secret reference '{secret_ref}' resolution failed: path '{path}' is not a regular file"
96 )]
97 SecretPathIsNotRegularFile { secret_ref: SecretRef, path: String },
98 #[error("secret reference '{secret_ref}' resolution failed: cannot determine if '{path}' is an existing file due to: {error_kind}")]
99 SecretFileExistenceUnknown {
100 secret_ref: SecretRef,
101 path: String,
102 error_kind: ErrorKind,
103 },
104 #[error("secret resolution led to a duplicate key in the mapping, which should not happen")]
105 DuplicateKeyInMapping,
106 #[error("unable to serialize connector configuration: {error}")]
107 SerializationFailed { error: String },
108 #[error("unable to deserialize connector configuration (error omitted)")]
109 DeserializationFailed,
110}
111
112pub fn resolve_secret_references_in_connector_config(
114 secrets_dir: &Path,
115 connector_config: &ConnectorConfig,
116) -> Result<ConnectorConfig, SecretRefResolutionError> {
117 let connector_config = connector_config.clone();
118 Ok(ConnectorConfig {
119 transport: resolve_secret_references_via_yaml(secrets_dir, &connector_config.transport)?,
120 format: resolve_secret_references_via_yaml(secrets_dir, &connector_config.format)?,
121 ..connector_config
122 })
123}
124
125pub fn resolve_secret_references_via_yaml<T>(
127 secrets_dir: &Path,
128 value: &T,
129) -> Result<T, SecretRefResolutionError>
130where
131 T: Serialize + DeserializeOwned,
132{
133 let yaml_value =
134 serde_yaml::to_value(value).map_err(|e| SecretRefResolutionError::SerializationFailed {
135 error: e.to_string(),
136 })?;
137 let resolved_yaml = resolve_secret_references_in_yaml(secrets_dir, yaml_value)?;
138 serde_yaml::from_value(resolved_yaml)
139 .map_err(|_e| SecretRefResolutionError::DeserializationFailed)
140}
141
142fn resolve_secret_references_in_yaml(
144 secrets_dir: &Path,
145 value: Value,
146) -> Result<Value, SecretRefResolutionError> {
147 Ok(match value {
148 Value::Null => Value::Null,
149 Value::Bool(b) => Value::Bool(b),
150 Value::Number(n) => Value::Number(n),
151 Value::String(s) => {
152 Value::String(resolve_potential_secret_reference_string(secrets_dir, s)?)
153 }
154 Value::Sequence(seq) => Value::Sequence(
155 seq.into_iter()
156 .map(|v| resolve_secret_references_in_yaml(secrets_dir, v))
157 .collect::<Result<Vec<Value>, SecretRefResolutionError>>()?,
158 ),
159 Value::Mapping(mapping) => {
160 let mut new_mapping = Mapping::new();
161 for (k, v) in mapping.into_iter() {
162 if let Some(_existing) =
163 new_mapping.insert(k, resolve_secret_references_in_yaml(secrets_dir, v)?)
164 {
165 return Err(SecretRefResolutionError::DuplicateKeyInMapping);
166 }
167 }
168 Value::Mapping(new_mapping)
169 }
170 Value::Tagged(mut tag_val) => {
171 tag_val.value = resolve_secret_references_in_yaml(secrets_dir, tag_val.value)?;
172 Value::Tagged(tag_val)
173 }
174 })
175}
176
177fn resolve_potential_secret_reference_string(
179 secrets_dir: &Path,
180 s: String,
181) -> Result<String, SecretRefResolutionError> {
182 match MaybeSecretRef::new(s) {
183 Ok(maybe_secret_ref) => match maybe_secret_ref {
184 MaybeSecretRef::String(plain_str) => Ok(plain_str),
185 MaybeSecretRef::SecretRef(secret_ref) => match &secret_ref {
186 SecretRef::Kubernetes { name, data_key } => {
187 let path = Path::new(secrets_dir)
190 .join("kubernetes")
191 .join(name)
192 .join(data_key);
193
194 if path.is_file() {
197 match fs::read_to_string(&path) {
198 Ok(content) => Ok(content),
199 Err(e) => {
200 Err(SecretRefResolutionError::CannotReadSecretFile {
201 secret_ref,
202 path: path.display().to_string(),
203 error_kind: e.kind(), })
205 }
206 }
207 } else {
208 match path.try_exists() {
209 Ok(exists) => {
210 if exists {
211 Err(SecretRefResolutionError::SecretPathIsNotRegularFile {
212 secret_ref,
213 path: path.display().to_string(),
214 })
215 } else {
216 Err(SecretRefResolutionError::SecretFileDoesNotExist {
217 secret_ref,
218 path: path.display().to_string(),
219 })
220 }
221 }
222 Err(e) => Err(SecretRefResolutionError::SecretFileExistenceUnknown {
223 secret_ref,
224 path: path.display().to_string(),
225 error_kind: e.kind(), }),
227 }
228 }
229 }
230 },
231 },
232 Err(e) => Err(SecretRefResolutionError::MaybeSecretRefParseFailed { e }),
233 }
234}
235
236#[cfg(test)]
237mod tests {
238 use crate::config::{ConnectorConfig, TransportConfig};
239 use crate::secret_ref::{MaybeSecretRef, SecretRef};
240 use crate::secret_resolver::{
241 discover_secret_references_in_connector_config, discover_secret_references_in_yaml,
242 resolve_potential_secret_reference_string, resolve_secret_references_in_connector_config,
243 resolve_secret_references_in_yaml, SecretRefResolutionError,
244 };
245 use serde_json::json;
246 use std::collections::BTreeSet;
247 use std::fs::{create_dir_all, File};
248 use std::io::Write;
249
250 #[test]
251 fn resolve_kubernetes_secret_success() {
252 let dir = tempfile::tempdir().unwrap();
254 let dir_path = dir.path();
255 let name_dir = &dir_path.join("kubernetes").join("a");
256 create_dir_all(name_dir).unwrap();
257 let data_key_file_path = &name_dir.join("b");
258 let mut file = File::create(data_key_file_path).unwrap();
259 file.write_all(b"example").unwrap();
260
261 assert_eq!(
263 resolve_potential_secret_reference_string(
264 dir_path,
265 "${secret:kubernetes:a/b}".to_string()
266 )
267 .unwrap(),
268 "example"
269 );
270 }
271
272 #[test]
273 fn resolve_kubernetes_secret_max_size_success() {
274 let dir = tempfile::tempdir().unwrap();
276 let dir_path = dir.path();
277 let name_dir = &dir_path.join("kubernetes").join("a".repeat(63));
278 create_dir_all(name_dir).unwrap();
279 let data_key_file_path = &name_dir.join("b".repeat(255));
280 let mut file = File::create(data_key_file_path).unwrap();
281 file.write_all(b"example").unwrap();
282
283 assert_eq!(
285 resolve_potential_secret_reference_string(
286 dir_path,
287 format!(
288 "${{secret:kubernetes:{}/{}}}",
289 "a".repeat(63),
290 "b".repeat(255)
291 )
292 )
293 .unwrap(),
294 "example"
295 );
296 }
297
298 #[test]
299 fn resolve_kubernetes_secret_path_not_a_file() {
300 let dir = tempfile::tempdir().unwrap();
302 let dir_path = dir.path();
303 let data_key_file_path = &dir_path.join("kubernetes").join("a").join("b");
304 create_dir_all(data_key_file_path).unwrap();
305
306 let MaybeSecretRef::SecretRef(secret_ref) =
308 MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
309 else {
310 unreachable!();
311 };
312 assert_eq!(
313 resolve_potential_secret_reference_string(
314 dir_path,
315 "${secret:kubernetes:a/b}".to_string()
316 )
317 .unwrap_err(),
318 SecretRefResolutionError::SecretPathIsNotRegularFile {
319 secret_ref,
320 path: data_key_file_path.display().to_string()
321 }
322 );
323 }
324
325 #[test]
326 fn resolve_kubernetes_secret_file_does_not_exist() {
327 let dir = tempfile::tempdir().unwrap();
329 let dir_path = dir.path();
330 let name_dir = &dir_path.join("kubernetes").join("a");
331 create_dir_all(name_dir).unwrap();
332 let data_key_file_path = &name_dir.join("b");
333
334 let MaybeSecretRef::SecretRef(secret_ref) =
336 MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
337 else {
338 unreachable!();
339 };
340 assert_eq!(
341 resolve_potential_secret_reference_string(
342 dir_path,
343 "${secret:kubernetes:a/b}".to_string()
344 )
345 .unwrap_err(),
346 SecretRefResolutionError::SecretFileDoesNotExist {
347 secret_ref,
348 path: data_key_file_path.display().to_string()
349 }
350 );
351 }
352
353 #[test]
354 fn resolve_secret_ref_cannot_read_file() {
355 let dir = tempfile::tempdir().unwrap();
357 let dir_path = dir.path();
358 let name_dir = &dir_path.join("kubernetes").join("a");
359 create_dir_all(name_dir).unwrap();
360 let data_key_file_path = &name_dir.join("b");
361 let mut file = File::create(data_key_file_path).unwrap();
362 file.write_all(&[255, 255]).unwrap();
363
364 let MaybeSecretRef::SecretRef(secret_ref) =
366 MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
367 else {
368 unreachable!();
369 };
370 assert_eq!(
371 resolve_potential_secret_reference_string(
372 dir_path,
373 "${secret:kubernetes:a/b}".to_string()
374 )
375 .unwrap_err(),
376 SecretRefResolutionError::CannotReadSecretFile {
377 secret_ref,
378 path: data_key_file_path.display().to_string(),
379 error_kind: std::io::ErrorKind::InvalidData
380 }
381 );
382 }
383
384 #[test]
385 fn secret_resolution_yaml() {
386 let dir = tempfile::tempdir().unwrap();
390 let dir_path = dir.path();
391 let name_dir = &dir_path.join("kubernetes").join("a");
392 create_dir_all(name_dir).unwrap();
393 let data_key_file_path = &name_dir.join("b");
394 let mut file = File::create(data_key_file_path).unwrap();
395 file.write_all(b"example1").unwrap();
396 let name_dir = &dir_path.join("kubernetes").join("c");
397 create_dir_all(name_dir).unwrap();
398 let data_key_file_path = &name_dir.join("d");
399 let mut file = File::create(data_key_file_path).unwrap();
400 file.write_all(b"example2").unwrap();
401
402 let input = r#"
404 a: null
405 b: false,
406 c: 123
407 d: "val1"
408 e: [1, "2"]
409 f:
410 f1: 1
411 f2: "val2"
412 g: !str "val3"
413 "${secret:kubernetes:a/b}": 123
414 "${secret:kubernetes:e/f}": 456
415 s1: "${secret:kubernetes:a/b}"
416 s2: ["${secret:kubernetes:a/b}"]
417 s3:
418 s31: "${secret:kubernetes:a/b}"
419 s32: ["${secret:kubernetes:a/b}", "${secret:kubernetes:c/d}"]
420 s4: !str "${secret:kubernetes:c/d}"
421 "#;
422 let expectation = r#"
423 a: null
424 b: false,
425 c: 123
426 d: "val1"
427 e: [1, "2"]
428 f:
429 f1: 1
430 f2: "val2"
431 g: !str "val3"
432 "${secret:kubernetes:a/b}": 123
433 "${secret:kubernetes:e/f}": 456
434 s1: "example1"
435 s2: ["example1"]
436 s3:
437 s31: "example1"
438 s32: ["example1", "example2"]
439 s4: !str "example2"
440 "#;
441 assert_eq!(
442 resolve_secret_references_in_yaml(dir_path, serde_yaml::from_str(input).unwrap())
443 .unwrap(),
444 serde_yaml::from_str::<serde_yaml::Value>(expectation).unwrap()
445 );
446 assert_eq!(
447 discover_secret_references_in_yaml(&serde_yaml::from_str(input).unwrap()).unwrap(),
448 BTreeSet::from([
449 SecretRef::Kubernetes {
450 name: "a".to_string(),
451 data_key: "b".to_string(),
452 },
453 SecretRef::Kubernetes {
454 name: "c".to_string(),
455 data_key: "d".to_string(),
456 },
457 ])
458 );
459 assert_eq!(
460 discover_secret_references_in_yaml(&serde_yaml::from_str(expectation).unwrap())
461 .unwrap(),
462 BTreeSet::from([])
463 );
464 }
465
466 #[test]
467 fn secret_resolution_connector_config() {
468 let dir = tempfile::tempdir().unwrap();
472 let dir_path = dir.path();
473 let name_dir = &dir_path.join("kubernetes").join("a");
474 create_dir_all(name_dir).unwrap();
475 let data_key_file_path = &name_dir.join("b");
476 let mut file = File::create(data_key_file_path).unwrap();
477 file.write_all(b"example1").unwrap();
478 let name_dir = &dir_path.join("kubernetes").join("c");
479 create_dir_all(name_dir).unwrap();
480 let data_key_file_path = &name_dir.join("d");
481 let mut file = File::create(data_key_file_path).unwrap();
482 file.write_all(b"example2").unwrap();
483
484 let connector_config_json = json!({
486 "transport": {
487 "name": "datagen",
488 "config": {
489 "plan": [{
490 "limit": 2,
491 "fields": {
492 "col1": { "values": [1, 2] },
493 "col2": { "values": ["${secret:kubernetes:a/b}", "${secret:kubernetes:c/d}"] }
494 }
495 }]
496 }
497 },
498 "format": {
499 "name": "json",
500 "config": {
501 "example": "${secret:kubernetes:a/b}"
502 }
503 },
504 "index": "${secret:kubernetes:e/f}"
505 });
506 let connector_config: ConnectorConfig =
507 serde_json::from_value(connector_config_json).unwrap();
508 assert_eq!(
509 discover_secret_references_in_connector_config(&connector_config).unwrap(),
510 BTreeSet::from([
511 SecretRef::Kubernetes {
512 name: "a".to_string(),
513 data_key: "b".to_string(),
514 },
515 SecretRef::Kubernetes {
516 name: "c".to_string(),
517 data_key: "d".to_string(),
518 },
519 ])
520 );
521 let connector_config_secrets_resolved =
522 resolve_secret_references_in_connector_config(dir_path, &connector_config).unwrap();
523
524 let TransportConfig::Datagen(datagen_input_config) =
526 connector_config_secrets_resolved.transport
527 else {
528 unreachable!();
529 };
530 assert_eq!(
531 datagen_input_config.plan[0].fields["col2"]
532 .values
533 .as_ref()
534 .unwrap(),
535 &vec![json!("example1"), json!("example2")]
536 );
537
538 let Some(format_config) = connector_config_secrets_resolved.format else {
540 unreachable!();
541 };
542 let mut expected_mapping = serde_yaml::Mapping::new();
543 expected_mapping.insert(
544 serde_yaml::Value::String("example".to_string()),
545 serde_yaml::Value::String("example1".to_string()),
546 );
547 assert_eq!(
548 format_config.config,
549 serde_yaml::Value::Mapping(expected_mapping)
550 );
551
552 assert_eq!(
554 connector_config.index,
555 Some("${secret:kubernetes:e/f}".to_string())
556 );
557 }
558}