1use crate::config::ConnectorConfig;
2use crate::secret_ref::{MaybeSecretRef, MaybeSecretRefParseError, SecretRef};
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5use serde_json::{Map, Value};
6use std::collections::BTreeSet;
7use std::fmt::Debug;
8use std::fs;
9use std::io::ErrorKind;
10use std::path::Path;
11use thiserror::Error as ThisError;
12
13#[derive(Debug, Clone, PartialEq, Eq, ThisError)]
14pub enum SecretRefDiscoveryError {
15 #[error("{e}")]
16 MaybeSecretRefParseFailed { e: MaybeSecretRefParseError },
17 #[error("unable to serialize connector configuration: {error}")]
18 SerializationFailed { error: String },
19 #[error("unable to deserialize connector configuration (error omitted)")]
20 DeserializationFailed,
21}
22
23pub fn discover_secret_references_in_connector_config(
25 connector_config: &serde_json::Value,
26) -> Result<BTreeSet<SecretRef>, SecretRefDiscoveryError> {
27 let mut result = BTreeSet::new();
28 if let Some(transport_config_json) = connector_config
29 .get("transport")
30 .and_then(|v| v.get("config"))
31 {
32 result.extend(discover_secret_references_in_json(transport_config_json)?);
33 }
34 if let Some(format_config_json) = connector_config.get("format").and_then(|v| v.get("config")) {
35 result.extend(discover_secret_references_in_json(format_config_json)?);
36 }
37 Ok(result)
38}
39
40fn discover_secret_references_in_json(
42 value: &Value,
43) -> Result<BTreeSet<SecretRef>, SecretRefDiscoveryError> {
44 Ok(match value {
45 Value::Null => BTreeSet::new(),
46 Value::Bool(_b) => BTreeSet::new(),
47 Value::Number(_n) => BTreeSet::new(),
48 Value::String(s) => {
49 if let MaybeSecretRef::SecretRef(secret_ref) = MaybeSecretRef::new(s.clone())
50 .map_err(|e| SecretRefDiscoveryError::MaybeSecretRefParseFailed { e })?
51 {
52 BTreeSet::from([secret_ref])
53 } else {
54 BTreeSet::new()
55 }
56 }
57 Value::Array(seq) => {
58 let mut result = BTreeSet::new();
59 for entry in seq.iter() {
60 result.extend(discover_secret_references_in_json(entry)?)
61 }
62 result
63 }
64 Value::Object(mapping) => {
65 let mut result = BTreeSet::new();
66 for (_k, v) in mapping.into_iter() {
67 result.extend(discover_secret_references_in_json(v)?);
68 }
69 result
70 }
71 })
72}
73
74pub fn default_secrets_directory() -> &'static Path {
76 Path::new("/etc/feldera-secrets")
77}
78
79#[derive(Debug, Clone, PartialEq, Eq, ThisError)]
80pub enum SecretRefResolutionError {
81 #[error("{e}")]
82 MaybeSecretRefParseFailed { e: MaybeSecretRefParseError },
83 #[error(
84 "secret reference '{secret_ref}' resolution failed: file '{path}' does exist but unable to read it due to: {error_kind}"
85 )]
86 CannotReadSecretFile {
87 secret_ref: SecretRef,
88 path: String,
89 error_kind: ErrorKind,
90 },
91 #[error("secret reference '{secret_ref}' resolution failed: file '{path}' does not exist")]
92 SecretFileDoesNotExist { secret_ref: SecretRef, path: String },
93 #[error(
94 "secret reference '{secret_ref}' resolution failed: path '{path}' is not a regular file"
95 )]
96 SecretPathIsNotRegularFile { secret_ref: SecretRef, path: String },
97 #[error(
98 "secret reference '{secret_ref}' resolution failed: cannot determine if '{path}' is an existing file due to: {error_kind}"
99 )]
100 SecretFileExistenceUnknown {
101 secret_ref: SecretRef,
102 path: String,
103 error_kind: ErrorKind,
104 },
105 #[error("secret resolution led to a duplicate key in the mapping, which should not happen")]
106 DuplicateKeyInMapping,
107 #[error("unable to serialize connector configuration: {error}")]
108 SerializationFailed { error: String },
109 #[error("unable to deserialize connector configuration (error omitted)")]
110 DeserializationFailed,
111}
112
113pub fn resolve_secret_references_in_connector_config(
115 secrets_dir: &Path,
116 connector_config: &ConnectorConfig,
117) -> Result<ConnectorConfig, SecretRefResolutionError> {
118 let connector_config = connector_config.clone();
119 Ok(ConnectorConfig {
120 transport: resolve_secret_references_via_json(secrets_dir, &connector_config.transport)?,
121 format: resolve_secret_references_via_json(secrets_dir, &connector_config.format)?,
122 ..connector_config
123 })
124}
125
126pub fn resolve_secret_references_via_json<T>(
128 secrets_dir: &Path,
129 value: &T,
130) -> Result<T, SecretRefResolutionError>
131where
132 T: Serialize + DeserializeOwned,
133{
134 let json_value =
135 serde_json::to_value(value).map_err(|e| SecretRefResolutionError::SerializationFailed {
136 error: e.to_string(),
137 })?;
138 let resolved_json = resolve_secret_references_in_json(secrets_dir, json_value)?;
139 serde_json::from_value(resolved_json)
140 .map_err(|_e| SecretRefResolutionError::DeserializationFailed)
141}
142
143fn resolve_secret_references_in_json(
145 secrets_dir: &Path,
146 value: Value,
147) -> Result<Value, SecretRefResolutionError> {
148 Ok(match value {
149 Value::Null => Value::Null,
150 Value::Bool(b) => Value::Bool(b),
151 Value::Number(n) => Value::Number(n),
152 Value::String(s) => {
153 Value::String(resolve_potential_secret_reference_string(secrets_dir, s)?)
154 }
155 Value::Array(seq) => Value::Array(
156 seq.into_iter()
157 .map(|v| resolve_secret_references_in_json(secrets_dir, v))
158 .collect::<Result<Vec<Value>, SecretRefResolutionError>>()?,
159 ),
160 Value::Object(mapping) => {
161 let mut new_mapping = Map::new();
162 for (k, v) in mapping.into_iter() {
163 if let Some(_existing) =
164 new_mapping.insert(k, resolve_secret_references_in_json(secrets_dir, v)?)
165 {
166 return Err(SecretRefResolutionError::DuplicateKeyInMapping);
167 }
168 }
169 Value::Object(new_mapping)
170 }
171 })
172}
173
174fn resolve_potential_secret_reference_string(
176 secrets_dir: &Path,
177 s: String,
178) -> Result<String, SecretRefResolutionError> {
179 match MaybeSecretRef::new(s) {
180 Ok(maybe_secret_ref) => match maybe_secret_ref {
181 MaybeSecretRef::String(plain_str) => Ok(plain_str),
182 MaybeSecretRef::SecretRef(secret_ref) => match &secret_ref {
183 SecretRef::Kubernetes { name, data_key } => {
184 let path = Path::new(secrets_dir)
187 .join("kubernetes")
188 .join(name)
189 .join(data_key);
190
191 if path.is_file() {
194 match fs::read_to_string(&path) {
195 Ok(content) => Ok(content),
196 Err(e) => {
197 Err(SecretRefResolutionError::CannotReadSecretFile {
198 secret_ref,
199 path: path.display().to_string(),
200 error_kind: e.kind(), })
202 }
203 }
204 } else {
205 match path.try_exists() {
206 Ok(exists) => {
207 if exists {
208 Err(SecretRefResolutionError::SecretPathIsNotRegularFile {
209 secret_ref,
210 path: path.display().to_string(),
211 })
212 } else {
213 Err(SecretRefResolutionError::SecretFileDoesNotExist {
214 secret_ref,
215 path: path.display().to_string(),
216 })
217 }
218 }
219 Err(e) => Err(SecretRefResolutionError::SecretFileExistenceUnknown {
220 secret_ref,
221 path: path.display().to_string(),
222 error_kind: e.kind(), }),
224 }
225 }
226 }
227 },
228 },
229 Err(e) => Err(SecretRefResolutionError::MaybeSecretRefParseFailed { e }),
230 }
231}
232
233#[cfg(test)]
234mod tests {
235 use crate::config::{ConnectorConfig, TransportConfig};
236 use crate::secret_ref::{MaybeSecretRef, SecretRef};
237 use crate::secret_resolver::{
238 SecretRefResolutionError, discover_secret_references_in_connector_config,
239 discover_secret_references_in_json, resolve_potential_secret_reference_string,
240 resolve_secret_references_in_connector_config, resolve_secret_references_in_json,
241 };
242 use serde_json::json;
243 use std::collections::BTreeSet;
244 use std::fs::{File, create_dir_all};
245 use std::io::Write;
246
247 #[test]
248 fn resolve_kubernetes_secret_success() {
249 let dir = tempfile::tempdir().unwrap();
251 let dir_path = dir.path();
252 let name_dir = &dir_path.join("kubernetes").join("a");
253 create_dir_all(name_dir).unwrap();
254 let data_key_file_path = &name_dir.join("b");
255 let mut file = File::create(data_key_file_path).unwrap();
256 file.write_all(b"example").unwrap();
257
258 assert_eq!(
260 resolve_potential_secret_reference_string(
261 dir_path,
262 "${secret:kubernetes:a/b}".to_string()
263 )
264 .unwrap(),
265 "example"
266 );
267 }
268
269 #[test]
270 fn resolve_kubernetes_secret_max_size_success() {
271 let dir = tempfile::tempdir().unwrap();
273 let dir_path = dir.path();
274 let name_dir = &dir_path.join("kubernetes").join("a".repeat(63));
275 create_dir_all(name_dir).unwrap();
276 let data_key_file_path = &name_dir.join("b".repeat(255));
277 let mut file = File::create(data_key_file_path).unwrap();
278 file.write_all(b"example").unwrap();
279
280 assert_eq!(
282 resolve_potential_secret_reference_string(
283 dir_path,
284 format!(
285 "${{secret:kubernetes:{}/{}}}",
286 "a".repeat(63),
287 "b".repeat(255)
288 )
289 )
290 .unwrap(),
291 "example"
292 );
293 }
294
295 #[test]
296 fn resolve_kubernetes_secret_path_not_a_file() {
297 let dir = tempfile::tempdir().unwrap();
299 let dir_path = dir.path();
300 let data_key_file_path = &dir_path.join("kubernetes").join("a").join("b");
301 create_dir_all(data_key_file_path).unwrap();
302
303 let MaybeSecretRef::SecretRef(secret_ref) =
305 MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
306 else {
307 unreachable!();
308 };
309 assert_eq!(
310 resolve_potential_secret_reference_string(
311 dir_path,
312 "${secret:kubernetes:a/b}".to_string()
313 )
314 .unwrap_err(),
315 SecretRefResolutionError::SecretPathIsNotRegularFile {
316 secret_ref,
317 path: data_key_file_path.display().to_string()
318 }
319 );
320 }
321
322 #[test]
323 fn resolve_kubernetes_secret_file_does_not_exist() {
324 let dir = tempfile::tempdir().unwrap();
326 let dir_path = dir.path();
327 let name_dir = &dir_path.join("kubernetes").join("a");
328 create_dir_all(name_dir).unwrap();
329 let data_key_file_path = &name_dir.join("b");
330
331 let MaybeSecretRef::SecretRef(secret_ref) =
333 MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
334 else {
335 unreachable!();
336 };
337 assert_eq!(
338 resolve_potential_secret_reference_string(
339 dir_path,
340 "${secret:kubernetes:a/b}".to_string()
341 )
342 .unwrap_err(),
343 SecretRefResolutionError::SecretFileDoesNotExist {
344 secret_ref,
345 path: data_key_file_path.display().to_string()
346 }
347 );
348 }
349
350 #[test]
351 fn resolve_secret_ref_cannot_read_file() {
352 let dir = tempfile::tempdir().unwrap();
354 let dir_path = dir.path();
355 let name_dir = &dir_path.join("kubernetes").join("a");
356 create_dir_all(name_dir).unwrap();
357 let data_key_file_path = &name_dir.join("b");
358 let mut file = File::create(data_key_file_path).unwrap();
359 file.write_all(&[255, 255]).unwrap();
360
361 let MaybeSecretRef::SecretRef(secret_ref) =
363 MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
364 else {
365 unreachable!();
366 };
367 assert_eq!(
368 resolve_potential_secret_reference_string(
369 dir_path,
370 "${secret:kubernetes:a/b}".to_string()
371 )
372 .unwrap_err(),
373 SecretRefResolutionError::CannotReadSecretFile {
374 secret_ref,
375 path: data_key_file_path.display().to_string(),
376 error_kind: std::io::ErrorKind::InvalidData
377 }
378 );
379 }
380
381 #[test]
382 fn secret_resolution_json() {
383 let dir = tempfile::tempdir().unwrap();
387 let dir_path = dir.path();
388 let name_dir = &dir_path.join("kubernetes").join("a");
389 create_dir_all(name_dir).unwrap();
390 let data_key_file_path = &name_dir.join("b");
391 let mut file = File::create(data_key_file_path).unwrap();
392 file.write_all(b"example1").unwrap();
393 let name_dir = &dir_path.join("kubernetes").join("c");
394 create_dir_all(name_dir).unwrap();
395 let data_key_file_path = &name_dir.join("d");
396 let mut file = File::create(data_key_file_path).unwrap();
397 file.write_all(b"example2").unwrap();
398
399 let input = json!({
401 "a": null,
402 "b": "false,",
403 "c": 123,
404 "d": "val1",
405 "e": [
406 1,
407 "2"
408 ],
409 "f": {
410 "f1": 1,
411 "f2": "val2"
412 },
413 "g": "val3",
414 "${secret:kubernetes:a/b}": 123,
415 "${secret:kubernetes:e/f}": 456,
416 "s1": "${secret:kubernetes:a/b}",
417 "s2": [
418 "${secret:kubernetes:a/b}"
419 ],
420 "s3": {
421 "s31": "${secret:kubernetes:a/b}",
422 "s32": [
423 "${secret:kubernetes:a/b}",
424 "${secret:kubernetes:c/d}"
425 ]
426 },
427 "s4": "${secret:kubernetes:c/d}"
428 });
429
430 let expectation = json!({
431 "a": null,
432 "b": "false,",
433 "c": 123,
434 "d": "val1",
435 "e": [
436 1,
437 "2"
438 ],
439 "f": {
440 "f1": 1,
441 "f2": "val2"
442 },
443 "g": "val3",
444 "${secret:kubernetes:a/b}": 123,
445 "${secret:kubernetes:e/f}": 456,
446 "s1": "example1",
447 "s2": [
448 "example1"
449 ],
450 "s3": {
451 "s31": "example1",
452 "s32": [
453 "example1",
454 "example2"
455 ]
456 },
457 "s4": "example2"
458 });
459 assert_eq!(
460 resolve_secret_references_in_json(dir_path, input.clone()).unwrap(),
461 expectation
462 );
463 assert_eq!(
464 discover_secret_references_in_json(&input).unwrap(),
465 BTreeSet::from([
466 SecretRef::Kubernetes {
467 name: "a".to_string(),
468 data_key: "b".to_string(),
469 },
470 SecretRef::Kubernetes {
471 name: "c".to_string(),
472 data_key: "d".to_string(),
473 },
474 ])
475 );
476 assert_eq!(
477 discover_secret_references_in_json(&expectation).unwrap(),
478 BTreeSet::from([])
479 );
480 }
481
482 #[test]
483 fn secret_resolution_connector_config() {
484 let dir = tempfile::tempdir().unwrap();
488 let dir_path = dir.path();
489 let name_dir = &dir_path.join("kubernetes").join("a");
490 create_dir_all(name_dir).unwrap();
491 let data_key_file_path = &name_dir.join("b");
492 let mut file = File::create(data_key_file_path).unwrap();
493 file.write_all(b"example1").unwrap();
494 let name_dir = &dir_path.join("kubernetes").join("c");
495 create_dir_all(name_dir).unwrap();
496 let data_key_file_path = &name_dir.join("d");
497 let mut file = File::create(data_key_file_path).unwrap();
498 file.write_all(b"example2").unwrap();
499
500 let connector_config_json = json!({
502 "transport": {
503 "name": "datagen",
504 "config": {
505 "plan": [{
506 "limit": 2,
507 "fields": {
508 "col1": { "values": [1, 2] },
509 "col2": { "values": ["${secret:kubernetes:a/b}", "${secret:kubernetes:c/d}"] }
510 }
511 }]
512 }
513 },
514 "format": {
515 "name": "json",
516 "config": {
517 "example": "${secret:kubernetes:a/b}"
518 }
519 },
520 "index": "${secret:kubernetes:e/f}"
521 });
522 assert_eq!(
523 discover_secret_references_in_connector_config(&connector_config_json).unwrap(),
524 BTreeSet::from([
525 SecretRef::Kubernetes {
526 name: "a".to_string(),
527 data_key: "b".to_string(),
528 },
529 SecretRef::Kubernetes {
530 name: "c".to_string(),
531 data_key: "d".to_string(),
532 },
533 ])
534 );
535
536 let connector_config: ConnectorConfig =
537 serde_json::from_value(connector_config_json).unwrap();
538
539 let connector_config_secrets_resolved =
540 resolve_secret_references_in_connector_config(dir_path, &connector_config).unwrap();
541
542 let TransportConfig::Datagen(datagen_input_config) =
544 connector_config_secrets_resolved.transport
545 else {
546 unreachable!();
547 };
548 assert_eq!(
549 datagen_input_config.plan[0].fields["col2"]
550 .values
551 .as_ref()
552 .unwrap(),
553 &vec![json!("example1"), json!("example2")]
554 );
555
556 let Some(format_config) = connector_config_secrets_resolved.format else {
558 unreachable!();
559 };
560 assert_eq!(format_config.config, json!({"example": "example1"}));
561
562 assert_eq!(
564 connector_config.index,
565 Some("${secret:kubernetes:e/f}".to_string())
566 );
567 }
568}