Skip to main content

faucet_stream/
config.rs

1//! Stream configuration and builder.
2
3use crate::auth::Auth;
4use crate::pagination::PaginationStyle;
5use crate::replication::ReplicationMethod;
6use crate::transform::RecordTransform;
7use reqwest::{
8    Method,
9    header::{HeaderMap, HeaderName, HeaderValue},
10};
11use serde_json::Value;
12use std::collections::HashMap;
13use std::time::Duration;
14
15/// Configuration for a RestStream.
16#[derive(Debug, Clone)]
17pub struct RestStreamConfig {
18    // ── Core request ──────────────────────────────────────────────────────────
19    pub base_url: String,
20    /// URL path, relative to `base_url`. May contain `{key}` placeholders that
21    /// are substituted per-partition (e.g. `"/orgs/{org_id}/users"`).
22    pub path: String,
23    pub method: Method,
24    pub auth: Auth,
25    pub headers: HeaderMap,
26    pub query_params: HashMap<String, String>,
27    pub body: Option<Value>,
28
29    // ── Pagination ────────────────────────────────────────────────────────────
30    pub pagination: PaginationStyle,
31    pub records_path: Option<String>,
32    pub max_pages: Option<usize>,
33    pub request_delay: Option<Duration>,
34
35    // ── Reliability ───────────────────────────────────────────────────────────
36    pub timeout: Option<Duration>,
37    pub max_retries: u32,
38    pub retry_backoff: Duration,
39    /// HTTP status codes that should **not** cause an error. Responses with
40    /// these codes are treated as empty pages (no records, no further pages).
41    pub tolerated_http_errors: Vec<u16>,
42
43    // ── Replication ───────────────────────────────────────────────────────────
44    pub replication_method: ReplicationMethod,
45    /// Field name (not a JSONPath) used for incremental replication bookmarking.
46    pub replication_key: Option<String>,
47    /// Bookmark value: records where `record[replication_key] <= start_replication_value`
48    /// are filtered out when `replication_method` is `Incremental`.
49    pub start_replication_value: Option<Value>,
50
51    // ── Singer / Meltano metadata ─────────────────────────────────────────────
52    /// Human-readable stream name (used in logging and Singer SCHEMA messages).
53    pub name: Option<String>,
54    /// Field names that uniquely identify a record (Singer `key_properties`).
55    pub primary_keys: Vec<String>,
56    /// JSON Schema describing the structure of each record.
57    pub schema: Option<Value>,
58    /// Maximum number of records to sample when inferring the schema via
59    /// [`crate::stream::RestStream::infer_schema`].  `0` means sample all
60    /// available records (up to `max_pages`).  Defaults to `100`.
61    pub schema_sample_size: usize,
62
63    // ── Partitions ────────────────────────────────────────────────────────────
64    /// Each entry is a context map whose values are substituted into `path`
65    /// placeholders. The stream is executed once per partition and results are
66    /// concatenated.  Empty means run once with no substitution.
67    pub partitions: Vec<HashMap<String, Value>>,
68
69    // ── Record transforms ─────────────────────────────────────────────────────
70    /// Transformations applied to every record in order.
71    /// See [`RecordTransform`] for available options.
72    pub transforms: Vec<RecordTransform>,
73}
74
75impl Default for RestStreamConfig {
76    fn default() -> Self {
77        Self {
78            base_url: String::new(),
79            path: String::new(),
80            method: Method::GET,
81            auth: Auth::None,
82            headers: HeaderMap::new(),
83            query_params: HashMap::new(),
84            body: None,
85            pagination: PaginationStyle::None,
86            records_path: None,
87            max_pages: Some(100),
88            request_delay: None,
89            timeout: Some(Duration::from_secs(30)),
90            max_retries: 3,
91            retry_backoff: Duration::from_secs(1),
92            tolerated_http_errors: Vec::new(),
93            replication_method: ReplicationMethod::FullTable,
94            replication_key: None,
95            start_replication_value: None,
96            name: None,
97            primary_keys: Vec::new(),
98            schema: None,
99            schema_sample_size: 100,
100            partitions: Vec::new(),
101            transforms: Vec::new(),
102        }
103    }
104}
105
106impl RestStreamConfig {
107    pub fn new(base_url: &str, path: &str) -> Self {
108        Self {
109            base_url: base_url.trim_end_matches('/').to_string(),
110            path: path.to_string(),
111            ..Default::default()
112        }
113    }
114
115    // ── Core request ──────────────────────────────────────────────────────────
116
117    pub fn method(mut self, m: Method) -> Self {
118        self.method = m;
119        self
120    }
121
122    pub fn auth(mut self, a: Auth) -> Self {
123        self.auth = a;
124        self
125    }
126
127    pub fn header(mut self, k: &str, v: &str) -> Self {
128        self.headers.insert(
129            HeaderName::from_bytes(k.as_bytes()).expect("invalid header name"),
130            HeaderValue::from_str(v).expect("invalid header value"),
131        );
132        self
133    }
134
135    pub fn query(mut self, k: &str, v: &str) -> Self {
136        self.query_params.insert(k.into(), v.into());
137        self
138    }
139
140    pub fn body(mut self, b: Value) -> Self {
141        self.body = Some(b);
142        self
143    }
144
145    // ── Pagination ────────────────────────────────────────────────────────────
146
147    pub fn pagination(mut self, p: PaginationStyle) -> Self {
148        self.pagination = p;
149        self
150    }
151
152    pub fn records_path(mut self, p: &str) -> Self {
153        self.records_path = Some(p.into());
154        self
155    }
156
157    pub fn max_pages(mut self, n: usize) -> Self {
158        self.max_pages = Some(n);
159        self
160    }
161
162    pub fn request_delay(mut self, d: Duration) -> Self {
163        self.request_delay = Some(d);
164        self
165    }
166
167    // ── Reliability ───────────────────────────────────────────────────────────
168
169    pub fn timeout(mut self, d: Duration) -> Self {
170        self.timeout = Some(d);
171        self
172    }
173
174    pub fn max_retries(mut self, n: u32) -> Self {
175        self.max_retries = n;
176        self
177    }
178
179    pub fn retry_backoff(mut self, d: Duration) -> Self {
180        self.retry_backoff = d;
181        self
182    }
183
184    /// HTTP status codes that should be silently ignored (treated as empty pages).
185    pub fn tolerate_http_error(mut self, status: u16) -> Self {
186        self.tolerated_http_errors.push(status);
187        self
188    }
189
190    // ── Replication ───────────────────────────────────────────────────────────
191
192    pub fn replication_method(mut self, m: ReplicationMethod) -> Self {
193        self.replication_method = m;
194        self
195    }
196
197    /// Field name (not JSONPath) used as the incremental replication bookmark.
198    pub fn replication_key(mut self, key: &str) -> Self {
199        self.replication_key = Some(key.into());
200        self
201    }
202
203    /// Bookmark start value: records at or before this value are filtered out
204    /// when using `ReplicationMethod::Incremental`.
205    pub fn start_replication_value(mut self, v: Value) -> Self {
206        self.start_replication_value = Some(v);
207        self
208    }
209
210    // ── Singer / Meltano metadata ─────────────────────────────────────────────
211
212    /// Human-readable stream name.
213    pub fn name(mut self, n: &str) -> Self {
214        self.name = Some(n.into());
215        self
216    }
217
218    /// Field names that uniquely identify a record (Singer `key_properties`).
219    pub fn primary_keys(mut self, keys: Vec<String>) -> Self {
220        self.primary_keys = keys;
221        self
222    }
223
224    /// JSON Schema for the stream's records.
225    pub fn schema(mut self, s: Value) -> Self {
226        self.schema = Some(s);
227        self
228    }
229
230    /// Maximum records to sample for schema inference (`0` = unlimited).
231    pub fn schema_sample_size(mut self, n: usize) -> Self {
232        self.schema_sample_size = n;
233        self
234    }
235
236    // ── Partitions ────────────────────────────────────────────────────────────
237
238    /// Add a partition context. The stream will execute once for each partition,
239    /// substituting `{key}` placeholders in `path` with values from the context.
240    pub fn add_partition(mut self, ctx: HashMap<String, Value>) -> Self {
241        self.partitions.push(ctx);
242        self
243    }
244
245    // ── Record transforms ─────────────────────────────────────────────────────
246
247    /// Append a [`RecordTransform`] to the pipeline.
248    ///
249    /// Transforms are applied in the order they are added.
250    pub fn add_transform(mut self, t: RecordTransform) -> Self {
251        self.transforms.push(t);
252        self
253    }
254}