1use std::collections::HashMap;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use serde::Deserialize;
6use tokio::sync::mpsc;
7use tracing::debug;
8
9use oversync_core::error::OversyncError;
10use oversync_core::model::RawRow;
11use oversync_core::traits::OriginConnector;
12
13pub use crate::http_common::AuthConfig;
14use crate::http_common::{extract_cursor, extract_items, items_to_rows};
15
16#[derive(Debug, Clone, Deserialize)]
17pub struct HttpSourceConfig {
18 #[serde(rename = "dsn")]
19 pub base_url: String,
20 #[serde(default)]
21 pub headers: HashMap<String, String>,
22 #[serde(default)]
23 pub auth: Option<AuthConfig>,
24 #[serde(default)]
25 pub pagination: Option<PaginationConfig>,
26 #[serde(default)]
27 pub response_path: Option<String>,
28 #[serde(default = "default_timeout")]
29 pub timeout_secs: u64,
30}
31
32fn default_timeout() -> u64 {
33 30
34}
35
36#[derive(Debug, Clone, Deserialize)]
37#[serde(tag = "type", rename_all = "snake_case")]
38pub enum PaginationConfig {
39 Offset {
40 page_size: usize,
41 #[serde(default = "default_limit_param")]
42 limit_param: String,
43 #[serde(default = "default_offset_param")]
44 offset_param: String,
45 },
46 Cursor {
47 page_size: usize,
48 #[serde(default = "default_cursor_param")]
49 cursor_param: String,
50 cursor_path: String,
51 },
52}
53
54fn default_limit_param() -> String {
55 "limit".into()
56}
57fn default_offset_param() -> String {
58 "offset".into()
59}
60fn default_cursor_param() -> String {
61 "cursor".into()
62}
63
64pub struct HttpSource {
65 client: reqwest::Client,
66 config: HttpSourceConfig,
67 name: String,
68}
69
70impl HttpSource {
71 pub fn new(name: &str, config: HttpSourceConfig) -> Result<Self, OversyncError> {
72 let client = reqwest::Client::builder()
73 .timeout(Duration::from_secs(config.timeout_secs))
74 .build()
75 .map_err(|e| OversyncError::Connector(format!("http client: {e}")))?;
76
77 Ok(Self {
78 client,
79 config,
80 name: name.to_string(),
81 })
82 }
83
84 fn build_url(&self, path: &str) -> String {
85 format!("{}{}", self.config.base_url.trim_end_matches('/'), path)
86 }
87
88 fn apply_auth(&self, req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
89 crate::http_common::apply_auth(req, &self.config.auth)
90 }
91
92 fn apply_headers(&self, mut req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
93 for (k, v) in &self.config.headers {
94 req = req.header(k, v);
95 }
96 req
97 }
98
99 async fn request_page(
100 &self,
101 path: &str,
102 extra_params: &[(String, String)],
103 ) -> Result<serde_json::Value, OversyncError> {
104 let url = self.build_url(path);
105 let mut req = self.client.get(&url);
106 req = self.apply_headers(req);
107 req = self.apply_auth(req);
108 if !extra_params.is_empty() {
109 req = req.query(extra_params);
110 }
111
112 let resp = req
113 .send()
114 .await
115 .map_err(|e| OversyncError::Connector(format!("http: {e}")))?;
116
117 let status = resp.status();
118 if !status.is_success() {
119 return Err(OversyncError::Connector(format!("http {status}: {url}")));
120 }
121
122 resp.json()
123 .await
124 .map_err(|e| OversyncError::Connector(format!("json decode: {e}")))
125 }
126}
127
128#[async_trait]
129impl OriginConnector for HttpSource {
130 fn name(&self) -> &str {
131 &self.name
132 }
133
134 async fn fetch_all(&self, sql: &str, key_column: &str) -> Result<Vec<RawRow>, OversyncError> {
135 let pagination = self.config.pagination.clone();
136 match pagination {
137 None => {
138 let body = self.request_page(sql, &[]).await?;
139 let items = extract_items(&body, &self.config.response_path);
140 debug!(count = items.len(), "fetched items from http");
141 items_to_rows(&items, key_column)
142 }
143 Some(PaginationConfig::Offset {
144 page_size,
145 ref limit_param,
146 ref offset_param,
147 }) => {
148 let mut all_rows = Vec::new();
149 let mut offset = 0usize;
150 loop {
151 let params = vec![
152 (limit_param.clone(), page_size.to_string()),
153 (offset_param.clone(), offset.to_string()),
154 ];
155 let body = self.request_page(sql, ¶ms).await?;
156 let items = extract_items(&body, &self.config.response_path);
157 if items.is_empty() {
158 break;
159 }
160 let count = items.len();
161 all_rows.extend(items_to_rows(&items, key_column)?);
162 if count < page_size {
163 break;
164 }
165 offset += count;
166 }
167 debug!(
168 count = all_rows.len(),
169 "fetched all pages from http (offset)"
170 );
171 Ok(all_rows)
172 }
173 Some(PaginationConfig::Cursor {
174 page_size: _,
175 ref cursor_param,
176 ref cursor_path,
177 }) => {
178 let mut all_rows = Vec::new();
179 let mut cursor: Option<String> = None;
180 loop {
181 let mut params = Vec::new();
182 if let Some(ref c) = cursor {
183 params.push((cursor_param.clone(), c.clone()));
184 }
185 let body = self.request_page(sql, ¶ms).await?;
186 let items = extract_items(&body, &self.config.response_path);
187 if items.is_empty() {
188 break;
189 }
190 all_rows.extend(items_to_rows(&items, key_column)?);
191 cursor = extract_cursor(&body, cursor_path);
192 if cursor.is_none() {
193 break;
194 }
195 }
196 debug!(
197 count = all_rows.len(),
198 "fetched all pages from http (cursor)"
199 );
200 Ok(all_rows)
201 }
202 }
203 }
204
205 async fn fetch_into(
206 &self,
207 sql: &str,
208 key_column: &str,
209 batch_size: usize,
210 tx: mpsc::Sender<Vec<RawRow>>,
211 ) -> Result<usize, OversyncError> {
212 let pagination = self.config.pagination.clone();
213 match pagination {
214 None => {
215 let all = self.fetch_all(sql, key_column).await?;
216 let total = all.len();
217 for chunk in all.chunks(batch_size) {
218 tx.send(chunk.to_vec())
219 .await
220 .map_err(|_| OversyncError::Internal("channel closed".into()))?;
221 }
222 Ok(total)
223 }
224 Some(PaginationConfig::Offset {
225 page_size,
226 ref limit_param,
227 ref offset_param,
228 }) => {
229 let mut total = 0usize;
230 let mut offset = 0usize;
231 loop {
232 let params = vec![
233 (limit_param.clone(), page_size.to_string()),
234 (offset_param.clone(), offset.to_string()),
235 ];
236 let body = self.request_page(sql, ¶ms).await?;
237 let items = extract_items(&body, &self.config.response_path);
238 if items.is_empty() {
239 break;
240 }
241 let count = items.len();
242 let rows = items_to_rows(&items, key_column)?;
243 total += rows.len();
244 tx.send(rows)
245 .await
246 .map_err(|_| OversyncError::Internal("channel closed".into()))?;
247 if count < page_size {
248 break;
249 }
250 offset += count;
251 }
252 Ok(total)
253 }
254 Some(PaginationConfig::Cursor {
255 page_size: _,
256 ref cursor_param,
257 ref cursor_path,
258 }) => {
259 let mut total = 0usize;
260 let mut cursor: Option<String> = None;
261 loop {
262 let mut params = Vec::new();
263 if let Some(ref c) = cursor {
264 params.push((cursor_param.clone(), c.clone()));
265 }
266 let body = self.request_page(sql, ¶ms).await?;
267 let items = extract_items(&body, &self.config.response_path);
268 if items.is_empty() {
269 break;
270 }
271 let rows = items_to_rows(&items, key_column)?;
272 total += rows.len();
273 tx.send(rows)
274 .await
275 .map_err(|_| OversyncError::Internal("channel closed".into()))?;
276 cursor = extract_cursor(&body, cursor_path);
277 if cursor.is_none() {
278 break;
279 }
280 }
281 Ok(total)
282 }
283 }
284 }
285
286 async fn test_connection(&self) -> Result<(), OversyncError> {
287 let mut req = self.client.get(&self.config.base_url);
288 req = self.apply_headers(req);
289 req = self.apply_auth(req);
290 let resp = req
291 .send()
292 .await
293 .map_err(|e| OversyncError::Connector(format!("http test: {e}")))?;
294 if !resp.status().is_success() {
295 return Err(OversyncError::Connector(format!(
296 "http test: status {}",
297 resp.status()
298 )));
299 }
300 Ok(())
301 }
302}
303
304#[cfg(test)]
305mod tests {
306 use super::*;
307
308 #[test]
309 fn parse_config_minimal() {
310 let json = serde_json::json!({
311 "dsn": "https://api.example.com"
312 });
313 let config: HttpSourceConfig = serde_json::from_value(json).unwrap();
314 assert_eq!(config.base_url, "https://api.example.com");
315 assert!(config.headers.is_empty());
316 assert!(config.auth.is_none());
317 assert!(config.pagination.is_none());
318 assert!(config.response_path.is_none());
319 assert_eq!(config.timeout_secs, 30);
320 }
321
322 #[test]
323 fn parse_config_full() {
324 let json = serde_json::json!({
325 "dsn": "https://api.example.com",
326 "headers": {"Accept": "application/json", "X-Custom": "val"},
327 "auth": {"type": "bearer", "token": "sk-123"},
328 "pagination": {"type": "offset", "page_size": 50, "limit_param": "per_page", "offset_param": "page"},
329 "response_path": "data.items",
330 "timeout_secs": 60
331 });
332 let config: HttpSourceConfig = serde_json::from_value(json).unwrap();
333 assert_eq!(config.base_url, "https://api.example.com");
334 assert_eq!(config.headers.len(), 2);
335 assert!(matches!(config.auth, Some(AuthConfig::Bearer { ref token }) if token == "sk-123"));
336 assert_eq!(config.timeout_secs, 60);
337 assert_eq!(config.response_path.as_deref(), Some("data.items"));
338 }
339
340 #[test]
341 fn parse_auth_basic() {
342 let json = serde_json::json!({"type": "basic", "username": "user", "password": "pass"});
343 let auth: AuthConfig = serde_json::from_value(json).unwrap();
344 assert!(
345 matches!(auth, AuthConfig::Basic { ref username, ref password } if username == "user" && password == "pass")
346 );
347 }
348
349 #[test]
350 fn parse_auth_header() {
351 let json = serde_json::json!({"type": "header", "name": "X-API-Key", "value": "key123"});
352 let auth: AuthConfig = serde_json::from_value(json).unwrap();
353 assert!(
354 matches!(auth, AuthConfig::Header { ref name, ref value } if name == "X-API-Key" && value == "key123")
355 );
356 }
357
358 #[test]
359 fn parse_pagination_offset_defaults() {
360 let json = serde_json::json!({"type": "offset", "page_size": 100});
361 let pg: PaginationConfig = serde_json::from_value(json).unwrap();
362 match pg {
363 PaginationConfig::Offset {
364 page_size,
365 limit_param,
366 offset_param,
367 } => {
368 assert_eq!(page_size, 100);
369 assert_eq!(limit_param, "limit");
370 assert_eq!(offset_param, "offset");
371 }
372 _ => panic!("expected Offset"),
373 }
374 }
375
376 #[test]
377 fn parse_pagination_cursor() {
378 let json = serde_json::json!({
379 "type": "cursor",
380 "page_size": 50,
381 "cursor_param": "after",
382 "cursor_path": "meta.next_cursor"
383 });
384 let pg: PaginationConfig = serde_json::from_value(json).unwrap();
385 match pg {
386 PaginationConfig::Cursor {
387 page_size,
388 cursor_param,
389 cursor_path,
390 } => {
391 assert_eq!(page_size, 50);
392 assert_eq!(cursor_param, "after");
393 assert_eq!(cursor_path, "meta.next_cursor");
394 }
395 _ => panic!("expected Cursor"),
396 }
397 }
398
399 #[test]
400 fn build_url_joins_path() {
401 let config = make_config("https://api.example.com");
402 let source = HttpSource::new("test", config).unwrap();
403 assert_eq!(
404 source.build_url("/v1/items"),
405 "https://api.example.com/v1/items"
406 );
407 }
408
409 #[test]
410 fn build_url_strips_trailing_slash() {
411 let config = make_config("https://api.example.com/");
412 let source = HttpSource::new("test", config).unwrap();
413 assert_eq!(
414 source.build_url("/v1/items"),
415 "https://api.example.com/v1/items"
416 );
417 }
418
419 #[test]
420 fn extract_items_top_level_array() {
421 let body = serde_json::json!([{"id": 1}, {"id": 2}]);
422 let items = extract_items(&body, &None);
423 assert_eq!(items.len(), 2);
424 }
425
426 #[test]
427 fn extract_items_nested_path() {
428 let body = serde_json::json!({"data": {"items": [{"id": 1}, {"id": 2}, {"id": 3}]}});
429 let items = extract_items(&body, &Some("data.items".into()));
430 assert_eq!(items.len(), 3);
431 }
432
433 #[test]
434 fn extract_items_single_level_path() {
435 let body = serde_json::json!({"results": [{"id": "a"}]});
436 let items = extract_items(&body, &Some("results".into()));
437 assert_eq!(items.len(), 1);
438 }
439
440 #[test]
441 fn extract_items_empty_path_is_top_level() {
442 let body = serde_json::json!([{"id": 1}]);
443 let items = extract_items(&body, &Some("".into()));
444 assert_eq!(items.len(), 1);
445 }
446
447 #[test]
448 fn extract_items_invalid_path_returns_empty() {
449 let body = serde_json::json!({"data": [{"id": 1}]});
450 let items = extract_items(&body, &Some("nonexistent.path".into()));
451 assert!(items.is_empty());
452 }
453
454 #[test]
455 fn extract_items_non_array_returns_empty() {
456 let body = serde_json::json!({"data": "not an array"});
457 let items = extract_items(&body, &Some("data".into()));
458 assert!(items.is_empty());
459 }
460
461 #[test]
462 fn items_to_rows_string_key() {
463 let items = vec![
464 serde_json::json!({"id": "abc", "name": "Alpha"}),
465 serde_json::json!({"id": "def", "name": "Beta"}),
466 ];
467 let rows = items_to_rows(&items, "id").unwrap();
468 assert_eq!(rows.len(), 2);
469 assert_eq!(rows[0].row_key, "abc");
470 assert_eq!(rows[1].row_key, "def");
471 }
472
473 #[test]
474 fn items_to_rows_numeric_key() {
475 let items = vec![serde_json::json!({"id": 42, "val": "x"})];
476 let rows = items_to_rows(&items, "id").unwrap();
477 assert_eq!(rows[0].row_key, "42");
478 }
479
480 #[test]
481 fn items_to_rows_missing_key_errors() {
482 let items = vec![serde_json::json!({"name": "no id field"})];
483 let result = items_to_rows(&items, "id");
484 assert!(result.is_err());
485 assert!(
486 result
487 .unwrap_err()
488 .to_string()
489 .contains("missing key field")
490 );
491 }
492
493 #[test]
494 fn items_to_rows_preserves_full_data() {
495 let item = serde_json::json!({"id": "k1", "a": 1, "b": "two"});
496 let rows = items_to_rows(std::slice::from_ref(&item), "id").unwrap();
497 assert_eq!(rows[0].row_data, item);
498 }
499
500 #[test]
501 fn extract_cursor_string() {
502 let body = serde_json::json!({"meta": {"next": "cursor_abc"}});
503 assert_eq!(
504 extract_cursor(&body, "meta.next"),
505 Some("cursor_abc".into())
506 );
507 }
508
509 #[test]
510 fn extract_cursor_number() {
511 let body = serde_json::json!({"next_page": 5});
512 assert_eq!(extract_cursor(&body, "next_page"), Some("5".into()));
513 }
514
515 #[test]
516 fn extract_cursor_null_returns_none() {
517 let body = serde_json::json!({"meta": {"next": null}});
518 assert_eq!(extract_cursor(&body, "meta.next"), None);
519 }
520
521 #[test]
522 fn extract_cursor_missing_returns_none() {
523 let body = serde_json::json!({"data": []});
524 assert_eq!(extract_cursor(&body, "meta.next"), None);
525 }
526
527 fn make_config(base_url: &str) -> HttpSourceConfig {
528 serde_json::from_value(serde_json::json!({"dsn": base_url})).unwrap()
529 }
530}