Skip to main content

schema/deployment/
conversion.rs

1//! Lifting the parsed `flusso.toml` ([`ConfigToml`]) into the assembled
2//! [`Config`].
3//!
4//! The toml parser ([`schema_config_toml`]) produces neutral entity types that
5//! mirror the file; turning those into a `Config` is a composition step, so it
6//! lives here next to `Config` rather than in the parser. Secrets are **not**
7//! resolved here — a `{ env = "VAR" }` / literal becomes a deferred
8//! [`Secret`], read in the environment that runs the
9//! pipeline. The `index` entries are left empty; the loader fills them in by
10//! reading each referenced YAML schema.
11
12use std::collections::BTreeMap;
13
14use schema_config_toml::{ConfigToml, EnvOrValue, entities};
15use schema_core::{
16    ConnectionSpec, OpensearchSink, Secret, StdoutSink, TextAnalysis, common::SourceType,
17};
18
19use super::{Config, ServerConfig, Sink, Source};
20
21/// Infallible (secrets are deferred, URLs validated at resolution time), so this
22/// is a `From`; the blanket impl still gives callers a `TryFrom<ConfigToml>`.
23impl From<ConfigToml> for Config {
24    fn from(toml: ConfigToml) -> Self {
25        let source = convert_source(toml.source);
26        let sinks = toml
27            .sinks
28            .into_iter()
29            .map(|(name, sink)| (name, convert_sink(sink)))
30            .collect();
31
32        Config {
33            source,
34            sinks,
35            indexes: BTreeMap::new(),
36            on_error: toml.on_error,
37            server: ServerConfig {
38                public_address: toml.server.public_address,
39                private_address: toml.server.private_address,
40            },
41            prefix: toml.prefix,
42        }
43    }
44}
45
46fn convert_source(source: entities::Source) -> Source {
47    match source {
48        entities::Source::Postgres(pg) => Source {
49            source_type: SourceType::Postgres,
50            connection: pg.connection_url.map(convert_connection_spec),
51            manage_publication: pg.manage_publication.unwrap_or(true),
52        },
53    }
54}
55
56/// Map a parsed connection form into the deferred core [`ConnectionSpec`].
57/// Nothing is resolved here — `{ env = "X" }` becomes a [`Secret::Env`] and a
58/// literal a [`Secret::Value`], read in the environment that runs the pipeline.
59fn convert_connection_spec(url: entities::ConnectionUrl) -> ConnectionSpec {
60    match url {
61        entities::ConnectionUrl::Url(ev) => ConnectionSpec::Url(to_secret(ev)),
62        entities::ConnectionUrl::Parts {
63            host,
64            port,
65            user,
66            password,
67            database,
68        } => ConnectionSpec::Parts {
69            host,
70            port,
71            user,
72            password: password.map(to_secret),
73            database,
74        },
75    }
76}
77
78fn convert_sink(sink: entities::Sink) -> Sink {
79    match sink {
80        entities::Sink::Opensearch(s) => Sink::Opensearch(OpensearchSink {
81            url: to_secret(s.url),
82            username: s.username.map(to_secret),
83            password: s.password.map(to_secret),
84            tls_verify: s.tls_verify,
85            batch_size: s.batch_size,
86            max_bytes: s.max_bytes,
87            timeout_secs: s.timeout_secs,
88            max_retries: s.max_retries,
89            pipeline: s.pipeline,
90            number_of_shards: s.number_of_shards,
91            number_of_replicas: s.number_of_replicas,
92            refresh_interval: s.refresh_interval,
93            text_analysis: convert_text_analysis(s.text_analysis),
94            auto_subfields: s.auto_subfields,
95        }),
96        entities::Sink::Stdout(s) => Sink::Stdout(StdoutSink { pretty: s.pretty }),
97    }
98}
99
100/// A parsed `{ env = "X" }` / literal becomes a deferred [`Secret`].
101fn to_secret(value: EnvOrValue) -> Secret {
102    match value {
103        EnvOrValue::Env { env } => Secret::Env(env),
104        EnvOrValue::Value(v) => Secret::Value(v),
105    }
106}
107
108fn convert_text_analysis(value: entities::TextAnalysis) -> TextAnalysis {
109    match value {
110        entities::TextAnalysis::Builtin => TextAnalysis::Builtin,
111        entities::TextAnalysis::Icu => TextAnalysis::Icu,
112    }
113}