Skip to main content

oversync_api/
lib.rs

1pub mod auth;
2pub mod handlers;
3pub mod mutations;
4pub mod operations;
5pub mod state;
6pub mod types;
7
8use std::sync::Arc;
9
10use axum::Router;
11use axum::middleware;
12use axum::routing::{get, post, put};
13use utoipa::OpenApi;
14
15use crate::state::ApiState;
16
17#[derive(OpenApi)]
18#[openapi(
19	paths(
20		handlers::health,
21		handlers::list_sinks,
22		handlers::list_pipes,
23		handlers::list_pipe_presets,
24		handlers::get_pipe,
25		handlers::get_pipe_preset,
26		mutations::create_sink,
27		mutations::update_sink,
28		mutations::delete_sink,
29		mutations::create_pipe,
30		mutations::update_pipe,
31		mutations::delete_pipe,
32		mutations::create_pipe_preset,
33		mutations::update_pipe_preset,
34		mutations::delete_pipe_preset,
35		operations::pause_sync,
36		operations::resume_sync,
37		operations::export_config,
38		operations::import_config,
39		operations::get_history,
40		operations::sync_status,
41	),
42	components(schemas(
43		types::HealthResponse,
44		types::CycleInfo,
45		types::SinkListResponse,
46		types::SinkInfo,
47		types::PipeListResponse,
48		types::PipeInfo,
49		types::PipePresetListResponse,
50		types::PipePresetInfo,
51		types::PipePresetSpecInput,
52		types::PipePresetParameterInput,
53		types::PipeQueryInput,
54		types::ErrorResponse,
55		types::CreateSinkRequest,
56		types::UpdateSinkRequest,
57		types::CreatePipeRequest,
58		types::UpdatePipeRequest,
59		types::CreatePipePresetRequest,
60		types::UpdatePipePresetRequest,
61		types::MutationResponse,
62		types::HistoryResponse,
63		types::StatusResponse,
64		types::ExportConfigFormat,
65		types::ExportConfigQuery,
66		types::ExportConfigResponse,
67		types::ImportConfigRequest,
68		types::ImportConfigResponse,
69	)),
70	info(
71		title = "oversync API",
72		version = "0.1.0",
73		description = "HTTP API for managing oversync pipes, sinks, recipes, and sync status."
74	)
75)]
76pub struct ApiDoc;
77
78pub fn router(state: Arc<ApiState>) -> Router {
79	router_with_openapi(state, ApiDoc::openapi())
80}
81
82pub fn router_with_openapi(state: Arc<ApiState>, openapi: utoipa::openapi::OpenApi) -> Router {
83	let openapi = Arc::new(openapi);
84
85	let protected = Router::new()
86		.route(
87			"/sinks",
88			get(handlers::list_sinks).post(mutations::create_sink),
89		)
90		.route(
91			"/sinks/{name}",
92			put(mutations::update_sink).delete(mutations::delete_sink),
93		)
94		.route(
95			"/pipes",
96			get(handlers::list_pipes).post(mutations::create_pipe),
97		)
98		.route(
99			"/pipe-presets",
100			get(handlers::list_pipe_presets).post(mutations::create_pipe_preset),
101		)
102		.route(
103			"/pipes/{name}",
104			get(handlers::get_pipe)
105				.put(mutations::update_pipe)
106				.delete(mutations::delete_pipe),
107		)
108		.route(
109			"/pipe-presets/{name}",
110			get(handlers::get_pipe_preset)
111				.put(mutations::update_pipe_preset)
112				.delete(mutations::delete_pipe_preset),
113		)
114		.route("/sync/pause", post(operations::pause_sync))
115		.route("/sync/resume", post(operations::resume_sync))
116		.route("/sync/status", get(operations::sync_status))
117		.route("/config/import", post(operations::import_config))
118		.route("/config/export", get(operations::export_config))
119		.route("/history", get(operations::get_history))
120		.route_layer(middleware::from_fn_with_state(
121			state.clone(),
122			auth::require_api_key,
123		));
124
125	Router::new()
126		.route("/health", get(handlers::health))
127		.route(
128			"/openapi.json",
129			get({
130				let openapi = Arc::clone(&openapi);
131				move || {
132					let openapi = Arc::clone(&openapi);
133					async move { axum::Json(openapi.as_ref().clone()) }
134				}
135			}),
136		)
137		.merge(protected)
138		.with_state(state)
139}
140
141#[cfg(test)]
142mod tests {
143	use super::*;
144	use axum::body::Body;
145	use axum::http::{Request, StatusCode};
146	use http_body_util::BodyExt;
147	use state::*;
148	use tower::ServiceExt;
149
150	fn test_state() -> Arc<ApiState> {
151		Arc::new(ApiState {
152			sinks: Arc::new(tokio::sync::RwLock::new(vec![SinkConfig {
153				name: "stdout".into(),
154				sink_type: "stdout".into(),
155				config: None,
156			}])),
157			pipes: Arc::new(tokio::sync::RwLock::new(vec![state::PipeConfigCache {
158				name: "catalog-sync".into(),
159				origin_connector: "postgres".into(),
160				origin_dsn: "postgres://ro@pg1:5432/meta".into(),
161				targets: vec!["kafka-main".into()],
162				interval_secs: 60,
163				query_count: 2,
164				recipe: None,
165				enabled: true,
166			}])),
167			pipe_presets: Arc::new(tokio::sync::RwLock::new(vec![])),
168			db_client: None,
169			lifecycle: None,
170			api_key: None,
171		})
172	}
173
174	async fn get_json(app: &Router, path: &str) -> (StatusCode, serde_json::Value) {
175		let req = Request::get(path).body(Body::empty()).unwrap();
176		let resp = app.clone().oneshot(req).await.unwrap();
177		let status = resp.status();
178		let body = resp.into_body().collect().await.unwrap().to_bytes();
179		let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
180		(status, json)
181	}
182
183	#[tokio::test]
184	async fn health_returns_ok() {
185		let app = router(test_state());
186		let (status, json) = get_json(&app, "/health").await;
187		assert_eq!(status, StatusCode::OK);
188		assert_eq!(json["status"], "ok");
189		assert!(json["version"].is_string());
190	}
191
192	#[tokio::test]
193	async fn list_sinks_returns_configured() {
194		let app = router(test_state());
195		let (status, json) = get_json(&app, "/sinks").await;
196		assert_eq!(status, StatusCode::OK);
197		let sinks = json["sinks"].as_array().unwrap();
198		assert_eq!(sinks.len(), 1);
199		assert_eq!(sinks[0]["name"], "stdout");
200		assert_eq!(sinks[0]["sink_type"], "stdout");
201	}
202
203	#[tokio::test]
204	async fn list_pipes_returns_configured() {
205		let app = router(test_state());
206		let (status, json) = get_json(&app, "/pipes").await;
207		assert_eq!(status, StatusCode::OK);
208		let pipes = json["pipes"].as_array().unwrap();
209		assert_eq!(pipes.len(), 1);
210		assert_eq!(pipes[0]["name"], "catalog-sync");
211		assert_eq!(pipes[0]["origin_connector"], "postgres");
212		assert_eq!(pipes[0]["targets"][0], "kafka-main");
213		assert_eq!(pipes[0]["interval_secs"], 60);
214		assert_eq!(pipes[0]["query_count"], 2);
215		assert!(pipes[0]["enabled"].as_bool().unwrap());
216	}
217
218	#[tokio::test]
219	async fn get_pipe_by_name() {
220		let app = router(test_state());
221		let (status, json) = get_json(&app, "/pipes/catalog-sync").await;
222		assert_eq!(status, StatusCode::OK);
223		assert_eq!(json["name"], "catalog-sync");
224		assert_eq!(json["origin_connector"], "postgres");
225	}
226
227	#[tokio::test]
228	async fn get_pipe_not_found() {
229		let app = router(test_state());
230		let (status, json) = get_json(&app, "/pipes/nonexistent").await;
231		assert_eq!(status, StatusCode::NOT_FOUND);
232		assert!(json["error"].as_str().unwrap().contains("not found"));
233	}
234
235	#[tokio::test]
236	async fn openapi_spec_is_valid_json() {
237		let app = router(test_state());
238		let (status, json) = get_json(&app, "/openapi.json").await;
239		assert_eq!(status, StatusCode::OK);
240		assert_eq!(json["openapi"], "3.1.0");
241		assert_eq!(json["info"]["title"], "oversync API");
242		assert!(json["paths"]["/health"].is_object());
243		assert!(json["paths"]["/sinks"].is_object());
244		assert!(json["paths"]["/pipes"].is_object());
245		assert!(json["paths"]["/pipe-presets"].is_object());
246		assert!(json["paths"]["/config/import"].is_object());
247		assert!(json["paths"]["/config/export"].is_object());
248		assert!(json["paths"]["/history"].is_object());
249		assert!(json["paths"]["/sync/pause"].is_object());
250		assert!(json["paths"]["/sync/resume"].is_object());
251		assert!(json["paths"]["/sources"].is_null());
252	}
253
254	#[tokio::test]
255	async fn sync_status_returns_default() {
256		let app = router(test_state());
257		let (status, json) = get_json(&app, "/sync/status").await;
258		assert_eq!(status, StatusCode::OK);
259		assert_eq!(json["running"], false);
260	}
261}