use sdf_parser_core::config::types::{MetadataTypeInner, MetadataTypeTagged};
use sdf_parser_df::parse;
#[test]
fn test_parse_ttl() {
let yaml = r#"
apiVersion: 0.6.0
meta:
name: my-df
version: 0.1.0
namespace: my-org
# default config
config:
converter: json
consumer:
default_starting_offset:
value: 0
position: Beginning
topics:
sentence:
name: sentence-ref-state
schema:
value:
type: string
converter: raw
input:
name: input-ref-state
schema:
value:
type: string
converter: raw
word-count:
name: output-ref-state
schema:
value:
type: string
converter: json
services:
word-count-processing:
sources:
- type: topic
id: sentence
states:
count-per-word:
type: keyed-state
properties:
key:
type: string
value:
type: arrow-row
properties:
occurrences:
type: u32
ttl: 60s
transforms:
- operator: flat-map
run: |
fn split_sequence(sentence: String) -> Result<Vec<String>> {
Ok(sentence.split_whitespace().map(|word| word.chars().filter(|c| c.is_alphanumeric()).collect::<String>()).filter(|word| !word.is_empty()).collect())
}
partition:
assign-key:
run: |
fn assign_key_word(word: String) -> Result<String> {
println!("got {}", word);
Ok(word.to_lowercase().chars().filter(|c| c.is_alphanumeric()).collect())
}
update-state:
run: |
fn count_word(word: String) -> Result<()> {
let mut state = count_per_word();
state.occurrences += 1;
println!("word: {}, count: {}", word, state.occurrences);
state.update()?;
Ok(())
}
map-to-occurrences:
sources:
- type: topic
id: input
states:
count-per-word:
from: word-count-processing.count-per-word
transforms:
- operator: flat-map
run: |
fn split_sequencetwo(sentence: String) -> Result<Vec<String>> {
Ok(sentence.split_whitespace().map(|word| word.chars().filter(|c| c.is_alphanumeric()).collect::<String>()).filter(|word| !word.is_empty()).collect())
}
partition:
assign-key:
run: |
fn assign_key_sort_by(_word: String) -> Result<String> {
Ok("default".to_string())
}
transforms:
- operator: map
run: |
fn map_words_to_occurrence(key: String) -> Result<String> {
println!("key: {key}");
let count = sql(&format!("select * from count_per_word where _key = '{}'", key))?;
let rows = count.rows()?;
let columns = count.schema(["_key","occurrences"])?;
match &columns[..] {
[k,v] => {
if rows.next() {
let w = rows.str(&k)?;
let c = rows.u32(&v)?;
return Ok(format!("key: {} count: {}",w,c))
} else {
return Ok(format!("key: {} not found",key));
}
},
_ => panic!("unexpected schema"),
}
}
sinks:
- type: topic
id: word-count
"#
.to_string();
let config = parse(&yaml).expect("parse yaml");
let state = config
.services
.get("word-count-processing")
.expect("service exists")
.valid_data()
.unwrap()
.states
.get("count-per-word")
.expect("state exists")
.valid_data()
.expect("state is valid")
.inner_type()
.expect("state type is typed");
let MetadataTypeInner::MetadataTypeTagged(MetadataTypeTagged::KeyedState(kvstate)) = &state.ty
else {
panic!("Expected ArrowRow type");
};
let row = kvstate.properties.value.ty.clone();
let MetadataTypeInner::MetadataTypeTagged(MetadataTypeTagged::ArrowRow(row)) = &row else {
panic!("Expected ArrowRow type");
};
let ttl = row.ttl.as_ref().expect("ttl is set");
assert_eq!(ttl, "60s");
}