schema/deployment/
conversion.rs1use std::collections::BTreeMap;
13
14use schema_config_toml::{ConfigToml, EnvOrValue, entities};
15use schema_core::{
16 ConnectionSpec, OpensearchSink, Secret, StdoutSink, TextAnalysis, common::SourceType,
17};
18
19use super::{Config, ServerConfig, Sink, Source};
20
21impl From<ConfigToml> for Config {
24 fn from(toml: ConfigToml) -> Self {
25 let source = convert_source(toml.source);
26 let sinks = toml
27 .sinks
28 .into_iter()
29 .map(|(name, sink)| (name, convert_sink(sink)))
30 .collect();
31
32 Config {
33 source,
34 sinks,
35 indexes: BTreeMap::new(),
36 on_error: toml.on_error,
37 server: ServerConfig {
38 public_address: toml.server.public_address,
39 private_address: toml.server.private_address,
40 },
41 prefix: toml.prefix,
42 }
43 }
44}
45
46fn convert_source(source: entities::Source) -> Source {
47 match source {
48 entities::Source::Postgres(pg) => Source {
49 source_type: SourceType::Postgres,
50 connection: pg.connection_url.map(convert_connection_spec),
51 manage_publication: pg.manage_publication.unwrap_or(true),
52 },
53 }
54}
55
56fn convert_connection_spec(url: entities::ConnectionUrl) -> ConnectionSpec {
60 match url {
61 entities::ConnectionUrl::Url(ev) => ConnectionSpec::Url(to_secret(ev)),
62 entities::ConnectionUrl::Parts {
63 host,
64 port,
65 user,
66 password,
67 database,
68 } => ConnectionSpec::Parts {
69 host,
70 port,
71 user,
72 password: password.map(to_secret),
73 database,
74 },
75 }
76}
77
78fn convert_sink(sink: entities::Sink) -> Sink {
79 match sink {
80 entities::Sink::Opensearch(s) => Sink::Opensearch(OpensearchSink {
81 url: to_secret(s.url),
82 username: s.username.map(to_secret),
83 password: s.password.map(to_secret),
84 tls_verify: s.tls_verify,
85 batch_size: s.batch_size,
86 max_bytes: s.max_bytes,
87 timeout_secs: s.timeout_secs,
88 max_retries: s.max_retries,
89 pipeline: s.pipeline,
90 number_of_shards: s.number_of_shards,
91 number_of_replicas: s.number_of_replicas,
92 refresh_interval: s.refresh_interval,
93 text_analysis: convert_text_analysis(s.text_analysis),
94 auto_subfields: s.auto_subfields,
95 }),
96 entities::Sink::Stdout(s) => Sink::Stdout(StdoutSink { pretty: s.pretty }),
97 }
98}
99
100fn to_secret(value: EnvOrValue) -> Secret {
102 match value {
103 EnvOrValue::Env { env } => Secret::Env(env),
104 EnvOrValue::Value(v) => Secret::Value(v),
105 }
106}
107
108fn convert_text_analysis(value: entities::TextAnalysis) -> TextAnalysis {
109 match value {
110 entities::TextAnalysis::Builtin => TextAnalysis::Builtin,
111 entities::TextAnalysis::Icu => TextAnalysis::Icu,
112 }
113}