Skip to main content

oversync_connectors/
http_source.rs

1use std::collections::HashMap;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use serde::Deserialize;
6use tokio::sync::mpsc;
7use tracing::debug;
8
9use oversync_core::error::OversyncError;
10use oversync_core::model::RawRow;
11use oversync_core::traits::OriginConnector;
12
13pub use crate::http_common::AuthConfig;
14use crate::http_common::{extract_cursor, extract_items, items_to_rows};
15
16#[derive(Debug, Clone, Deserialize)]
17pub struct HttpSourceConfig {
18	#[serde(rename = "dsn")]
19	pub base_url: String,
20	#[serde(default)]
21	pub headers: HashMap<String, String>,
22	#[serde(default)]
23	pub auth: Option<AuthConfig>,
24	#[serde(default)]
25	pub pagination: Option<PaginationConfig>,
26	#[serde(default)]
27	pub response_path: Option<String>,
28	#[serde(default = "default_timeout")]
29	pub timeout_secs: u64,
30}
31
32fn default_timeout() -> u64 {
33	30
34}
35
36#[derive(Debug, Clone, Deserialize)]
37#[serde(tag = "type", rename_all = "snake_case")]
38pub enum PaginationConfig {
39	Offset {
40		page_size: usize,
41		#[serde(default = "default_limit_param")]
42		limit_param: String,
43		#[serde(default = "default_offset_param")]
44		offset_param: String,
45	},
46	Cursor {
47		page_size: usize,
48		#[serde(default = "default_cursor_param")]
49		cursor_param: String,
50		cursor_path: String,
51	},
52}
53
54fn default_limit_param() -> String {
55	"limit".into()
56}
57fn default_offset_param() -> String {
58	"offset".into()
59}
60fn default_cursor_param() -> String {
61	"cursor".into()
62}
63
64pub struct HttpSource {
65	client: reqwest::Client,
66	config: HttpSourceConfig,
67	name: String,
68}
69
70impl HttpSource {
71	pub fn new(name: &str, config: HttpSourceConfig) -> Result<Self, OversyncError> {
72		let client = reqwest::Client::builder()
73			.timeout(Duration::from_secs(config.timeout_secs))
74			.build()
75			.map_err(|e| OversyncError::Connector(format!("http client: {e}")))?;
76
77		Ok(Self {
78			client,
79			config,
80			name: name.to_string(),
81		})
82	}
83
84	fn build_url(&self, path: &str) -> String {
85		format!("{}{}", self.config.base_url.trim_end_matches('/'), path)
86	}
87
88	fn apply_auth(&self, req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
89		crate::http_common::apply_auth(req, &self.config.auth)
90	}
91
92	fn apply_headers(&self, mut req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
93		for (k, v) in &self.config.headers {
94			req = req.header(k, v);
95		}
96		req
97	}
98
99	async fn request_page(
100		&self,
101		path: &str,
102		extra_params: &[(String, String)],
103	) -> Result<serde_json::Value, OversyncError> {
104		let url = self.build_url(path);
105		let mut req = self.client.get(&url);
106		req = self.apply_headers(req);
107		req = self.apply_auth(req);
108		if !extra_params.is_empty() {
109			req = req.query(extra_params);
110		}
111
112		let resp = req
113			.send()
114			.await
115			.map_err(|e| OversyncError::Connector(format!("http: {e}")))?;
116
117		let status = resp.status();
118		if !status.is_success() {
119			return Err(OversyncError::Connector(format!("http {status}: {url}")));
120		}
121
122		resp.json()
123			.await
124			.map_err(|e| OversyncError::Connector(format!("json decode: {e}")))
125	}
126}
127
128#[async_trait]
129impl OriginConnector for HttpSource {
130	fn name(&self) -> &str {
131		&self.name
132	}
133
134	async fn fetch_all(&self, sql: &str, key_column: &str) -> Result<Vec<RawRow>, OversyncError> {
135		let pagination = self.config.pagination.clone();
136		match pagination {
137			None => {
138				let body = self.request_page(sql, &[]).await?;
139				let items = extract_items(&body, &self.config.response_path);
140				debug!(count = items.len(), "fetched items from http");
141				items_to_rows(&items, key_column)
142			}
143			Some(PaginationConfig::Offset {
144				page_size,
145				ref limit_param,
146				ref offset_param,
147			}) => {
148				let mut all_rows = Vec::new();
149				let mut offset = 0usize;
150				loop {
151					let params = vec![
152						(limit_param.clone(), page_size.to_string()),
153						(offset_param.clone(), offset.to_string()),
154					];
155					let body = self.request_page(sql, &params).await?;
156					let items = extract_items(&body, &self.config.response_path);
157					if items.is_empty() {
158						break;
159					}
160					let count = items.len();
161					all_rows.extend(items_to_rows(&items, key_column)?);
162					if count < page_size {
163						break;
164					}
165					offset += count;
166				}
167				debug!(
168					count = all_rows.len(),
169					"fetched all pages from http (offset)"
170				);
171				Ok(all_rows)
172			}
173			Some(PaginationConfig::Cursor {
174				page_size: _,
175				ref cursor_param,
176				ref cursor_path,
177			}) => {
178				let mut all_rows = Vec::new();
179				let mut cursor: Option<String> = None;
180				loop {
181					let mut params = Vec::new();
182					if let Some(ref c) = cursor {
183						params.push((cursor_param.clone(), c.clone()));
184					}
185					let body = self.request_page(sql, &params).await?;
186					let items = extract_items(&body, &self.config.response_path);
187					if items.is_empty() {
188						break;
189					}
190					all_rows.extend(items_to_rows(&items, key_column)?);
191					cursor = extract_cursor(&body, cursor_path);
192					if cursor.is_none() {
193						break;
194					}
195				}
196				debug!(
197					count = all_rows.len(),
198					"fetched all pages from http (cursor)"
199				);
200				Ok(all_rows)
201			}
202		}
203	}
204
205	async fn fetch_into(
206		&self,
207		sql: &str,
208		key_column: &str,
209		batch_size: usize,
210		tx: mpsc::Sender<Vec<RawRow>>,
211	) -> Result<usize, OversyncError> {
212		let pagination = self.config.pagination.clone();
213		match pagination {
214			None => {
215				let all = self.fetch_all(sql, key_column).await?;
216				let total = all.len();
217				for chunk in all.chunks(batch_size) {
218					tx.send(chunk.to_vec())
219						.await
220						.map_err(|_| OversyncError::Internal("channel closed".into()))?;
221				}
222				Ok(total)
223			}
224			Some(PaginationConfig::Offset {
225				page_size,
226				ref limit_param,
227				ref offset_param,
228			}) => {
229				let mut total = 0usize;
230				let mut offset = 0usize;
231				loop {
232					let params = vec![
233						(limit_param.clone(), page_size.to_string()),
234						(offset_param.clone(), offset.to_string()),
235					];
236					let body = self.request_page(sql, &params).await?;
237					let items = extract_items(&body, &self.config.response_path);
238					if items.is_empty() {
239						break;
240					}
241					let count = items.len();
242					let rows = items_to_rows(&items, key_column)?;
243					total += rows.len();
244					tx.send(rows)
245						.await
246						.map_err(|_| OversyncError::Internal("channel closed".into()))?;
247					if count < page_size {
248						break;
249					}
250					offset += count;
251				}
252				Ok(total)
253			}
254			Some(PaginationConfig::Cursor {
255				page_size: _,
256				ref cursor_param,
257				ref cursor_path,
258			}) => {
259				let mut total = 0usize;
260				let mut cursor: Option<String> = None;
261				loop {
262					let mut params = Vec::new();
263					if let Some(ref c) = cursor {
264						params.push((cursor_param.clone(), c.clone()));
265					}
266					let body = self.request_page(sql, &params).await?;
267					let items = extract_items(&body, &self.config.response_path);
268					if items.is_empty() {
269						break;
270					}
271					let rows = items_to_rows(&items, key_column)?;
272					total += rows.len();
273					tx.send(rows)
274						.await
275						.map_err(|_| OversyncError::Internal("channel closed".into()))?;
276					cursor = extract_cursor(&body, cursor_path);
277					if cursor.is_none() {
278						break;
279					}
280				}
281				Ok(total)
282			}
283		}
284	}
285
286	async fn test_connection(&self) -> Result<(), OversyncError> {
287		let mut req = self.client.get(&self.config.base_url);
288		req = self.apply_headers(req);
289		req = self.apply_auth(req);
290		let resp = req
291			.send()
292			.await
293			.map_err(|e| OversyncError::Connector(format!("http test: {e}")))?;
294		if !resp.status().is_success() {
295			return Err(OversyncError::Connector(format!(
296				"http test: status {}",
297				resp.status()
298			)));
299		}
300		Ok(())
301	}
302}
303
304#[cfg(test)]
305mod tests {
306	use super::*;
307
308	#[test]
309	fn parse_config_minimal() {
310		let json = serde_json::json!({
311			"dsn": "https://api.example.com"
312		});
313		let config: HttpSourceConfig = serde_json::from_value(json).unwrap();
314		assert_eq!(config.base_url, "https://api.example.com");
315		assert!(config.headers.is_empty());
316		assert!(config.auth.is_none());
317		assert!(config.pagination.is_none());
318		assert!(config.response_path.is_none());
319		assert_eq!(config.timeout_secs, 30);
320	}
321
322	#[test]
323	fn parse_config_full() {
324		let json = serde_json::json!({
325			"dsn": "https://api.example.com",
326			"headers": {"Accept": "application/json", "X-Custom": "val"},
327			"auth": {"type": "bearer", "token": "sk-123"},
328			"pagination": {"type": "offset", "page_size": 50, "limit_param": "per_page", "offset_param": "page"},
329			"response_path": "data.items",
330			"timeout_secs": 60
331		});
332		let config: HttpSourceConfig = serde_json::from_value(json).unwrap();
333		assert_eq!(config.base_url, "https://api.example.com");
334		assert_eq!(config.headers.len(), 2);
335		assert!(matches!(config.auth, Some(AuthConfig::Bearer { ref token }) if token == "sk-123"));
336		assert_eq!(config.timeout_secs, 60);
337		assert_eq!(config.response_path.as_deref(), Some("data.items"));
338	}
339
340	#[test]
341	fn parse_auth_basic() {
342		let json = serde_json::json!({"type": "basic", "username": "user", "password": "pass"});
343		let auth: AuthConfig = serde_json::from_value(json).unwrap();
344		assert!(
345			matches!(auth, AuthConfig::Basic { ref username, ref password } if username == "user" && password == "pass")
346		);
347	}
348
349	#[test]
350	fn parse_auth_header() {
351		let json = serde_json::json!({"type": "header", "name": "X-API-Key", "value": "key123"});
352		let auth: AuthConfig = serde_json::from_value(json).unwrap();
353		assert!(
354			matches!(auth, AuthConfig::Header { ref name, ref value } if name == "X-API-Key" && value == "key123")
355		);
356	}
357
358	#[test]
359	fn parse_pagination_offset_defaults() {
360		let json = serde_json::json!({"type": "offset", "page_size": 100});
361		let pg: PaginationConfig = serde_json::from_value(json).unwrap();
362		match pg {
363			PaginationConfig::Offset {
364				page_size,
365				limit_param,
366				offset_param,
367			} => {
368				assert_eq!(page_size, 100);
369				assert_eq!(limit_param, "limit");
370				assert_eq!(offset_param, "offset");
371			}
372			_ => panic!("expected Offset"),
373		}
374	}
375
376	#[test]
377	fn parse_pagination_cursor() {
378		let json = serde_json::json!({
379			"type": "cursor",
380			"page_size": 50,
381			"cursor_param": "after",
382			"cursor_path": "meta.next_cursor"
383		});
384		let pg: PaginationConfig = serde_json::from_value(json).unwrap();
385		match pg {
386			PaginationConfig::Cursor {
387				page_size,
388				cursor_param,
389				cursor_path,
390			} => {
391				assert_eq!(page_size, 50);
392				assert_eq!(cursor_param, "after");
393				assert_eq!(cursor_path, "meta.next_cursor");
394			}
395			_ => panic!("expected Cursor"),
396		}
397	}
398
399	#[test]
400	fn build_url_joins_path() {
401		let config = make_config("https://api.example.com");
402		let source = HttpSource::new("test", config).unwrap();
403		assert_eq!(
404			source.build_url("/v1/items"),
405			"https://api.example.com/v1/items"
406		);
407	}
408
409	#[test]
410	fn build_url_strips_trailing_slash() {
411		let config = make_config("https://api.example.com/");
412		let source = HttpSource::new("test", config).unwrap();
413		assert_eq!(
414			source.build_url("/v1/items"),
415			"https://api.example.com/v1/items"
416		);
417	}
418
419	#[test]
420	fn extract_items_top_level_array() {
421		let body = serde_json::json!([{"id": 1}, {"id": 2}]);
422		let items = extract_items(&body, &None);
423		assert_eq!(items.len(), 2);
424	}
425
426	#[test]
427	fn extract_items_nested_path() {
428		let body = serde_json::json!({"data": {"items": [{"id": 1}, {"id": 2}, {"id": 3}]}});
429		let items = extract_items(&body, &Some("data.items".into()));
430		assert_eq!(items.len(), 3);
431	}
432
433	#[test]
434	fn extract_items_single_level_path() {
435		let body = serde_json::json!({"results": [{"id": "a"}]});
436		let items = extract_items(&body, &Some("results".into()));
437		assert_eq!(items.len(), 1);
438	}
439
440	#[test]
441	fn extract_items_empty_path_is_top_level() {
442		let body = serde_json::json!([{"id": 1}]);
443		let items = extract_items(&body, &Some("".into()));
444		assert_eq!(items.len(), 1);
445	}
446
447	#[test]
448	fn extract_items_invalid_path_returns_empty() {
449		let body = serde_json::json!({"data": [{"id": 1}]});
450		let items = extract_items(&body, &Some("nonexistent.path".into()));
451		assert!(items.is_empty());
452	}
453
454	#[test]
455	fn extract_items_non_array_returns_empty() {
456		let body = serde_json::json!({"data": "not an array"});
457		let items = extract_items(&body, &Some("data".into()));
458		assert!(items.is_empty());
459	}
460
461	#[test]
462	fn items_to_rows_string_key() {
463		let items = vec![
464			serde_json::json!({"id": "abc", "name": "Alpha"}),
465			serde_json::json!({"id": "def", "name": "Beta"}),
466		];
467		let rows = items_to_rows(&items, "id").unwrap();
468		assert_eq!(rows.len(), 2);
469		assert_eq!(rows[0].row_key, "abc");
470		assert_eq!(rows[1].row_key, "def");
471	}
472
473	#[test]
474	fn items_to_rows_numeric_key() {
475		let items = vec![serde_json::json!({"id": 42, "val": "x"})];
476		let rows = items_to_rows(&items, "id").unwrap();
477		assert_eq!(rows[0].row_key, "42");
478	}
479
480	#[test]
481	fn items_to_rows_missing_key_errors() {
482		let items = vec![serde_json::json!({"name": "no id field"})];
483		let result = items_to_rows(&items, "id");
484		assert!(result.is_err());
485		assert!(
486			result
487				.unwrap_err()
488				.to_string()
489				.contains("missing key field")
490		);
491	}
492
493	#[test]
494	fn items_to_rows_preserves_full_data() {
495		let item = serde_json::json!({"id": "k1", "a": 1, "b": "two"});
496		let rows = items_to_rows(std::slice::from_ref(&item), "id").unwrap();
497		assert_eq!(rows[0].row_data, item);
498	}
499
500	#[test]
501	fn extract_cursor_string() {
502		let body = serde_json::json!({"meta": {"next": "cursor_abc"}});
503		assert_eq!(
504			extract_cursor(&body, "meta.next"),
505			Some("cursor_abc".into())
506		);
507	}
508
509	#[test]
510	fn extract_cursor_number() {
511		let body = serde_json::json!({"next_page": 5});
512		assert_eq!(extract_cursor(&body, "next_page"), Some("5".into()));
513	}
514
515	#[test]
516	fn extract_cursor_null_returns_none() {
517		let body = serde_json::json!({"meta": {"next": null}});
518		assert_eq!(extract_cursor(&body, "meta.next"), None);
519	}
520
521	#[test]
522	fn extract_cursor_missing_returns_none() {
523		let body = serde_json::json!({"data": []});
524		assert_eq!(extract_cursor(&body, "meta.next"), None);
525	}
526
527	fn make_config(base_url: &str) -> HttpSourceConfig {
528		serde_json::from_value(serde_json::json!({"dsn": base_url})).unwrap()
529	}
530}