Skip to main content

faucet_source_rest/
config.rs

1//! Stream configuration and builder.
2
3use crate::auth::Auth;
4use crate::pagination::PaginationStyle;
5use faucet_core::RecordTransform;
6use faucet_core::ReplicationMethod;
7use reqwest::{
8    Method,
9    header::{HeaderMap, HeaderName, HeaderValue},
10};
11use schemars::JsonSchema;
12use serde::{Deserialize, Serialize};
13use serde_json::Value;
14use std::collections::HashMap;
15use std::time::Duration;
16
17/// Configuration for a RestStream.
18#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
19pub struct RestStreamConfig {
20    // ── Core request ──────────────────────────────────────────────────────────
21    pub base_url: String,
22    /// URL path, relative to `base_url`. May contain `{key}` placeholders that
23    /// are substituted per-partition (e.g. `"/orgs/{org_id}/users"`).
24    pub path: String,
25    #[serde(with = "crate::serde_helpers::http_method")]
26    #[schemars(with = "String")]
27    pub method: Method,
28    pub auth: Auth,
29    #[serde(skip, default)]
30    pub headers: HeaderMap,
31    pub query_params: HashMap<String, String>,
32    pub body: Option<Value>,
33
34    // ── Pagination ────────────────────────────────────────────────────────────
35    pub pagination: PaginationStyle,
36    pub records_path: Option<String>,
37    pub max_pages: Option<usize>,
38    #[serde(with = "faucet_core::config::duration_secs_option", default)]
39    #[schemars(with = "Option<u64>")]
40    pub request_delay: Option<Duration>,
41
42    // ── Reliability ───────────────────────────────────────────────────────────
43    #[serde(with = "faucet_core::config::duration_secs_option", default)]
44    #[schemars(with = "Option<u64>")]
45    pub timeout: Option<Duration>,
46    pub max_retries: u32,
47    #[serde(with = "faucet_core::config::duration_secs")]
48    #[schemars(with = "u64")]
49    pub retry_backoff: Duration,
50    /// HTTP status codes that should **not** cause an error. Responses with
51    /// these codes are treated as empty pages (no records, no further pages).
52    pub tolerated_http_errors: Vec<u16>,
53
54    // ── Replication ───────────────────────────────────────────────────────────
55    pub replication_method: ReplicationMethod,
56    /// Field name (not a JSONPath) used for incremental replication bookmarking.
57    pub replication_key: Option<String>,
58    /// Bookmark value: records where `record[replication_key] <= start_replication_value`
59    /// are filtered out when `replication_method` is `Incremental`.
60    pub start_replication_value: Option<Value>,
61
62    // ── Singer / Meltano metadata ─────────────────────────────────────────────
63    /// Human-readable stream name (used in logging and Singer SCHEMA messages).
64    pub name: Option<String>,
65    /// Field names that uniquely identify a record (Singer `key_properties`).
66    pub primary_keys: Vec<String>,
67    /// JSON Schema describing the structure of each record.
68    pub schema: Option<Value>,
69    /// Maximum number of records to sample when inferring the schema via
70    /// [`crate::stream::RestStream::infer_schema`].  `0` means sample all
71    /// available records (up to `max_pages`).  Defaults to `100`.
72    pub schema_sample_size: usize,
73
74    // ── Partitions ────────────────────────────────────────────────────────────
75    /// Each entry is a context map whose values are substituted into `path`
76    /// placeholders. The stream is executed once per partition and results are
77    /// concatenated.  Empty means run once with no substitution.
78    pub partitions: Vec<HashMap<String, Value>>,
79    /// Maximum number of partitions to fetch concurrently.
80    /// `None` means sequential processing (backward compatible default).
81    pub partition_concurrency: Option<usize>,
82
83    // ── Record transforms ─────────────────────────────────────────────────────
84    /// Transformations applied to every record in order.
85    /// See [`RecordTransform`] for available options.
86    #[serde(skip, default)]
87    pub transforms: Vec<RecordTransform>,
88}
89
90impl Default for RestStreamConfig {
91    fn default() -> Self {
92        Self {
93            base_url: String::new(),
94            path: String::new(),
95            method: Method::GET,
96            auth: Auth::None,
97            headers: HeaderMap::new(),
98            query_params: HashMap::new(),
99            body: None,
100            pagination: PaginationStyle::None,
101            records_path: None,
102            max_pages: Some(100),
103            request_delay: None,
104            timeout: Some(Duration::from_secs(30)),
105            max_retries: 3,
106            retry_backoff: Duration::from_secs(1),
107            tolerated_http_errors: Vec::new(),
108            replication_method: ReplicationMethod::FullTable,
109            replication_key: None,
110            start_replication_value: None,
111            name: None,
112            primary_keys: Vec::new(),
113            schema: None,
114            schema_sample_size: 100,
115            partitions: Vec::new(),
116            partition_concurrency: None,
117            transforms: Vec::new(),
118        }
119    }
120}
121
122impl RestStreamConfig {
123    pub fn new(base_url: &str, path: &str) -> Self {
124        Self {
125            base_url: base_url.trim_end_matches('/').to_string(),
126            path: path.to_string(),
127            ..Default::default()
128        }
129    }
130
131    // ── Core request ──────────────────────────────────────────────────────────
132
133    pub fn method(mut self, m: Method) -> Self {
134        self.method = m;
135        self
136    }
137
138    pub fn auth(mut self, a: Auth) -> Self {
139        self.auth = a;
140        self
141    }
142
143    pub fn header(mut self, k: &str, v: &str) -> Self {
144        self.headers.insert(
145            HeaderName::from_bytes(k.as_bytes()).expect("invalid header name"),
146            HeaderValue::from_str(v).expect("invalid header value"),
147        );
148        self
149    }
150
151    pub fn query(mut self, k: &str, v: &str) -> Self {
152        self.query_params.insert(k.into(), v.into());
153        self
154    }
155
156    pub fn body(mut self, b: Value) -> Self {
157        self.body = Some(b);
158        self
159    }
160
161    // ── Pagination ────────────────────────────────────────────────────────────
162
163    pub fn pagination(mut self, p: PaginationStyle) -> Self {
164        self.pagination = p;
165        self
166    }
167
168    pub fn records_path(mut self, p: &str) -> Self {
169        self.records_path = Some(p.into());
170        self
171    }
172
173    pub fn max_pages(mut self, n: usize) -> Self {
174        self.max_pages = Some(n);
175        self
176    }
177
178    pub fn request_delay(mut self, d: Duration) -> Self {
179        self.request_delay = Some(d);
180        self
181    }
182
183    // ── Reliability ───────────────────────────────────────────────────────────
184
185    pub fn timeout(mut self, d: Duration) -> Self {
186        self.timeout = Some(d);
187        self
188    }
189
190    pub fn max_retries(mut self, n: u32) -> Self {
191        self.max_retries = n;
192        self
193    }
194
195    pub fn retry_backoff(mut self, d: Duration) -> Self {
196        self.retry_backoff = d;
197        self
198    }
199
200    /// HTTP status codes that should be silently ignored (treated as empty pages).
201    pub fn tolerate_http_error(mut self, status: u16) -> Self {
202        self.tolerated_http_errors.push(status);
203        self
204    }
205
206    // ── Replication ───────────────────────────────────────────────────────────
207
208    pub fn replication_method(mut self, m: ReplicationMethod) -> Self {
209        self.replication_method = m;
210        self
211    }
212
213    /// Field name (not JSONPath) used as the incremental replication bookmark.
214    pub fn replication_key(mut self, key: &str) -> Self {
215        self.replication_key = Some(key.into());
216        self
217    }
218
219    /// Bookmark start value: records at or before this value are filtered out
220    /// when using `ReplicationMethod::Incremental`.
221    pub fn start_replication_value(mut self, v: Value) -> Self {
222        self.start_replication_value = Some(v);
223        self
224    }
225
226    // ── Singer / Meltano metadata ─────────────────────────────────────────────
227
228    /// Human-readable stream name.
229    pub fn name(mut self, n: &str) -> Self {
230        self.name = Some(n.into());
231        self
232    }
233
234    /// Field names that uniquely identify a record (Singer `key_properties`).
235    pub fn primary_keys(mut self, keys: Vec<String>) -> Self {
236        self.primary_keys = keys;
237        self
238    }
239
240    /// JSON Schema for the stream's records.
241    pub fn schema(mut self, s: Value) -> Self {
242        self.schema = Some(s);
243        self
244    }
245
246    /// Maximum records to sample for schema inference (`0` = unlimited).
247    pub fn schema_sample_size(mut self, n: usize) -> Self {
248        self.schema_sample_size = n;
249        self
250    }
251
252    // ── Partitions ────────────────────────────────────────────────────────────
253
254    /// Add a partition context. The stream will execute once for each partition,
255    /// substituting `{key}` placeholders in `path` with values from the context.
256    pub fn add_partition(mut self, ctx: HashMap<String, Value>) -> Self {
257        self.partitions.push(ctx);
258        self
259    }
260
261    /// Set the maximum number of partitions to fetch concurrently.
262    /// `None` (default) means sequential processing.
263    pub fn partition_concurrency(mut self, concurrency: Option<usize>) -> Self {
264        self.partition_concurrency = concurrency;
265        self
266    }
267
268    // ── Record transforms ─────────────────────────────────────────────────────
269
270    /// Append a [`RecordTransform`] to the pipeline.
271    ///
272    /// Transforms are applied in the order they are added.
273    pub fn add_transform(mut self, t: RecordTransform) -> Self {
274        self.transforms.push(t);
275        self
276    }
277}