1use conv::prelude::*;
2use serde_json::{Map, Number, Value};
3use streamdal_gjson as gjson;
4use streamdal_gjson::Kind;
5
6#[derive(Debug)]
7pub enum TransformError {
8 Generic(String),
9}
10
11pub enum TruncateType {
12 Chars,
13 Percent,
14}
15
16pub struct TruncateOptions {
17 pub length: usize,
18 pub truncate_type: TruncateType,
19}
20
21pub struct ExtractOptions {
22 pub flatten: bool,
23 pub paths: Vec<String>,
24}
25
26pub struct Request {
27 pub data: Vec<u8>,
28 pub path: String,
29 pub value: String,
30
31 pub truncate_options: Option<TruncateOptions>,
34 pub extract_options: Option<ExtractOptions>,
35}
36
37fn extract_array(value: &streamdal_gjson::Value) -> Result<serde_json::Value, TransformError> {
38 let mut array = Value::Array(Vec::new());
39 for element in value.array() {
40 match element.kind() {
41 Kind::String => {
42 array
43 .as_array_mut()
44 .unwrap()
45 .push(Value::String(element.to_string()));
46 }
47 Kind::Number => {
48 array
49 .as_array_mut()
50 .unwrap()
51 .push(Value::Number(Number::from_f64(element.f64()).unwrap()));
52 }
53 Kind::True => {
54 array.as_array_mut().unwrap().push(Value::Bool(true));
55 }
56 Kind::False => {
57 array.as_array_mut().unwrap().push(Value::Bool(false));
58 }
59 Kind::Null => {
60 array.as_array_mut().unwrap().push(Value::Null);
61 }
62 Kind::Object => {
63 let obj = Value::Object(serde_json::from_str(element.str()).unwrap());
64 array.as_array_mut().unwrap().push(obj);
65 }
66 Kind::Array => match extract_array(&element) {
67 Ok(v) => {
68 array.as_array_mut().unwrap().push(v);
69 }
70 Err(e) => {
71 return Err(e);
72 }
73 },
74 }
75 }
76
77 Ok(array)
78}
79
80fn extract_number(json_str: &str) -> Result<serde_json::Value, TransformError> {
81 let parsed_value: Result<Value, _> = serde_json::from_str(json_str);
83
84 match parsed_value {
85 Ok(value) => {
86 if value.is_number() {
88 Ok(value)
89 } else {
90 Err(TransformError::Generic(
91 "unable to extract data: path is not a valid number".to_string(),
92 ))
93 }
94 }
95
96 Err(e) => Err(TransformError::Generic(e.to_string())), }
98}
99
100fn extract_key(value: &streamdal_gjson::Value) -> Result<serde_json::Value, TransformError> {
101 match value.kind() {
102 Kind::String => Ok(Value::String(value.to_string())),
103 Kind::Number => match extract_number(value.to_string().as_str()) {
104 Ok(num) => Ok(num),
105 Err(e) => Err(e),
106 },
107 Kind::True => Ok(Value::Bool(true)),
108 Kind::False => Ok(Value::Bool(false)),
109 Kind::Null => Ok(Value::Null),
110 Kind::Array => {
111 match extract_array(value) {
113 Ok(array) => Ok(array),
114 Err(e) => Err(e),
115 }
116 }
117 _ => Err(TransformError::Generic(format!(
118 "unable to extract data: unknown type for value: `{}`",
119 value
120 ))),
121 }
122}
123
124pub fn extract(req: &Request) -> Result<String, TransformError> {
125 validate_extract_request(req)?;
126
127 let extract_options = match &req.extract_options {
128 Some(options) => options,
129 None => {
130 return Err(TransformError::Generic(
131 "unable to extract data: options not provided".to_string(),
132 ))
133 }
134 };
135
136 let data_as_str = convert_bytes_to_string(&req.data)?;
137
138 let mut extracted_data = Map::new();
140 for path in &extract_options.paths {
141 let value = gjson::get(data_as_str, path.as_str());
142 if !value.exists() {
143 continue;
144 }
145
146 let path_elements: Vec<&str> = path.split('.').collect();
150
151 if extract_options.flatten {
152 let parsed_value = match extract_key(&value) {
153 Ok(key) => key,
154 Err(e) => return Err(e),
155 };
156
157 extracted_data.insert(path_elements.last().unwrap().to_string(), parsed_value);
158 } else {
159 let mut current_map = &mut extracted_data;
160 for (i, path_element) in path_elements.iter().enumerate() {
161 if i == path_elements.len() - 1 {
163 let parsed_value = match extract_key(&value) {
164 Ok(key) => key,
165 Err(e) => return Err(e),
166 };
167
168 current_map.insert(path_element.to_string(), parsed_value);
169
170 continue;
171 }
172
173 if path_element.ends_with('#') {
175 continue;
176 }
177
178 if current_map.contains_key(*path_element) {
182 let current_map_value = match current_map.get_mut(*path_element) {
183 Some(value) => value,
184 None => {
185 return Err(TransformError::Generic(
186 "unable to extract data: unable to get current map value"
187 .to_string(),
188 ))
189 }
190 };
191
192 current_map = match current_map_value.as_object_mut() {
193 Some(value) => value,
194 None => {
195 return Err(TransformError::Generic(
196 "unable to extract data: unable to get current map as object"
197 .to_string(),
198 ))
199 }
200 };
201 }
202
203 current_map.insert(path_element.to_string(), Value::Object(Map::new()));
204
205 let current_map_value = match current_map.get_mut(*path_element) {
206 Some(value) => value,
207 None => {
208 return Err(TransformError::Generic(
209 "unable to extract data: unable to get current map value".to_string(),
210 ))
211 }
212 };
213
214 current_map = match current_map_value.as_object_mut() {
215 Some(value) => value,
216 None => {
217 return Err(TransformError::Generic(
218 "unable to extract data: unable to get current map as object"
219 .to_string(),
220 ))
221 }
222 };
223 }
224 }
225 }
226
227 if let Ok(res) = serde_json::to_string(&extracted_data) {
229 Ok(res)
230 } else {
231 Err(TransformError::Generic(
232 "unable to extract data: unable to serialize data".to_string(),
233 ))
234 }
235}
236
237pub fn overwrite(req: &Request) -> Result<String, TransformError> {
238 validate_request(req, true)?;
239
240 let data = gjson::set_overwrite(
241 convert_bytes_to_string(&req.data)?,
242 req.path.as_str(),
243 req.value.as_str(),
244 )
245 .map_err(|e| TransformError::Generic(format!("unable to overwrite data: {}", e)))?;
246
247 Ok(data)
248}
249
250pub fn truncate(req: &Request) -> Result<String, TransformError> {
251 validate_request(req, false)?;
252
253 let truncate_options = match &req.truncate_options {
254 Some(options) => options,
255 None => {
256 return Err(TransformError::Generic(
257 "unable to truncate data: options not provided".to_string(),
258 ))
259 }
260 };
261
262 let data_as_str = convert_bytes_to_string(&req.data)?;
263 let value = gjson::get(data_as_str, req.path.as_str());
264 let length_of_field = gjson::get(data_as_str, req.path.as_str()).to_string().len();
265
266 let mut truncate_length = match &truncate_options.truncate_type {
267 #[allow(clippy::clone_on_copy)]
268 TruncateType::Chars => {
269 if truncate_options.length > length_of_field {
270 length_of_field
271 } else {
272 length_of_field - truncate_options.length.clone()
273 }
274 }
275 TruncateType::Percent => {
276 let my_usize_reference =
277 100.0 - &truncate_options.length.value_as::<f64>().unwrap_or(0.0);
278 let num_chars_to_keep: f64 = length_of_field as f64 * (my_usize_reference / 100.0);
279 num_chars_to_keep.round() as usize
280 }
281 };
282
283 truncate_length = truncate_length.clamp(0, length_of_field);
284
285 match value.kind() {
286 gjson::Kind::String => _truncate(data_as_str, req.path.as_str(), &truncate_length),
287 _ => Err(TransformError::Generic(format!(
288 "unable to truncate data: path '{}' is not a string",
289 req.path
290 ))),
291 }
292}
293
294#[allow(clippy::to_string_in_format_args)]
295fn _truncate(data: &str, path: &str, len: &usize) -> Result<String, TransformError> {
296 let contents = gjson::get(data, path);
297
298 let num_chars_to_keep = contents.str().len() - len;
299
300 let truncated = format!("\"{}\"", contents.str()[0..num_chars_to_keep].to_string());
301
302 let data = gjson::set_overwrite(data, path, &truncated)
303 .map_err(|e| TransformError::Generic(format!("unable to truncate data: {}", e)))?;
304
305 Ok(data)
306}
307
308pub fn delete(req: &Request) -> Result<String, TransformError> {
309 validate_request(req, false)?;
310
311 let data_as_str = convert_bytes_to_string(&req.data)?;
312
313 _delete(data_as_str, req.path.as_str())
314}
315
316fn _delete(data: &str, path: &str) -> Result<String, TransformError> {
317 let data = gjson::delete_path(data, path)
318 .map_err(|e| TransformError::Generic(format!("unable to delete data: {}", e)))?;
319
320 Ok(data)
321}
322
323pub fn obfuscate(req: &Request) -> Result<String, TransformError> {
324 validate_request(req, false)?;
325
326 let data_as_str = convert_bytes_to_string(&req.data)?;
327 let value = gjson::get(data_as_str, req.path.as_str());
328
329 match value.kind() {
330 gjson::Kind::String => _obfuscate(data_as_str, req.path.as_str()),
331 _ => Err(TransformError::Generic(format!(
332 "unable to mask data: path '{}' is not a string or number",
333 req.path
334 ))),
335 }
336}
337
338fn _obfuscate(data: &str, path: &str) -> Result<String, TransformError> {
339 let contents = gjson::get(data, path);
340 let hashed = sha256::digest(contents.str().as_bytes());
341
342 let obfuscated = format!("\"sha256:{}\"", hashed);
343
344 gjson::set_overwrite(data, path, &obfuscated)
345 .map_err(|e| TransformError::Generic(format!("unable to obfuscate data: {}", e)))
346}
347
348pub fn mask(req: &Request) -> Result<String, TransformError> {
349 validate_request(req, false)?;
350
351 let data_as_str = convert_bytes_to_string(&req.data)?;
352 let value = gjson::get(data_as_str, req.path.as_str());
353
354 match value.kind() {
355 gjson::Kind::String => _mask(data_as_str, req.path.as_str(), '*', true),
356 gjson::Kind::Number => _mask(data_as_str, req.path.as_str(), '0', false),
357 _ => Err(TransformError::Generic(format!(
358 "unable to mask data: path '{}' is not a string or number",
359 req.path
360 ))),
361 }
362}
363
364fn _mask(data: &str, path: &str, mask_char: char, quote: bool) -> Result<String, TransformError> {
365 let contents = gjson::get(data, path);
366 let num_chars_to_mask = (0.8 * contents.str().len() as f64).round() as usize;
367 let num_chars_to_skip = contents.str().len() - num_chars_to_mask;
368
369 let mut masked = contents.str()[0..num_chars_to_skip].to_string()
370 + mask_char.to_string().repeat(num_chars_to_mask).as_str();
371
372 if quote {
373 masked = format!("\"{}\"", masked);
374 }
375
376 gjson::set_overwrite(data, path, &masked)
377 .map_err(|e| TransformError::Generic(format!("unable to mask data: {}", e)))
378}
379
380fn validate_request(req: &Request, _value_check: bool) -> Result<(), TransformError> {
381 if req.data.is_empty() {
382 return Err(TransformError::Generic("data cannot be empty".to_string()));
383 }
384
385 if !gjson::valid(convert_bytes_to_string(&req.data)?) {
387 return Err(TransformError::Generic(
388 "data is not valid JSON".to_string(),
389 ));
390 }
391
392 if !gjson::get(convert_bytes_to_string(&req.data)?, req.path.as_str()).exists() {
394 return Err(TransformError::Generic(format!(
395 "path '{}' not found in data",
396 req.path
397 )));
398 }
399
400 Ok(())
401}
402
403fn validate_extract_request(req: &Request) -> Result<(), TransformError> {
404 if req.data.is_empty() {
405 return Err(TransformError::Generic("data cannot be empty".to_string()));
406 }
407
408 if !gjson::valid(convert_bytes_to_string(&req.data)?) {
410 return Err(TransformError::Generic(
411 "data is not valid JSON".to_string(),
412 ));
413 }
414
415 if req.extract_options.is_none() {
416 return Err(TransformError::Generic(
417 "extract options not provided".to_string(),
418 ));
419 }
420
421 let extract_options = req.extract_options.as_ref().unwrap();
422 if extract_options.paths.is_empty() {
423 return Err(TransformError::Generic(
424 "extract paths cannot be empty".to_string(),
425 ));
426 }
427
428 Ok(())
429}
430
431fn convert_bytes_to_string(bytes: &Vec<u8>) -> Result<&str, TransformError> {
432 Ok(std::str::from_utf8(bytes.as_slice())
433 .map_err(|e| TransformError::Generic(format!("unable to parse data as UTF-8: {}", e))))?
434}
435
436#[cfg(test)]
437mod tests {
438 use super::*;
439
440 const TEST_DATA: &str = r#"{
441 "foo": "bar",
442 "baz": {
443 "qux": "quux"
444 },
445 "bool": true
446}"#;
447
448 #[test]
449 fn test_overwrite() {
450 let mut req = Request {
451 data: TEST_DATA.as_bytes().to_vec(),
452 path: "baz.qux".to_string(),
453 value: "\"test\"".to_string(),
454 truncate_options: None,
455 extract_options: None,
456 };
457
458 let result = overwrite(&req).unwrap();
459
460 assert!(gjson::valid(&TEST_DATA));
461 assert!(gjson::valid(&result));
462 assert_eq!(result, TEST_DATA.replace("quux", "test"));
463
464 let v = gjson::get(TEST_DATA, "baz.qux");
465 assert_eq!(v.str(), "quux");
466
467 let v = gjson::get(result.as_str(), "baz.qux");
468 assert_eq!(v.str(), "test");
469
470 req.path = "does-not-exist".to_string();
471 assert!(
472 overwrite(&req).is_err(),
473 "should error when path does not exist"
474 );
475
476 req.path = "bool".to_string();
478 assert!(
479 overwrite(&req).is_ok(),
480 "should be able to replace any value, regardless of type"
481 );
482 }
483
484 #[test]
485 fn test_obfuscate() {
486 let mut req = Request {
487 data: TEST_DATA.as_bytes().to_vec(),
488 path: "baz.qux".to_string(),
489 value: "".to_string(), truncate_options: None,
491 extract_options: None,
492 };
493
494 let result = obfuscate(&req).unwrap();
495 let hashed_value = sha256::digest("quux".as_bytes());
496
497 assert!(gjson::valid(&TEST_DATA));
498 assert!(gjson::valid(&result));
499
500 let v = gjson::get(TEST_DATA, "baz.qux");
501 assert_eq!(v.str(), "quux");
502
503 let v = gjson::get(result.as_str(), "baz.qux");
504 assert_eq!(v.str(), format!("sha256:{}", hashed_value));
505
506 req.path = "does-not-exist".to_string();
508 assert!(mask(&req).is_err());
509
510 req.path = "bool".to_string();
512 assert!(mask(&req).is_err());
513 }
514
515 #[test]
516 fn test_mask() {
517 let mut req = Request {
518 data: TEST_DATA.as_bytes().to_vec(),
519 path: "baz.qux".to_string(),
520 value: "".to_string(), truncate_options: None,
522 extract_options: None,
523 };
524
525 let result = mask(&req).unwrap();
526
527 assert!(gjson::valid(TEST_DATA));
528 assert!(gjson::valid(&result));
529
530 let v = gjson::get(TEST_DATA, "baz.qux");
531 assert_eq!(v.str(), "quux");
532
533 let v = gjson::get(result.as_str(), "baz.qux");
534 assert_eq!(v.str(), "q***");
535
536 req.path = "does-not-exist".to_string();
538 assert!(mask(&req).is_err());
539
540 req.path = "bool".to_string();
542 assert!(mask(&req).is_err());
543 }
544
545 #[test]
546 fn test_truncate_chars() {
547 let mut req = Request {
548 data: TEST_DATA.as_bytes().to_vec(),
549 path: "baz.qux".to_string(),
550 value: "".to_string(), truncate_options: Some(TruncateOptions {
552 length: 1,
553 truncate_type: TruncateType::Chars,
554 }),
555 extract_options: None,
556 };
557
558 let result = truncate(&req).unwrap();
559
560 assert!(gjson::valid(TEST_DATA));
561 assert!(gjson::valid(&result));
562
563 let v = gjson::get(TEST_DATA, "baz.qux");
564 assert_eq!(v.str(), "quux");
565
566 let v = gjson::get(result.as_str(), "baz.qux");
567 assert_eq!(v.str(), "q");
568
569 req.path = "does-not-exist".to_string();
571 assert!(truncate(&req).is_err());
572
573 req.path = "bool".to_string();
575 assert!(truncate(&req).is_err());
576 }
577
578 #[test]
579 fn test_truncate_chars_over_length() {
580 let req = Request {
581 data: TEST_DATA.as_bytes().to_vec(),
582 path: "baz.qux".to_string(),
583 value: "".to_string(), truncate_options: Some(TruncateOptions {
585 length: 5,
586 truncate_type: TruncateType::Chars,
587 }),
588 extract_options: None,
589 };
590
591 let result = truncate(&req).unwrap();
592
593 assert!(gjson::valid(TEST_DATA));
594 assert!(gjson::valid(&result));
595
596 let v = gjson::get(TEST_DATA, "baz.qux");
597 assert_eq!(v.str(), "quux");
598
599 let v = gjson::get(result.as_str(), "baz.qux");
600 assert_eq!(v.str(), "");
601 }
602
603 #[test]
604 fn test_truncate_percent() {
605 let req = Request {
606 data: TEST_DATA.as_bytes().to_vec(),
607 path: "baz.qux".to_string(),
608 value: "".to_string(), truncate_options: Some(TruncateOptions {
610 length: 25,
611 truncate_type: TruncateType::Percent,
612 }),
613 extract_options: None,
614 };
615
616 let result = truncate(&req).unwrap();
617
618 assert!(gjson::valid(TEST_DATA));
619 assert!(gjson::valid(&result));
620
621 let v = gjson::get(TEST_DATA, "baz.qux");
622 assert_eq!(v.str(), "quux");
623
624 let v = gjson::get(result.as_str(), "baz.qux");
625 assert_eq!(v.str(), "q");
626 }
627
628 #[test]
629 fn test_delete() {
630 let req = Request {
631 data: TEST_DATA.as_bytes().to_vec(),
632 path: "baz.qux".to_string(),
633 value: "".to_string(), truncate_options: None,
635 extract_options: None,
636 };
637
638 let result = delete(&req).unwrap();
639
640 assert!(gjson::valid(TEST_DATA));
641 assert!(gjson::valid(&result));
642
643 let v = gjson::get(TEST_DATA, "baz.qux");
644 assert_eq!(v.str(), "quux");
645
646 let v = gjson::get(result.as_str(), "baz.qux");
647 assert_eq!(v.exists(), false);
648 }
649
650 #[test]
651 fn test_extract_flatten() {
652 let req = Request {
653 data: TEST_DATA.as_bytes().to_vec(),
654 path: "".to_string(),
655 value: "".to_string(),
656 truncate_options: None,
657 extract_options: Some(ExtractOptions {
658 flatten: true,
659 paths: vec!["foo".to_string(), "baz.qux".to_string()],
660 }),
661 };
662
663 let result = extract(&req).unwrap();
664
665 assert!(gjson::valid(result.as_str()));
666 assert_eq!(result, r#"{"foo":"bar","qux":"quux"}"#);
667 }
668
669 #[test]
670 fn test_extract_no_flatten() {
671 let req = Request {
672 data: TEST_DATA.as_bytes().to_vec(),
673 path: "".to_string(),
674 value: "".to_string(),
675 truncate_options: None,
676 extract_options: Some(ExtractOptions {
677 flatten: false,
678 paths: vec!["foo".to_string(), "baz.qux".to_string()],
679 }),
680 };
681
682 let result = extract(&req).unwrap();
683
684 assert!(gjson::valid(result.as_str()));
685 assert_eq!(result, r#"{"baz":{"qux":"quux"},"foo":"bar"}"#);
686 }
687
688 #[test]
689 fn test_extract_scalar_types() {
690 let req = Request {
691 data: r#"{
692 "string": "bar",
693 "number": 1,
694 "float": 1.0,
695 "bigint": 9007199254740991,
696 "signed_int": -1,
697 "bool": true,
698 "null": null
699 }"#
700 .as_bytes()
701 .to_vec(),
702 path: "".to_string(),
703 value: "".to_string(),
704 truncate_options: None,
705 extract_options: Some(ExtractOptions {
706 flatten: true,
707 paths: vec![
708 "string".to_string(),
709 "number".to_string(),
710 "bool".to_string(),
711 "null".to_string(),
712 "float".to_string(),
713 "bigint".to_string(),
714 "signed_int".to_string(),
715 ],
716 }),
717 };
718
719 let result = extract(&req).unwrap();
720
721 println!("result: {}", result);
722 assert!(gjson::valid(result.as_str()));
723 assert_eq!(
724 result,
725 r#"{"bigint":9007199254740991,"bool":true,"float":1.0,"null":null,"number":1,"signed_int":-1,"string":"bar"}"#
726 );
727 }
728
729 #[test]
730 fn test_extract_arrays() {
731 let req = Request {
732 data: r#"{"users": [
733 {"name": "Alice", "age": 30},
734 {"name": "Bob", "age": 31}
735 ]}"#
736 .as_bytes()
737 .to_vec(),
738 path: "".to_string(),
739 value: "".to_string(),
740 truncate_options: None,
741 extract_options: Some(ExtractOptions {
742 flatten: false,
743 paths: vec!["users".to_string()],
744 }),
745 };
746
747 let result = extract(&req).unwrap();
748
749 let expected = r#"{"users":[{"age":30,"name":"Alice"},{"age":31,"name":"Bob"}]}"#;
750
751 assert!(gjson::valid(result.as_str()));
752 assert_eq!(result, expected);
753 }
754
755 #[test]
756 fn test_extract_array_object_field() {
757 let req = Request {
758 data: r#"{"users": [
759 {"name": "Alice", "age": 30},
760 {"name": "Bob", "age": 31}
761 ]}"#
762 .as_bytes()
763 .to_vec(),
764 path: "".to_string(),
765 value: "".to_string(),
766 truncate_options: None,
767 extract_options: Some(ExtractOptions {
768 flatten: false,
769 paths: vec!["users.#.name".to_string()],
770 }),
771 };
772
773 let result = extract(&req).unwrap();
774
775 let expected = r#"{"users":{"name":["Alice","Bob"]}}"#;
776
777 assert!(gjson::valid(result.as_str()));
778 assert_eq!(result, expected);
779 }
780}