Skip to main content

schema/deployment/
conversion.rs

1//! Lifting the parsed `flusso.toml` ([`ConfigToml`]) into the assembled
2//! [`Config`].
3//!
4//! The toml parser ([`schema_config_toml`]) produces neutral entity types that
5//! mirror the file; turning those into a `Config` is a composition step, so it
6//! lives here next to `Config` rather than in the parser. Secrets are **not**
7//! resolved here — a `{ env = "VAR" }` / literal becomes a deferred
8//! [`Secret`], read in the environment that runs the
9//! pipeline. The `index` entries are left empty; the loader fills them in by
10//! reading each referenced YAML schema.
11
12use std::collections::BTreeMap;
13
14use schema_config_toml::{ConfigToml, EnvOrValue, entities};
15use schema_core::{
16    ConnectionSpec, OpensearchSink, Secret, StdoutSink, TextAnalysis, common::SourceType,
17};
18
19use super::{Config, ServerConfig, Sink, Source};
20
21/// Infallible (secrets are deferred, URLs validated at resolution time), so this
22/// is a `From`; the blanket impl still gives callers a `TryFrom<ConfigToml>`.
23impl From<ConfigToml> for Config {
24    fn from(toml: ConfigToml) -> Self {
25        let source = convert_source(toml.source);
26        let sinks = toml
27            .sinks
28            .into_iter()
29            .map(|(name, sink)| (name, convert_sink(sink)))
30            .collect();
31
32        Config {
33            source,
34            sinks,
35            indexes: BTreeMap::new(),
36            on_error: toml.on_error,
37            server: ServerConfig {
38                public_address: toml.server.public_address,
39                private_address: toml.server.private_address,
40            },
41        }
42    }
43}
44
45fn convert_source(source: entities::Source) -> Source {
46    match source {
47        entities::Source::Postgres(pg) => Source {
48            source_type: SourceType::Postgres,
49            connection: pg.connection_url.map(convert_connection_spec),
50        },
51    }
52}
53
54/// Map a parsed connection form into the deferred core [`ConnectionSpec`].
55/// Nothing is resolved here — `{ env = "X" }` becomes a [`Secret::Env`] and a
56/// literal a [`Secret::Value`], read in the environment that runs the pipeline.
57fn convert_connection_spec(url: entities::ConnectionUrl) -> ConnectionSpec {
58    match url {
59        entities::ConnectionUrl::Url(ev) => ConnectionSpec::Url(to_secret(ev)),
60        entities::ConnectionUrl::Parts {
61            host,
62            port,
63            user,
64            password,
65            database,
66        } => ConnectionSpec::Parts {
67            host,
68            port,
69            user,
70            password: password.map(to_secret),
71            database,
72        },
73    }
74}
75
76fn convert_sink(sink: entities::Sink) -> Sink {
77    match sink {
78        entities::Sink::Opensearch(s) => Sink::Opensearch(OpensearchSink {
79            url: to_secret(s.url),
80            username: s.username.map(to_secret),
81            password: s.password.map(to_secret),
82            tls_verify: s.tls_verify,
83            batch_size: s.batch_size,
84            max_bytes: s.max_bytes,
85            timeout_secs: s.timeout_secs,
86            max_retries: s.max_retries,
87            pipeline: s.pipeline,
88            number_of_shards: s.number_of_shards,
89            number_of_replicas: s.number_of_replicas,
90            refresh_interval: s.refresh_interval,
91            text_analysis: convert_text_analysis(s.text_analysis),
92            auto_subfields: s.auto_subfields,
93        }),
94        entities::Sink::Stdout(s) => Sink::Stdout(StdoutSink { pretty: s.pretty }),
95    }
96}
97
98/// A parsed `{ env = "X" }` / literal becomes a deferred [`Secret`].
99fn to_secret(value: EnvOrValue) -> Secret {
100    match value {
101        EnvOrValue::Env { env } => Secret::Env(env),
102        EnvOrValue::Value(v) => Secret::Value(v),
103    }
104}
105
106fn convert_text_analysis(value: entities::TextAnalysis) -> TextAnalysis {
107    match value {
108        entities::TextAnalysis::Builtin => TextAnalysis::Builtin,
109        entities::TextAnalysis::Icu => TextAnalysis::Icu,
110    }
111}