1pub mod auth;
2pub mod handlers;
3pub mod mutations;
4pub mod operations;
5pub mod state;
6pub mod types;
7
8use std::sync::Arc;
9
10use axum::Router;
11use axum::middleware;
12use axum::routing::{get, post, put};
13use utoipa::OpenApi;
14
15use crate::state::ApiState;
16
17#[derive(OpenApi)]
18#[openapi(
19 paths(
20 handlers::health,
21 handlers::list_sinks,
22 handlers::list_pipes,
23 handlers::list_pipe_presets,
24 handlers::get_pipe,
25 handlers::get_pipe_preset,
26 mutations::create_sink,
27 mutations::update_sink,
28 mutations::delete_sink,
29 mutations::create_pipe,
30 mutations::update_pipe,
31 mutations::delete_pipe,
32 mutations::create_pipe_preset,
33 mutations::update_pipe_preset,
34 mutations::delete_pipe_preset,
35 operations::run_pipe,
36 operations::pause_sync,
37 operations::resume_sync,
38 operations::export_config,
39 operations::import_config,
40 operations::get_history,
41 operations::sync_status,
42 ),
43 components(schemas(
44 types::HealthResponse,
45 types::CycleInfo,
46 types::SinkListResponse,
47 types::SinkInfo,
48 types::PipeListResponse,
49 types::PipeInfo,
50 types::PipePresetListResponse,
51 types::PipePresetInfo,
52 types::PipePresetSpecInput,
53 types::PipePresetParameterInput,
54 types::PipeQueryInput,
55 types::ErrorResponse,
56 types::CreateSinkRequest,
57 types::UpdateSinkRequest,
58 types::CreatePipeRequest,
59 types::UpdatePipeRequest,
60 types::CreatePipePresetRequest,
61 types::UpdatePipePresetRequest,
62 types::MutationResponse,
63 types::PipeRunQueryResult,
64 types::PipeRunResponse,
65 types::HistoryResponse,
66 types::StatusResponse,
67 types::ExportConfigFormat,
68 types::ExportConfigQuery,
69 types::ExportConfigResponse,
70 types::ImportConfigRequest,
71 types::ImportConfigResponse,
72 )),
73 info(
74 title = "oversync API",
75 version = env!("CARGO_PKG_VERSION"),
76 description = "HTTP API for managing oversync pipes, sinks, recipes, and sync status."
77 )
78)]
79pub struct ApiDoc;
80
81pub fn router(state: Arc<ApiState>) -> Router {
82 router_with_openapi(state, ApiDoc::openapi())
83}
84
85pub fn router_with_openapi(state: Arc<ApiState>, openapi: utoipa::openapi::OpenApi) -> Router {
86 let openapi = Arc::new(openapi);
87
88 let protected = Router::new()
89 .route(
90 "/sinks",
91 get(handlers::list_sinks).post(mutations::create_sink),
92 )
93 .route(
94 "/sinks/{name}",
95 put(mutations::update_sink).delete(mutations::delete_sink),
96 )
97 .route(
98 "/pipes",
99 get(handlers::list_pipes).post(mutations::create_pipe),
100 )
101 .route(
102 "/pipe-presets",
103 get(handlers::list_pipe_presets).post(mutations::create_pipe_preset),
104 )
105 .route(
106 "/pipes/{name}",
107 get(handlers::get_pipe)
108 .put(mutations::update_pipe)
109 .delete(mutations::delete_pipe),
110 )
111 .route("/pipes/{name}/run", post(operations::run_pipe))
112 .route(
113 "/pipe-presets/{name}",
114 get(handlers::get_pipe_preset)
115 .put(mutations::update_pipe_preset)
116 .delete(mutations::delete_pipe_preset),
117 )
118 .route("/sync/pause", post(operations::pause_sync))
119 .route("/sync/resume", post(operations::resume_sync))
120 .route("/sync/status", get(operations::sync_status))
121 .route("/config/import", post(operations::import_config))
122 .route("/config/export", get(operations::export_config))
123 .route("/history", get(operations::get_history))
124 .route_layer(middleware::from_fn_with_state(
125 state.clone(),
126 auth::require_api_key,
127 ));
128
129 Router::new()
130 .route("/health", get(handlers::health))
131 .route(
132 "/openapi.json",
133 get({
134 let openapi = Arc::clone(&openapi);
135 move || {
136 let openapi = Arc::clone(&openapi);
137 async move { axum::Json(openapi.as_ref().clone()) }
138 }
139 }),
140 )
141 .merge(protected)
142 .with_state(state)
143}
144
145#[cfg(test)]
146mod tests {
147 use super::*;
148 use axum::body::Body;
149 use axum::http::{Request, StatusCode};
150 use http_body_util::BodyExt;
151 use state::*;
152 use tower::ServiceExt;
153
154 struct TestLifecycle {
155 run_results: Vec<types::PipeRunQueryResult>,
156 run_error: Option<String>,
157 }
158
159 #[async_trait::async_trait]
160 impl LifecycleControl for TestLifecycle {
161 async fn restart_with_config_json(
162 &self,
163 _db: &surrealdb::Surreal<surrealdb::engine::any::Any>,
164 ) -> Result<(), oversync_core::error::OversyncError> {
165 Ok(())
166 }
167
168 async fn runtime_cache_snapshot(
169 &self,
170 ) -> Result<RuntimeCacheSnapshot, oversync_core::error::OversyncError> {
171 Ok(RuntimeCacheSnapshot::default())
172 }
173
174 async fn export_config(
175 &self,
176 _db: &surrealdb::Surreal<surrealdb::engine::any::Any>,
177 _format: types::ExportConfigFormat,
178 ) -> Result<String, oversync_core::error::OversyncError> {
179 Ok(String::new())
180 }
181
182 async fn import_config(
183 &self,
184 _db: &surrealdb::Surreal<surrealdb::engine::any::Any>,
185 _format: types::ExportConfigFormat,
186 _content: &str,
187 ) -> Result<Vec<String>, oversync_core::error::OversyncError> {
188 Ok(vec![])
189 }
190
191 async fn run_pipe_once(
192 &self,
193 _pipe_name: &str,
194 ) -> Result<Vec<types::PipeRunQueryResult>, oversync_core::error::OversyncError> {
195 match &self.run_error {
196 Some(message) => Err(oversync_core::error::OversyncError::Config(message.clone())),
197 None => Ok(self.run_results.clone()),
198 }
199 }
200
201 async fn pause(&self) {}
202
203 async fn resume(&self) -> Result<(), oversync_core::error::OversyncError> {
204 Ok(())
205 }
206
207 async fn is_running(&self) -> bool {
208 false
209 }
210
211 async fn is_paused(&self) -> bool {
212 false
213 }
214 }
215
216 fn test_state() -> Arc<ApiState> {
217 Arc::new(ApiState {
218 sinks: Arc::new(tokio::sync::RwLock::new(vec![SinkConfig {
219 name: "stdout".into(),
220 sink_type: "stdout".into(),
221 config: None,
222 }])),
223 pipes: Arc::new(tokio::sync::RwLock::new(vec![state::PipeConfigCache {
224 name: "catalog-sync".into(),
225 origin_connector: "postgres".into(),
226 origin_dsn: "postgres://ro@pg1:5432/meta".into(),
227 targets: vec!["kafka-main".into()],
228 interval_secs: 60,
229 query_count: 2,
230 recipe: None,
231 enabled: true,
232 }])),
233 pipe_presets: Arc::new(tokio::sync::RwLock::new(vec![])),
234 db_client: None,
235 lifecycle: None,
236 api_key: None,
237 })
238 }
239
240 fn test_state_with_lifecycle(lifecycle: Arc<dyn LifecycleControl>) -> Arc<ApiState> {
241 Arc::new(ApiState {
242 sinks: Arc::new(tokio::sync::RwLock::new(vec![SinkConfig {
243 name: "stdout".into(),
244 sink_type: "stdout".into(),
245 config: None,
246 }])),
247 pipes: Arc::new(tokio::sync::RwLock::new(vec![state::PipeConfigCache {
248 name: "catalog-sync".into(),
249 origin_connector: "postgres".into(),
250 origin_dsn: "postgres://ro@pg1:5432/meta".into(),
251 targets: vec!["kafka-main".into()],
252 interval_secs: 60,
253 query_count: 2,
254 recipe: None,
255 enabled: true,
256 }])),
257 pipe_presets: Arc::new(tokio::sync::RwLock::new(vec![])),
258 db_client: None,
259 lifecycle: Some(lifecycle),
260 api_key: None,
261 })
262 }
263
264 async fn get_json(app: &Router, path: &str) -> (StatusCode, serde_json::Value) {
265 let req = Request::get(path).body(Body::empty()).unwrap();
266 let resp = app.clone().oneshot(req).await.unwrap();
267 let status = resp.status();
268 let body = resp.into_body().collect().await.unwrap().to_bytes();
269 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
270 (status, json)
271 }
272
273 #[tokio::test]
274 async fn health_returns_ok() {
275 let app = router(test_state());
276 let (status, json) = get_json(&app, "/health").await;
277 assert_eq!(status, StatusCode::OK);
278 assert_eq!(json["status"], "ok");
279 assert!(json["version"].is_string());
280 }
281
282 #[tokio::test]
283 async fn list_sinks_returns_configured() {
284 let app = router(test_state());
285 let (status, json) = get_json(&app, "/sinks").await;
286 assert_eq!(status, StatusCode::OK);
287 let sinks = json["sinks"].as_array().unwrap();
288 assert_eq!(sinks.len(), 1);
289 assert_eq!(sinks[0]["name"], "stdout");
290 assert_eq!(sinks[0]["sink_type"], "stdout");
291 }
292
293 #[tokio::test]
294 async fn list_pipes_returns_configured() {
295 let app = router(test_state());
296 let (status, json) = get_json(&app, "/pipes").await;
297 assert_eq!(status, StatusCode::OK);
298 let pipes = json["pipes"].as_array().unwrap();
299 assert_eq!(pipes.len(), 1);
300 assert_eq!(pipes[0]["name"], "catalog-sync");
301 assert_eq!(pipes[0]["origin_connector"], "postgres");
302 assert_eq!(pipes[0]["targets"][0], "kafka-main");
303 assert_eq!(pipes[0]["interval_secs"], 60);
304 assert_eq!(pipes[0]["query_count"], 2);
305 assert!(pipes[0]["enabled"].as_bool().unwrap());
306 }
307
308 #[tokio::test]
309 async fn get_pipe_by_name() {
310 let app = router(test_state());
311 let (status, json) = get_json(&app, "/pipes/catalog-sync").await;
312 assert_eq!(status, StatusCode::OK);
313 assert_eq!(json["name"], "catalog-sync");
314 assert_eq!(json["origin_connector"], "postgres");
315 }
316
317 #[tokio::test]
318 async fn run_pipe_returns_manual_run_results() {
319 let app = router(test_state_with_lifecycle(Arc::new(TestLifecycle {
320 run_results: vec![types::PipeRunQueryResult {
321 query_id: "tables".into(),
322 created: 2,
323 updated: 1,
324 deleted: 0,
325 }],
326 run_error: None,
327 })));
328 let req = Request::post("/pipes/catalog-sync/run")
329 .body(Body::empty())
330 .unwrap();
331 let resp = app.clone().oneshot(req).await.unwrap();
332 assert_eq!(resp.status(), StatusCode::OK);
333 let body = resp.into_body().collect().await.unwrap().to_bytes();
334 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
335 assert_eq!(json["ok"], true);
336 assert_eq!(json["results"][0]["query_id"], "tables");
337 assert_eq!(json["results"][0]["created"], 2);
338 }
339
340 #[tokio::test]
341 async fn run_pipe_surfaces_manual_run_errors() {
342 let app = router(test_state_with_lifecycle(Arc::new(TestLifecycle {
343 run_results: vec![],
344 run_error: Some("pipe is disabled".into()),
345 })));
346 let req = Request::post("/pipes/catalog-sync/run")
347 .body(Body::empty())
348 .unwrap();
349 let resp = app.clone().oneshot(req).await.unwrap();
350 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
351 let body = resp.into_body().collect().await.unwrap().to_bytes();
352 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
353 assert!(json["error"].as_str().unwrap().contains("pipe is disabled"));
354 }
355
356 #[tokio::test]
357 async fn get_pipe_not_found() {
358 let app = router(test_state());
359 let (status, json) = get_json(&app, "/pipes/nonexistent").await;
360 assert_eq!(status, StatusCode::NOT_FOUND);
361 assert!(json["error"].as_str().unwrap().contains("not found"));
362 }
363
364 #[tokio::test]
365 async fn openapi_spec_is_valid_json() {
366 let app = router(test_state());
367 let (status, json) = get_json(&app, "/openapi.json").await;
368 assert_eq!(status, StatusCode::OK);
369 assert_eq!(json["openapi"], "3.1.0");
370 assert_eq!(json["info"]["title"], "oversync API");
371 assert!(json["paths"]["/health"].is_object());
372 assert!(json["paths"]["/sinks"].is_object());
373 assert!(json["paths"]["/pipes"].is_object());
374 assert!(json["paths"]["/pipes/{name}/run"].is_object());
375 assert!(json["paths"]["/pipe-presets"].is_object());
376 assert!(json["paths"]["/config/import"].is_object());
377 assert!(json["paths"]["/config/export"].is_object());
378 assert!(json["paths"]["/history"].is_object());
379 assert!(json["paths"]["/sync/pause"].is_object());
380 assert!(json["paths"]["/sync/resume"].is_object());
381 assert!(json["paths"]["/sources"].is_null());
382 }
383
384 #[tokio::test]
385 async fn sync_status_returns_default() {
386 let app = router(test_state());
387 let (status, json) = get_json(&app, "/sync/status").await;
388 assert_eq!(status, StatusCode::OK);
389 assert_eq!(json["running"], false);
390 }
391}