1pub mod auth;
2pub mod handlers;
3pub mod mutations;
4pub mod operations;
5pub mod state;
6pub mod types;
7
8use std::sync::Arc;
9
10use axum::Router;
11use axum::middleware;
12use axum::routing::{get, post, put};
13use utoipa::OpenApi;
14
15use crate::state::ApiState;
16
17#[derive(OpenApi)]
18#[openapi(
19 paths(
20 handlers::health,
21 handlers::list_sinks,
22 handlers::list_pipes,
23 handlers::list_pipe_presets,
24 handlers::get_pipe,
25 handlers::get_pipe_preset,
26 mutations::create_sink,
27 mutations::update_sink,
28 mutations::delete_sink,
29 mutations::create_pipe,
30 mutations::update_pipe,
31 mutations::delete_pipe,
32 mutations::create_pipe_preset,
33 mutations::update_pipe_preset,
34 mutations::delete_pipe_preset,
35 operations::pause_sync,
36 operations::resume_sync,
37 operations::export_config,
38 operations::import_config,
39 operations::get_history,
40 operations::sync_status,
41 ),
42 components(schemas(
43 types::HealthResponse,
44 types::CycleInfo,
45 types::SinkListResponse,
46 types::SinkInfo,
47 types::PipeListResponse,
48 types::PipeInfo,
49 types::PipePresetListResponse,
50 types::PipePresetInfo,
51 types::PipePresetSpecInput,
52 types::PipePresetParameterInput,
53 types::PipeQueryInput,
54 types::ErrorResponse,
55 types::CreateSinkRequest,
56 types::UpdateSinkRequest,
57 types::CreatePipeRequest,
58 types::UpdatePipeRequest,
59 types::CreatePipePresetRequest,
60 types::UpdatePipePresetRequest,
61 types::MutationResponse,
62 types::HistoryResponse,
63 types::StatusResponse,
64 types::ExportConfigFormat,
65 types::ExportConfigQuery,
66 types::ExportConfigResponse,
67 types::ImportConfigRequest,
68 types::ImportConfigResponse,
69 )),
70 info(
71 title = "oversync API",
72 version = "0.1.0",
73 description = "HTTP API for managing oversync pipes, sinks, recipes, and sync status."
74 )
75)]
76pub struct ApiDoc;
77
78pub fn router(state: Arc<ApiState>) -> Router {
79 router_with_openapi(state, ApiDoc::openapi())
80}
81
82pub fn router_with_openapi(state: Arc<ApiState>, openapi: utoipa::openapi::OpenApi) -> Router {
83 let openapi = Arc::new(openapi);
84
85 let protected = Router::new()
86 .route(
87 "/sinks",
88 get(handlers::list_sinks).post(mutations::create_sink),
89 )
90 .route(
91 "/sinks/{name}",
92 put(mutations::update_sink).delete(mutations::delete_sink),
93 )
94 .route(
95 "/pipes",
96 get(handlers::list_pipes).post(mutations::create_pipe),
97 )
98 .route(
99 "/pipe-presets",
100 get(handlers::list_pipe_presets).post(mutations::create_pipe_preset),
101 )
102 .route(
103 "/pipes/{name}",
104 get(handlers::get_pipe)
105 .put(mutations::update_pipe)
106 .delete(mutations::delete_pipe),
107 )
108 .route(
109 "/pipe-presets/{name}",
110 get(handlers::get_pipe_preset)
111 .put(mutations::update_pipe_preset)
112 .delete(mutations::delete_pipe_preset),
113 )
114 .route("/sync/pause", post(operations::pause_sync))
115 .route("/sync/resume", post(operations::resume_sync))
116 .route("/sync/status", get(operations::sync_status))
117 .route("/config/import", post(operations::import_config))
118 .route("/config/export", get(operations::export_config))
119 .route("/history", get(operations::get_history))
120 .route_layer(middleware::from_fn_with_state(
121 state.clone(),
122 auth::require_api_key,
123 ));
124
125 Router::new()
126 .route("/health", get(handlers::health))
127 .route(
128 "/openapi.json",
129 get({
130 let openapi = Arc::clone(&openapi);
131 move || {
132 let openapi = Arc::clone(&openapi);
133 async move { axum::Json(openapi.as_ref().clone()) }
134 }
135 }),
136 )
137 .merge(protected)
138 .with_state(state)
139}
140
141#[cfg(test)]
142mod tests {
143 use super::*;
144 use axum::body::Body;
145 use axum::http::{Request, StatusCode};
146 use http_body_util::BodyExt;
147 use state::*;
148 use tower::ServiceExt;
149
150 fn test_state() -> Arc<ApiState> {
151 Arc::new(ApiState {
152 sinks: Arc::new(tokio::sync::RwLock::new(vec![SinkConfig {
153 name: "stdout".into(),
154 sink_type: "stdout".into(),
155 config: None,
156 }])),
157 pipes: Arc::new(tokio::sync::RwLock::new(vec![state::PipeConfigCache {
158 name: "catalog-sync".into(),
159 origin_connector: "postgres".into(),
160 origin_dsn: "postgres://ro@pg1:5432/meta".into(),
161 targets: vec!["kafka-main".into()],
162 interval_secs: 60,
163 query_count: 2,
164 recipe: None,
165 enabled: true,
166 }])),
167 pipe_presets: Arc::new(tokio::sync::RwLock::new(vec![])),
168 db_client: None,
169 lifecycle: None,
170 api_key: None,
171 })
172 }
173
174 async fn get_json(app: &Router, path: &str) -> (StatusCode, serde_json::Value) {
175 let req = Request::get(path).body(Body::empty()).unwrap();
176 let resp = app.clone().oneshot(req).await.unwrap();
177 let status = resp.status();
178 let body = resp.into_body().collect().await.unwrap().to_bytes();
179 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
180 (status, json)
181 }
182
183 #[tokio::test]
184 async fn health_returns_ok() {
185 let app = router(test_state());
186 let (status, json) = get_json(&app, "/health").await;
187 assert_eq!(status, StatusCode::OK);
188 assert_eq!(json["status"], "ok");
189 assert!(json["version"].is_string());
190 }
191
192 #[tokio::test]
193 async fn list_sinks_returns_configured() {
194 let app = router(test_state());
195 let (status, json) = get_json(&app, "/sinks").await;
196 assert_eq!(status, StatusCode::OK);
197 let sinks = json["sinks"].as_array().unwrap();
198 assert_eq!(sinks.len(), 1);
199 assert_eq!(sinks[0]["name"], "stdout");
200 assert_eq!(sinks[0]["sink_type"], "stdout");
201 }
202
203 #[tokio::test]
204 async fn list_pipes_returns_configured() {
205 let app = router(test_state());
206 let (status, json) = get_json(&app, "/pipes").await;
207 assert_eq!(status, StatusCode::OK);
208 let pipes = json["pipes"].as_array().unwrap();
209 assert_eq!(pipes.len(), 1);
210 assert_eq!(pipes[0]["name"], "catalog-sync");
211 assert_eq!(pipes[0]["origin_connector"], "postgres");
212 assert_eq!(pipes[0]["targets"][0], "kafka-main");
213 assert_eq!(pipes[0]["interval_secs"], 60);
214 assert_eq!(pipes[0]["query_count"], 2);
215 assert!(pipes[0]["enabled"].as_bool().unwrap());
216 }
217
218 #[tokio::test]
219 async fn get_pipe_by_name() {
220 let app = router(test_state());
221 let (status, json) = get_json(&app, "/pipes/catalog-sync").await;
222 assert_eq!(status, StatusCode::OK);
223 assert_eq!(json["name"], "catalog-sync");
224 assert_eq!(json["origin_connector"], "postgres");
225 }
226
227 #[tokio::test]
228 async fn get_pipe_not_found() {
229 let app = router(test_state());
230 let (status, json) = get_json(&app, "/pipes/nonexistent").await;
231 assert_eq!(status, StatusCode::NOT_FOUND);
232 assert!(json["error"].as_str().unwrap().contains("not found"));
233 }
234
235 #[tokio::test]
236 async fn openapi_spec_is_valid_json() {
237 let app = router(test_state());
238 let (status, json) = get_json(&app, "/openapi.json").await;
239 assert_eq!(status, StatusCode::OK);
240 assert_eq!(json["openapi"], "3.1.0");
241 assert_eq!(json["info"]["title"], "oversync API");
242 assert!(json["paths"]["/health"].is_object());
243 assert!(json["paths"]["/sinks"].is_object());
244 assert!(json["paths"]["/pipes"].is_object());
245 assert!(json["paths"]["/pipe-presets"].is_object());
246 assert!(json["paths"]["/config/import"].is_object());
247 assert!(json["paths"]["/config/export"].is_object());
248 assert!(json["paths"]["/history"].is_object());
249 assert!(json["paths"]["/sync/pause"].is_object());
250 assert!(json["paths"]["/sync/resume"].is_object());
251 assert!(json["paths"]["/sources"].is_null());
252 }
253
254 #[tokio::test]
255 async fn sync_status_returns_default() {
256 let app = router(test_state());
257 let (status, json) = get_json(&app, "/sync/status").await;
258 assert_eq!(status, StatusCode::OK);
259 assert_eq!(json["running"], false);
260 }
261}