sdf-parser-df 0.14.0

Core parser for the sdf YAML
Documentation
use sdf_parser_core::config::types::{MetadataTypeInner, MetadataTypeTagged};
use sdf_parser_df::parse;

#[test]
fn test_parse_ttl() {
    let yaml = r#"
apiVersion: 0.6.0
meta:
  name: my-df
  version: 0.1.0
  namespace: my-org
# default config
config:
  converter: json
  consumer:
    default_starting_offset:
      value: 0
      position: Beginning
topics:
  sentence:
    name: sentence-ref-state
    schema:
      value:
        type: string
        converter: raw
  input:
    name: input-ref-state
    schema:
      value:
        type: string
        converter: raw
  word-count:
    name: output-ref-state
    schema:
      value:
        type: string
        converter: json

services:
  word-count-processing:
    sources:
      - type: topic
        id: sentence
    states:
      count-per-word:
        type: keyed-state
        properties:
          key:
            type: string
          value:
            type: arrow-row
            properties:
              occurrences:
                type: u32
            ttl: 60s
    transforms:
      - operator: flat-map
        run: |
          fn split_sequence(sentence: String) -> Result<Vec<String>> {
            Ok(sentence.split_whitespace().map(|word| word.chars().filter(|c| c.is_alphanumeric()).collect::<String>()).filter(|word| !word.is_empty()).collect())
          }
    partition:
      assign-key:
        run: |
          fn assign_key_word(word: String) -> Result<String> {
            println!("got {}", word);
            Ok(word.to_lowercase().chars().filter(|c| c.is_alphanumeric()).collect())
          }
      update-state:
        run: |
          fn count_word(word: String) -> Result<()> {
            let mut state = count_per_word();
            state.occurrences += 1;
            println!("word: {}, count: {}", word, state.occurrences);
            state.update()?;
            Ok(())
          }

  map-to-occurrences:
    sources:
      - type: topic
        id: input
    states:
      count-per-word:
        from: word-count-processing.count-per-word

    transforms:
      - operator: flat-map
        run: |
          fn split_sequencetwo(sentence: String) -> Result<Vec<String>> {
            Ok(sentence.split_whitespace().map(|word| word.chars().filter(|c| c.is_alphanumeric()).collect::<String>()).filter(|word| !word.is_empty()).collect())
          }
    partition:
      assign-key:
        run: |
          fn assign_key_sort_by(_word: String) -> Result<String> {
            Ok("default".to_string())
          }
      transforms:
        - operator: map
          run: |
            fn map_words_to_occurrence(key: String) -> Result<String> {
                println!("key: {key}");
                let count = sql(&format!("select * from count_per_word where _key = '{}'", key))?;
                let rows = count.rows()?;
                let columns = count.schema(["_key","occurrences"])?;
                match &columns[..] {
                  [k,v] => {
                    if rows.next() {
                      let w = rows.str(&k)?;
                      let c = rows.u32(&v)?;
                      return Ok(format!("key: {} count: {}",w,c))
                    } else {
                      return Ok(format!("key: {} not found",key));
                    }
                },
                  _ => panic!("unexpected schema"),
              }
            }
    sinks:
      - type: topic
        id: word-count
    "#
    .to_string();

    let config = parse(&yaml).expect("parse yaml");

    // assert 60s ttl is set
    let state = config
        .services
        .get("word-count-processing")
        .expect("service exists")
        .valid_data()
        .unwrap()
        .states
        .get("count-per-word")
        .expect("state exists")
        .valid_data()
        .expect("state is valid")
        .inner_type()
        .expect("state type is typed");

    let MetadataTypeInner::MetadataTypeTagged(MetadataTypeTagged::KeyedState(kvstate)) = &state.ty
    else {
        panic!("Expected ArrowRow type");
    };

    let row = kvstate.properties.value.ty.clone();

    let MetadataTypeInner::MetadataTypeTagged(MetadataTypeTagged::ArrowRow(row)) = &row else {
        panic!("Expected ArrowRow type");
    };

    let ttl = row.ttl.as_ref().expect("ttl is set");
    assert_eq!(ttl, "60s");
}