sdf_parser_core/config/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
pub mod transform;
pub mod types;
pub mod import;
pub mod dev;
pub mod utils;

use sdf_common::LATEST_STABLE_DATAFLOW;

pub const SERVICE_DEFINITION_CONFIG_STABLE_VERSION: &str = LATEST_STABLE_DATAFLOW;
pub const VERSION_NOT_SUPPORTED_ERROR: &str = "ApiVersion not supported, try upgrading to 0.5.0";

pub use wrapper::*;

mod wrapper {

    use serde::{Deserialize, Serialize};

    use super::*;

    use self::types::MetadataType;

    /// Common metadata
    #[derive(Serialize, Deserialize, Debug, Default, Clone)]
    pub struct Metadata {
        pub name: String,
        pub version: String,
        pub namespace: String,
    }

    #[derive(Serialize, Deserialize, Debug, Default, Clone)]
    pub struct DefaultConfigs {
        #[serde(skip_serializing_if = "Option::is_none", default)]
        pub consumer: Option<ConsumerConfigWrapper>,
        #[serde(skip_serializing_if = "Option::is_none", default)]
        pub converter: Option<SerdeConverter>,
        #[serde(skip_serializing_if = "Option::is_none", default)]
        pub producer: Option<ProducerConfigWrapper>,
    }

    #[derive(Serialize, Deserialize, Debug, Clone)]
    pub struct TopicWrapper {
        pub name: Option<String>,
        #[serde(skip_serializing_if = "Option::is_none", default)]
        pub consumer: Option<ConsumerConfigWrapper>,
        #[serde(skip_serializing_if = "Option::is_none", default)]
        pub producer: Option<ProducerConfigWrapper>,
        #[serde(skip_serializing_if = "Option::is_none", default)]
        pub profile: Option<String>,
        pub schema: SchemaWrapper,
    }

    impl TopicWrapper {
        pub fn ty(&self) -> String {
            self.schema.value.ty.ty()
        }

        pub fn with_defaults(&self, topic_id: &str, defaults: &DefaultConfigs) -> Self {
            Self {
                name: self.name.clone().or_else(|| Some(topic_id.into())),
                consumer: self.consumer.clone().or(defaults.consumer.clone()),
                producer: self.producer.clone().or(defaults.producer.clone()),
                profile: self.profile.clone(),
                schema: SchemaWrapper {
                    key: self
                        .schema
                        .clone()
                        .key
                        .map(|inner| inner.with_defaults(defaults)),
                    value: self.schema.value.with_defaults(defaults),
                },
            }
        }
    }

    #[derive(Serialize, Deserialize, Debug, Clone)]
    pub struct SchemaWrapper {
        #[serde(default)]
        pub key: Option<SerdeConfig>,
        pub value: SerdeConfig,
    }

    #[derive(Serialize, Deserialize, Debug, Clone)]
    pub struct SerdeConfig {
        #[serde(flatten)]
        pub ty: MetadataType,
        pub converter: Option<SerdeConverter>,
    }

    impl SerdeConfig {
        pub fn with_defaults(&self, defaults: &DefaultConfigs) -> Self {
            Self {
                ty: self.ty.clone(),
                converter: self.converter.clone().or(defaults.converter.clone()),
            }
        }
    }

    #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
    #[serde(rename_all = "kebab-case")]
    pub enum SerdeConverter {
        Json,
        Raw,
    }

    #[derive(Serialize, Deserialize, Debug, Clone)]
    pub struct ConsumerConfigWrapper {
        #[serde(skip_serializing_if = "Option::is_none", default)]
        pub default_starting_offset: Option<OffsetWrapper>,
        #[serde(skip_serializing_if = "Option::is_none", default)]
        pub max_bytes: Option<i32>,
        #[serde(skip_serializing_if = "Option::is_none", default)]
        pub isolation: Option<Isolation>,
    }

    #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
    #[serde(tag = "position", content = "value")]
    pub enum OffsetWrapper {
        Offset(i64),
        Beginning(u32),
        End(u32),
    }

    #[derive(Serialize, Deserialize, Debug, Clone)]
    pub enum Isolation {
        ReadUncommitted,
        ReadCommitted,
    }

    #[derive(Serialize, Deserialize, Debug, Clone)]
    #[serde(rename_all = "kebab-case")]
    pub enum Compression {
        Gzip,
        Snappy,
        Lz4,
        Zstd,
    }

    #[derive(Serialize, Deserialize, Debug, Clone)]
    pub struct ProducerConfigWrapper {
        #[serde(skip_serializing_if = "Option::is_none", default)]
        pub batch_size: Option<i64>,
        #[serde(skip_serializing_if = "Option::is_none", default)]
        pub linger_ms: Option<u64>,
        #[serde(skip_serializing_if = "Option::is_none", default)]
        pub compression: Option<Compression>,
        #[serde(skip_serializing_if = "Option::is_none", default)]
        pub timeout_ms: Option<u64>,
        #[serde(skip_serializing_if = "Option::is_none", default)]
        pub isolation: Option<Isolation>,
    }
}