Skip to main content

oversync_api/
lib.rs

1pub mod auth;
2pub mod handlers;
3pub mod mutations;
4pub mod operations;
5pub mod queries;
6pub mod state;
7pub mod types;
8
9use std::sync::Arc;
10
11use axum::Router;
12use axum::middleware;
13use axum::routing::{get, post, put};
14use utoipa::OpenApi;
15
16use crate::state::ApiState;
17
18#[derive(OpenApi)]
19#[openapi(
20	paths(
21		handlers::health,
22		handlers::list_sources,
23		handlers::get_source,
24		handlers::list_sinks,
25		handlers::list_pipes,
26		handlers::get_pipe,
27		mutations::create_source,
28		mutations::update_source,
29		mutations::delete_source,
30		mutations::create_sink,
31		mutations::update_sink,
32		mutations::delete_sink,
33		mutations::create_pipe,
34		mutations::update_pipe,
35		mutations::delete_pipe,
36		operations::trigger_source,
37		operations::pause_sync,
38		operations::resume_sync,
39		operations::get_history,
40		operations::sync_status,
41		queries::list_queries,
42		queries::create_query,
43		queries::update_query,
44		queries::delete_query,
45	),
46	components(schemas(
47		types::HealthResponse,
48		types::SourceListResponse,
49		types::SourceInfo,
50		types::QueryInfo,
51		types::SourceStatus,
52		types::CycleInfo,
53		types::SinkListResponse,
54		types::SinkInfo,
55		types::PipeListResponse,
56		types::PipeInfo,
57		types::TriggerResponse,
58		types::ErrorResponse,
59		types::CreateSourceRequest,
60		types::UpdateSourceRequest,
61		types::CreateSinkRequest,
62		types::UpdateSinkRequest,
63		types::CreatePipeRequest,
64		types::UpdatePipeRequest,
65		types::MutationResponse,
66		types::HistoryResponse,
67		types::StatusResponse,
68		types::CreateQueryRequest,
69		types::UpdateQueryRequest,
70		types::QueryListResponse,
71		types::QueryDetail,
72	)),
73	info(
74		title = "oversync API",
75		version = "0.1.0",
76		description = "HTTP API for managing oversync sources, sinks, and sync status."
77	)
78)]
79pub struct ApiDoc;
80
81pub fn router(state: Arc<ApiState>) -> Router {
82	// Protected routes — require API key when configured
83	let protected = Router::new()
84		.route(
85			"/sources",
86			get(handlers::list_sources).post(mutations::create_source),
87		)
88		.route(
89			"/sources/{name}",
90			get(handlers::get_source)
91				.put(mutations::update_source)
92				.delete(mutations::delete_source),
93		)
94		.route("/sources/{name}/trigger", post(operations::trigger_source))
95		.route(
96			"/sources/{source}/queries",
97			get(queries::list_queries).post(queries::create_query),
98		)
99		.route(
100			"/sources/{source}/queries/{name}",
101			put(queries::update_query).delete(queries::delete_query),
102		)
103		.route(
104			"/sinks",
105			get(handlers::list_sinks).post(mutations::create_sink),
106		)
107		.route(
108			"/sinks/{name}",
109			put(mutations::update_sink).delete(mutations::delete_sink),
110		)
111		.route(
112			"/pipes",
113			get(handlers::list_pipes).post(mutations::create_pipe),
114		)
115		.route(
116			"/pipes/{name}",
117			get(handlers::get_pipe)
118				.put(mutations::update_pipe)
119				.delete(mutations::delete_pipe),
120		)
121		.route("/sync/pause", post(operations::pause_sync))
122		.route("/sync/resume", post(operations::resume_sync))
123		.route("/sync/status", get(operations::sync_status))
124		.route("/history", get(operations::get_history))
125		.route_layer(middleware::from_fn_with_state(
126			state.clone(),
127			auth::require_api_key,
128		));
129
130	// Public routes — no auth required
131	Router::new()
132		.route("/health", get(handlers::health))
133		.route(
134			"/openapi.json",
135			get(|| async { axum::Json(ApiDoc::openapi()) }),
136		)
137		.merge(protected)
138		.with_state(state)
139}
140
141#[cfg(test)]
142mod tests {
143	use super::*;
144	use axum::body::Body;
145	use axum::http::{Request, StatusCode};
146	use http_body_util::BodyExt;
147	use state::*;
148	use std::collections::HashMap;
149	use tokio::sync::RwLock;
150	use tower::ServiceExt;
151
152	fn test_state() -> Arc<ApiState> {
153		Arc::new(ApiState {
154			sources: Arc::new(RwLock::new(vec![SourceConfig {
155				name: "pg-prod".into(),
156				connector: "postgres".into(),
157				interval_secs: 300,
158				queries: vec![QueryConfig {
159					id: "users".into(),
160					key_column: "id".into(),
161				}],
162			}])),
163			sinks: Arc::new(RwLock::new(vec![SinkConfig {
164				name: "stdout".into(),
165				sink_type: "stdout".into(),
166			}])),
167			pipes: Arc::new(RwLock::new(vec![state::PipeConfigCache {
168				name: "catalog-sync".into(),
169				origin_connector: "postgres".into(),
170				origin_dsn: "postgres://ro@pg1:5432/meta".into(),
171				targets: vec!["kafka-main".into()],
172				interval_secs: 60,
173				enabled: true,
174			}])),
175			cycle_status: Arc::new(RwLock::new(HashMap::new())),
176			db_client: None,
177			lifecycle: None,
178			api_key: None,
179		})
180	}
181
182	async fn get_json(app: &Router, path: &str) -> (StatusCode, serde_json::Value) {
183		let req = Request::get(path).body(Body::empty()).unwrap();
184		let resp = app.clone().oneshot(req).await.unwrap();
185		let status = resp.status();
186		let body = resp.into_body().collect().await.unwrap().to_bytes();
187		let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
188		(status, json)
189	}
190
191	#[tokio::test]
192	async fn health_returns_ok() {
193		let app = router(test_state());
194		let (status, json) = get_json(&app, "/health").await;
195		assert_eq!(status, StatusCode::OK);
196		assert_eq!(json["status"], "ok");
197		assert!(json["version"].is_string());
198	}
199
200	#[tokio::test]
201	async fn list_sources_returns_configured() {
202		let app = router(test_state());
203		let (status, json) = get_json(&app, "/sources").await;
204		assert_eq!(status, StatusCode::OK);
205		let sources = json["sources"].as_array().unwrap();
206		assert_eq!(sources.len(), 1);
207		assert_eq!(sources[0]["name"], "pg-prod");
208		assert_eq!(sources[0]["connector"], "postgres");
209		assert_eq!(sources[0]["queries"][0]["id"], "users");
210	}
211
212	#[tokio::test]
213	async fn get_source_by_name() {
214		let app = router(test_state());
215		let (status, json) = get_json(&app, "/sources/pg-prod").await;
216		assert_eq!(status, StatusCode::OK);
217		assert_eq!(json["name"], "pg-prod");
218	}
219
220	#[tokio::test]
221	async fn get_source_not_found() {
222		let app = router(test_state());
223		let (status, json) = get_json(&app, "/sources/nonexistent").await;
224		assert_eq!(status, StatusCode::OK); // TODO: proper 404 with IntoResponse
225		assert!(json["error"].as_str().unwrap().contains("not found"));
226	}
227
228	#[tokio::test]
229	async fn list_sinks_returns_configured() {
230		let app = router(test_state());
231		let (status, json) = get_json(&app, "/sinks").await;
232		assert_eq!(status, StatusCode::OK);
233		let sinks = json["sinks"].as_array().unwrap();
234		assert_eq!(sinks.len(), 1);
235		assert_eq!(sinks[0]["name"], "stdout");
236		assert_eq!(sinks[0]["sink_type"], "stdout");
237	}
238
239	#[tokio::test]
240	async fn openapi_spec_is_valid_json() {
241		let app = router(test_state());
242		let (status, json) = get_json(&app, "/openapi.json").await;
243		assert_eq!(status, StatusCode::OK);
244		assert_eq!(json["openapi"], "3.1.0");
245		assert_eq!(json["info"]["title"], "oversync API");
246		assert!(json["paths"]["/health"].is_object());
247		assert!(json["paths"]["/sources"].is_object());
248		assert!(json["paths"]["/sinks"].is_object());
249		assert!(json["paths"]["/pipes"].is_object());
250		assert!(json["paths"]["/history"].is_object());
251		assert!(json["paths"]["/sync/pause"].is_object());
252		assert!(json["paths"]["/sync/resume"].is_object());
253	}
254
255	#[tokio::test]
256	async fn sync_status_returns_default() {
257		let app = router(test_state());
258		let (status, json) = get_json(&app, "/sync/status").await;
259		assert_eq!(status, StatusCode::OK);
260		assert_eq!(json["running"], false);
261	}
262
263	#[tokio::test]
264	async fn list_pipes_returns_configured() {
265		let app = router(test_state());
266		let (status, json) = get_json(&app, "/pipes").await;
267		assert_eq!(status, StatusCode::OK);
268		let pipes = json["pipes"].as_array().unwrap();
269		assert_eq!(pipes.len(), 1);
270		assert_eq!(pipes[0]["name"], "catalog-sync");
271		assert_eq!(pipes[0]["origin_connector"], "postgres");
272		assert_eq!(pipes[0]["targets"][0], "kafka-main");
273		assert_eq!(pipes[0]["interval_secs"], 60);
274		assert!(pipes[0]["enabled"].as_bool().unwrap());
275	}
276
277	#[tokio::test]
278	async fn get_pipe_by_name() {
279		let app = router(test_state());
280		let (status, json) = get_json(&app, "/pipes/catalog-sync").await;
281		assert_eq!(status, StatusCode::OK);
282		assert_eq!(json["name"], "catalog-sync");
283		assert_eq!(json["origin_connector"], "postgres");
284	}
285
286	#[tokio::test]
287	async fn get_pipe_not_found() {
288		let app = router(test_state());
289		let (status, json) = get_json(&app, "/pipes/nonexistent").await;
290		assert_eq!(status, StatusCode::OK);
291		assert!(json["error"].as_str().unwrap().contains("not found"));
292	}
293}