1use crate::config::ConnectorConfig;
2use crate::secret_ref::{MaybeSecretRef, MaybeSecretRefParseError, SecretRef};
3use serde::de::DeserializeOwned;
4use serde::Serialize;
5use serde_json::{Map, Value};
6use std::collections::BTreeSet;
7use std::fmt::Debug;
8use std::fs;
9use std::io::ErrorKind;
10use std::path::Path;
11use thiserror::Error as ThisError;
12
13#[derive(Debug, Clone, PartialEq, Eq, ThisError)]
14pub enum SecretRefDiscoveryError {
15 #[error("{e}")]
16 MaybeSecretRefParseFailed { e: MaybeSecretRefParseError },
17 #[error("unable to serialize connector configuration: {error}")]
18 SerializationFailed { error: String },
19 #[error("unable to deserialize connector configuration (error omitted)")]
20 DeserializationFailed,
21}
22
23pub fn discover_secret_references_in_connector_config(
25 connector_config: &serde_json::Value,
26) -> Result<BTreeSet<SecretRef>, SecretRefDiscoveryError> {
27 let mut result = BTreeSet::new();
28 if let Some(transport_config_json) = connector_config
29 .get("transport")
30 .and_then(|v| v.get("config"))
31 {
32 result.extend(discover_secret_references_in_json(transport_config_json)?);
33 }
34 if let Some(format_config_json) = connector_config.get("format").and_then(|v| v.get("config")) {
35 result.extend(discover_secret_references_in_json(format_config_json)?);
36 }
37 Ok(result)
38}
39
40fn discover_secret_references_in_json(
42 value: &Value,
43) -> Result<BTreeSet<SecretRef>, SecretRefDiscoveryError> {
44 Ok(match value {
45 Value::Null => BTreeSet::new(),
46 Value::Bool(_b) => BTreeSet::new(),
47 Value::Number(_n) => BTreeSet::new(),
48 Value::String(s) => {
49 if let MaybeSecretRef::SecretRef(secret_ref) = MaybeSecretRef::new(s.clone())
50 .map_err(|e| SecretRefDiscoveryError::MaybeSecretRefParseFailed { e })?
51 {
52 BTreeSet::from([secret_ref])
53 } else {
54 BTreeSet::new()
55 }
56 }
57 Value::Array(seq) => {
58 let mut result = BTreeSet::new();
59 for entry in seq.iter() {
60 result.extend(discover_secret_references_in_json(entry)?)
61 }
62 result
63 }
64 Value::Object(mapping) => {
65 let mut result = BTreeSet::new();
66 for (_k, v) in mapping.into_iter() {
67 result.extend(discover_secret_references_in_json(v)?);
68 }
69 result
70 }
71 })
72}
73
74pub fn default_secrets_directory() -> &'static Path {
76 Path::new("/etc/feldera-secrets")
77}
78
79#[derive(Debug, Clone, PartialEq, Eq, ThisError)]
80pub enum SecretRefResolutionError {
81 #[error("{e}")]
82 MaybeSecretRefParseFailed { e: MaybeSecretRefParseError },
83 #[error("secret reference '{secret_ref}' resolution failed: file '{path}' does exist but unable to read it due to: {error_kind}")]
84 CannotReadSecretFile {
85 secret_ref: SecretRef,
86 path: String,
87 error_kind: ErrorKind,
88 },
89 #[error("secret reference '{secret_ref}' resolution failed: file '{path}' does not exist")]
90 SecretFileDoesNotExist { secret_ref: SecretRef, path: String },
91 #[error(
92 "secret reference '{secret_ref}' resolution failed: path '{path}' is not a regular file"
93 )]
94 SecretPathIsNotRegularFile { secret_ref: SecretRef, path: String },
95 #[error("secret reference '{secret_ref}' resolution failed: cannot determine if '{path}' is an existing file due to: {error_kind}")]
96 SecretFileExistenceUnknown {
97 secret_ref: SecretRef,
98 path: String,
99 error_kind: ErrorKind,
100 },
101 #[error("secret resolution led to a duplicate key in the mapping, which should not happen")]
102 DuplicateKeyInMapping,
103 #[error("unable to serialize connector configuration: {error}")]
104 SerializationFailed { error: String },
105 #[error("unable to deserialize connector configuration (error omitted)")]
106 DeserializationFailed,
107}
108
109pub fn resolve_secret_references_in_connector_config(
111 secrets_dir: &Path,
112 connector_config: &ConnectorConfig,
113) -> Result<ConnectorConfig, SecretRefResolutionError> {
114 let connector_config = connector_config.clone();
115 Ok(ConnectorConfig {
116 transport: resolve_secret_references_via_json(secrets_dir, &connector_config.transport)?,
117 format: resolve_secret_references_via_json(secrets_dir, &connector_config.format)?,
118 ..connector_config
119 })
120}
121
122pub fn resolve_secret_references_via_json<T>(
124 secrets_dir: &Path,
125 value: &T,
126) -> Result<T, SecretRefResolutionError>
127where
128 T: Serialize + DeserializeOwned,
129{
130 let json_value =
131 serde_json::to_value(value).map_err(|e| SecretRefResolutionError::SerializationFailed {
132 error: e.to_string(),
133 })?;
134 let resolved_json = resolve_secret_references_in_json(secrets_dir, json_value)?;
135 serde_json::from_value(resolved_json)
136 .map_err(|_e| SecretRefResolutionError::DeserializationFailed)
137}
138
139fn resolve_secret_references_in_json(
141 secrets_dir: &Path,
142 value: Value,
143) -> Result<Value, SecretRefResolutionError> {
144 Ok(match value {
145 Value::Null => Value::Null,
146 Value::Bool(b) => Value::Bool(b),
147 Value::Number(n) => Value::Number(n),
148 Value::String(s) => {
149 Value::String(resolve_potential_secret_reference_string(secrets_dir, s)?)
150 }
151 Value::Array(seq) => Value::Array(
152 seq.into_iter()
153 .map(|v| resolve_secret_references_in_json(secrets_dir, v))
154 .collect::<Result<Vec<Value>, SecretRefResolutionError>>()?,
155 ),
156 Value::Object(mapping) => {
157 let mut new_mapping = Map::new();
158 for (k, v) in mapping.into_iter() {
159 if let Some(_existing) =
160 new_mapping.insert(k, resolve_secret_references_in_json(secrets_dir, v)?)
161 {
162 return Err(SecretRefResolutionError::DuplicateKeyInMapping);
163 }
164 }
165 Value::Object(new_mapping)
166 }
167 })
168}
169
170fn resolve_potential_secret_reference_string(
172 secrets_dir: &Path,
173 s: String,
174) -> Result<String, SecretRefResolutionError> {
175 match MaybeSecretRef::new(s) {
176 Ok(maybe_secret_ref) => match maybe_secret_ref {
177 MaybeSecretRef::String(plain_str) => Ok(plain_str),
178 MaybeSecretRef::SecretRef(secret_ref) => match &secret_ref {
179 SecretRef::Kubernetes { name, data_key } => {
180 let path = Path::new(secrets_dir)
183 .join("kubernetes")
184 .join(name)
185 .join(data_key);
186
187 if path.is_file() {
190 match fs::read_to_string(&path) {
191 Ok(content) => Ok(content),
192 Err(e) => {
193 Err(SecretRefResolutionError::CannotReadSecretFile {
194 secret_ref,
195 path: path.display().to_string(),
196 error_kind: e.kind(), })
198 }
199 }
200 } else {
201 match path.try_exists() {
202 Ok(exists) => {
203 if exists {
204 Err(SecretRefResolutionError::SecretPathIsNotRegularFile {
205 secret_ref,
206 path: path.display().to_string(),
207 })
208 } else {
209 Err(SecretRefResolutionError::SecretFileDoesNotExist {
210 secret_ref,
211 path: path.display().to_string(),
212 })
213 }
214 }
215 Err(e) => Err(SecretRefResolutionError::SecretFileExistenceUnknown {
216 secret_ref,
217 path: path.display().to_string(),
218 error_kind: e.kind(), }),
220 }
221 }
222 }
223 },
224 },
225 Err(e) => Err(SecretRefResolutionError::MaybeSecretRefParseFailed { e }),
226 }
227}
228
229#[cfg(test)]
230mod tests {
231 use crate::config::{ConnectorConfig, TransportConfig};
232 use crate::secret_ref::{MaybeSecretRef, SecretRef};
233 use crate::secret_resolver::{
234 discover_secret_references_in_connector_config, discover_secret_references_in_json,
235 resolve_potential_secret_reference_string, resolve_secret_references_in_connector_config,
236 resolve_secret_references_in_json, SecretRefResolutionError,
237 };
238 use serde_json::json;
239 use std::collections::BTreeSet;
240 use std::fs::{create_dir_all, File};
241 use std::io::Write;
242
243 #[test]
244 fn resolve_kubernetes_secret_success() {
245 let dir = tempfile::tempdir().unwrap();
247 let dir_path = dir.path();
248 let name_dir = &dir_path.join("kubernetes").join("a");
249 create_dir_all(name_dir).unwrap();
250 let data_key_file_path = &name_dir.join("b");
251 let mut file = File::create(data_key_file_path).unwrap();
252 file.write_all(b"example").unwrap();
253
254 assert_eq!(
256 resolve_potential_secret_reference_string(
257 dir_path,
258 "${secret:kubernetes:a/b}".to_string()
259 )
260 .unwrap(),
261 "example"
262 );
263 }
264
265 #[test]
266 fn resolve_kubernetes_secret_max_size_success() {
267 let dir = tempfile::tempdir().unwrap();
269 let dir_path = dir.path();
270 let name_dir = &dir_path.join("kubernetes").join("a".repeat(63));
271 create_dir_all(name_dir).unwrap();
272 let data_key_file_path = &name_dir.join("b".repeat(255));
273 let mut file = File::create(data_key_file_path).unwrap();
274 file.write_all(b"example").unwrap();
275
276 assert_eq!(
278 resolve_potential_secret_reference_string(
279 dir_path,
280 format!(
281 "${{secret:kubernetes:{}/{}}}",
282 "a".repeat(63),
283 "b".repeat(255)
284 )
285 )
286 .unwrap(),
287 "example"
288 );
289 }
290
291 #[test]
292 fn resolve_kubernetes_secret_path_not_a_file() {
293 let dir = tempfile::tempdir().unwrap();
295 let dir_path = dir.path();
296 let data_key_file_path = &dir_path.join("kubernetes").join("a").join("b");
297 create_dir_all(data_key_file_path).unwrap();
298
299 let MaybeSecretRef::SecretRef(secret_ref) =
301 MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
302 else {
303 unreachable!();
304 };
305 assert_eq!(
306 resolve_potential_secret_reference_string(
307 dir_path,
308 "${secret:kubernetes:a/b}".to_string()
309 )
310 .unwrap_err(),
311 SecretRefResolutionError::SecretPathIsNotRegularFile {
312 secret_ref,
313 path: data_key_file_path.display().to_string()
314 }
315 );
316 }
317
318 #[test]
319 fn resolve_kubernetes_secret_file_does_not_exist() {
320 let dir = tempfile::tempdir().unwrap();
322 let dir_path = dir.path();
323 let name_dir = &dir_path.join("kubernetes").join("a");
324 create_dir_all(name_dir).unwrap();
325 let data_key_file_path = &name_dir.join("b");
326
327 let MaybeSecretRef::SecretRef(secret_ref) =
329 MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
330 else {
331 unreachable!();
332 };
333 assert_eq!(
334 resolve_potential_secret_reference_string(
335 dir_path,
336 "${secret:kubernetes:a/b}".to_string()
337 )
338 .unwrap_err(),
339 SecretRefResolutionError::SecretFileDoesNotExist {
340 secret_ref,
341 path: data_key_file_path.display().to_string()
342 }
343 );
344 }
345
346 #[test]
347 fn resolve_secret_ref_cannot_read_file() {
348 let dir = tempfile::tempdir().unwrap();
350 let dir_path = dir.path();
351 let name_dir = &dir_path.join("kubernetes").join("a");
352 create_dir_all(name_dir).unwrap();
353 let data_key_file_path = &name_dir.join("b");
354 let mut file = File::create(data_key_file_path).unwrap();
355 file.write_all(&[255, 255]).unwrap();
356
357 let MaybeSecretRef::SecretRef(secret_ref) =
359 MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
360 else {
361 unreachable!();
362 };
363 assert_eq!(
364 resolve_potential_secret_reference_string(
365 dir_path,
366 "${secret:kubernetes:a/b}".to_string()
367 )
368 .unwrap_err(),
369 SecretRefResolutionError::CannotReadSecretFile {
370 secret_ref,
371 path: data_key_file_path.display().to_string(),
372 error_kind: std::io::ErrorKind::InvalidData
373 }
374 );
375 }
376
377 #[test]
378 fn secret_resolution_json() {
379 let dir = tempfile::tempdir().unwrap();
383 let dir_path = dir.path();
384 let name_dir = &dir_path.join("kubernetes").join("a");
385 create_dir_all(name_dir).unwrap();
386 let data_key_file_path = &name_dir.join("b");
387 let mut file = File::create(data_key_file_path).unwrap();
388 file.write_all(b"example1").unwrap();
389 let name_dir = &dir_path.join("kubernetes").join("c");
390 create_dir_all(name_dir).unwrap();
391 let data_key_file_path = &name_dir.join("d");
392 let mut file = File::create(data_key_file_path).unwrap();
393 file.write_all(b"example2").unwrap();
394
395 let input = json!({
397 "a": null,
398 "b": "false,",
399 "c": 123,
400 "d": "val1",
401 "e": [
402 1,
403 "2"
404 ],
405 "f": {
406 "f1": 1,
407 "f2": "val2"
408 },
409 "g": "val3",
410 "${secret:kubernetes:a/b}": 123,
411 "${secret:kubernetes:e/f}": 456,
412 "s1": "${secret:kubernetes:a/b}",
413 "s2": [
414 "${secret:kubernetes:a/b}"
415 ],
416 "s3": {
417 "s31": "${secret:kubernetes:a/b}",
418 "s32": [
419 "${secret:kubernetes:a/b}",
420 "${secret:kubernetes:c/d}"
421 ]
422 },
423 "s4": "${secret:kubernetes:c/d}"
424 });
425
426 let expectation = json!({
427 "a": null,
428 "b": "false,",
429 "c": 123,
430 "d": "val1",
431 "e": [
432 1,
433 "2"
434 ],
435 "f": {
436 "f1": 1,
437 "f2": "val2"
438 },
439 "g": "val3",
440 "${secret:kubernetes:a/b}": 123,
441 "${secret:kubernetes:e/f}": 456,
442 "s1": "example1",
443 "s2": [
444 "example1"
445 ],
446 "s3": {
447 "s31": "example1",
448 "s32": [
449 "example1",
450 "example2"
451 ]
452 },
453 "s4": "example2"
454 });
455 assert_eq!(
456 resolve_secret_references_in_json(dir_path, input.clone()).unwrap(),
457 expectation
458 );
459 assert_eq!(
460 discover_secret_references_in_json(&input).unwrap(),
461 BTreeSet::from([
462 SecretRef::Kubernetes {
463 name: "a".to_string(),
464 data_key: "b".to_string(),
465 },
466 SecretRef::Kubernetes {
467 name: "c".to_string(),
468 data_key: "d".to_string(),
469 },
470 ])
471 );
472 assert_eq!(
473 discover_secret_references_in_json(&expectation).unwrap(),
474 BTreeSet::from([])
475 );
476 }
477
478 #[test]
479 fn secret_resolution_connector_config() {
480 let dir = tempfile::tempdir().unwrap();
484 let dir_path = dir.path();
485 let name_dir = &dir_path.join("kubernetes").join("a");
486 create_dir_all(name_dir).unwrap();
487 let data_key_file_path = &name_dir.join("b");
488 let mut file = File::create(data_key_file_path).unwrap();
489 file.write_all(b"example1").unwrap();
490 let name_dir = &dir_path.join("kubernetes").join("c");
491 create_dir_all(name_dir).unwrap();
492 let data_key_file_path = &name_dir.join("d");
493 let mut file = File::create(data_key_file_path).unwrap();
494 file.write_all(b"example2").unwrap();
495
496 let connector_config_json = json!({
498 "transport": {
499 "name": "datagen",
500 "config": {
501 "plan": [{
502 "limit": 2,
503 "fields": {
504 "col1": { "values": [1, 2] },
505 "col2": { "values": ["${secret:kubernetes:a/b}", "${secret:kubernetes:c/d}"] }
506 }
507 }]
508 }
509 },
510 "format": {
511 "name": "json",
512 "config": {
513 "example": "${secret:kubernetes:a/b}"
514 }
515 },
516 "index": "${secret:kubernetes:e/f}"
517 });
518 assert_eq!(
519 discover_secret_references_in_connector_config(&connector_config_json).unwrap(),
520 BTreeSet::from([
521 SecretRef::Kubernetes {
522 name: "a".to_string(),
523 data_key: "b".to_string(),
524 },
525 SecretRef::Kubernetes {
526 name: "c".to_string(),
527 data_key: "d".to_string(),
528 },
529 ])
530 );
531
532 let connector_config: ConnectorConfig =
533 serde_json::from_value(connector_config_json).unwrap();
534
535 let connector_config_secrets_resolved =
536 resolve_secret_references_in_connector_config(dir_path, &connector_config).unwrap();
537
538 let TransportConfig::Datagen(datagen_input_config) =
540 connector_config_secrets_resolved.transport
541 else {
542 unreachable!();
543 };
544 assert_eq!(
545 datagen_input_config.plan[0].fields["col2"]
546 .values
547 .as_ref()
548 .unwrap(),
549 &vec![json!("example1"), json!("example2")]
550 );
551
552 let Some(format_config) = connector_config_secrets_resolved.format else {
554 unreachable!();
555 };
556 assert_eq!(format_config.config, json!({"example": "example1"}));
557
558 assert_eq!(
560 connector_config.index,
561 Some("${secret:kubernetes:e/f}".to_string())
562 );
563 }
564}