schema/deployment/
conversion.rs1use std::collections::BTreeMap;
13
14use schema_config_toml::{ConfigToml, EnvOrValue, entities};
15use schema_core::{
16 ConnectionSpec, OpensearchSink, Secret, StdoutSink, TextAnalysis, common::SourceType,
17};
18
19use super::{Config, ServerConfig, Sink, Source};
20
21impl From<ConfigToml> for Config {
24 fn from(toml: ConfigToml) -> Self {
25 let source = convert_source(toml.source);
26 let sinks = toml
27 .sinks
28 .into_iter()
29 .map(|(name, sink)| (name, convert_sink(sink)))
30 .collect();
31
32 Config {
33 source,
34 sinks,
35 indexes: BTreeMap::new(),
36 on_error: toml.on_error,
37 server: ServerConfig {
38 public_address: toml.server.public_address,
39 private_address: toml.server.private_address,
40 },
41 }
42 }
43}
44
45fn convert_source(source: entities::Source) -> Source {
46 match source {
47 entities::Source::Postgres(pg) => Source {
48 source_type: SourceType::Postgres,
49 connection: pg.connection_url.map(convert_connection_spec),
50 manage_publication: pg.manage_publication.unwrap_or(true),
51 },
52 }
53}
54
55fn convert_connection_spec(url: entities::ConnectionUrl) -> ConnectionSpec {
59 match url {
60 entities::ConnectionUrl::Url(ev) => ConnectionSpec::Url(to_secret(ev)),
61 entities::ConnectionUrl::Parts {
62 host,
63 port,
64 user,
65 password,
66 database,
67 } => ConnectionSpec::Parts {
68 host,
69 port,
70 user,
71 password: password.map(to_secret),
72 database,
73 },
74 }
75}
76
77fn convert_sink(sink: entities::Sink) -> Sink {
78 match sink {
79 entities::Sink::Opensearch(s) => Sink::Opensearch(OpensearchSink {
80 url: to_secret(s.url),
81 username: s.username.map(to_secret),
82 password: s.password.map(to_secret),
83 tls_verify: s.tls_verify,
84 batch_size: s.batch_size,
85 max_bytes: s.max_bytes,
86 timeout_secs: s.timeout_secs,
87 max_retries: s.max_retries,
88 pipeline: s.pipeline,
89 number_of_shards: s.number_of_shards,
90 number_of_replicas: s.number_of_replicas,
91 refresh_interval: s.refresh_interval,
92 text_analysis: convert_text_analysis(s.text_analysis),
93 auto_subfields: s.auto_subfields,
94 }),
95 entities::Sink::Stdout(s) => Sink::Stdout(StdoutSink { pretty: s.pretty }),
96 }
97}
98
99fn to_secret(value: EnvOrValue) -> Secret {
101 match value {
102 EnvOrValue::Env { env } => Secret::Env(env),
103 EnvOrValue::Value(v) => Secret::Value(v),
104 }
105}
106
107fn convert_text_analysis(value: entities::TextAnalysis) -> TextAnalysis {
108 match value {
109 entities::TextAnalysis::Builtin => TextAnalysis::Builtin,
110 entities::TextAnalysis::Icu => TextAnalysis::Icu,
111 }
112}