package sdf:metadata;
interface metadata {
record metadata-type {
name: string,
// type
%type: sdf-type,
origin: sdf-type-origin,
}
variant sdf-type-origin {
local,
imported
}
variant sdf-type {
null,
%u8,
%u16,
%u32,
%u64,
%s8,
%s16,
%s32,
%s64,
%float32,
%float64,
%string,
%bool,
%bytes,
%enum(sdf-enum),
keyed-state(sdf-keyed-state),
key-value(sdf-key-value),
%list(sdf-list),
%option(sdf-option),
object(sdf-object),
named(type-ref),
arrow-row(sdf-arrow-row),
}
record sdf-key-value {
key: type-ref,
value: type-ref,
}
record sdf-arrow-row {
columns: list<sdf-arrow-column>,
ttl: option<u64>,
}
record sdf-arrow-column {
name: string,
%type: arrow-column-kind,
}
variant arrow-column-kind {
%u8,
%u16,
%u32,
%u64,
%s8,
%s16,
%s32,
%s64,
%float32,
%float64,
%string,
%bool,
timestamp
}
record type-ref {
name: string
}
// represents enum
// fruit:
// type: enum
// oneOf:
// banana:
// type: string
// apple:
// type: null
//
record sdf-enum {
variants: list<enum-field>,
tagging: option<enum-tagging>,
}
variant enum-tagging {
untagged,
}
record enum-field {
name: string,
value: option<type-ref>,
serde-config: serde-config
}
record sdf-object {
fields: list<object-field>
}
record object-field {
name: string,
%type: type-ref,
optional: bool,
serde-config: serde-config
}
record serde-config {
serialize: option<serde-field-config>,
deserialize: option<serde-field-config>,
}
record serde-field-config {
rename: option<string>,
}
record sdf-list {
%item: type-ref
}
record sdf-option {
%value: type-ref
}
record sdf-keyed-state {
key: type-ref,
value: sdf-keyed-state-value,
}
variant sdf-keyed-state-value {
arrow-row(sdf-arrow-row),
%u32,
unresolved(type-ref)
}
variant invocation-type {
filter-map,
filter,
map,
update,
read
}
record named-parameter {
name: string,
%type: type-ref,
optional: bool,
kind: parameter-kind
}
variant parameter-kind {
key,
value
}
record parameter {
%type: output-type,
optional: bool,
}
variant output-type {
ref(type-ref),
key-value(sdf-key-value)
}
record header {
name: string,
version: string,
namespace: string,
}
}
interface states {
use metadata.{sdf-keyed-state};
variant state {
typed(state-typed),
reference(state-ref),
system(system-state)
}
record state-typed {
name: string,
%type: sdf-keyed-state,
}
record state-ref {
ref-service: string,
name: string
}
record system-state {
name: string,
system: string
}
}
interface io {
use metadata.{type-ref};
variant serde-converter {
json,
raw
}
record topic {
name: string,
schema: topic-schema,
consumer: option<consumer-config>,
producer: option<producer-config>,
profile: option<string>
}
record topic-schema {
key: option<schema-ser-de>,
value: schema-ser-de,
}
// config fluvio consumer for reading
record consumer-config {
default-starting-offset: option<offset>,
max-bytes: option<s32>,
isolation: option<isolation>,
}
// config fluvio producer for write
record producer-config {
linger-ms: option<u64>,
batch-size-bytes: option<s64>,
isolation: option<isolation>,
compression: option<compression>,
timeout-ms: option<u64>,
}
variant compression {
gzip,
snappy,
lz4,
zstd,
}
record schema-ser-de {
converter: option<serde-converter>,
%type: type-ref,
}
variant offset {
beginning(u32),
end(u32),
offset(s64)
}
variant isolation {
read-uncommitted,
read-committed
}
}
interface operator {
use metadata.{named-parameter, parameter, header};
use states.{state-typed};
variant operator {
post-transforms(post-transforms),
transforms(transform-operator),
}
variant transform-operator {
map(step-invocation),
filter-map(step-invocation),
filter(step-invocation),
flat-map(step-invocation),
}
variant operator-type {
map,
filter-map,
filter,
flat-map,
update-state,
window-aggregate,
assign-timestamp,
assign-key,
}
// invoke in the step
record step-invocation {
%uses: string,
inputs: list<named-parameter>,
output: option<parameter>,
states: list<step-state>,
code-info: code-info,
system: bool,
imported-function-metadata: option<imported-function-metadata>,
// using option for backward compatibility
params: option<list<tuple<string, string>>>
}
record code-info {
lang: code-lang,
code: option<string>,
extra-deps: list<code-dep>
}
variant code-lang {
rust
}
record code-dep {
name: string,
version: code-dep-version,
features: list<string>,
default-features: bool,
}
record code-dep-version {
version: option<string>,
path-version: option<string>,
git-version: option<git-version>
}
record git-version {
git: string,
tag: option<string>,
branch: option<string>,
rev: option<string>
}
record imported-function-metadata {
original-name: string,
package-path: string,
package-metadata: header,
}
variant step-state {
resolved(state-typed),
unresolved(state-import)
}
record state-import {
name: string
}
variant post-transforms {
partition(partition-operator),
assign-timestamp(window)
}
record partition-operator {
assign-key: step-invocation,
transforms: transforms,
update-state: option<step-invocation>
}
record window {
properties: window-properties,
assign-timestamp: step-invocation,
transforms: transforms,
partition: option<partition-operator>,
flush: option<step-invocation>,
}
record transforms {
steps: list<transform-operator>,
}
record window-properties {
kind: window-kind,
watermark-config: watermark-config,
}
variant window-kind {
tumbling(tumbling-window),
sliding(sliding-window)
}
record tumbling-window {
duration: u64, // size of window in milliseconds
offset: u64 // offset of window in milliseconds
}
record sliding-window {
duration: u64, // size of window in milliseconds
slide: u64, // slide of window in milliseconds
offset: u64 // offset of window in milliseconds
}
record watermark-config {
idleness: option<u64>, // size of watermark idleness time in milliseconds
grace-period: option<u64> // grace period to allow late events
}
}
interface package-interface {
use metadata.{metadata-type, named-parameter, parameter, type-ref, header};
use states.{state-typed};
use operator.{operator, operator-type, step-invocation};
use io.{serde-converter, topic};
record package-definition {
meta: header,
api-version: string,
imports: list<package-import>,
types: list<metadata-type>,
states: list<state-typed>,
functions: list<tuple<step-invocation, operator-type>>,
dev: option<dev-config>,
}
record package-import {
metadata: header,
path: option<string>,
types: list<string>,
states: list<string>,
functions: list<function-import>,
}
record function-import {
name: string,
alias: option<string>
}
record dev-config {
converter: option<serde-converter>,
imports: list<package-import>,
topics: list<tuple<string, topic>>,
}
}
// define dataflow
interface dataflow {
use metadata.{header, metadata-type, named-parameter, parameter, type-ref};
use states.{state};
use package-interface.{dev-config, package-definition, package-import};
use io.{consumer-config, producer-config, serde-converter, topic};
use operator.{operator, post-transforms, step-invocation, transform-operator, transforms};
record default-configurations {
converter: option<serde-converter>,
consumer: option<consumer-config>,
producer: option<producer-config>,
}
record dataflow-definition {
meta: header,
api-version: string,
imports: list<package-import>,
types: list<metadata-type>,
services: list<operations>,
topics: list<tuple<string, topic>>,
dev: option<dev-config>,
packages: list<package-definition>,
schedule: option<list<schedule-config>>,
default-config: option<default-configurations>,
}
record operations {
name: string,
sources: list<io-ref>,
sinks: list<io-ref>,
transforms: transforms,
post-transforms: option<post-transforms>,
states: list<state>,
}
record io-ref {
%type: io-type,
id: string,
steps: list<transform-operator>,
}
variant io-type {
topic,
no-target,
schedule,
}
record schedule-config {
name: string,
schedule: schedule,
}
variant schedule {
cron(string)
}
record window-aggregate-operator {
aggregate: step-invocation
}
variant step-invocation-apply {
state(string)
}
record step-invocation-param {
name: string,
%type: type-ref,
value: option<string>
}
record emit-trigger {
name: string,
ouptut-type: string
}
record trigger {
name: string
}
variant operator-adaptor {
http
}
}
interface dataflow-graph {
use dataflow.{dataflow-definition};
add-dataflow: func(name: string, def: dataflow-definition);
get-dataflow: func(name: string) -> option<dataflow-definition>;
print-dataflow: func(name: string);
}
interface dataflow-session {
/// start session, this will start new operator session
create-session: func(service: string) -> option<string>;
/// terminate session, this will terminate operator session
terminate-session: func(session: string);
}
// basic interface for handling any records
interface record-handler {
record flv-record {
key: option<list<u8>>,
value: list<u8>
}
}
interface state-store {
get: func(key: string) -> list<u8>;
put: func(key: string,value: list<u8>);
}
interface trigger {
emit: func(name: string, value: list<u8>);
}
world dataflow-guest {
import dataflow;
import package-interface;
import metadata;
import states;
import dataflow-session;
import dataflow-graph;
use dataflow.{ dataflow-definition };
export dataflow-graph;
}