Skip to main content

schema/deployment/
conversion.rs

1//! Lifting the parsed `flusso.toml` ([`ConfigToml`]) into the assembled
2//! [`Config`].
3//!
4//! The toml parser ([`schema_config_toml`]) produces neutral entity types that
5//! mirror the file; turning those into a `Config` is a composition step, so it
6//! lives here next to `Config` rather than in the parser. Secrets are **not**
7//! resolved here — a `{ env = "VAR" }` / literal becomes a deferred
8//! [`Secret`], read in the environment that runs the
9//! pipeline. The `index` entries are left empty; the loader fills them in by
10//! reading each referenced YAML schema.
11
12use std::collections::BTreeMap;
13
14use schema_config_toml::{ConfigToml, EnvOrValue, entities};
15use schema_core::{
16    ConnectionSpec, OpensearchSink, Secret, StdoutSink, TextAnalysis, common::SourceType,
17};
18
19use super::{Config, ServerConfig, Sink, Source};
20
21/// Infallible (secrets are deferred, URLs validated at resolution time), so this
22/// is a `From`; the blanket impl still gives callers a `TryFrom<ConfigToml>`.
23impl From<ConfigToml> for Config {
24    fn from(toml: ConfigToml) -> Self {
25        let source = convert_source(toml.source);
26        let sinks = toml
27            .sinks
28            .into_iter()
29            .map(|(name, sink)| (name, convert_sink(sink)))
30            .collect();
31
32        Config {
33            source,
34            sinks,
35            indexes: BTreeMap::new(),
36            on_error: toml.on_error,
37            server: ServerConfig {
38                public_address: toml.server.public_address,
39                private_address: toml.server.private_address,
40            },
41        }
42    }
43}
44
45fn convert_source(source: entities::Source) -> Source {
46    match source {
47        entities::Source::Postgres(pg) => Source {
48            source_type: SourceType::Postgres,
49            connection: pg.connection_url.map(convert_connection_spec),
50            manage_publication: pg.manage_publication.unwrap_or(true),
51        },
52    }
53}
54
55/// Map a parsed connection form into the deferred core [`ConnectionSpec`].
56/// Nothing is resolved here — `{ env = "X" }` becomes a [`Secret::Env`] and a
57/// literal a [`Secret::Value`], read in the environment that runs the pipeline.
58fn convert_connection_spec(url: entities::ConnectionUrl) -> ConnectionSpec {
59    match url {
60        entities::ConnectionUrl::Url(ev) => ConnectionSpec::Url(to_secret(ev)),
61        entities::ConnectionUrl::Parts {
62            host,
63            port,
64            user,
65            password,
66            database,
67        } => ConnectionSpec::Parts {
68            host,
69            port,
70            user,
71            password: password.map(to_secret),
72            database,
73        },
74    }
75}
76
77fn convert_sink(sink: entities::Sink) -> Sink {
78    match sink {
79        entities::Sink::Opensearch(s) => Sink::Opensearch(OpensearchSink {
80            url: to_secret(s.url),
81            username: s.username.map(to_secret),
82            password: s.password.map(to_secret),
83            tls_verify: s.tls_verify,
84            batch_size: s.batch_size,
85            max_bytes: s.max_bytes,
86            timeout_secs: s.timeout_secs,
87            max_retries: s.max_retries,
88            pipeline: s.pipeline,
89            number_of_shards: s.number_of_shards,
90            number_of_replicas: s.number_of_replicas,
91            refresh_interval: s.refresh_interval,
92            text_analysis: convert_text_analysis(s.text_analysis),
93            auto_subfields: s.auto_subfields,
94        }),
95        entities::Sink::Stdout(s) => Sink::Stdout(StdoutSink { pretty: s.pretty }),
96    }
97}
98
99/// A parsed `{ env = "X" }` / literal becomes a deferred [`Secret`].
100fn to_secret(value: EnvOrValue) -> Secret {
101    match value {
102        EnvOrValue::Env { env } => Secret::Env(env),
103        EnvOrValue::Value(v) => Secret::Value(v),
104    }
105}
106
107fn convert_text_analysis(value: entities::TextAnalysis) -> TextAnalysis {
108    match value {
109        entities::TextAnalysis::Builtin => TextAnalysis::Builtin,
110        entities::TextAnalysis::Icu => TextAnalysis::Icu,
111    }
112}