schema_core/config/sink.rs
1use serde::{Deserialize, Serialize};
2
3use crate::common::{HttpUrl, SinkName};
4
5use super::{ResolveError, Secret, http_url, resolve_optional, resolve_required, sink_var_prefix};
6
7/// Per-backend configuration for an OpenSearch destination. The `Sink` enum that
8/// selects between this and [`StdoutSink`] is a composition concern and lives in
9/// the `schema` crate; the backend sinks read these settings directly.
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct OpensearchSink {
12 /// Cluster URL, resolved at runtime (`<NAME>_OPENSEARCH_URL` overrides).
13 pub url: Secret,
14 /// Basic-auth user, resolved at runtime (`<NAME>_OPENSEARCH_USERNAME`).
15 #[serde(default, skip_serializing_if = "Option::is_none")]
16 pub username: Option<Secret>,
17 /// Basic-auth password, resolved at runtime (`<NAME>_OPENSEARCH_PASSWORD`).
18 #[serde(default, skip_serializing_if = "Option::is_none")]
19 pub password: Option<Secret>,
20 /// Verify TLS certificates. Set false for local dev. Default: true.
21 pub tls_verify: bool,
22 /// Documents per bulk request. Default: 1000.
23 pub batch_size: u32,
24 /// Maximum serialized size of a single bulk request, in bytes. A flush is
25 /// split so no request exceeds this, independent of `batch_size`, keeping
26 /// requests under OpenSearch's `http.max_content_length` (100 MB default).
27 /// Default: 10 MiB. A single document larger than this is sent on its own.
28 pub max_bytes: u64,
29 /// Request timeout in seconds. Default: 30.
30 pub timeout_secs: u64,
31 /// Transient failure retries. Default: 3.
32 pub max_retries: u32,
33 /// Optional ingest pipeline applied on index.
34 #[serde(default, skip_serializing_if = "Option::is_none")]
35 pub pipeline: Option<String>,
36 /// Primary shards for each created index. Default: 1.
37 pub number_of_shards: u32,
38 /// Replica shards for each created index. Default: 1.
39 pub number_of_replicas: u32,
40 /// OpenSearch `refresh_interval` applied to each index once its backfill
41 /// completes — the steady-state visibility ceiling (e.g. `"10s"`, `"1s"`,
42 /// or `"-1"` to disable automatic refresh). Indexes are seeded with refresh
43 /// off (`-1`) and handed this value afterwards. flusso forces an immediate
44 /// refresh on any flush that catches the pipeline up, so this only bounds
45 /// search staleness while a backlog is draining. Default: `"10s"`.
46 pub refresh_interval: String,
47 /// Which analysis backend the built-in `flusso_*` analyzers use. Default:
48 /// [`Builtin`](TextAnalysis::Builtin).
49 pub text_analysis: TextAnalysis,
50 /// Whether the sink automatically enriches `text`/`keyword` fields with a
51 /// good analyzer and the `keyword` / `keyword_lowercase` / `text` subfields.
52 /// A field's explicit mapping always wins. Default: true.
53 pub auto_subfields: bool,
54}
55
56impl OpensearchSink {
57 /// Resolve the cluster URL in the current environment, applying the
58 /// `<NAME>_OPENSEARCH_URL` deployment override for the sink named `name`.
59 pub fn resolve_url(&self, name: &SinkName) -> Result<HttpUrl, ResolveError> {
60 let var = format!("{}_OPENSEARCH_URL", sink_var_prefix(name));
61 http_url(resolve_required(&self.url, &var)?)
62 }
63
64 /// Resolve the basic-auth username, applying `<NAME>_OPENSEARCH_USERNAME`.
65 pub fn resolve_username(&self, name: &SinkName) -> Result<Option<String>, ResolveError> {
66 let var = format!("{}_OPENSEARCH_USERNAME", sink_var_prefix(name));
67 resolve_optional(self.username.as_ref(), &var)
68 }
69
70 /// Resolve the basic-auth password, applying `<NAME>_OPENSEARCH_PASSWORD`.
71 pub fn resolve_password(&self, name: &SinkName) -> Result<Option<String>, ResolveError> {
72 let var = format!("{}_OPENSEARCH_PASSWORD", sink_var_prefix(name));
73 resolve_optional(self.password.as_ref(), &var)
74 }
75}
76
77/// Which analyzer toolkit the sink wires its `flusso_*` analyzers onto.
78#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
79#[serde(rename_all = "snake_case")]
80pub enum TextAnalysis {
81 /// Built-in OpenSearch components only — works on any cluster with no
82 /// plugins. Accent/case folding via `asciifolding` + `lowercase`.
83 Builtin,
84 /// Use the `analysis-icu` plugin (`icu_tokenizer` / `icu_folding` /
85 /// `icu_normalizer`) for stronger multilingual handling. Requires the plugin
86 /// to be installed on every node, or index creation fails.
87 Icu,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct StdoutSink {
92 /// Pretty-print JSON output. Default: false.
93 pub pretty: bool,
94}