1pub mod auth;
2pub mod handlers;
3pub mod mutations;
4pub mod operations;
5pub mod queries;
6pub mod state;
7pub mod types;
8
9use std::sync::Arc;
10
11use axum::Router;
12use axum::middleware;
13use axum::routing::{get, post, put};
14use utoipa::OpenApi;
15
16use crate::state::ApiState;
17
18#[derive(OpenApi)]
19#[openapi(
20 paths(
21 handlers::health,
22 handlers::list_sources,
23 handlers::get_source,
24 handlers::list_sinks,
25 handlers::list_pipes,
26 handlers::get_pipe,
27 mutations::create_source,
28 mutations::update_source,
29 mutations::delete_source,
30 mutations::create_sink,
31 mutations::update_sink,
32 mutations::delete_sink,
33 mutations::create_pipe,
34 mutations::update_pipe,
35 mutations::delete_pipe,
36 operations::trigger_source,
37 operations::pause_sync,
38 operations::resume_sync,
39 operations::get_history,
40 operations::sync_status,
41 queries::list_queries,
42 queries::create_query,
43 queries::update_query,
44 queries::delete_query,
45 ),
46 components(schemas(
47 types::HealthResponse,
48 types::SourceListResponse,
49 types::SourceInfo,
50 types::QueryInfo,
51 types::SourceStatus,
52 types::CycleInfo,
53 types::SinkListResponse,
54 types::SinkInfo,
55 types::PipeListResponse,
56 types::PipeInfo,
57 types::TriggerResponse,
58 types::ErrorResponse,
59 types::CreateSourceRequest,
60 types::UpdateSourceRequest,
61 types::CreateSinkRequest,
62 types::UpdateSinkRequest,
63 types::CreatePipeRequest,
64 types::UpdatePipeRequest,
65 types::MutationResponse,
66 types::HistoryResponse,
67 types::StatusResponse,
68 types::CreateQueryRequest,
69 types::UpdateQueryRequest,
70 types::QueryListResponse,
71 types::QueryDetail,
72 )),
73 info(
74 title = "oversync API",
75 version = "0.1.0",
76 description = "HTTP API for managing oversync sources, sinks, and sync status."
77 )
78)]
79pub struct ApiDoc;
80
81pub fn router(state: Arc<ApiState>) -> Router {
82 let protected = Router::new()
84 .route(
85 "/sources",
86 get(handlers::list_sources).post(mutations::create_source),
87 )
88 .route(
89 "/sources/{name}",
90 get(handlers::get_source)
91 .put(mutations::update_source)
92 .delete(mutations::delete_source),
93 )
94 .route("/sources/{name}/trigger", post(operations::trigger_source))
95 .route(
96 "/sources/{source}/queries",
97 get(queries::list_queries).post(queries::create_query),
98 )
99 .route(
100 "/sources/{source}/queries/{name}",
101 put(queries::update_query).delete(queries::delete_query),
102 )
103 .route(
104 "/sinks",
105 get(handlers::list_sinks).post(mutations::create_sink),
106 )
107 .route(
108 "/sinks/{name}",
109 put(mutations::update_sink).delete(mutations::delete_sink),
110 )
111 .route(
112 "/pipes",
113 get(handlers::list_pipes).post(mutations::create_pipe),
114 )
115 .route(
116 "/pipes/{name}",
117 get(handlers::get_pipe)
118 .put(mutations::update_pipe)
119 .delete(mutations::delete_pipe),
120 )
121 .route("/sync/pause", post(operations::pause_sync))
122 .route("/sync/resume", post(operations::resume_sync))
123 .route("/sync/status", get(operations::sync_status))
124 .route("/history", get(operations::get_history))
125 .route_layer(middleware::from_fn_with_state(
126 state.clone(),
127 auth::require_api_key,
128 ));
129
130 Router::new()
132 .route("/health", get(handlers::health))
133 .route(
134 "/openapi.json",
135 get(|| async { axum::Json(ApiDoc::openapi()) }),
136 )
137 .merge(protected)
138 .with_state(state)
139}
140
141#[cfg(test)]
142mod tests {
143 use super::*;
144 use axum::body::Body;
145 use axum::http::{Request, StatusCode};
146 use http_body_util::BodyExt;
147 use state::*;
148 use std::collections::HashMap;
149 use tokio::sync::RwLock;
150 use tower::ServiceExt;
151
152 fn test_state() -> Arc<ApiState> {
153 Arc::new(ApiState {
154 sources: Arc::new(RwLock::new(vec![SourceConfig {
155 name: "pg-prod".into(),
156 connector: "postgres".into(),
157 interval_secs: 300,
158 queries: vec![QueryConfig {
159 id: "users".into(),
160 key_column: "id".into(),
161 }],
162 }])),
163 sinks: Arc::new(RwLock::new(vec![SinkConfig {
164 name: "stdout".into(),
165 sink_type: "stdout".into(),
166 config: None,
167 }])),
168 pipes: Arc::new(RwLock::new(vec![state::PipeConfigCache {
169 name: "catalog-sync".into(),
170 origin_connector: "postgres".into(),
171 origin_dsn: "postgres://ro@pg1:5432/meta".into(),
172 targets: vec!["kafka-main".into()],
173 interval_secs: 60,
174 enabled: true,
175 }])),
176 cycle_status: Arc::new(RwLock::new(HashMap::new())),
177 db_client: None,
178 lifecycle: None,
179 api_key: None,
180 })
181 }
182
183 async fn get_json(app: &Router, path: &str) -> (StatusCode, serde_json::Value) {
184 let req = Request::get(path).body(Body::empty()).unwrap();
185 let resp = app.clone().oneshot(req).await.unwrap();
186 let status = resp.status();
187 let body = resp.into_body().collect().await.unwrap().to_bytes();
188 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
189 (status, json)
190 }
191
192 #[tokio::test]
193 async fn health_returns_ok() {
194 let app = router(test_state());
195 let (status, json) = get_json(&app, "/health").await;
196 assert_eq!(status, StatusCode::OK);
197 assert_eq!(json["status"], "ok");
198 assert!(json["version"].is_string());
199 }
200
201 #[tokio::test]
202 async fn list_sources_returns_configured() {
203 let app = router(test_state());
204 let (status, json) = get_json(&app, "/sources").await;
205 assert_eq!(status, StatusCode::OK);
206 let sources = json["sources"].as_array().unwrap();
207 assert_eq!(sources.len(), 1);
208 assert_eq!(sources[0]["name"], "pg-prod");
209 assert_eq!(sources[0]["connector"], "postgres");
210 assert_eq!(sources[0]["queries"][0]["id"], "users");
211 }
212
213 #[tokio::test]
214 async fn get_source_by_name() {
215 let app = router(test_state());
216 let (status, json) = get_json(&app, "/sources/pg-prod").await;
217 assert_eq!(status, StatusCode::OK);
218 assert_eq!(json["name"], "pg-prod");
219 }
220
221 #[tokio::test]
222 async fn get_source_not_found() {
223 let app = router(test_state());
224 let (status, json) = get_json(&app, "/sources/nonexistent").await;
225 assert_eq!(status, StatusCode::NOT_FOUND);
226 assert!(json["error"].as_str().unwrap().contains("not found"));
227 }
228
229 #[tokio::test]
230 async fn list_sinks_returns_configured() {
231 let app = router(test_state());
232 let (status, json) = get_json(&app, "/sinks").await;
233 assert_eq!(status, StatusCode::OK);
234 let sinks = json["sinks"].as_array().unwrap();
235 assert_eq!(sinks.len(), 1);
236 assert_eq!(sinks[0]["name"], "stdout");
237 assert_eq!(sinks[0]["sink_type"], "stdout");
238 }
239
240 #[tokio::test]
241 async fn openapi_spec_is_valid_json() {
242 let app = router(test_state());
243 let (status, json) = get_json(&app, "/openapi.json").await;
244 assert_eq!(status, StatusCode::OK);
245 assert_eq!(json["openapi"], "3.1.0");
246 assert_eq!(json["info"]["title"], "oversync API");
247 assert!(json["paths"]["/health"].is_object());
248 assert!(json["paths"]["/sources"].is_object());
249 assert!(json["paths"]["/sinks"].is_object());
250 assert!(json["paths"]["/pipes"].is_object());
251 assert!(json["paths"]["/history"].is_object());
252 assert!(json["paths"]["/sync/pause"].is_object());
253 assert!(json["paths"]["/sync/resume"].is_object());
254 }
255
256 #[tokio::test]
257 async fn sync_status_returns_default() {
258 let app = router(test_state());
259 let (status, json) = get_json(&app, "/sync/status").await;
260 assert_eq!(status, StatusCode::OK);
261 assert_eq!(json["running"], false);
262 }
263
264 #[tokio::test]
265 async fn list_pipes_returns_configured() {
266 let app = router(test_state());
267 let (status, json) = get_json(&app, "/pipes").await;
268 assert_eq!(status, StatusCode::OK);
269 let pipes = json["pipes"].as_array().unwrap();
270 assert_eq!(pipes.len(), 1);
271 assert_eq!(pipes[0]["name"], "catalog-sync");
272 assert_eq!(pipes[0]["origin_connector"], "postgres");
273 assert_eq!(pipes[0]["targets"][0], "kafka-main");
274 assert_eq!(pipes[0]["interval_secs"], 60);
275 assert!(pipes[0]["enabled"].as_bool().unwrap());
276 }
277
278 #[tokio::test]
279 async fn get_pipe_by_name() {
280 let app = router(test_state());
281 let (status, json) = get_json(&app, "/pipes/catalog-sync").await;
282 assert_eq!(status, StatusCode::OK);
283 assert_eq!(json["name"], "catalog-sync");
284 assert_eq!(json["origin_connector"], "postgres");
285 }
286
287 #[tokio::test]
288 async fn get_pipe_not_found() {
289 let app = router(test_state());
290 let (status, json) = get_json(&app, "/pipes/nonexistent").await;
291 assert_eq!(status, StatusCode::NOT_FOUND);
292 assert!(json["error"].as_str().unwrap().contains("not found"));
293 }
294}