Skip to main content

schema_core/config/
sink.rs

1use serde::{Deserialize, Serialize};
2
3use crate::common::{HttpUrl, SinkName};
4
5use super::{ResolveError, Secret, http_url, resolve_optional, resolve_required, sink_var_prefix};
6
7/// Per-backend configuration for an OpenSearch destination. The `Sink` enum that
8/// selects between this and [`StdoutSink`] is a composition concern and lives in
9/// the `schema` crate; the backend sinks read these settings directly.
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct OpensearchSink {
12    /// Cluster URL, resolved at runtime (`<NAME>_OPENSEARCH_URL` overrides).
13    pub url: Secret,
14    /// Basic-auth user, resolved at runtime (`<NAME>_OPENSEARCH_USERNAME`).
15    #[serde(default, skip_serializing_if = "Option::is_none")]
16    pub username: Option<Secret>,
17    /// Basic-auth password, resolved at runtime (`<NAME>_OPENSEARCH_PASSWORD`).
18    #[serde(default, skip_serializing_if = "Option::is_none")]
19    pub password: Option<Secret>,
20    /// Verify TLS certificates. Set false for local dev. Default: true.
21    pub tls_verify: bool,
22    /// Documents per bulk request. Default: 1000.
23    pub batch_size: u32,
24    /// Maximum serialized size of a single bulk request, in bytes. A flush is
25    /// split so no request exceeds this, independent of `batch_size`, keeping
26    /// requests under OpenSearch's `http.max_content_length` (100 MB default).
27    /// Default: 10 MiB. A single document larger than this is sent on its own.
28    pub max_bytes: u64,
29    /// Request timeout in seconds. Default: 30.
30    pub timeout_secs: u64,
31    /// Transient failure retries. Default: 3.
32    pub max_retries: u32,
33    /// Optional ingest pipeline applied on index.
34    #[serde(default, skip_serializing_if = "Option::is_none")]
35    pub pipeline: Option<String>,
36    /// Primary shards for each created index. Default: 1.
37    pub number_of_shards: u32,
38    /// Replica shards for each created index. Default: 1.
39    pub number_of_replicas: u32,
40    /// OpenSearch `refresh_interval` applied to each index once its backfill
41    /// completes — the steady-state visibility ceiling (e.g. `"10s"`, `"1s"`,
42    /// or `"-1"` to disable automatic refresh). Indexes are seeded with refresh
43    /// off (`-1`) and handed this value afterwards. flusso forces an immediate
44    /// refresh on any flush that catches the pipeline up, so this only bounds
45    /// search staleness while a backlog is draining. Default: `"10s"`.
46    pub refresh_interval: String,
47    /// Which analysis backend the built-in `flusso_*` analyzers use. Default:
48    /// [`Builtin`](TextAnalysis::Builtin).
49    pub text_analysis: TextAnalysis,
50    /// Whether the sink automatically enriches `text`/`keyword` fields with a
51    /// good analyzer and the `keyword` / `keyword_lowercase` / `text` subfields.
52    /// A field's explicit mapping always wins. Default: true.
53    pub auto_subfields: bool,
54}
55
56impl OpensearchSink {
57    /// Resolve the cluster URL in the current environment, applying the
58    /// `<NAME>_OPENSEARCH_URL` deployment override for the sink named `name`.
59    pub fn resolve_url(&self, name: &SinkName) -> Result<HttpUrl, ResolveError> {
60        let var = format!("{}_OPENSEARCH_URL", sink_var_prefix(name));
61        http_url(resolve_required(&self.url, &var)?)
62    }
63
64    /// Resolve the basic-auth username, applying `<NAME>_OPENSEARCH_USERNAME`.
65    pub fn resolve_username(&self, name: &SinkName) -> Result<Option<String>, ResolveError> {
66        let var = format!("{}_OPENSEARCH_USERNAME", sink_var_prefix(name));
67        resolve_optional(self.username.as_ref(), &var)
68    }
69
70    /// Resolve the basic-auth password, applying `<NAME>_OPENSEARCH_PASSWORD`.
71    pub fn resolve_password(&self, name: &SinkName) -> Result<Option<String>, ResolveError> {
72        let var = format!("{}_OPENSEARCH_PASSWORD", sink_var_prefix(name));
73        resolve_optional(self.password.as_ref(), &var)
74    }
75}
76
77/// Which analyzer toolkit the sink wires its `flusso_*` analyzers onto.
78#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
79#[serde(rename_all = "snake_case")]
80pub enum TextAnalysis {
81    /// Built-in OpenSearch components only — works on any cluster with no
82    /// plugins. Accent/case folding via `asciifolding` + `lowercase`.
83    Builtin,
84    /// Use the `analysis-icu` plugin (`icu_tokenizer` / `icu_folding` /
85    /// `icu_normalizer`) for stronger multilingual handling. Requires the plugin
86    /// to be installed on every node, or index creation fails.
87    Icu,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct StdoutSink {
92    /// Pretty-print JSON output. Default: false.
93    pub pretty: bool,
94}