faucet_sink_snowflake/
config.rs1use faucet_core::{AuthSpec, DEFAULT_BATCH_SIZE};
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6use std::time::Duration;
7
8pub use faucet_common_snowflake::SnowflakeAuth;
11
12#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
14pub struct SnowflakeSinkConfig {
15 pub account: String,
17 pub warehouse: String,
19 pub database: String,
21 pub schema: String,
23 pub table: String,
25 pub auth: AuthSpec<SnowflakeAuth>,
30 #[serde(default = "default_batch_size")]
40 pub batch_size: usize,
41 #[serde(
50 default = "default_poll_timeout",
51 with = "faucet_core::config::duration_secs"
52 )]
53 #[schemars(with = "u64")]
54 pub poll_timeout: Duration,
55}
56
57fn default_batch_size() -> usize {
58 DEFAULT_BATCH_SIZE
59}
60
61fn default_poll_timeout() -> Duration {
62 Duration::from_secs(300)
63}
64
65impl SnowflakeSinkConfig {
66 pub fn new(
68 account: impl Into<String>,
69 warehouse: impl Into<String>,
70 database: impl Into<String>,
71 schema: impl Into<String>,
72 table: impl Into<String>,
73 auth: SnowflakeAuth,
74 ) -> Self {
75 Self {
76 account: account.into(),
77 warehouse: warehouse.into(),
78 database: database.into(),
79 schema: schema.into(),
80 table: table.into(),
81 auth: AuthSpec::Inline(auth),
82 batch_size: DEFAULT_BATCH_SIZE,
83 poll_timeout: default_poll_timeout(),
84 }
85 }
86
87 pub fn with_poll_timeout(mut self, timeout: Duration) -> Self {
90 self.poll_timeout = timeout;
91 self
92 }
93
94 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
100 self.batch_size = batch_size;
101 self
102 }
103}
104
105#[cfg(test)]
106mod tests {
107 use super::*;
108
109 fn sample_auth() -> SnowflakeAuth {
110 SnowflakeAuth::OAuth {
111 token: "tok".into(),
112 }
113 }
114
115 fn sample_config() -> SnowflakeSinkConfig {
116 SnowflakeSinkConfig::new(
117 "xy12345",
118 "COMPUTE_WH",
119 "MY_DB",
120 "PUBLIC",
121 "events",
122 sample_auth(),
123 )
124 }
125
126 #[test]
127 fn default_config() {
128 let config = sample_config();
129 assert_eq!(config.account, "xy12345");
130 assert_eq!(config.warehouse, "COMPUTE_WH");
131 assert_eq!(config.database, "MY_DB");
132 assert_eq!(config.schema, "PUBLIC");
133 assert_eq!(config.table, "events");
134 assert_eq!(config.poll_timeout, Duration::from_secs(300));
135 }
136
137 #[test]
138 fn batch_size_defaults_to_default_batch_size() {
139 let config = sample_config();
140 assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
141 }
142
143 #[test]
144 fn with_batch_size_overrides_default() {
145 let config = sample_config().with_batch_size(250);
146 assert_eq!(config.batch_size, 250);
147 }
148
149 #[test]
150 fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
151 let config = sample_config().with_batch_size(0);
152 assert_eq!(config.batch_size, 0);
153 assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
154 }
155
156 #[test]
157 fn batch_size_above_max_is_rejected_by_validate_batch_size() {
158 let config = sample_config().with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
159 assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
160 }
161
162 #[test]
163 fn batch_size_deserializes_from_json() {
164 let json = r#"{
165 "account": "xy12345",
166 "warehouse": "COMPUTE_WH",
167 "database": "MY_DB",
168 "schema": "PUBLIC",
169 "table": "events",
170 "auth": {"type": "oauth", "config": {"token": "tok"}},
171 "batch_size": 250
172 }"#;
173 let config: SnowflakeSinkConfig = serde_json::from_str(json).unwrap();
174 assert_eq!(config.batch_size, 250);
175 }
176
177 #[test]
178 fn batch_size_defaults_when_absent_from_json() {
179 let json = r#"{
180 "account": "xy12345",
181 "warehouse": "COMPUTE_WH",
182 "database": "MY_DB",
183 "schema": "PUBLIC",
184 "table": "events",
185 "auth": {"type": "oauth", "config": {"token": "tok"}}
186 }"#;
187 let config: SnowflakeSinkConfig = serde_json::from_str(json).unwrap();
188 assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
189 }
190}