Skip to main content

faucet_source_rest/
config.rs

1//! Stream configuration and builder.
2
3use crate::auth::Auth;
4use crate::pagination::PaginationStyle;
5use faucet_core::AuthSpec;
6use faucet_core::ReplicationMethod;
7use reqwest::{
8    Method,
9    header::{HeaderMap, HeaderName, HeaderValue},
10};
11use schemars::JsonSchema;
12use serde::{Deserialize, Serialize};
13use serde_json::Value;
14use std::collections::HashMap;
15use std::time::Duration;
16
17/// Configuration for a RestStream.
18#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
19pub struct RestStreamConfig {
20    // ── Core request ──────────────────────────────────────────────────────────
21    pub base_url: String,
22    /// URL path, relative to `base_url`. May contain `{key}` placeholders that
23    /// are substituted per-partition (e.g. `"/orgs/{org_id}/users"`).
24    pub path: String,
25    #[serde(with = "crate::serde_helpers::http_method")]
26    #[schemars(with = "String")]
27    pub method: Method,
28    /// Authentication: either inline (`{ type, config }`) or a `{ ref: <name> }`
29    /// pointer to a shared provider in the CLI's top-level `auth:` catalog.
30    pub auth: AuthSpec<Auth>,
31    #[serde(skip, default)]
32    pub headers: HeaderMap,
33    pub query_params: HashMap<String, String>,
34    pub body: Option<Value>,
35
36    // ── Pagination ────────────────────────────────────────────────────────────
37    pub pagination: PaginationStyle,
38    pub records_path: Option<String>,
39    pub max_pages: Option<usize>,
40    #[serde(with = "faucet_core::config::duration_secs_option", default)]
41    #[schemars(with = "Option<u64>")]
42    pub request_delay: Option<Duration>,
43
44    // ── Reliability ───────────────────────────────────────────────────────────
45    #[serde(with = "faucet_core::config::duration_secs_option", default)]
46    #[schemars(with = "Option<u64>")]
47    pub timeout: Option<Duration>,
48    pub max_retries: u32,
49    #[serde(with = "faucet_core::config::duration_secs")]
50    #[schemars(with = "u64")]
51    pub retry_backoff: Duration,
52    /// HTTP status codes that should **not** cause an error. Responses with
53    /// these codes are treated as empty pages (no records, no further pages).
54    pub tolerated_http_errors: Vec<u16>,
55
56    // ── Replication ───────────────────────────────────────────────────────────
57    pub replication_method: ReplicationMethod,
58    /// Field name (not a JSONPath) used for incremental replication bookmarking.
59    pub replication_key: Option<String>,
60    /// Bookmark value: records where `record[replication_key] <= start_replication_value`
61    /// are filtered out when `replication_method` is `Incremental`.
62    pub start_replication_value: Option<Value>,
63    /// Opt-in identifier used by [`Pipeline::with_state_store`](faucet_core::Pipeline::with_state_store)
64    /// to persist this stream's bookmark across runs. When set, the pipeline
65    /// will load any previously-stored bookmark before fetching and write the
66    /// new bookmark only after the sink confirms the batch.
67    ///
68    /// Keys must satisfy [`faucet_core::state::validate_state_key`].
69    pub state_key: Option<String>,
70
71    // ── Singer / Meltano metadata ─────────────────────────────────────────────
72    /// Human-readable stream name (used in logging and Singer SCHEMA messages).
73    pub name: Option<String>,
74    /// Field names that uniquely identify a record (Singer `key_properties`).
75    pub primary_keys: Vec<String>,
76    /// JSON Schema describing the structure of each record.
77    pub schema: Option<Value>,
78    /// Maximum number of records to sample when inferring the schema via
79    /// [`crate::stream::RestStream::infer_schema`].  `0` means sample all
80    /// available records (up to `max_pages`).  Defaults to `100`.
81    pub schema_sample_size: usize,
82
83    // ── Partitions ────────────────────────────────────────────────────────────
84    /// Each entry is a context map whose values are substituted into `path`
85    /// placeholders. The stream is executed once per partition and results are
86    /// concatenated.  Empty means run once with no substitution.
87    pub partitions: Vec<HashMap<String, Value>>,
88    /// Maximum number of partitions to fetch concurrently.
89    /// `None` means sequential processing (backward compatible default).
90    pub partition_concurrency: Option<usize>,
91}
92
93impl Default for RestStreamConfig {
94    fn default() -> Self {
95        Self {
96            base_url: String::new(),
97            path: String::new(),
98            method: Method::GET,
99            auth: AuthSpec::Inline(Auth::None),
100            headers: HeaderMap::new(),
101            query_params: HashMap::new(),
102            body: None,
103            pagination: PaginationStyle::None,
104            records_path: None,
105            max_pages: Some(100),
106            request_delay: None,
107            timeout: Some(Duration::from_secs(30)),
108            max_retries: 3,
109            retry_backoff: Duration::from_secs(1),
110            tolerated_http_errors: Vec::new(),
111            replication_method: ReplicationMethod::FullTable,
112            replication_key: None,
113            start_replication_value: None,
114            state_key: None,
115            name: None,
116            primary_keys: Vec::new(),
117            schema: None,
118            schema_sample_size: 100,
119            partitions: Vec::new(),
120            partition_concurrency: None,
121        }
122    }
123}
124
125impl RestStreamConfig {
126    pub fn new(base_url: &str, path: &str) -> Self {
127        Self {
128            base_url: base_url.trim_end_matches('/').to_string(),
129            path: path.to_string(),
130            ..Default::default()
131        }
132    }
133
134    // ── Core request ──────────────────────────────────────────────────────────
135
136    pub fn method(mut self, m: Method) -> Self {
137        self.method = m;
138        self
139    }
140
141    pub fn auth(mut self, a: Auth) -> Self {
142        self.auth = AuthSpec::Inline(a);
143        self
144    }
145
146    pub fn header(mut self, k: &str, v: &str) -> Self {
147        self.headers.insert(
148            HeaderName::from_bytes(k.as_bytes()).expect("invalid header name"),
149            HeaderValue::from_str(v).expect("invalid header value"),
150        );
151        self
152    }
153
154    pub fn query(mut self, k: &str, v: &str) -> Self {
155        self.query_params.insert(k.into(), v.into());
156        self
157    }
158
159    pub fn body(mut self, b: Value) -> Self {
160        self.body = Some(b);
161        self
162    }
163
164    // ── Pagination ────────────────────────────────────────────────────────────
165
166    pub fn pagination(mut self, p: PaginationStyle) -> Self {
167        self.pagination = p;
168        self
169    }
170
171    pub fn records_path(mut self, p: &str) -> Self {
172        self.records_path = Some(p.into());
173        self
174    }
175
176    pub fn max_pages(mut self, n: usize) -> Self {
177        self.max_pages = Some(n);
178        self
179    }
180
181    pub fn request_delay(mut self, d: Duration) -> Self {
182        self.request_delay = Some(d);
183        self
184    }
185
186    // ── Reliability ───────────────────────────────────────────────────────────
187
188    pub fn timeout(mut self, d: Duration) -> Self {
189        self.timeout = Some(d);
190        self
191    }
192
193    pub fn max_retries(mut self, n: u32) -> Self {
194        self.max_retries = n;
195        self
196    }
197
198    pub fn retry_backoff(mut self, d: Duration) -> Self {
199        self.retry_backoff = d;
200        self
201    }
202
203    /// HTTP status codes that should be silently ignored (treated as empty pages).
204    pub fn tolerate_http_error(mut self, status: u16) -> Self {
205        self.tolerated_http_errors.push(status);
206        self
207    }
208
209    // ── Replication ───────────────────────────────────────────────────────────
210
211    pub fn replication_method(mut self, m: ReplicationMethod) -> Self {
212        self.replication_method = m;
213        self
214    }
215
216    /// Field name (not JSONPath) used as the incremental replication bookmark.
217    pub fn replication_key(mut self, key: &str) -> Self {
218        self.replication_key = Some(key.into());
219        self
220    }
221
222    /// Bookmark start value: records at or before this value are filtered out
223    /// when using `ReplicationMethod::Incremental`.
224    pub fn start_replication_value(mut self, v: Value) -> Self {
225        self.start_replication_value = Some(v);
226        self
227    }
228
229    /// Opt the stream into resumable runs by giving it a stable state key.
230    /// When this is set and the [`Pipeline`](faucet_core::Pipeline) is
231    /// configured with a state store, the previously persisted bookmark is
232    /// applied to the stream before fetching.
233    pub fn state_key(mut self, key: &str) -> Self {
234        self.state_key = Some(key.into());
235        self
236    }
237
238    // ── Singer / Meltano metadata ─────────────────────────────────────────────
239
240    /// Human-readable stream name.
241    pub fn name(mut self, n: &str) -> Self {
242        self.name = Some(n.into());
243        self
244    }
245
246    /// Field names that uniquely identify a record (Singer `key_properties`).
247    pub fn primary_keys(mut self, keys: Vec<String>) -> Self {
248        self.primary_keys = keys;
249        self
250    }
251
252    /// JSON Schema for the stream's records.
253    pub fn schema(mut self, s: Value) -> Self {
254        self.schema = Some(s);
255        self
256    }
257
258    /// Maximum records to sample for schema inference (`0` = unlimited).
259    pub fn schema_sample_size(mut self, n: usize) -> Self {
260        self.schema_sample_size = n;
261        self
262    }
263
264    // ── Partitions ────────────────────────────────────────────────────────────
265
266    /// Add a partition context. The stream will execute once for each partition,
267    /// substituting `{key}` placeholders in `path` with values from the context.
268    pub fn add_partition(mut self, ctx: HashMap<String, Value>) -> Self {
269        self.partitions.push(ctx);
270        self
271    }
272
273    /// Set the maximum number of partitions to fetch concurrently.
274    /// `None` (default) means sequential processing.
275    pub fn partition_concurrency(mut self, concurrency: Option<usize>) -> Self {
276        self.partition_concurrency = concurrency;
277        self
278    }
279}