1use crate::auth::Auth;
4use crate::pagination::PaginationStyle;
5use faucet_core::AuthSpec;
6use faucet_core::ReplicationMethod;
7use reqwest::{
8 Method,
9 header::{HeaderMap, HeaderName, HeaderValue},
10};
11use schemars::JsonSchema;
12use serde::{Deserialize, Serialize};
13use serde_json::Value;
14use std::collections::HashMap;
15use std::time::Duration;
16
17#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
19pub struct RestStreamConfig {
20 pub base_url: String,
22 pub path: String,
25 #[serde(with = "crate::serde_helpers::http_method")]
26 #[schemars(with = "String")]
27 pub method: Method,
28 pub auth: AuthSpec<Auth>,
31 #[serde(skip, default)]
32 pub headers: HeaderMap,
33 pub query_params: HashMap<String, String>,
34 pub body: Option<Value>,
35
36 pub pagination: PaginationStyle,
38 pub records_path: Option<String>,
39 pub max_pages: Option<usize>,
40 #[serde(with = "faucet_core::config::duration_secs_option", default)]
41 #[schemars(with = "Option<u64>")]
42 pub request_delay: Option<Duration>,
43
44 #[serde(with = "faucet_core::config::duration_secs_option", default)]
46 #[schemars(with = "Option<u64>")]
47 pub timeout: Option<Duration>,
48 pub max_retries: u32,
49 #[serde(with = "faucet_core::config::duration_secs")]
50 #[schemars(with = "u64")]
51 pub retry_backoff: Duration,
52 pub tolerated_http_errors: Vec<u16>,
55
56 pub replication_method: ReplicationMethod,
58 pub replication_key: Option<String>,
60 pub start_replication_value: Option<Value>,
63 pub state_key: Option<String>,
70
71 pub name: Option<String>,
74 pub primary_keys: Vec<String>,
76 pub schema: Option<Value>,
78 pub schema_sample_size: usize,
82
83 pub partitions: Vec<HashMap<String, Value>>,
88 pub partition_concurrency: Option<usize>,
91}
92
93impl Default for RestStreamConfig {
94 fn default() -> Self {
95 Self {
96 base_url: String::new(),
97 path: String::new(),
98 method: Method::GET,
99 auth: AuthSpec::Inline(Auth::None),
100 headers: HeaderMap::new(),
101 query_params: HashMap::new(),
102 body: None,
103 pagination: PaginationStyle::None,
104 records_path: None,
105 max_pages: Some(100),
106 request_delay: None,
107 timeout: Some(Duration::from_secs(30)),
108 max_retries: 3,
109 retry_backoff: Duration::from_secs(1),
110 tolerated_http_errors: Vec::new(),
111 replication_method: ReplicationMethod::FullTable,
112 replication_key: None,
113 start_replication_value: None,
114 state_key: None,
115 name: None,
116 primary_keys: Vec::new(),
117 schema: None,
118 schema_sample_size: 100,
119 partitions: Vec::new(),
120 partition_concurrency: None,
121 }
122 }
123}
124
125impl RestStreamConfig {
126 pub fn new(base_url: &str, path: &str) -> Self {
127 Self {
128 base_url: base_url.trim_end_matches('/').to_string(),
129 path: path.to_string(),
130 ..Default::default()
131 }
132 }
133
134 pub fn method(mut self, m: Method) -> Self {
137 self.method = m;
138 self
139 }
140
141 pub fn auth(mut self, a: Auth) -> Self {
142 self.auth = AuthSpec::Inline(a);
143 self
144 }
145
146 pub fn header(mut self, k: &str, v: &str) -> Self {
147 self.headers.insert(
148 HeaderName::from_bytes(k.as_bytes()).expect("invalid header name"),
149 HeaderValue::from_str(v).expect("invalid header value"),
150 );
151 self
152 }
153
154 pub fn query(mut self, k: &str, v: &str) -> Self {
155 self.query_params.insert(k.into(), v.into());
156 self
157 }
158
159 pub fn body(mut self, b: Value) -> Self {
160 self.body = Some(b);
161 self
162 }
163
164 pub fn pagination(mut self, p: PaginationStyle) -> Self {
167 self.pagination = p;
168 self
169 }
170
171 pub fn records_path(mut self, p: &str) -> Self {
172 self.records_path = Some(p.into());
173 self
174 }
175
176 pub fn max_pages(mut self, n: usize) -> Self {
177 self.max_pages = Some(n);
178 self
179 }
180
181 pub fn request_delay(mut self, d: Duration) -> Self {
182 self.request_delay = Some(d);
183 self
184 }
185
186 pub fn timeout(mut self, d: Duration) -> Self {
189 self.timeout = Some(d);
190 self
191 }
192
193 pub fn max_retries(mut self, n: u32) -> Self {
194 self.max_retries = n;
195 self
196 }
197
198 pub fn retry_backoff(mut self, d: Duration) -> Self {
199 self.retry_backoff = d;
200 self
201 }
202
203 pub fn tolerate_http_error(mut self, status: u16) -> Self {
205 self.tolerated_http_errors.push(status);
206 self
207 }
208
209 pub fn replication_method(mut self, m: ReplicationMethod) -> Self {
212 self.replication_method = m;
213 self
214 }
215
216 pub fn replication_key(mut self, key: &str) -> Self {
218 self.replication_key = Some(key.into());
219 self
220 }
221
222 pub fn start_replication_value(mut self, v: Value) -> Self {
225 self.start_replication_value = Some(v);
226 self
227 }
228
229 pub fn state_key(mut self, key: &str) -> Self {
234 self.state_key = Some(key.into());
235 self
236 }
237
238 pub fn name(mut self, n: &str) -> Self {
242 self.name = Some(n.into());
243 self
244 }
245
246 pub fn primary_keys(mut self, keys: Vec<String>) -> Self {
248 self.primary_keys = keys;
249 self
250 }
251
252 pub fn schema(mut self, s: Value) -> Self {
254 self.schema = Some(s);
255 self
256 }
257
258 pub fn schema_sample_size(mut self, n: usize) -> Self {
260 self.schema_sample_size = n;
261 self
262 }
263
264 pub fn add_partition(mut self, ctx: HashMap<String, Value>) -> Self {
269 self.partitions.push(ctx);
270 self
271 }
272
273 pub fn partition_concurrency(mut self, concurrency: Option<usize>) -> Self {
276 self.partition_concurrency = concurrency;
277 self
278 }
279}