1use crate::auth::Auth;
4use crate::pagination::PaginationStyle;
5use crate::replication::ReplicationMethod;
6use crate::transform::RecordTransform;
7use reqwest::{
8 Method,
9 header::{HeaderMap, HeaderName, HeaderValue},
10};
11use serde_json::Value;
12use std::collections::HashMap;
13use std::time::Duration;
14
15#[derive(Debug, Clone)]
17pub struct RestStreamConfig {
18 pub base_url: String,
20 pub path: String,
23 pub method: Method,
24 pub auth: Auth,
25 pub headers: HeaderMap,
26 pub query_params: HashMap<String, String>,
27 pub body: Option<Value>,
28
29 pub pagination: PaginationStyle,
31 pub records_path: Option<String>,
32 pub max_pages: Option<usize>,
33 pub request_delay: Option<Duration>,
34
35 pub timeout: Option<Duration>,
37 pub max_retries: u32,
38 pub retry_backoff: Duration,
39 pub tolerated_http_errors: Vec<u16>,
42
43 pub replication_method: ReplicationMethod,
45 pub replication_key: Option<String>,
47 pub start_replication_value: Option<Value>,
50
51 pub name: Option<String>,
54 pub primary_keys: Vec<String>,
56 pub schema: Option<Value>,
58 pub schema_sample_size: usize,
62
63 pub partitions: Vec<HashMap<String, Value>>,
68
69 pub transforms: Vec<RecordTransform>,
73}
74
75impl Default for RestStreamConfig {
76 fn default() -> Self {
77 Self {
78 base_url: String::new(),
79 path: String::new(),
80 method: Method::GET,
81 auth: Auth::None,
82 headers: HeaderMap::new(),
83 query_params: HashMap::new(),
84 body: None,
85 pagination: PaginationStyle::None,
86 records_path: None,
87 max_pages: Some(100),
88 request_delay: None,
89 timeout: Some(Duration::from_secs(30)),
90 max_retries: 3,
91 retry_backoff: Duration::from_secs(1),
92 tolerated_http_errors: Vec::new(),
93 replication_method: ReplicationMethod::FullTable,
94 replication_key: None,
95 start_replication_value: None,
96 name: None,
97 primary_keys: Vec::new(),
98 schema: None,
99 schema_sample_size: 100,
100 partitions: Vec::new(),
101 transforms: Vec::new(),
102 }
103 }
104}
105
106impl RestStreamConfig {
107 pub fn new(base_url: &str, path: &str) -> Self {
108 Self {
109 base_url: base_url.trim_end_matches('/').to_string(),
110 path: path.to_string(),
111 ..Default::default()
112 }
113 }
114
115 pub fn method(mut self, m: Method) -> Self {
118 self.method = m;
119 self
120 }
121
122 pub fn auth(mut self, a: Auth) -> Self {
123 self.auth = a;
124 self
125 }
126
127 pub fn header(mut self, k: &str, v: &str) -> Self {
128 self.headers.insert(
129 HeaderName::from_bytes(k.as_bytes()).expect("invalid header name"),
130 HeaderValue::from_str(v).expect("invalid header value"),
131 );
132 self
133 }
134
135 pub fn query(mut self, k: &str, v: &str) -> Self {
136 self.query_params.insert(k.into(), v.into());
137 self
138 }
139
140 pub fn body(mut self, b: Value) -> Self {
141 self.body = Some(b);
142 self
143 }
144
145 pub fn pagination(mut self, p: PaginationStyle) -> Self {
148 self.pagination = p;
149 self
150 }
151
152 pub fn records_path(mut self, p: &str) -> Self {
153 self.records_path = Some(p.into());
154 self
155 }
156
157 pub fn max_pages(mut self, n: usize) -> Self {
158 self.max_pages = Some(n);
159 self
160 }
161
162 pub fn request_delay(mut self, d: Duration) -> Self {
163 self.request_delay = Some(d);
164 self
165 }
166
167 pub fn timeout(mut self, d: Duration) -> Self {
170 self.timeout = Some(d);
171 self
172 }
173
174 pub fn max_retries(mut self, n: u32) -> Self {
175 self.max_retries = n;
176 self
177 }
178
179 pub fn retry_backoff(mut self, d: Duration) -> Self {
180 self.retry_backoff = d;
181 self
182 }
183
184 pub fn tolerate_http_error(mut self, status: u16) -> Self {
186 self.tolerated_http_errors.push(status);
187 self
188 }
189
190 pub fn replication_method(mut self, m: ReplicationMethod) -> Self {
193 self.replication_method = m;
194 self
195 }
196
197 pub fn replication_key(mut self, key: &str) -> Self {
199 self.replication_key = Some(key.into());
200 self
201 }
202
203 pub fn start_replication_value(mut self, v: Value) -> Self {
206 self.start_replication_value = Some(v);
207 self
208 }
209
210 pub fn name(mut self, n: &str) -> Self {
214 self.name = Some(n.into());
215 self
216 }
217
218 pub fn primary_keys(mut self, keys: Vec<String>) -> Self {
220 self.primary_keys = keys;
221 self
222 }
223
224 pub fn schema(mut self, s: Value) -> Self {
226 self.schema = Some(s);
227 self
228 }
229
230 pub fn schema_sample_size(mut self, n: usize) -> Self {
232 self.schema_sample_size = n;
233 self
234 }
235
236 pub fn add_partition(mut self, ctx: HashMap<String, Value>) -> Self {
241 self.partitions.push(ctx);
242 self
243 }
244
245 pub fn add_transform(mut self, t: RecordTransform) -> Self {
251 self.transforms.push(t);
252 self
253 }
254}