streamdal_wasm_transform/
transform.rs

1use conv::prelude::*;
2use serde_json::{Map, Number, Value};
3use streamdal_gjson as gjson;
4use streamdal_gjson::Kind;
5
6#[derive(Debug)]
7pub enum TransformError {
8    Generic(String),
9}
10
11pub enum TruncateType {
12    Chars,
13    Percent,
14}
15
16pub struct TruncateOptions {
17    pub length: usize,
18    pub truncate_type: TruncateType,
19}
20
21pub struct ExtractOptions {
22    pub flatten: bool,
23    pub paths: Vec<String>,
24}
25
26pub struct Request {
27    pub data: Vec<u8>,
28    pub path: String,
29    pub value: String,
30
31    // TODO: we'll eventually have to pass protos directly to those functions
32    // TODO: but let's keep changes simple for now ~MG 2024-01-02
33    pub truncate_options: Option<TruncateOptions>,
34    pub extract_options: Option<ExtractOptions>,
35}
36
37fn extract_array(value: &streamdal_gjson::Value) -> Result<serde_json::Value, TransformError> {
38    let mut array = Value::Array(Vec::new());
39    for element in value.array() {
40        match element.kind() {
41            Kind::String => {
42                array
43                    .as_array_mut()
44                    .unwrap()
45                    .push(Value::String(element.to_string()));
46            }
47            Kind::Number => {
48                array
49                    .as_array_mut()
50                    .unwrap()
51                    .push(Value::Number(Number::from_f64(element.f64()).unwrap()));
52            }
53            Kind::True => {
54                array.as_array_mut().unwrap().push(Value::Bool(true));
55            }
56            Kind::False => {
57                array.as_array_mut().unwrap().push(Value::Bool(false));
58            }
59            Kind::Null => {
60                array.as_array_mut().unwrap().push(Value::Null);
61            }
62            Kind::Object => {
63                let obj = Value::Object(serde_json::from_str(element.str()).unwrap());
64                array.as_array_mut().unwrap().push(obj);
65            }
66            Kind::Array => match extract_array(&element) {
67                Ok(v) => {
68                    array.as_array_mut().unwrap().push(v);
69                }
70                Err(e) => {
71                    return Err(e);
72                }
73            },
74        }
75    }
76
77    Ok(array)
78}
79
80fn extract_number(json_str: &str) -> Result<serde_json::Value, TransformError> {
81    // Parse the JSON string
82    let parsed_value: Result<Value, _> = serde_json::from_str(json_str);
83
84    match parsed_value {
85        Ok(value) => {
86            // Check if the parsed value is a number
87            if value.is_number() {
88                Ok(value)
89            } else {
90                Err(TransformError::Generic(
91                    "unable to extract data: path is not a valid number".to_string(),
92                ))
93            }
94        }
95
96        Err(e) => Err(TransformError::Generic(e.to_string())), // JSON parsing error
97    }
98}
99
100fn extract_key(value: &streamdal_gjson::Value) -> Result<serde_json::Value, TransformError> {
101    match value.kind() {
102        Kind::String => Ok(Value::String(value.to_string())),
103        Kind::Number => match extract_number(value.to_string().as_str()) {
104            Ok(num) => Ok(num),
105            Err(e) => Err(e),
106        },
107        Kind::True => Ok(Value::Bool(true)),
108        Kind::False => Ok(Value::Bool(false)),
109        Kind::Null => Ok(Value::Null),
110        Kind::Array => {
111            // Convert from gjson to serde type :/
112            match extract_array(value) {
113                Ok(array) => Ok(array),
114                Err(e) => Err(e),
115            }
116        }
117        _ => Err(TransformError::Generic(format!(
118            "unable to extract data: unknown type for value: `{}`",
119            value
120        ))),
121    }
122}
123
124pub fn extract(req: &Request) -> Result<String, TransformError> {
125    validate_extract_request(req)?;
126
127    let extract_options = match &req.extract_options {
128        Some(options) => options,
129        None => {
130            return Err(TransformError::Generic(
131                "unable to extract data: options not provided".to_string(),
132            ))
133        }
134    };
135
136    let data_as_str = convert_bytes_to_string(&req.data)?;
137
138    // For each path in extract_options.paths, get the value and add it to a map
139    let mut extracted_data = Map::new();
140    for path in &extract_options.paths {
141        let value = gjson::get(data_as_str, path.as_str());
142        if !value.exists() {
143            continue;
144        }
145
146        // Split path by ".". If we're flattening it, just get last element and insert into hashmap
147        // Otherwise, we need to recursively create sub-hashmaps for each element in the path before
148        // inserting the value.
149        let path_elements: Vec<&str> = path.split('.').collect();
150
151        if extract_options.flatten {
152            let parsed_value = match extract_key(&value) {
153                Ok(key) => key,
154                Err(e) => return Err(e),
155            };
156
157            extracted_data.insert(path_elements.last().unwrap().to_string(), parsed_value);
158        } else {
159            let mut current_map = &mut extracted_data;
160            for (i, path_element) in path_elements.iter().enumerate() {
161                // Check if we're in the last element of the path, if so, we're inserting the value
162                if i == path_elements.len() - 1 {
163                    let parsed_value = match extract_key(&value) {
164                        Ok(key) => key,
165                        Err(e) => return Err(e),
166                    };
167
168                    current_map.insert(path_element.to_string(), parsed_value);
169
170                    continue;
171                }
172
173                // Exclude gjson array syntax from path
174                if path_element.ends_with('#') {
175                    continue;
176                }
177
178                // We're not at the last element, so we need to insert a new map for this element
179                // Get the key if it exists, otherwise create the key and set it's value to serde_json::Map::new()
180                // On subsequent iterations, we'll get the key and set current_map to the value of that key
181                if current_map.contains_key(*path_element) {
182                    let current_map_value = match current_map.get_mut(*path_element) {
183                        Some(value) => value,
184                        None => {
185                            return Err(TransformError::Generic(
186                                "unable to extract data: unable to get current map value"
187                                    .to_string(),
188                            ))
189                        }
190                    };
191
192                    current_map = match current_map_value.as_object_mut() {
193                        Some(value) => value,
194                        None => {
195                            return Err(TransformError::Generic(
196                                "unable to extract data: unable to get current map as object"
197                                    .to_string(),
198                            ))
199                        }
200                    };
201                }
202
203                current_map.insert(path_element.to_string(), Value::Object(Map::new()));
204
205                let current_map_value = match current_map.get_mut(*path_element) {
206                    Some(value) => value,
207                    None => {
208                        return Err(TransformError::Generic(
209                            "unable to extract data: unable to get current map value".to_string(),
210                        ))
211                    }
212                };
213
214                current_map = match current_map_value.as_object_mut() {
215                    Some(value) => value,
216                    None => {
217                        return Err(TransformError::Generic(
218                            "unable to extract data: unable to get current map as object"
219                                .to_string(),
220                        ))
221                    }
222                };
223            }
224        }
225    }
226
227    // Convert serde map to json string
228    if let Ok(res) = serde_json::to_string(&extracted_data) {
229        Ok(res)
230    } else {
231        Err(TransformError::Generic(
232            "unable to extract data: unable to serialize data".to_string(),
233        ))
234    }
235}
236
237pub fn overwrite(req: &Request) -> Result<String, TransformError> {
238    validate_request(req, true)?;
239
240    let data = gjson::set_overwrite(
241        convert_bytes_to_string(&req.data)?,
242        req.path.as_str(),
243        req.value.as_str(),
244    )
245    .map_err(|e| TransformError::Generic(format!("unable to overwrite data: {}", e)))?;
246
247    Ok(data)
248}
249
250pub fn truncate(req: &Request) -> Result<String, TransformError> {
251    validate_request(req, false)?;
252
253    let truncate_options = match &req.truncate_options {
254        Some(options) => options,
255        None => {
256            return Err(TransformError::Generic(
257                "unable to truncate data: options not provided".to_string(),
258            ))
259        }
260    };
261
262    let data_as_str = convert_bytes_to_string(&req.data)?;
263    let value = gjson::get(data_as_str, req.path.as_str());
264    let length_of_field = gjson::get(data_as_str, req.path.as_str()).to_string().len();
265
266    let mut truncate_length = match &truncate_options.truncate_type {
267        #[allow(clippy::clone_on_copy)]
268        TruncateType::Chars => {
269            if truncate_options.length > length_of_field {
270                length_of_field
271            } else {
272                length_of_field - truncate_options.length.clone()
273            }
274        }
275        TruncateType::Percent => {
276            let my_usize_reference =
277                100.0 - &truncate_options.length.value_as::<f64>().unwrap_or(0.0);
278            let num_chars_to_keep: f64 = length_of_field as f64 * (my_usize_reference / 100.0);
279            num_chars_to_keep.round() as usize
280        }
281    };
282
283    truncate_length = truncate_length.clamp(0, length_of_field);
284
285    match value.kind() {
286        gjson::Kind::String => _truncate(data_as_str, req.path.as_str(), &truncate_length),
287        _ => Err(TransformError::Generic(format!(
288            "unable to truncate data: path '{}' is not a string",
289            req.path
290        ))),
291    }
292}
293
294#[allow(clippy::to_string_in_format_args)]
295fn _truncate(data: &str, path: &str, len: &usize) -> Result<String, TransformError> {
296    let contents = gjson::get(data, path);
297
298    let num_chars_to_keep = contents.str().len() - len;
299
300    let truncated = format!("\"{}\"", contents.str()[0..num_chars_to_keep].to_string());
301
302    let data = gjson::set_overwrite(data, path, &truncated)
303        .map_err(|e| TransformError::Generic(format!("unable to truncate data: {}", e)))?;
304
305    Ok(data)
306}
307
308pub fn delete(req: &Request) -> Result<String, TransformError> {
309    validate_request(req, false)?;
310
311    let data_as_str = convert_bytes_to_string(&req.data)?;
312
313    _delete(data_as_str, req.path.as_str())
314}
315
316fn _delete(data: &str, path: &str) -> Result<String, TransformError> {
317    let data = gjson::delete_path(data, path)
318        .map_err(|e| TransformError::Generic(format!("unable to delete data: {}", e)))?;
319
320    Ok(data)
321}
322
323pub fn obfuscate(req: &Request) -> Result<String, TransformError> {
324    validate_request(req, false)?;
325
326    let data_as_str = convert_bytes_to_string(&req.data)?;
327    let value = gjson::get(data_as_str, req.path.as_str());
328
329    match value.kind() {
330        gjson::Kind::String => _obfuscate(data_as_str, req.path.as_str()),
331        _ => Err(TransformError::Generic(format!(
332            "unable to mask data: path '{}' is not a string or number",
333            req.path
334        ))),
335    }
336}
337
338fn _obfuscate(data: &str, path: &str) -> Result<String, TransformError> {
339    let contents = gjson::get(data, path);
340    let hashed = sha256::digest(contents.str().as_bytes());
341
342    let obfuscated = format!("\"sha256:{}\"", hashed);
343
344    gjson::set_overwrite(data, path, &obfuscated)
345        .map_err(|e| TransformError::Generic(format!("unable to obfuscate data: {}", e)))
346}
347
348pub fn mask(req: &Request) -> Result<String, TransformError> {
349    validate_request(req, false)?;
350
351    let data_as_str = convert_bytes_to_string(&req.data)?;
352    let value = gjson::get(data_as_str, req.path.as_str());
353
354    match value.kind() {
355        gjson::Kind::String => _mask(data_as_str, req.path.as_str(), '*', true),
356        gjson::Kind::Number => _mask(data_as_str, req.path.as_str(), '0', false),
357        _ => Err(TransformError::Generic(format!(
358            "unable to mask data: path '{}' is not a string or number",
359            req.path
360        ))),
361    }
362}
363
364fn _mask(data: &str, path: &str, mask_char: char, quote: bool) -> Result<String, TransformError> {
365    let contents = gjson::get(data, path);
366    let num_chars_to_mask = (0.8 * contents.str().len() as f64).round() as usize;
367    let num_chars_to_skip = contents.str().len() - num_chars_to_mask;
368
369    let mut masked = contents.str()[0..num_chars_to_skip].to_string()
370        + mask_char.to_string().repeat(num_chars_to_mask).as_str();
371
372    if quote {
373        masked = format!("\"{}\"", masked);
374    }
375
376    gjson::set_overwrite(data, path, &masked)
377        .map_err(|e| TransformError::Generic(format!("unable to mask data: {}", e)))
378}
379
380fn validate_request(req: &Request, _value_check: bool) -> Result<(), TransformError> {
381    if req.data.is_empty() {
382        return Err(TransformError::Generic("data cannot be empty".to_string()));
383    }
384
385    // Is this valid JSON?
386    if !gjson::valid(convert_bytes_to_string(&req.data)?) {
387        return Err(TransformError::Generic(
388            "data is not valid JSON".to_string(),
389        ));
390    }
391
392    // Valid path?
393    if !gjson::get(convert_bytes_to_string(&req.data)?, req.path.as_str()).exists() {
394        return Err(TransformError::Generic(format!(
395            "path '{}' not found in data",
396            req.path
397        )));
398    }
399
400    Ok(())
401}
402
403fn validate_extract_request(req: &Request) -> Result<(), TransformError> {
404    if req.data.is_empty() {
405        return Err(TransformError::Generic("data cannot be empty".to_string()));
406    }
407
408    // Is this valid JSON?
409    if !gjson::valid(convert_bytes_to_string(&req.data)?) {
410        return Err(TransformError::Generic(
411            "data is not valid JSON".to_string(),
412        ));
413    }
414
415    if req.extract_options.is_none() {
416        return Err(TransformError::Generic(
417            "extract options not provided".to_string(),
418        ));
419    }
420
421    let extract_options = req.extract_options.as_ref().unwrap();
422    if extract_options.paths.is_empty() {
423        return Err(TransformError::Generic(
424            "extract paths cannot be empty".to_string(),
425        ));
426    }
427
428    Ok(())
429}
430
431fn convert_bytes_to_string(bytes: &Vec<u8>) -> Result<&str, TransformError> {
432    Ok(std::str::from_utf8(bytes.as_slice())
433        .map_err(|e| TransformError::Generic(format!("unable to parse data as UTF-8: {}", e))))?
434}
435
436#[cfg(test)]
437mod tests {
438    use super::*;
439
440    const TEST_DATA: &str = r#"{
441    "foo": "bar",
442    "baz": {
443        "qux": "quux"
444    },
445    "bool": true
446}"#;
447
448    #[test]
449    fn test_overwrite() {
450        let mut req = Request {
451            data: TEST_DATA.as_bytes().to_vec(),
452            path: "baz.qux".to_string(),
453            value: "\"test\"".to_string(),
454            truncate_options: None,
455            extract_options: None,
456        };
457
458        let result = overwrite(&req).unwrap();
459
460        assert!(gjson::valid(&TEST_DATA));
461        assert!(gjson::valid(&result));
462        assert_eq!(result, TEST_DATA.replace("quux", "test"));
463
464        let v = gjson::get(TEST_DATA, "baz.qux");
465        assert_eq!(v.str(), "quux");
466
467        let v = gjson::get(result.as_str(), "baz.qux");
468        assert_eq!(v.str(), "test");
469
470        req.path = "does-not-exist".to_string();
471        assert!(
472            overwrite(&req).is_err(),
473            "should error when path does not exist"
474        );
475
476        // Can overwrite anything
477        req.path = "bool".to_string();
478        assert!(
479            overwrite(&req).is_ok(),
480            "should be able to replace any value, regardless of type"
481        );
482    }
483
484    #[test]
485    fn test_obfuscate() {
486        let mut req = Request {
487            data: TEST_DATA.as_bytes().to_vec(),
488            path: "baz.qux".to_string(),
489            value: "".to_string(), // needs a default
490            truncate_options: None,
491            extract_options: None,
492        };
493
494        let result = obfuscate(&req).unwrap();
495        let hashed_value = sha256::digest("quux".as_bytes());
496
497        assert!(gjson::valid(&TEST_DATA));
498        assert!(gjson::valid(&result));
499
500        let v = gjson::get(TEST_DATA, "baz.qux");
501        assert_eq!(v.str(), "quux");
502
503        let v = gjson::get(result.as_str(), "baz.qux");
504        assert_eq!(v.str(), format!("sha256:{}", hashed_value));
505
506        // path does not exist
507        req.path = "does-not-exist".to_string();
508        assert!(mask(&req).is_err());
509
510        // path not a string
511        req.path = "bool".to_string();
512        assert!(mask(&req).is_err());
513    }
514
515    #[test]
516    fn test_mask() {
517        let mut req = Request {
518            data: TEST_DATA.as_bytes().to_vec(),
519            path: "baz.qux".to_string(),
520            value: "".to_string(), // needs a default
521            truncate_options: None,
522            extract_options: None,
523        };
524
525        let result = mask(&req).unwrap();
526
527        assert!(gjson::valid(TEST_DATA));
528        assert!(gjson::valid(&result));
529
530        let v = gjson::get(TEST_DATA, "baz.qux");
531        assert_eq!(v.str(), "quux");
532
533        let v = gjson::get(result.as_str(), "baz.qux");
534        assert_eq!(v.str(), "q***");
535
536        // path does not exist
537        req.path = "does-not-exist".to_string();
538        assert!(mask(&req).is_err());
539
540        // path not a string
541        req.path = "bool".to_string();
542        assert!(mask(&req).is_err());
543    }
544
545    #[test]
546    fn test_truncate_chars() {
547        let mut req = Request {
548            data: TEST_DATA.as_bytes().to_vec(),
549            path: "baz.qux".to_string(),
550            value: "".to_string(), // needs a default
551            truncate_options: Some(TruncateOptions {
552                length: 1,
553                truncate_type: TruncateType::Chars,
554            }),
555            extract_options: None,
556        };
557
558        let result = truncate(&req).unwrap();
559
560        assert!(gjson::valid(TEST_DATA));
561        assert!(gjson::valid(&result));
562
563        let v = gjson::get(TEST_DATA, "baz.qux");
564        assert_eq!(v.str(), "quux");
565
566        let v = gjson::get(result.as_str(), "baz.qux");
567        assert_eq!(v.str(), "q");
568
569        // path does not exist
570        req.path = "does-not-exist".to_string();
571        assert!(truncate(&req).is_err());
572
573        // path not a string
574        req.path = "bool".to_string();
575        assert!(truncate(&req).is_err());
576    }
577
578    #[test]
579    fn test_truncate_chars_over_length() {
580        let req = Request {
581            data: TEST_DATA.as_bytes().to_vec(),
582            path: "baz.qux".to_string(),
583            value: "".to_string(), // needs a default
584            truncate_options: Some(TruncateOptions {
585                length: 5,
586                truncate_type: TruncateType::Chars,
587            }),
588            extract_options: None,
589        };
590
591        let result = truncate(&req).unwrap();
592
593        assert!(gjson::valid(TEST_DATA));
594        assert!(gjson::valid(&result));
595
596        let v = gjson::get(TEST_DATA, "baz.qux");
597        assert_eq!(v.str(), "quux");
598
599        let v = gjson::get(result.as_str(), "baz.qux");
600        assert_eq!(v.str(), "");
601    }
602
603    #[test]
604    fn test_truncate_percent() {
605        let req = Request {
606            data: TEST_DATA.as_bytes().to_vec(),
607            path: "baz.qux".to_string(),
608            value: "".to_string(), // needs a default
609            truncate_options: Some(TruncateOptions {
610                length: 25,
611                truncate_type: TruncateType::Percent,
612            }),
613            extract_options: None,
614        };
615
616        let result = truncate(&req).unwrap();
617
618        assert!(gjson::valid(TEST_DATA));
619        assert!(gjson::valid(&result));
620
621        let v = gjson::get(TEST_DATA, "baz.qux");
622        assert_eq!(v.str(), "quux");
623
624        let v = gjson::get(result.as_str(), "baz.qux");
625        assert_eq!(v.str(), "q");
626    }
627
628    #[test]
629    fn test_delete() {
630        let req = Request {
631            data: TEST_DATA.as_bytes().to_vec(),
632            path: "baz.qux".to_string(),
633            value: "".to_string(), // needs a default
634            truncate_options: None,
635            extract_options: None,
636        };
637
638        let result = delete(&req).unwrap();
639
640        assert!(gjson::valid(TEST_DATA));
641        assert!(gjson::valid(&result));
642
643        let v = gjson::get(TEST_DATA, "baz.qux");
644        assert_eq!(v.str(), "quux");
645
646        let v = gjson::get(result.as_str(), "baz.qux");
647        assert_eq!(v.exists(), false);
648    }
649
650    #[test]
651    fn test_extract_flatten() {
652        let req = Request {
653            data: TEST_DATA.as_bytes().to_vec(),
654            path: "".to_string(),
655            value: "".to_string(),
656            truncate_options: None,
657            extract_options: Some(ExtractOptions {
658                flatten: true,
659                paths: vec!["foo".to_string(), "baz.qux".to_string()],
660            }),
661        };
662
663        let result = extract(&req).unwrap();
664
665        assert!(gjson::valid(result.as_str()));
666        assert_eq!(result, r#"{"foo":"bar","qux":"quux"}"#);
667    }
668
669    #[test]
670    fn test_extract_no_flatten() {
671        let req = Request {
672            data: TEST_DATA.as_bytes().to_vec(),
673            path: "".to_string(),
674            value: "".to_string(),
675            truncate_options: None,
676            extract_options: Some(ExtractOptions {
677                flatten: false,
678                paths: vec!["foo".to_string(), "baz.qux".to_string()],
679            }),
680        };
681
682        let result = extract(&req).unwrap();
683
684        assert!(gjson::valid(result.as_str()));
685        assert_eq!(result, r#"{"baz":{"qux":"quux"},"foo":"bar"}"#);
686    }
687
688    #[test]
689    fn test_extract_scalar_types() {
690        let req = Request {
691            data: r#"{
692                "string": "bar",
693                "number": 1,
694                "float": 1.0,
695                "bigint": 9007199254740991,
696                "signed_int": -1,
697                "bool": true,
698                "null": null
699            }"#
700            .as_bytes()
701            .to_vec(),
702            path: "".to_string(),
703            value: "".to_string(),
704            truncate_options: None,
705            extract_options: Some(ExtractOptions {
706                flatten: true,
707                paths: vec![
708                    "string".to_string(),
709                    "number".to_string(),
710                    "bool".to_string(),
711                    "null".to_string(),
712                    "float".to_string(),
713                    "bigint".to_string(),
714                    "signed_int".to_string(),
715                ],
716            }),
717        };
718
719        let result = extract(&req).unwrap();
720
721        println!("result: {}", result);
722        assert!(gjson::valid(result.as_str()));
723        assert_eq!(
724            result,
725            r#"{"bigint":9007199254740991,"bool":true,"float":1.0,"null":null,"number":1,"signed_int":-1,"string":"bar"}"#
726        );
727    }
728
729    #[test]
730    fn test_extract_arrays() {
731        let req = Request {
732            data: r#"{"users": [
733                {"name": "Alice", "age": 30},
734                {"name": "Bob", "age": 31}
735            ]}"#
736            .as_bytes()
737            .to_vec(),
738            path: "".to_string(),
739            value: "".to_string(),
740            truncate_options: None,
741            extract_options: Some(ExtractOptions {
742                flatten: false,
743                paths: vec!["users".to_string()],
744            }),
745        };
746
747        let result = extract(&req).unwrap();
748
749        let expected = r#"{"users":[{"age":30,"name":"Alice"},{"age":31,"name":"Bob"}]}"#;
750
751        assert!(gjson::valid(result.as_str()));
752        assert_eq!(result, expected);
753    }
754
755    #[test]
756    fn test_extract_array_object_field() {
757        let req = Request {
758            data: r#"{"users": [
759                {"name": "Alice", "age": 30},
760                {"name": "Bob", "age": 31}
761            ]}"#
762            .as_bytes()
763            .to_vec(),
764            path: "".to_string(),
765            value: "".to_string(),
766            truncate_options: None,
767            extract_options: Some(ExtractOptions {
768                flatten: false,
769                paths: vec!["users.#.name".to_string()],
770            }),
771        };
772
773        let result = extract(&req).unwrap();
774
775        let expected = r#"{"users":{"name":["Alice","Bob"]}}"#;
776
777        assert!(gjson::valid(result.as_str()));
778        assert_eq!(result, expected);
779    }
780}