1use crate::config::ConnectorConfig;
2use crate::secret_ref::{MaybeSecretRef, MaybeSecretRefParseError, SecretRef};
3use serde::de::DeserializeOwned;
4use serde::Serialize;
5use serde_json::{Map, Value};
6use std::collections::BTreeSet;
7use std::fmt::Debug;
8use std::fs;
9use std::io::ErrorKind;
10use std::path::Path;
11use thiserror::Error as ThisError;
12
13#[derive(Debug, Clone, PartialEq, Eq, ThisError)]
14pub enum SecretRefDiscoveryError {
15 #[error("{e}")]
16 MaybeSecretRefParseFailed { e: MaybeSecretRefParseError },
17 #[error("unable to serialize connector configuration: {error}")]
18 SerializationFailed { error: String },
19 #[error("unable to deserialize connector configuration (error omitted)")]
20 DeserializationFailed,
21}
22
23pub fn discover_secret_references_in_connector_config(
25 connector_config: &ConnectorConfig,
26) -> Result<BTreeSet<SecretRef>, SecretRefDiscoveryError> {
27 let json_value = serde_json::to_value(connector_config).map_err(|e| {
28 SecretRefDiscoveryError::SerializationFailed {
29 error: e.to_string(),
30 }
31 })?;
32 let mut result = BTreeSet::new();
33 if let Some(transport_config_json) = json_value.get("transport").and_then(|v| v.get("config")) {
34 result.extend(discover_secret_references_in_json(transport_config_json)?);
35 }
36 if let Some(format_config_json) = json_value.get("format").and_then(|v| v.get("config")) {
37 result.extend(discover_secret_references_in_json(format_config_json)?);
38 }
39 Ok(result)
40}
41
42fn discover_secret_references_in_json(
44 value: &Value,
45) -> Result<BTreeSet<SecretRef>, SecretRefDiscoveryError> {
46 Ok(match value {
47 Value::Null => BTreeSet::new(),
48 Value::Bool(_b) => BTreeSet::new(),
49 Value::Number(_n) => BTreeSet::new(),
50 Value::String(s) => {
51 if let MaybeSecretRef::SecretRef(secret_ref) = MaybeSecretRef::new(s.clone())
52 .map_err(|e| SecretRefDiscoveryError::MaybeSecretRefParseFailed { e })?
53 {
54 BTreeSet::from([secret_ref])
55 } else {
56 BTreeSet::new()
57 }
58 }
59 Value::Array(seq) => {
60 let mut result = BTreeSet::new();
61 for entry in seq.iter() {
62 result.extend(discover_secret_references_in_json(entry)?)
63 }
64 result
65 }
66 Value::Object(mapping) => {
67 let mut result = BTreeSet::new();
68 for (_k, v) in mapping.into_iter() {
69 result.extend(discover_secret_references_in_json(v)?);
70 }
71 result
72 }
73 })
74}
75
76pub fn default_secrets_directory() -> &'static Path {
78 Path::new("/etc/feldera-secrets")
79}
80
81#[derive(Debug, Clone, PartialEq, Eq, ThisError)]
82pub enum SecretRefResolutionError {
83 #[error("{e}")]
84 MaybeSecretRefParseFailed { e: MaybeSecretRefParseError },
85 #[error("secret reference '{secret_ref}' resolution failed: file '{path}' does exist but unable to read it due to: {error_kind}")]
86 CannotReadSecretFile {
87 secret_ref: SecretRef,
88 path: String,
89 error_kind: ErrorKind,
90 },
91 #[error("secret reference '{secret_ref}' resolution failed: file '{path}' does not exist")]
92 SecretFileDoesNotExist { secret_ref: SecretRef, path: String },
93 #[error(
94 "secret reference '{secret_ref}' resolution failed: path '{path}' is not a regular file"
95 )]
96 SecretPathIsNotRegularFile { secret_ref: SecretRef, path: String },
97 #[error("secret reference '{secret_ref}' resolution failed: cannot determine if '{path}' is an existing file due to: {error_kind}")]
98 SecretFileExistenceUnknown {
99 secret_ref: SecretRef,
100 path: String,
101 error_kind: ErrorKind,
102 },
103 #[error("secret resolution led to a duplicate key in the mapping, which should not happen")]
104 DuplicateKeyInMapping,
105 #[error("unable to serialize connector configuration: {error}")]
106 SerializationFailed { error: String },
107 #[error("unable to deserialize connector configuration (error omitted)")]
108 DeserializationFailed,
109}
110
111pub fn resolve_secret_references_in_connector_config(
113 secrets_dir: &Path,
114 connector_config: &ConnectorConfig,
115) -> Result<ConnectorConfig, SecretRefResolutionError> {
116 let connector_config = connector_config.clone();
117 Ok(ConnectorConfig {
118 transport: resolve_secret_references_via_json(secrets_dir, &connector_config.transport)?,
119 format: resolve_secret_references_via_json(secrets_dir, &connector_config.format)?,
120 ..connector_config
121 })
122}
123
124pub fn resolve_secret_references_via_json<T>(
126 secrets_dir: &Path,
127 value: &T,
128) -> Result<T, SecretRefResolutionError>
129where
130 T: Serialize + DeserializeOwned,
131{
132 let json_value =
133 serde_json::to_value(value).map_err(|e| SecretRefResolutionError::SerializationFailed {
134 error: e.to_string(),
135 })?;
136 let resolved_json = resolve_secret_references_in_json(secrets_dir, json_value)?;
137 serde_json::from_value(resolved_json)
138 .map_err(|_e| SecretRefResolutionError::DeserializationFailed)
139}
140
141fn resolve_secret_references_in_json(
143 secrets_dir: &Path,
144 value: Value,
145) -> Result<Value, SecretRefResolutionError> {
146 Ok(match value {
147 Value::Null => Value::Null,
148 Value::Bool(b) => Value::Bool(b),
149 Value::Number(n) => Value::Number(n),
150 Value::String(s) => {
151 Value::String(resolve_potential_secret_reference_string(secrets_dir, s)?)
152 }
153 Value::Array(seq) => Value::Array(
154 seq.into_iter()
155 .map(|v| resolve_secret_references_in_json(secrets_dir, v))
156 .collect::<Result<Vec<Value>, SecretRefResolutionError>>()?,
157 ),
158 Value::Object(mapping) => {
159 let mut new_mapping = Map::new();
160 for (k, v) in mapping.into_iter() {
161 if let Some(_existing) =
162 new_mapping.insert(k, resolve_secret_references_in_json(secrets_dir, v)?)
163 {
164 return Err(SecretRefResolutionError::DuplicateKeyInMapping);
165 }
166 }
167 Value::Object(new_mapping)
168 }
169 })
170}
171
172fn resolve_potential_secret_reference_string(
174 secrets_dir: &Path,
175 s: String,
176) -> Result<String, SecretRefResolutionError> {
177 match MaybeSecretRef::new(s) {
178 Ok(maybe_secret_ref) => match maybe_secret_ref {
179 MaybeSecretRef::String(plain_str) => Ok(plain_str),
180 MaybeSecretRef::SecretRef(secret_ref) => match &secret_ref {
181 SecretRef::Kubernetes { name, data_key } => {
182 let path = Path::new(secrets_dir)
185 .join("kubernetes")
186 .join(name)
187 .join(data_key);
188
189 if path.is_file() {
192 match fs::read_to_string(&path) {
193 Ok(content) => Ok(content),
194 Err(e) => {
195 Err(SecretRefResolutionError::CannotReadSecretFile {
196 secret_ref,
197 path: path.display().to_string(),
198 error_kind: e.kind(), })
200 }
201 }
202 } else {
203 match path.try_exists() {
204 Ok(exists) => {
205 if exists {
206 Err(SecretRefResolutionError::SecretPathIsNotRegularFile {
207 secret_ref,
208 path: path.display().to_string(),
209 })
210 } else {
211 Err(SecretRefResolutionError::SecretFileDoesNotExist {
212 secret_ref,
213 path: path.display().to_string(),
214 })
215 }
216 }
217 Err(e) => Err(SecretRefResolutionError::SecretFileExistenceUnknown {
218 secret_ref,
219 path: path.display().to_string(),
220 error_kind: e.kind(), }),
222 }
223 }
224 }
225 },
226 },
227 Err(e) => Err(SecretRefResolutionError::MaybeSecretRefParseFailed { e }),
228 }
229}
230
231#[cfg(test)]
232mod tests {
233 use crate::config::{ConnectorConfig, TransportConfig};
234 use crate::secret_ref::{MaybeSecretRef, SecretRef};
235 use crate::secret_resolver::{
236 discover_secret_references_in_connector_config, discover_secret_references_in_json,
237 resolve_potential_secret_reference_string, resolve_secret_references_in_connector_config,
238 resolve_secret_references_in_json, SecretRefResolutionError,
239 };
240 use serde_json::json;
241 use std::collections::BTreeSet;
242 use std::fs::{create_dir_all, File};
243 use std::io::Write;
244
245 #[test]
246 fn resolve_kubernetes_secret_success() {
247 let dir = tempfile::tempdir().unwrap();
249 let dir_path = dir.path();
250 let name_dir = &dir_path.join("kubernetes").join("a");
251 create_dir_all(name_dir).unwrap();
252 let data_key_file_path = &name_dir.join("b");
253 let mut file = File::create(data_key_file_path).unwrap();
254 file.write_all(b"example").unwrap();
255
256 assert_eq!(
258 resolve_potential_secret_reference_string(
259 dir_path,
260 "${secret:kubernetes:a/b}".to_string()
261 )
262 .unwrap(),
263 "example"
264 );
265 }
266
267 #[test]
268 fn resolve_kubernetes_secret_max_size_success() {
269 let dir = tempfile::tempdir().unwrap();
271 let dir_path = dir.path();
272 let name_dir = &dir_path.join("kubernetes").join("a".repeat(63));
273 create_dir_all(name_dir).unwrap();
274 let data_key_file_path = &name_dir.join("b".repeat(255));
275 let mut file = File::create(data_key_file_path).unwrap();
276 file.write_all(b"example").unwrap();
277
278 assert_eq!(
280 resolve_potential_secret_reference_string(
281 dir_path,
282 format!(
283 "${{secret:kubernetes:{}/{}}}",
284 "a".repeat(63),
285 "b".repeat(255)
286 )
287 )
288 .unwrap(),
289 "example"
290 );
291 }
292
293 #[test]
294 fn resolve_kubernetes_secret_path_not_a_file() {
295 let dir = tempfile::tempdir().unwrap();
297 let dir_path = dir.path();
298 let data_key_file_path = &dir_path.join("kubernetes").join("a").join("b");
299 create_dir_all(data_key_file_path).unwrap();
300
301 let MaybeSecretRef::SecretRef(secret_ref) =
303 MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
304 else {
305 unreachable!();
306 };
307 assert_eq!(
308 resolve_potential_secret_reference_string(
309 dir_path,
310 "${secret:kubernetes:a/b}".to_string()
311 )
312 .unwrap_err(),
313 SecretRefResolutionError::SecretPathIsNotRegularFile {
314 secret_ref,
315 path: data_key_file_path.display().to_string()
316 }
317 );
318 }
319
320 #[test]
321 fn resolve_kubernetes_secret_file_does_not_exist() {
322 let dir = tempfile::tempdir().unwrap();
324 let dir_path = dir.path();
325 let name_dir = &dir_path.join("kubernetes").join("a");
326 create_dir_all(name_dir).unwrap();
327 let data_key_file_path = &name_dir.join("b");
328
329 let MaybeSecretRef::SecretRef(secret_ref) =
331 MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
332 else {
333 unreachable!();
334 };
335 assert_eq!(
336 resolve_potential_secret_reference_string(
337 dir_path,
338 "${secret:kubernetes:a/b}".to_string()
339 )
340 .unwrap_err(),
341 SecretRefResolutionError::SecretFileDoesNotExist {
342 secret_ref,
343 path: data_key_file_path.display().to_string()
344 }
345 );
346 }
347
348 #[test]
349 fn resolve_secret_ref_cannot_read_file() {
350 let dir = tempfile::tempdir().unwrap();
352 let dir_path = dir.path();
353 let name_dir = &dir_path.join("kubernetes").join("a");
354 create_dir_all(name_dir).unwrap();
355 let data_key_file_path = &name_dir.join("b");
356 let mut file = File::create(data_key_file_path).unwrap();
357 file.write_all(&[255, 255]).unwrap();
358
359 let MaybeSecretRef::SecretRef(secret_ref) =
361 MaybeSecretRef::new("${secret:kubernetes:a/b}".to_string()).unwrap()
362 else {
363 unreachable!();
364 };
365 assert_eq!(
366 resolve_potential_secret_reference_string(
367 dir_path,
368 "${secret:kubernetes:a/b}".to_string()
369 )
370 .unwrap_err(),
371 SecretRefResolutionError::CannotReadSecretFile {
372 secret_ref,
373 path: data_key_file_path.display().to_string(),
374 error_kind: std::io::ErrorKind::InvalidData
375 }
376 );
377 }
378
379 #[test]
380 fn secret_resolution_json() {
381 let dir = tempfile::tempdir().unwrap();
385 let dir_path = dir.path();
386 let name_dir = &dir_path.join("kubernetes").join("a");
387 create_dir_all(name_dir).unwrap();
388 let data_key_file_path = &name_dir.join("b");
389 let mut file = File::create(data_key_file_path).unwrap();
390 file.write_all(b"example1").unwrap();
391 let name_dir = &dir_path.join("kubernetes").join("c");
392 create_dir_all(name_dir).unwrap();
393 let data_key_file_path = &name_dir.join("d");
394 let mut file = File::create(data_key_file_path).unwrap();
395 file.write_all(b"example2").unwrap();
396
397 let input = json!({
399 "a": null,
400 "b": "false,",
401 "c": 123,
402 "d": "val1",
403 "e": [
404 1,
405 "2"
406 ],
407 "f": {
408 "f1": 1,
409 "f2": "val2"
410 },
411 "g": "val3",
412 "${secret:kubernetes:a/b}": 123,
413 "${secret:kubernetes:e/f}": 456,
414 "s1": "${secret:kubernetes:a/b}",
415 "s2": [
416 "${secret:kubernetes:a/b}"
417 ],
418 "s3": {
419 "s31": "${secret:kubernetes:a/b}",
420 "s32": [
421 "${secret:kubernetes:a/b}",
422 "${secret:kubernetes:c/d}"
423 ]
424 },
425 "s4": "${secret:kubernetes:c/d}"
426 });
427
428 let expectation = json!({
429 "a": null,
430 "b": "false,",
431 "c": 123,
432 "d": "val1",
433 "e": [
434 1,
435 "2"
436 ],
437 "f": {
438 "f1": 1,
439 "f2": "val2"
440 },
441 "g": "val3",
442 "${secret:kubernetes:a/b}": 123,
443 "${secret:kubernetes:e/f}": 456,
444 "s1": "example1",
445 "s2": [
446 "example1"
447 ],
448 "s3": {
449 "s31": "example1",
450 "s32": [
451 "example1",
452 "example2"
453 ]
454 },
455 "s4": "example2"
456 });
457 assert_eq!(
458 resolve_secret_references_in_json(dir_path, input.clone()).unwrap(),
459 expectation
460 );
461 assert_eq!(
462 discover_secret_references_in_json(&input).unwrap(),
463 BTreeSet::from([
464 SecretRef::Kubernetes {
465 name: "a".to_string(),
466 data_key: "b".to_string(),
467 },
468 SecretRef::Kubernetes {
469 name: "c".to_string(),
470 data_key: "d".to_string(),
471 },
472 ])
473 );
474 assert_eq!(
475 discover_secret_references_in_json(&expectation).unwrap(),
476 BTreeSet::from([])
477 );
478 }
479
480 #[test]
481 fn secret_resolution_connector_config() {
482 let dir = tempfile::tempdir().unwrap();
486 let dir_path = dir.path();
487 let name_dir = &dir_path.join("kubernetes").join("a");
488 create_dir_all(name_dir).unwrap();
489 let data_key_file_path = &name_dir.join("b");
490 let mut file = File::create(data_key_file_path).unwrap();
491 file.write_all(b"example1").unwrap();
492 let name_dir = &dir_path.join("kubernetes").join("c");
493 create_dir_all(name_dir).unwrap();
494 let data_key_file_path = &name_dir.join("d");
495 let mut file = File::create(data_key_file_path).unwrap();
496 file.write_all(b"example2").unwrap();
497
498 let connector_config_json = json!({
500 "transport": {
501 "name": "datagen",
502 "config": {
503 "plan": [{
504 "limit": 2,
505 "fields": {
506 "col1": { "values": [1, 2] },
507 "col2": { "values": ["${secret:kubernetes:a/b}", "${secret:kubernetes:c/d}"] }
508 }
509 }]
510 }
511 },
512 "format": {
513 "name": "json",
514 "config": {
515 "example": "${secret:kubernetes:a/b}"
516 }
517 },
518 "index": "${secret:kubernetes:e/f}"
519 });
520 let connector_config: ConnectorConfig =
521 serde_json::from_value(connector_config_json).unwrap();
522 assert_eq!(
523 discover_secret_references_in_connector_config(&connector_config).unwrap(),
524 BTreeSet::from([
525 SecretRef::Kubernetes {
526 name: "a".to_string(),
527 data_key: "b".to_string(),
528 },
529 SecretRef::Kubernetes {
530 name: "c".to_string(),
531 data_key: "d".to_string(),
532 },
533 ])
534 );
535 let connector_config_secrets_resolved =
536 resolve_secret_references_in_connector_config(dir_path, &connector_config).unwrap();
537
538 let TransportConfig::Datagen(datagen_input_config) =
540 connector_config_secrets_resolved.transport
541 else {
542 unreachable!();
543 };
544 assert_eq!(
545 datagen_input_config.plan[0].fields["col2"]
546 .values
547 .as_ref()
548 .unwrap(),
549 &vec![json!("example1"), json!("example2")]
550 );
551
552 let Some(format_config) = connector_config_secrets_resolved.format else {
554 unreachable!();
555 };
556 assert_eq!(format_config.config, json!({"example": "example1"}));
557
558 assert_eq!(
560 connector_config.index,
561 Some("${secret:kubernetes:e/f}".to_string())
562 );
563 }
564}