Skip to main content

oversync_connectors/
http_source.rs

1use std::collections::HashMap;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use serde::Deserialize;
6use tokio::sync::mpsc;
7use tracing::debug;
8
9use oversync_core::error::OversyncError;
10use oversync_core::model::RawRow;
11use oversync_core::traits::OriginConnector;
12
13pub use crate::http_common::AuthConfig;
14use crate::http_common::{extract_cursor, extract_items, items_to_rows};
15
16#[derive(Debug, Clone, Deserialize)]
17pub struct HttpSourceConfig {
18	#[serde(rename = "dsn")]
19	pub base_url: String,
20	#[serde(default)]
21	pub headers: HashMap<String, String>,
22	#[serde(default)]
23	pub auth: Option<AuthConfig>,
24	#[serde(default)]
25	pub pagination: Option<PaginationConfig>,
26	#[serde(default)]
27	pub response_path: Option<String>,
28	#[serde(default = "default_timeout")]
29	pub timeout_secs: u64,
30}
31
32fn default_timeout() -> u64 {
33	30
34}
35
36#[derive(Debug, Clone, Deserialize)]
37#[serde(tag = "type", rename_all = "snake_case")]
38pub enum PaginationConfig {
39	Offset {
40		page_size: usize,
41		#[serde(default = "default_limit_param")]
42		limit_param: String,
43		#[serde(default = "default_offset_param")]
44		offset_param: String,
45	},
46	Cursor {
47		page_size: usize,
48		#[serde(default = "default_cursor_param")]
49		cursor_param: String,
50		cursor_path: String,
51	},
52}
53
54fn default_limit_param() -> String {
55	"limit".into()
56}
57fn default_offset_param() -> String {
58	"offset".into()
59}
60fn default_cursor_param() -> String {
61	"cursor".into()
62}
63
64fn build_request_url(
65	base_url: &str,
66	path: &str,
67	extra_params: &[(String, String)],
68) -> Result<reqwest::Url, OversyncError> {
69	let mut url = reqwest::Url::parse(&format!("{}{}", base_url.trim_end_matches('/'), path))
70		.map_err(|e| OversyncError::Connector(format!("invalid http url: {e}")))?;
71	if !extra_params.is_empty() {
72		let mut pairs = url.query_pairs_mut();
73		for (key, value) in extra_params {
74			pairs.append_pair(key, value);
75		}
76		drop(pairs);
77	}
78	Ok(url)
79}
80
81pub struct HttpSource {
82	client: reqwest::Client,
83	config: HttpSourceConfig,
84	name: String,
85}
86
87impl HttpSource {
88	pub fn new(name: &str, config: HttpSourceConfig) -> Result<Self, OversyncError> {
89		let client = reqwest::Client::builder()
90			.timeout(Duration::from_secs(config.timeout_secs))
91			.build()
92			.map_err(|e| OversyncError::Connector(format!("http client: {e}")))?;
93
94		Ok(Self {
95			client,
96			config,
97			name: name.to_string(),
98		})
99	}
100
101	fn apply_auth(&self, req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
102		crate::http_common::apply_auth(req, &self.config.auth)
103	}
104
105	fn apply_headers(&self, mut req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
106		for (k, v) in &self.config.headers {
107			req = req.header(k, v);
108		}
109		req
110	}
111
112	async fn request_page(
113		&self,
114		path: &str,
115		extra_params: &[(String, String)],
116	) -> Result<serde_json::Value, OversyncError> {
117		let url = build_request_url(&self.config.base_url, path, extra_params)?;
118		let mut req = self.client.get(url.clone());
119		req = self.apply_headers(req);
120		req = self.apply_auth(req);
121
122		let resp = req
123			.send()
124			.await
125			.map_err(|e| OversyncError::Connector(format!("http: {e}")))?;
126
127		let status = resp.status();
128		if !status.is_success() {
129			return Err(OversyncError::Connector(format!("http {status}: {url}")));
130		}
131
132		resp.json()
133			.await
134			.map_err(|e| OversyncError::Connector(format!("json decode: {e}")))
135	}
136}
137
138#[async_trait]
139impl OriginConnector for HttpSource {
140	fn name(&self) -> &str {
141		&self.name
142	}
143
144	async fn fetch_all(&self, sql: &str, key_column: &str) -> Result<Vec<RawRow>, OversyncError> {
145		let pagination = self.config.pagination.clone();
146		match pagination {
147			None => {
148				let body = self.request_page(sql, &[]).await?;
149				let items = extract_items(&body, &self.config.response_path);
150				debug!(count = items.len(), "fetched items from http");
151				items_to_rows(&items, key_column)
152			}
153			Some(PaginationConfig::Offset {
154				page_size,
155				ref limit_param,
156				ref offset_param,
157			}) => {
158				let mut all_rows = Vec::new();
159				let mut offset = 0usize;
160				loop {
161					let params = vec![
162						(limit_param.clone(), page_size.to_string()),
163						(offset_param.clone(), offset.to_string()),
164					];
165					let body = self.request_page(sql, &params).await?;
166					let items = extract_items(&body, &self.config.response_path);
167					if items.is_empty() {
168						break;
169					}
170					let count = items.len();
171					all_rows.extend(items_to_rows(&items, key_column)?);
172					if count < page_size {
173						break;
174					}
175					offset += count;
176				}
177				debug!(
178					count = all_rows.len(),
179					"fetched all pages from http (offset)"
180				);
181				Ok(all_rows)
182			}
183			Some(PaginationConfig::Cursor {
184				page_size: _,
185				ref cursor_param,
186				ref cursor_path,
187			}) => {
188				let mut all_rows = Vec::new();
189				let mut cursor: Option<String> = None;
190				loop {
191					let mut params = Vec::new();
192					if let Some(ref c) = cursor {
193						params.push((cursor_param.clone(), c.clone()));
194					}
195					let body = self.request_page(sql, &params).await?;
196					let items = extract_items(&body, &self.config.response_path);
197					if items.is_empty() {
198						break;
199					}
200					all_rows.extend(items_to_rows(&items, key_column)?);
201					cursor = extract_cursor(&body, cursor_path);
202					if cursor.is_none() {
203						break;
204					}
205				}
206				debug!(
207					count = all_rows.len(),
208					"fetched all pages from http (cursor)"
209				);
210				Ok(all_rows)
211			}
212		}
213	}
214
215	async fn fetch_into(
216		&self,
217		sql: &str,
218		key_column: &str,
219		batch_size: usize,
220		tx: mpsc::Sender<Vec<RawRow>>,
221	) -> Result<usize, OversyncError> {
222		let pagination = self.config.pagination.clone();
223		match pagination {
224			None => {
225				let all = self.fetch_all(sql, key_column).await?;
226				let total = all.len();
227				for chunk in all.chunks(batch_size) {
228					tx.send(chunk.to_vec())
229						.await
230						.map_err(|_| OversyncError::Internal("channel closed".into()))?;
231				}
232				Ok(total)
233			}
234			Some(PaginationConfig::Offset {
235				page_size,
236				ref limit_param,
237				ref offset_param,
238			}) => {
239				let mut total = 0usize;
240				let mut offset = 0usize;
241				loop {
242					let params = vec![
243						(limit_param.clone(), page_size.to_string()),
244						(offset_param.clone(), offset.to_string()),
245					];
246					let body = self.request_page(sql, &params).await?;
247					let items = extract_items(&body, &self.config.response_path);
248					if items.is_empty() {
249						break;
250					}
251					let count = items.len();
252					let rows = items_to_rows(&items, key_column)?;
253					total += rows.len();
254					tx.send(rows)
255						.await
256						.map_err(|_| OversyncError::Internal("channel closed".into()))?;
257					if count < page_size {
258						break;
259					}
260					offset += count;
261				}
262				Ok(total)
263			}
264			Some(PaginationConfig::Cursor {
265				page_size: _,
266				ref cursor_param,
267				ref cursor_path,
268			}) => {
269				let mut total = 0usize;
270				let mut cursor: Option<String> = None;
271				loop {
272					let mut params = Vec::new();
273					if let Some(ref c) = cursor {
274						params.push((cursor_param.clone(), c.clone()));
275					}
276					let body = self.request_page(sql, &params).await?;
277					let items = extract_items(&body, &self.config.response_path);
278					if items.is_empty() {
279						break;
280					}
281					let rows = items_to_rows(&items, key_column)?;
282					total += rows.len();
283					tx.send(rows)
284						.await
285						.map_err(|_| OversyncError::Internal("channel closed".into()))?;
286					cursor = extract_cursor(&body, cursor_path);
287					if cursor.is_none() {
288						break;
289					}
290				}
291				Ok(total)
292			}
293		}
294	}
295
296	async fn test_connection(&self) -> Result<(), OversyncError> {
297		let mut req = self.client.get(&self.config.base_url);
298		req = self.apply_headers(req);
299		req = self.apply_auth(req);
300		let resp = req
301			.send()
302			.await
303			.map_err(|e| OversyncError::Connector(format!("http test: {e}")))?;
304		if !resp.status().is_success() {
305			return Err(OversyncError::Connector(format!(
306				"http test: status {}",
307				resp.status()
308			)));
309		}
310		Ok(())
311	}
312}
313
314#[cfg(test)]
315mod tests {
316	use super::*;
317
318	#[test]
319	fn parse_config_minimal() {
320		let json = serde_json::json!({
321			"dsn": "https://api.example.com"
322		});
323		let config: HttpSourceConfig = serde_json::from_value(json).unwrap();
324		assert_eq!(config.base_url, "https://api.example.com");
325		assert!(config.headers.is_empty());
326		assert!(config.auth.is_none());
327		assert!(config.pagination.is_none());
328		assert!(config.response_path.is_none());
329		assert_eq!(config.timeout_secs, 30);
330	}
331
332	#[test]
333	fn parse_config_full() {
334		let json = serde_json::json!({
335			"dsn": "https://api.example.com",
336			"headers": {"Accept": "application/json", "X-Custom": "val"},
337			"auth": {"type": "bearer", "token": "sk-123"},
338			"pagination": {"type": "offset", "page_size": 50, "limit_param": "per_page", "offset_param": "page"},
339			"response_path": "data.items",
340			"timeout_secs": 60
341		});
342		let config: HttpSourceConfig = serde_json::from_value(json).unwrap();
343		assert_eq!(config.base_url, "https://api.example.com");
344		assert_eq!(config.headers.len(), 2);
345		assert!(matches!(config.auth, Some(AuthConfig::Bearer { ref token }) if token == "sk-123"));
346		assert_eq!(config.timeout_secs, 60);
347		assert_eq!(config.response_path.as_deref(), Some("data.items"));
348	}
349
350	#[test]
351	fn parse_auth_basic() {
352		let json = serde_json::json!({"type": "basic", "username": "user", "password": "pass"});
353		let auth: AuthConfig = serde_json::from_value(json).unwrap();
354		assert!(
355			matches!(auth, AuthConfig::Basic { ref username, ref password } if username == "user" && password == "pass")
356		);
357	}
358
359	#[test]
360	fn parse_auth_header() {
361		let json = serde_json::json!({"type": "header", "name": "X-API-Key", "value": "key123"});
362		let auth: AuthConfig = serde_json::from_value(json).unwrap();
363		assert!(
364			matches!(auth, AuthConfig::Header { ref name, ref value } if name == "X-API-Key" && value == "key123")
365		);
366	}
367
368	#[test]
369	fn parse_pagination_offset_defaults() {
370		let json = serde_json::json!({"type": "offset", "page_size": 100});
371		let pg: PaginationConfig = serde_json::from_value(json).unwrap();
372		match pg {
373			PaginationConfig::Offset {
374				page_size,
375				limit_param,
376				offset_param,
377			} => {
378				assert_eq!(page_size, 100);
379				assert_eq!(limit_param, "limit");
380				assert_eq!(offset_param, "offset");
381			}
382			_ => panic!("expected Offset"),
383		}
384	}
385
386	#[test]
387	fn parse_pagination_cursor() {
388		let json = serde_json::json!({
389			"type": "cursor",
390			"page_size": 50,
391			"cursor_param": "after",
392			"cursor_path": "meta.next_cursor"
393		});
394		let pg: PaginationConfig = serde_json::from_value(json).unwrap();
395		match pg {
396			PaginationConfig::Cursor {
397				page_size,
398				cursor_param,
399				cursor_path,
400			} => {
401				assert_eq!(page_size, 50);
402				assert_eq!(cursor_param, "after");
403				assert_eq!(cursor_path, "meta.next_cursor");
404			}
405			_ => panic!("expected Cursor"),
406		}
407	}
408
409	#[test]
410	fn build_url_joins_path() {
411		let url = build_request_url("https://api.example.com", "/v1/items", &[])
412			.expect("url should build");
413		assert_eq!(url.as_str(), "https://api.example.com/v1/items");
414	}
415
416	#[test]
417	fn build_url_appends_query_pairs() {
418		let url = build_request_url(
419			"http://example.test/api",
420			"/items",
421			&[
422				("limit".to_string(), "50".to_string()),
423				("offset".to_string(), "100".to_string()),
424			],
425		)
426		.expect("url should build");
427
428		assert_eq!(
429			url.as_str(),
430			"http://example.test/api/items?limit=50&offset=100"
431		);
432	}
433
434	#[test]
435	fn build_url_strips_trailing_slash() {
436		let url = build_request_url("https://api.example.com/", "/v1/items", &[])
437			.expect("url should build");
438		assert_eq!(url.as_str(), "https://api.example.com/v1/items");
439	}
440
441	#[test]
442	fn extract_items_top_level_array() {
443		let body = serde_json::json!([{"id": 1}, {"id": 2}]);
444		let items = extract_items(&body, &None);
445		assert_eq!(items.len(), 2);
446	}
447
448	#[test]
449	fn extract_items_nested_path() {
450		let body = serde_json::json!({"data": {"items": [{"id": 1}, {"id": 2}, {"id": 3}]}});
451		let items = extract_items(&body, &Some("data.items".into()));
452		assert_eq!(items.len(), 3);
453	}
454
455	#[test]
456	fn extract_items_single_level_path() {
457		let body = serde_json::json!({"results": [{"id": "a"}]});
458		let items = extract_items(&body, &Some("results".into()));
459		assert_eq!(items.len(), 1);
460	}
461
462	#[test]
463	fn extract_items_empty_path_is_top_level() {
464		let body = serde_json::json!([{"id": 1}]);
465		let items = extract_items(&body, &Some("".into()));
466		assert_eq!(items.len(), 1);
467	}
468
469	#[test]
470	fn extract_items_invalid_path_returns_empty() {
471		let body = serde_json::json!({"data": [{"id": 1}]});
472		let items = extract_items(&body, &Some("nonexistent.path".into()));
473		assert!(items.is_empty());
474	}
475
476	#[test]
477	fn extract_items_non_array_returns_empty() {
478		let body = serde_json::json!({"data": "not an array"});
479		let items = extract_items(&body, &Some("data".into()));
480		assert!(items.is_empty());
481	}
482
483	#[test]
484	fn items_to_rows_string_key() {
485		let items = vec![
486			serde_json::json!({"id": "abc", "name": "Alpha"}),
487			serde_json::json!({"id": "def", "name": "Beta"}),
488		];
489		let rows = items_to_rows(&items, "id").unwrap();
490		assert_eq!(rows.len(), 2);
491		assert_eq!(rows[0].row_key, "abc");
492		assert_eq!(rows[1].row_key, "def");
493	}
494
495	#[test]
496	fn items_to_rows_numeric_key() {
497		let items = vec![serde_json::json!({"id": 42, "val": "x"})];
498		let rows = items_to_rows(&items, "id").unwrap();
499		assert_eq!(rows[0].row_key, "42");
500	}
501
502	#[test]
503	fn items_to_rows_missing_key_errors() {
504		let items = vec![serde_json::json!({"name": "no id field"})];
505		let result = items_to_rows(&items, "id");
506		assert!(result.is_err());
507		assert!(
508			result
509				.unwrap_err()
510				.to_string()
511				.contains("missing key field")
512		);
513	}
514
515	#[test]
516	fn items_to_rows_preserves_full_data() {
517		let item = serde_json::json!({"id": "k1", "a": 1, "b": "two"});
518		let rows = items_to_rows(std::slice::from_ref(&item), "id").unwrap();
519		assert_eq!(rows[0].row_data, item);
520	}
521
522	#[test]
523	fn extract_cursor_string() {
524		let body = serde_json::json!({"meta": {"next": "cursor_abc"}});
525		assert_eq!(
526			extract_cursor(&body, "meta.next"),
527			Some("cursor_abc".into())
528		);
529	}
530
531	#[test]
532	fn extract_cursor_number() {
533		let body = serde_json::json!({"next_page": 5});
534		assert_eq!(extract_cursor(&body, "next_page"), Some("5".into()));
535	}
536
537	#[test]
538	fn extract_cursor_null_returns_none() {
539		let body = serde_json::json!({"meta": {"next": null}});
540		assert_eq!(extract_cursor(&body, "meta.next"), None);
541	}
542
543	#[test]
544	fn extract_cursor_missing_returns_none() {
545		let body = serde_json::json!({"data": []});
546		assert_eq!(extract_cursor(&body, "meta.next"), None);
547	}
548}