1use std::collections::HashMap;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use serde::Deserialize;
6use tokio::sync::mpsc;
7use tracing::debug;
8
9use oversync_core::error::OversyncError;
10use oversync_core::model::RawRow;
11use oversync_core::traits::OriginConnector;
12
13pub use crate::http_common::AuthConfig;
14use crate::http_common::{extract_cursor, extract_items, items_to_rows};
15
16#[derive(Debug, Clone, Deserialize)]
17pub struct HttpSourceConfig {
18 #[serde(rename = "dsn")]
19 pub base_url: String,
20 #[serde(default)]
21 pub headers: HashMap<String, String>,
22 #[serde(default)]
23 pub auth: Option<AuthConfig>,
24 #[serde(default)]
25 pub pagination: Option<PaginationConfig>,
26 #[serde(default)]
27 pub response_path: Option<String>,
28 #[serde(default = "default_timeout")]
29 pub timeout_secs: u64,
30}
31
32fn default_timeout() -> u64 {
33 30
34}
35
36#[derive(Debug, Clone, Deserialize)]
37#[serde(tag = "type", rename_all = "snake_case")]
38pub enum PaginationConfig {
39 Offset {
40 page_size: usize,
41 #[serde(default = "default_limit_param")]
42 limit_param: String,
43 #[serde(default = "default_offset_param")]
44 offset_param: String,
45 },
46 Cursor {
47 page_size: usize,
48 #[serde(default = "default_cursor_param")]
49 cursor_param: String,
50 cursor_path: String,
51 },
52}
53
54fn default_limit_param() -> String {
55 "limit".into()
56}
57fn default_offset_param() -> String {
58 "offset".into()
59}
60fn default_cursor_param() -> String {
61 "cursor".into()
62}
63
64fn build_request_url(
65 base_url: &str,
66 path: &str,
67 extra_params: &[(String, String)],
68) -> Result<reqwest::Url, OversyncError> {
69 let mut url = reqwest::Url::parse(&format!("{}{}", base_url.trim_end_matches('/'), path))
70 .map_err(|e| OversyncError::Connector(format!("invalid http url: {e}")))?;
71 if !extra_params.is_empty() {
72 let mut pairs = url.query_pairs_mut();
73 for (key, value) in extra_params {
74 pairs.append_pair(key, value);
75 }
76 drop(pairs);
77 }
78 Ok(url)
79}
80
81pub struct HttpSource {
82 client: reqwest::Client,
83 config: HttpSourceConfig,
84 name: String,
85}
86
87impl HttpSource {
88 pub fn new(name: &str, config: HttpSourceConfig) -> Result<Self, OversyncError> {
89 let client = reqwest::Client::builder()
90 .timeout(Duration::from_secs(config.timeout_secs))
91 .build()
92 .map_err(|e| OversyncError::Connector(format!("http client: {e}")))?;
93
94 Ok(Self {
95 client,
96 config,
97 name: name.to_string(),
98 })
99 }
100
101 fn apply_auth(&self, req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
102 crate::http_common::apply_auth(req, &self.config.auth)
103 }
104
105 fn apply_headers(&self, mut req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
106 for (k, v) in &self.config.headers {
107 req = req.header(k, v);
108 }
109 req
110 }
111
112 async fn request_page(
113 &self,
114 path: &str,
115 extra_params: &[(String, String)],
116 ) -> Result<serde_json::Value, OversyncError> {
117 let url = build_request_url(&self.config.base_url, path, extra_params)?;
118 let mut req = self.client.get(url.clone());
119 req = self.apply_headers(req);
120 req = self.apply_auth(req);
121
122 let resp = req
123 .send()
124 .await
125 .map_err(|e| OversyncError::Connector(format!("http: {e}")))?;
126
127 let status = resp.status();
128 if !status.is_success() {
129 return Err(OversyncError::Connector(format!("http {status}: {url}")));
130 }
131
132 resp.json()
133 .await
134 .map_err(|e| OversyncError::Connector(format!("json decode: {e}")))
135 }
136}
137
138#[async_trait]
139impl OriginConnector for HttpSource {
140 fn name(&self) -> &str {
141 &self.name
142 }
143
144 async fn fetch_all(&self, sql: &str, key_column: &str) -> Result<Vec<RawRow>, OversyncError> {
145 let pagination = self.config.pagination.clone();
146 match pagination {
147 None => {
148 let body = self.request_page(sql, &[]).await?;
149 let items = extract_items(&body, &self.config.response_path);
150 debug!(count = items.len(), "fetched items from http");
151 items_to_rows(&items, key_column)
152 }
153 Some(PaginationConfig::Offset {
154 page_size,
155 ref limit_param,
156 ref offset_param,
157 }) => {
158 let mut all_rows = Vec::new();
159 let mut offset = 0usize;
160 loop {
161 let params = vec![
162 (limit_param.clone(), page_size.to_string()),
163 (offset_param.clone(), offset.to_string()),
164 ];
165 let body = self.request_page(sql, ¶ms).await?;
166 let items = extract_items(&body, &self.config.response_path);
167 if items.is_empty() {
168 break;
169 }
170 let count = items.len();
171 all_rows.extend(items_to_rows(&items, key_column)?);
172 if count < page_size {
173 break;
174 }
175 offset += count;
176 }
177 debug!(
178 count = all_rows.len(),
179 "fetched all pages from http (offset)"
180 );
181 Ok(all_rows)
182 }
183 Some(PaginationConfig::Cursor {
184 page_size: _,
185 ref cursor_param,
186 ref cursor_path,
187 }) => {
188 let mut all_rows = Vec::new();
189 let mut cursor: Option<String> = None;
190 loop {
191 let mut params = Vec::new();
192 if let Some(ref c) = cursor {
193 params.push((cursor_param.clone(), c.clone()));
194 }
195 let body = self.request_page(sql, ¶ms).await?;
196 let items = extract_items(&body, &self.config.response_path);
197 if items.is_empty() {
198 break;
199 }
200 all_rows.extend(items_to_rows(&items, key_column)?);
201 cursor = extract_cursor(&body, cursor_path);
202 if cursor.is_none() {
203 break;
204 }
205 }
206 debug!(
207 count = all_rows.len(),
208 "fetched all pages from http (cursor)"
209 );
210 Ok(all_rows)
211 }
212 }
213 }
214
215 async fn fetch_into(
216 &self,
217 sql: &str,
218 key_column: &str,
219 batch_size: usize,
220 tx: mpsc::Sender<Vec<RawRow>>,
221 ) -> Result<usize, OversyncError> {
222 let pagination = self.config.pagination.clone();
223 match pagination {
224 None => {
225 let all = self.fetch_all(sql, key_column).await?;
226 let total = all.len();
227 for chunk in all.chunks(batch_size) {
228 tx.send(chunk.to_vec())
229 .await
230 .map_err(|_| OversyncError::Internal("channel closed".into()))?;
231 }
232 Ok(total)
233 }
234 Some(PaginationConfig::Offset {
235 page_size,
236 ref limit_param,
237 ref offset_param,
238 }) => {
239 let mut total = 0usize;
240 let mut offset = 0usize;
241 loop {
242 let params = vec![
243 (limit_param.clone(), page_size.to_string()),
244 (offset_param.clone(), offset.to_string()),
245 ];
246 let body = self.request_page(sql, ¶ms).await?;
247 let items = extract_items(&body, &self.config.response_path);
248 if items.is_empty() {
249 break;
250 }
251 let count = items.len();
252 let rows = items_to_rows(&items, key_column)?;
253 total += rows.len();
254 tx.send(rows)
255 .await
256 .map_err(|_| OversyncError::Internal("channel closed".into()))?;
257 if count < page_size {
258 break;
259 }
260 offset += count;
261 }
262 Ok(total)
263 }
264 Some(PaginationConfig::Cursor {
265 page_size: _,
266 ref cursor_param,
267 ref cursor_path,
268 }) => {
269 let mut total = 0usize;
270 let mut cursor: Option<String> = None;
271 loop {
272 let mut params = Vec::new();
273 if let Some(ref c) = cursor {
274 params.push((cursor_param.clone(), c.clone()));
275 }
276 let body = self.request_page(sql, ¶ms).await?;
277 let items = extract_items(&body, &self.config.response_path);
278 if items.is_empty() {
279 break;
280 }
281 let rows = items_to_rows(&items, key_column)?;
282 total += rows.len();
283 tx.send(rows)
284 .await
285 .map_err(|_| OversyncError::Internal("channel closed".into()))?;
286 cursor = extract_cursor(&body, cursor_path);
287 if cursor.is_none() {
288 break;
289 }
290 }
291 Ok(total)
292 }
293 }
294 }
295
296 async fn test_connection(&self) -> Result<(), OversyncError> {
297 let mut req = self.client.get(&self.config.base_url);
298 req = self.apply_headers(req);
299 req = self.apply_auth(req);
300 let resp = req
301 .send()
302 .await
303 .map_err(|e| OversyncError::Connector(format!("http test: {e}")))?;
304 if !resp.status().is_success() {
305 return Err(OversyncError::Connector(format!(
306 "http test: status {}",
307 resp.status()
308 )));
309 }
310 Ok(())
311 }
312}
313
314#[cfg(test)]
315mod tests {
316 use super::*;
317
318 #[test]
319 fn parse_config_minimal() {
320 let json = serde_json::json!({
321 "dsn": "https://api.example.com"
322 });
323 let config: HttpSourceConfig = serde_json::from_value(json).unwrap();
324 assert_eq!(config.base_url, "https://api.example.com");
325 assert!(config.headers.is_empty());
326 assert!(config.auth.is_none());
327 assert!(config.pagination.is_none());
328 assert!(config.response_path.is_none());
329 assert_eq!(config.timeout_secs, 30);
330 }
331
332 #[test]
333 fn parse_config_full() {
334 let json = serde_json::json!({
335 "dsn": "https://api.example.com",
336 "headers": {"Accept": "application/json", "X-Custom": "val"},
337 "auth": {"type": "bearer", "token": "sk-123"},
338 "pagination": {"type": "offset", "page_size": 50, "limit_param": "per_page", "offset_param": "page"},
339 "response_path": "data.items",
340 "timeout_secs": 60
341 });
342 let config: HttpSourceConfig = serde_json::from_value(json).unwrap();
343 assert_eq!(config.base_url, "https://api.example.com");
344 assert_eq!(config.headers.len(), 2);
345 assert!(matches!(config.auth, Some(AuthConfig::Bearer { ref token }) if token == "sk-123"));
346 assert_eq!(config.timeout_secs, 60);
347 assert_eq!(config.response_path.as_deref(), Some("data.items"));
348 }
349
350 #[test]
351 fn parse_auth_basic() {
352 let json = serde_json::json!({"type": "basic", "username": "user", "password": "pass"});
353 let auth: AuthConfig = serde_json::from_value(json).unwrap();
354 assert!(
355 matches!(auth, AuthConfig::Basic { ref username, ref password } if username == "user" && password == "pass")
356 );
357 }
358
359 #[test]
360 fn parse_auth_header() {
361 let json = serde_json::json!({"type": "header", "name": "X-API-Key", "value": "key123"});
362 let auth: AuthConfig = serde_json::from_value(json).unwrap();
363 assert!(
364 matches!(auth, AuthConfig::Header { ref name, ref value } if name == "X-API-Key" && value == "key123")
365 );
366 }
367
368 #[test]
369 fn parse_pagination_offset_defaults() {
370 let json = serde_json::json!({"type": "offset", "page_size": 100});
371 let pg: PaginationConfig = serde_json::from_value(json).unwrap();
372 match pg {
373 PaginationConfig::Offset {
374 page_size,
375 limit_param,
376 offset_param,
377 } => {
378 assert_eq!(page_size, 100);
379 assert_eq!(limit_param, "limit");
380 assert_eq!(offset_param, "offset");
381 }
382 _ => panic!("expected Offset"),
383 }
384 }
385
386 #[test]
387 fn parse_pagination_cursor() {
388 let json = serde_json::json!({
389 "type": "cursor",
390 "page_size": 50,
391 "cursor_param": "after",
392 "cursor_path": "meta.next_cursor"
393 });
394 let pg: PaginationConfig = serde_json::from_value(json).unwrap();
395 match pg {
396 PaginationConfig::Cursor {
397 page_size,
398 cursor_param,
399 cursor_path,
400 } => {
401 assert_eq!(page_size, 50);
402 assert_eq!(cursor_param, "after");
403 assert_eq!(cursor_path, "meta.next_cursor");
404 }
405 _ => panic!("expected Cursor"),
406 }
407 }
408
409 #[test]
410 fn build_url_joins_path() {
411 let url = build_request_url("https://api.example.com", "/v1/items", &[])
412 .expect("url should build");
413 assert_eq!(url.as_str(), "https://api.example.com/v1/items");
414 }
415
416 #[test]
417 fn build_url_appends_query_pairs() {
418 let url = build_request_url(
419 "http://example.test/api",
420 "/items",
421 &[
422 ("limit".to_string(), "50".to_string()),
423 ("offset".to_string(), "100".to_string()),
424 ],
425 )
426 .expect("url should build");
427
428 assert_eq!(
429 url.as_str(),
430 "http://example.test/api/items?limit=50&offset=100"
431 );
432 }
433
434 #[test]
435 fn build_url_strips_trailing_slash() {
436 let url = build_request_url("https://api.example.com/", "/v1/items", &[])
437 .expect("url should build");
438 assert_eq!(url.as_str(), "https://api.example.com/v1/items");
439 }
440
441 #[test]
442 fn extract_items_top_level_array() {
443 let body = serde_json::json!([{"id": 1}, {"id": 2}]);
444 let items = extract_items(&body, &None);
445 assert_eq!(items.len(), 2);
446 }
447
448 #[test]
449 fn extract_items_nested_path() {
450 let body = serde_json::json!({"data": {"items": [{"id": 1}, {"id": 2}, {"id": 3}]}});
451 let items = extract_items(&body, &Some("data.items".into()));
452 assert_eq!(items.len(), 3);
453 }
454
455 #[test]
456 fn extract_items_single_level_path() {
457 let body = serde_json::json!({"results": [{"id": "a"}]});
458 let items = extract_items(&body, &Some("results".into()));
459 assert_eq!(items.len(), 1);
460 }
461
462 #[test]
463 fn extract_items_empty_path_is_top_level() {
464 let body = serde_json::json!([{"id": 1}]);
465 let items = extract_items(&body, &Some("".into()));
466 assert_eq!(items.len(), 1);
467 }
468
469 #[test]
470 fn extract_items_invalid_path_returns_empty() {
471 let body = serde_json::json!({"data": [{"id": 1}]});
472 let items = extract_items(&body, &Some("nonexistent.path".into()));
473 assert!(items.is_empty());
474 }
475
476 #[test]
477 fn extract_items_non_array_returns_empty() {
478 let body = serde_json::json!({"data": "not an array"});
479 let items = extract_items(&body, &Some("data".into()));
480 assert!(items.is_empty());
481 }
482
483 #[test]
484 fn items_to_rows_string_key() {
485 let items = vec![
486 serde_json::json!({"id": "abc", "name": "Alpha"}),
487 serde_json::json!({"id": "def", "name": "Beta"}),
488 ];
489 let rows = items_to_rows(&items, "id").unwrap();
490 assert_eq!(rows.len(), 2);
491 assert_eq!(rows[0].row_key, "abc");
492 assert_eq!(rows[1].row_key, "def");
493 }
494
495 #[test]
496 fn items_to_rows_numeric_key() {
497 let items = vec![serde_json::json!({"id": 42, "val": "x"})];
498 let rows = items_to_rows(&items, "id").unwrap();
499 assert_eq!(rows[0].row_key, "42");
500 }
501
502 #[test]
503 fn items_to_rows_missing_key_errors() {
504 let items = vec![serde_json::json!({"name": "no id field"})];
505 let result = items_to_rows(&items, "id");
506 assert!(result.is_err());
507 assert!(
508 result
509 .unwrap_err()
510 .to_string()
511 .contains("missing key field")
512 );
513 }
514
515 #[test]
516 fn items_to_rows_preserves_full_data() {
517 let item = serde_json::json!({"id": "k1", "a": 1, "b": "two"});
518 let rows = items_to_rows(std::slice::from_ref(&item), "id").unwrap();
519 assert_eq!(rows[0].row_data, item);
520 }
521
522 #[test]
523 fn extract_cursor_string() {
524 let body = serde_json::json!({"meta": {"next": "cursor_abc"}});
525 assert_eq!(
526 extract_cursor(&body, "meta.next"),
527 Some("cursor_abc".into())
528 );
529 }
530
531 #[test]
532 fn extract_cursor_number() {
533 let body = serde_json::json!({"next_page": 5});
534 assert_eq!(extract_cursor(&body, "next_page"), Some("5".into()));
535 }
536
537 #[test]
538 fn extract_cursor_null_returns_none() {
539 let body = serde_json::json!({"meta": {"next": null}});
540 assert_eq!(extract_cursor(&body, "meta.next"), None);
541 }
542
543 #[test]
544 fn extract_cursor_missing_returns_none() {
545 let body = serde_json::json!({"data": []});
546 assert_eq!(extract_cursor(&body, "meta.next"), None);
547 }
548}