1use crate::auth::Auth;
4use crate::pagination::PaginationStyle;
5use faucet_core::RecordTransform;
6use faucet_core::ReplicationMethod;
7use reqwest::{
8 Method,
9 header::{HeaderMap, HeaderName, HeaderValue},
10};
11use schemars::JsonSchema;
12use serde::{Deserialize, Serialize};
13use serde_json::Value;
14use std::collections::HashMap;
15use std::time::Duration;
16
17#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
19pub struct RestStreamConfig {
20 pub base_url: String,
22 pub path: String,
25 #[serde(with = "crate::serde_helpers::http_method")]
26 #[schemars(with = "String")]
27 pub method: Method,
28 pub auth: Auth,
29 #[serde(skip, default)]
30 pub headers: HeaderMap,
31 pub query_params: HashMap<String, String>,
32 pub body: Option<Value>,
33
34 pub pagination: PaginationStyle,
36 pub records_path: Option<String>,
37 pub max_pages: Option<usize>,
38 #[serde(with = "faucet_core::config::duration_secs_option", default)]
39 #[schemars(with = "Option<u64>")]
40 pub request_delay: Option<Duration>,
41
42 #[serde(with = "faucet_core::config::duration_secs_option", default)]
44 #[schemars(with = "Option<u64>")]
45 pub timeout: Option<Duration>,
46 pub max_retries: u32,
47 #[serde(with = "faucet_core::config::duration_secs")]
48 #[schemars(with = "u64")]
49 pub retry_backoff: Duration,
50 pub tolerated_http_errors: Vec<u16>,
53
54 pub replication_method: ReplicationMethod,
56 pub replication_key: Option<String>,
58 pub start_replication_value: Option<Value>,
61
62 pub name: Option<String>,
65 pub primary_keys: Vec<String>,
67 pub schema: Option<Value>,
69 pub schema_sample_size: usize,
73
74 pub partitions: Vec<HashMap<String, Value>>,
79 pub partition_concurrency: Option<usize>,
82
83 #[serde(skip, default)]
87 pub transforms: Vec<RecordTransform>,
88}
89
90impl Default for RestStreamConfig {
91 fn default() -> Self {
92 Self {
93 base_url: String::new(),
94 path: String::new(),
95 method: Method::GET,
96 auth: Auth::None,
97 headers: HeaderMap::new(),
98 query_params: HashMap::new(),
99 body: None,
100 pagination: PaginationStyle::None,
101 records_path: None,
102 max_pages: Some(100),
103 request_delay: None,
104 timeout: Some(Duration::from_secs(30)),
105 max_retries: 3,
106 retry_backoff: Duration::from_secs(1),
107 tolerated_http_errors: Vec::new(),
108 replication_method: ReplicationMethod::FullTable,
109 replication_key: None,
110 start_replication_value: None,
111 name: None,
112 primary_keys: Vec::new(),
113 schema: None,
114 schema_sample_size: 100,
115 partitions: Vec::new(),
116 partition_concurrency: None,
117 transforms: Vec::new(),
118 }
119 }
120}
121
122impl RestStreamConfig {
123 pub fn new(base_url: &str, path: &str) -> Self {
124 Self {
125 base_url: base_url.trim_end_matches('/').to_string(),
126 path: path.to_string(),
127 ..Default::default()
128 }
129 }
130
131 pub fn method(mut self, m: Method) -> Self {
134 self.method = m;
135 self
136 }
137
138 pub fn auth(mut self, a: Auth) -> Self {
139 self.auth = a;
140 self
141 }
142
143 pub fn header(mut self, k: &str, v: &str) -> Self {
144 self.headers.insert(
145 HeaderName::from_bytes(k.as_bytes()).expect("invalid header name"),
146 HeaderValue::from_str(v).expect("invalid header value"),
147 );
148 self
149 }
150
151 pub fn query(mut self, k: &str, v: &str) -> Self {
152 self.query_params.insert(k.into(), v.into());
153 self
154 }
155
156 pub fn body(mut self, b: Value) -> Self {
157 self.body = Some(b);
158 self
159 }
160
161 pub fn pagination(mut self, p: PaginationStyle) -> Self {
164 self.pagination = p;
165 self
166 }
167
168 pub fn records_path(mut self, p: &str) -> Self {
169 self.records_path = Some(p.into());
170 self
171 }
172
173 pub fn max_pages(mut self, n: usize) -> Self {
174 self.max_pages = Some(n);
175 self
176 }
177
178 pub fn request_delay(mut self, d: Duration) -> Self {
179 self.request_delay = Some(d);
180 self
181 }
182
183 pub fn timeout(mut self, d: Duration) -> Self {
186 self.timeout = Some(d);
187 self
188 }
189
190 pub fn max_retries(mut self, n: u32) -> Self {
191 self.max_retries = n;
192 self
193 }
194
195 pub fn retry_backoff(mut self, d: Duration) -> Self {
196 self.retry_backoff = d;
197 self
198 }
199
200 pub fn tolerate_http_error(mut self, status: u16) -> Self {
202 self.tolerated_http_errors.push(status);
203 self
204 }
205
206 pub fn replication_method(mut self, m: ReplicationMethod) -> Self {
209 self.replication_method = m;
210 self
211 }
212
213 pub fn replication_key(mut self, key: &str) -> Self {
215 self.replication_key = Some(key.into());
216 self
217 }
218
219 pub fn start_replication_value(mut self, v: Value) -> Self {
222 self.start_replication_value = Some(v);
223 self
224 }
225
226 pub fn name(mut self, n: &str) -> Self {
230 self.name = Some(n.into());
231 self
232 }
233
234 pub fn primary_keys(mut self, keys: Vec<String>) -> Self {
236 self.primary_keys = keys;
237 self
238 }
239
240 pub fn schema(mut self, s: Value) -> Self {
242 self.schema = Some(s);
243 self
244 }
245
246 pub fn schema_sample_size(mut self, n: usize) -> Self {
248 self.schema_sample_size = n;
249 self
250 }
251
252 pub fn add_partition(mut self, ctx: HashMap<String, Value>) -> Self {
257 self.partitions.push(ctx);
258 self
259 }
260
261 pub fn partition_concurrency(mut self, concurrency: Option<usize>) -> Self {
264 self.partition_concurrency = concurrency;
265 self
266 }
267
268 pub fn add_transform(mut self, t: RecordTransform) -> Self {
274 self.transforms.push(t);
275 self
276 }
277}