Skip to main content

oversync_api/
lib.rs

1pub mod auth;
2pub mod handlers;
3pub mod mutations;
4pub mod operations;
5pub mod state;
6pub mod types;
7
8use std::sync::Arc;
9
10use axum::Router;
11use axum::middleware;
12use axum::routing::{get, post, put};
13use utoipa::OpenApi;
14
15use crate::state::ApiState;
16
17#[derive(OpenApi)]
18#[openapi(
19	paths(
20		handlers::health,
21		handlers::list_sinks,
22		handlers::list_pipes,
23		handlers::list_pipe_presets,
24		handlers::get_pipe,
25		handlers::get_pipe_preset,
26		mutations::create_sink,
27		mutations::update_sink,
28		mutations::delete_sink,
29		mutations::create_pipe,
30		mutations::update_pipe,
31		mutations::delete_pipe,
32		mutations::create_pipe_preset,
33		mutations::update_pipe_preset,
34		mutations::delete_pipe_preset,
35		operations::run_pipe,
36		operations::pause_sync,
37		operations::resume_sync,
38		operations::export_config,
39		operations::import_config,
40		operations::get_history,
41		operations::sync_status,
42	),
43	components(schemas(
44		types::HealthResponse,
45		types::CycleInfo,
46		types::SinkListResponse,
47		types::SinkInfo,
48		types::PipeListResponse,
49		types::PipeInfo,
50		types::PipePresetListResponse,
51		types::PipePresetInfo,
52		types::PipePresetSpecInput,
53		types::PipePresetParameterInput,
54		types::PipeQueryInput,
55		types::ErrorResponse,
56		types::CreateSinkRequest,
57		types::UpdateSinkRequest,
58		types::CreatePipeRequest,
59		types::UpdatePipeRequest,
60		types::CreatePipePresetRequest,
61		types::UpdatePipePresetRequest,
62		types::MutationResponse,
63		types::PipeRunQueryResult,
64		types::PipeRunResponse,
65		types::HistoryResponse,
66		types::StatusResponse,
67		types::ExportConfigFormat,
68		types::ExportConfigQuery,
69		types::ExportConfigResponse,
70		types::ImportConfigRequest,
71		types::ImportConfigResponse,
72	)),
73	info(
74		title = "oversync API",
75		version = env!("CARGO_PKG_VERSION"),
76		description = "HTTP API for managing oversync pipes, sinks, recipes, and sync status."
77	)
78)]
79pub struct ApiDoc;
80
81pub fn router(state: Arc<ApiState>) -> Router {
82	router_with_openapi(state, ApiDoc::openapi())
83}
84
85pub fn router_with_openapi(state: Arc<ApiState>, openapi: utoipa::openapi::OpenApi) -> Router {
86	let openapi = Arc::new(openapi);
87
88	let protected = Router::new()
89		.route(
90			"/sinks",
91			get(handlers::list_sinks).post(mutations::create_sink),
92		)
93		.route(
94			"/sinks/{name}",
95			put(mutations::update_sink).delete(mutations::delete_sink),
96		)
97		.route(
98			"/pipes",
99			get(handlers::list_pipes).post(mutations::create_pipe),
100		)
101		.route(
102			"/pipe-presets",
103			get(handlers::list_pipe_presets).post(mutations::create_pipe_preset),
104		)
105		.route(
106			"/pipes/{name}",
107			get(handlers::get_pipe)
108				.put(mutations::update_pipe)
109				.delete(mutations::delete_pipe),
110		)
111		.route("/pipes/{name}/run", post(operations::run_pipe))
112		.route(
113			"/pipe-presets/{name}",
114			get(handlers::get_pipe_preset)
115				.put(mutations::update_pipe_preset)
116				.delete(mutations::delete_pipe_preset),
117		)
118		.route("/sync/pause", post(operations::pause_sync))
119		.route("/sync/resume", post(operations::resume_sync))
120		.route("/sync/status", get(operations::sync_status))
121		.route("/config/import", post(operations::import_config))
122		.route("/config/export", get(operations::export_config))
123		.route("/history", get(operations::get_history))
124		.route_layer(middleware::from_fn_with_state(
125			state.clone(),
126			auth::require_api_key,
127		));
128
129	Router::new()
130		.route("/health", get(handlers::health))
131		.route(
132			"/openapi.json",
133			get({
134				let openapi = Arc::clone(&openapi);
135				move || {
136					let openapi = Arc::clone(&openapi);
137					async move { axum::Json(openapi.as_ref().clone()) }
138				}
139			}),
140		)
141		.merge(protected)
142		.with_state(state)
143}
144
145#[cfg(test)]
146mod tests {
147	use super::*;
148	use axum::body::Body;
149	use axum::http::{Request, StatusCode};
150	use http_body_util::BodyExt;
151	use state::*;
152	use tower::ServiceExt;
153
154	struct TestLifecycle {
155		run_results: Vec<types::PipeRunQueryResult>,
156		run_error: Option<String>,
157	}
158
159	#[async_trait::async_trait]
160	impl LifecycleControl for TestLifecycle {
161		async fn restart_with_config_json(
162			&self,
163			_db: &surrealdb::Surreal<surrealdb::engine::any::Any>,
164		) -> Result<(), oversync_core::error::OversyncError> {
165			Ok(())
166		}
167
168		async fn runtime_cache_snapshot(
169			&self,
170		) -> Result<RuntimeCacheSnapshot, oversync_core::error::OversyncError> {
171			Ok(RuntimeCacheSnapshot::default())
172		}
173
174		async fn export_config(
175			&self,
176			_db: &surrealdb::Surreal<surrealdb::engine::any::Any>,
177			_format: types::ExportConfigFormat,
178		) -> Result<String, oversync_core::error::OversyncError> {
179			Ok(String::new())
180		}
181
182		async fn import_config(
183			&self,
184			_db: &surrealdb::Surreal<surrealdb::engine::any::Any>,
185			_format: types::ExportConfigFormat,
186			_content: &str,
187		) -> Result<Vec<String>, oversync_core::error::OversyncError> {
188			Ok(vec![])
189		}
190
191		async fn run_pipe_once(
192			&self,
193			_pipe_name: &str,
194		) -> Result<Vec<types::PipeRunQueryResult>, oversync_core::error::OversyncError> {
195			match &self.run_error {
196				Some(message) => Err(oversync_core::error::OversyncError::Config(message.clone())),
197				None => Ok(self.run_results.clone()),
198			}
199		}
200
201		async fn pause(&self) {}
202
203		async fn resume(&self) -> Result<(), oversync_core::error::OversyncError> {
204			Ok(())
205		}
206
207		async fn is_running(&self) -> bool {
208			false
209		}
210
211		async fn is_paused(&self) -> bool {
212			false
213		}
214	}
215
216	fn test_state() -> Arc<ApiState> {
217		Arc::new(ApiState {
218			sinks: Arc::new(tokio::sync::RwLock::new(vec![SinkConfig {
219				name: "stdout".into(),
220				sink_type: "stdout".into(),
221				config: None,
222			}])),
223			pipes: Arc::new(tokio::sync::RwLock::new(vec![state::PipeConfigCache {
224				name: "catalog-sync".into(),
225				origin_connector: "postgres".into(),
226				origin_dsn: "postgres://ro@pg1:5432/meta".into(),
227				targets: vec!["kafka-main".into()],
228				interval_secs: 60,
229				query_count: 2,
230				recipe: None,
231				enabled: true,
232			}])),
233			pipe_presets: Arc::new(tokio::sync::RwLock::new(vec![])),
234			db_client: None,
235			lifecycle: None,
236			api_key: None,
237		})
238	}
239
240	fn test_state_with_lifecycle(lifecycle: Arc<dyn LifecycleControl>) -> Arc<ApiState> {
241		Arc::new(ApiState {
242			sinks: Arc::new(tokio::sync::RwLock::new(vec![SinkConfig {
243				name: "stdout".into(),
244				sink_type: "stdout".into(),
245				config: None,
246			}])),
247			pipes: Arc::new(tokio::sync::RwLock::new(vec![state::PipeConfigCache {
248				name: "catalog-sync".into(),
249				origin_connector: "postgres".into(),
250				origin_dsn: "postgres://ro@pg1:5432/meta".into(),
251				targets: vec!["kafka-main".into()],
252				interval_secs: 60,
253				query_count: 2,
254				recipe: None,
255				enabled: true,
256			}])),
257			pipe_presets: Arc::new(tokio::sync::RwLock::new(vec![])),
258			db_client: None,
259			lifecycle: Some(lifecycle),
260			api_key: None,
261		})
262	}
263
264	async fn get_json(app: &Router, path: &str) -> (StatusCode, serde_json::Value) {
265		let req = Request::get(path).body(Body::empty()).unwrap();
266		let resp = app.clone().oneshot(req).await.unwrap();
267		let status = resp.status();
268		let body = resp.into_body().collect().await.unwrap().to_bytes();
269		let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
270		(status, json)
271	}
272
273	#[tokio::test]
274	async fn health_returns_ok() {
275		let app = router(test_state());
276		let (status, json) = get_json(&app, "/health").await;
277		assert_eq!(status, StatusCode::OK);
278		assert_eq!(json["status"], "ok");
279		assert!(json["version"].is_string());
280	}
281
282	#[tokio::test]
283	async fn list_sinks_returns_configured() {
284		let app = router(test_state());
285		let (status, json) = get_json(&app, "/sinks").await;
286		assert_eq!(status, StatusCode::OK);
287		let sinks = json["sinks"].as_array().unwrap();
288		assert_eq!(sinks.len(), 1);
289		assert_eq!(sinks[0]["name"], "stdout");
290		assert_eq!(sinks[0]["sink_type"], "stdout");
291	}
292
293	#[tokio::test]
294	async fn list_pipes_returns_configured() {
295		let app = router(test_state());
296		let (status, json) = get_json(&app, "/pipes").await;
297		assert_eq!(status, StatusCode::OK);
298		let pipes = json["pipes"].as_array().unwrap();
299		assert_eq!(pipes.len(), 1);
300		assert_eq!(pipes[0]["name"], "catalog-sync");
301		assert_eq!(pipes[0]["origin_connector"], "postgres");
302		assert_eq!(pipes[0]["targets"][0], "kafka-main");
303		assert_eq!(pipes[0]["interval_secs"], 60);
304		assert_eq!(pipes[0]["query_count"], 2);
305		assert!(pipes[0]["enabled"].as_bool().unwrap());
306	}
307
308	#[tokio::test]
309	async fn get_pipe_by_name() {
310		let app = router(test_state());
311		let (status, json) = get_json(&app, "/pipes/catalog-sync").await;
312		assert_eq!(status, StatusCode::OK);
313		assert_eq!(json["name"], "catalog-sync");
314		assert_eq!(json["origin_connector"], "postgres");
315	}
316
317	#[tokio::test]
318	async fn run_pipe_returns_manual_run_results() {
319		let app = router(test_state_with_lifecycle(Arc::new(TestLifecycle {
320			run_results: vec![types::PipeRunQueryResult {
321				query_id: "tables".into(),
322				created: 2,
323				updated: 1,
324				deleted: 0,
325			}],
326			run_error: None,
327		})));
328		let req = Request::post("/pipes/catalog-sync/run")
329			.body(Body::empty())
330			.unwrap();
331		let resp = app.clone().oneshot(req).await.unwrap();
332		assert_eq!(resp.status(), StatusCode::OK);
333		let body = resp.into_body().collect().await.unwrap().to_bytes();
334		let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
335		assert_eq!(json["ok"], true);
336		assert_eq!(json["results"][0]["query_id"], "tables");
337		assert_eq!(json["results"][0]["created"], 2);
338	}
339
340	#[tokio::test]
341	async fn run_pipe_surfaces_manual_run_errors() {
342		let app = router(test_state_with_lifecycle(Arc::new(TestLifecycle {
343			run_results: vec![],
344			run_error: Some("pipe is disabled".into()),
345		})));
346		let req = Request::post("/pipes/catalog-sync/run")
347			.body(Body::empty())
348			.unwrap();
349		let resp = app.clone().oneshot(req).await.unwrap();
350		assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
351		let body = resp.into_body().collect().await.unwrap().to_bytes();
352		let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
353		assert!(json["error"].as_str().unwrap().contains("pipe is disabled"));
354	}
355
356	#[tokio::test]
357	async fn get_pipe_not_found() {
358		let app = router(test_state());
359		let (status, json) = get_json(&app, "/pipes/nonexistent").await;
360		assert_eq!(status, StatusCode::NOT_FOUND);
361		assert!(json["error"].as_str().unwrap().contains("not found"));
362	}
363
364	#[tokio::test]
365	async fn openapi_spec_is_valid_json() {
366		let app = router(test_state());
367		let (status, json) = get_json(&app, "/openapi.json").await;
368		assert_eq!(status, StatusCode::OK);
369		assert_eq!(json["openapi"], "3.1.0");
370		assert_eq!(json["info"]["title"], "oversync API");
371		assert!(json["paths"]["/health"].is_object());
372		assert!(json["paths"]["/sinks"].is_object());
373		assert!(json["paths"]["/pipes"].is_object());
374		assert!(json["paths"]["/pipes/{name}/run"].is_object());
375		assert!(json["paths"]["/pipe-presets"].is_object());
376		assert!(json["paths"]["/config/import"].is_object());
377		assert!(json["paths"]["/config/export"].is_object());
378		assert!(json["paths"]["/history"].is_object());
379		assert!(json["paths"]["/sync/pause"].is_object());
380		assert!(json["paths"]["/sync/resume"].is_object());
381		assert!(json["paths"]["/sources"].is_null());
382	}
383
384	#[tokio::test]
385	async fn sync_status_returns_default() {
386		let app = router(test_state());
387		let (status, json) = get_json(&app, "/sync/status").await;
388		assert_eq!(status, StatusCode::OK);
389		assert_eq!(json["running"], false);
390	}
391}