Skip to main content

faucet_sink_snowflake/
config.rs

1//! Snowflake sink configuration.
2
3use faucet_core::{AuthSpec, DEFAULT_BATCH_SIZE};
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6use std::time::Duration;
7
8// Re-export the shared auth types so end-user imports remain stable
9// (`use faucet_sink_snowflake::SnowflakeAuth;` keeps working).
10pub use faucet_common_snowflake::SnowflakeAuth;
11
12/// Configuration for the Snowflake sink.
13#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
14pub struct SnowflakeSinkConfig {
15    /// Snowflake account identifier (e.g. `"xy12345.us-east-1"`).
16    pub account: String,
17    /// Warehouse to use for the session.
18    pub warehouse: String,
19    /// Database name.
20    pub database: String,
21    /// Schema name.
22    pub schema: String,
23    /// Target table name.
24    pub table: String,
25    /// Authentication: either inline (`{ type, config }`) or a `{ ref: <name> }`
26    /// pointer to a shared provider in the CLI's top-level `auth:` catalog.
27    /// A shared provider must yield a `Bearer` or `Token` credential, which
28    /// maps onto [`SnowflakeAuth::OAuth`]; key-pair JWT is always inline.
29    pub auth: AuthSpec<SnowflakeAuth>,
30    /// Maximum number of records sent per Snowflake SQL REST API request.
31    /// Defaults to [`DEFAULT_BATCH_SIZE`] (1000), which matches the
32    /// documented sweet spot for the SQL REST API.
33    ///
34    /// When `write_batch` is handed a slice larger than `batch_size`, the
35    /// sink re-chunks it into `batch_size` slices and issues one INSERT per
36    /// chunk. `batch_size = 0` is the **"no batching" sentinel** — the
37    /// records slice is forwarded as a single INSERT, no matter how large,
38    /// so upstream `StreamPage` framing flows through untouched.
39    #[serde(default = "default_batch_size")]
40    pub batch_size: usize,
41    /// Maximum wall-clock time to wait for an asynchronously-executed
42    /// INSERT to finish. Snowflake's SQL REST API answers an accepted but
43    /// not-yet-finished statement with HTTP 202 and a `statementHandle`;
44    /// the sink polls `GET /api/v2/statements/{handle}` until the statement
45    /// reports success before counting the rows as written. Without this
46    /// the sink would report success the moment Snowflake *accepted* the
47    /// statement, losing durability. Defaults to 300 seconds. Set to `0`
48    /// to poll indefinitely.
49    #[serde(
50        default = "default_poll_timeout",
51        with = "faucet_core::config::duration_secs"
52    )]
53    #[schemars(with = "u64")]
54    pub poll_timeout: Duration,
55}
56
57fn default_batch_size() -> usize {
58    DEFAULT_BATCH_SIZE
59}
60
61fn default_poll_timeout() -> Duration {
62    Duration::from_secs(300)
63}
64
65impl SnowflakeSinkConfig {
66    /// Create a new config with required fields and sensible defaults.
67    pub fn new(
68        account: impl Into<String>,
69        warehouse: impl Into<String>,
70        database: impl Into<String>,
71        schema: impl Into<String>,
72        table: impl Into<String>,
73        auth: SnowflakeAuth,
74    ) -> Self {
75        Self {
76            account: account.into(),
77            warehouse: warehouse.into(),
78            database: database.into(),
79            schema: schema.into(),
80            table: table.into(),
81            auth: AuthSpec::Inline(auth),
82            batch_size: DEFAULT_BATCH_SIZE,
83            poll_timeout: default_poll_timeout(),
84        }
85    }
86
87    /// Set the maximum wall-clock time spent polling an asynchronously
88    /// executed INSERT for completion. Pass `Duration::ZERO` to poll forever.
89    pub fn with_poll_timeout(mut self, timeout: Duration) -> Self {
90        self.poll_timeout = timeout;
91        self
92    }
93
94    /// Set the maximum number of records per Snowflake SQL REST API request.
95    ///
96    /// Pass `0` to opt out of re-chunking — the entire records slice handed
97    /// to `write_batch` is sent in a single INSERT request, preserving
98    /// upstream `StreamPage` framing.
99    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
100        self.batch_size = batch_size;
101        self
102    }
103}
104
105#[cfg(test)]
106mod tests {
107    use super::*;
108
109    fn sample_auth() -> SnowflakeAuth {
110        SnowflakeAuth::OAuth {
111            token: "tok".into(),
112        }
113    }
114
115    fn sample_config() -> SnowflakeSinkConfig {
116        SnowflakeSinkConfig::new(
117            "xy12345",
118            "COMPUTE_WH",
119            "MY_DB",
120            "PUBLIC",
121            "events",
122            sample_auth(),
123        )
124    }
125
126    #[test]
127    fn default_config() {
128        let config = sample_config();
129        assert_eq!(config.account, "xy12345");
130        assert_eq!(config.warehouse, "COMPUTE_WH");
131        assert_eq!(config.database, "MY_DB");
132        assert_eq!(config.schema, "PUBLIC");
133        assert_eq!(config.table, "events");
134        assert_eq!(config.poll_timeout, Duration::from_secs(300));
135    }
136
137    #[test]
138    fn batch_size_defaults_to_default_batch_size() {
139        let config = sample_config();
140        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
141    }
142
143    #[test]
144    fn with_batch_size_overrides_default() {
145        let config = sample_config().with_batch_size(250);
146        assert_eq!(config.batch_size, 250);
147    }
148
149    #[test]
150    fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
151        let config = sample_config().with_batch_size(0);
152        assert_eq!(config.batch_size, 0);
153        assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
154    }
155
156    #[test]
157    fn batch_size_above_max_is_rejected_by_validate_batch_size() {
158        let config = sample_config().with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
159        assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
160    }
161
162    #[test]
163    fn batch_size_deserializes_from_json() {
164        let json = r#"{
165            "account": "xy12345",
166            "warehouse": "COMPUTE_WH",
167            "database": "MY_DB",
168            "schema": "PUBLIC",
169            "table": "events",
170            "auth": {"type": "oauth", "config": {"token": "tok"}},
171            "batch_size": 250
172        }"#;
173        let config: SnowflakeSinkConfig = serde_json::from_str(json).unwrap();
174        assert_eq!(config.batch_size, 250);
175    }
176
177    #[test]
178    fn batch_size_defaults_when_absent_from_json() {
179        let json = r#"{
180            "account": "xy12345",
181            "warehouse": "COMPUTE_WH",
182            "database": "MY_DB",
183            "schema": "PUBLIC",
184            "table": "events",
185            "auth": {"type": "oauth", "config": {"token": "tok"}}
186        }"#;
187        let config: SnowflakeSinkConfig = serde_json::from_str(json).unwrap();
188        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
189    }
190}