1pub mod auth;
2pub mod handlers;
3pub mod mutations;
4pub mod operations;
5pub mod queries;
6pub mod state;
7pub mod types;
8
9use std::sync::Arc;
10
11use axum::Router;
12use axum::middleware;
13use axum::routing::{get, post, put};
14use utoipa::OpenApi;
15
16use crate::state::ApiState;
17
18#[derive(OpenApi)]
19#[openapi(
20 paths(
21 handlers::health,
22 handlers::list_sources,
23 handlers::get_source,
24 handlers::list_sinks,
25 handlers::list_pipes,
26 handlers::get_pipe,
27 mutations::create_source,
28 mutations::update_source,
29 mutations::delete_source,
30 mutations::create_sink,
31 mutations::update_sink,
32 mutations::delete_sink,
33 mutations::create_pipe,
34 mutations::update_pipe,
35 mutations::delete_pipe,
36 operations::trigger_source,
37 operations::pause_sync,
38 operations::resume_sync,
39 operations::get_history,
40 operations::sync_status,
41 queries::list_queries,
42 queries::create_query,
43 queries::update_query,
44 queries::delete_query,
45 ),
46 components(schemas(
47 types::HealthResponse,
48 types::SourceListResponse,
49 types::SourceInfo,
50 types::QueryInfo,
51 types::SourceStatus,
52 types::CycleInfo,
53 types::SinkListResponse,
54 types::SinkInfo,
55 types::PipeListResponse,
56 types::PipeInfo,
57 types::TriggerResponse,
58 types::ErrorResponse,
59 types::CreateSourceRequest,
60 types::UpdateSourceRequest,
61 types::CreateSinkRequest,
62 types::UpdateSinkRequest,
63 types::CreatePipeRequest,
64 types::UpdatePipeRequest,
65 types::MutationResponse,
66 types::HistoryResponse,
67 types::StatusResponse,
68 types::CreateQueryRequest,
69 types::UpdateQueryRequest,
70 types::QueryListResponse,
71 types::QueryDetail,
72 )),
73 info(
74 title = "oversync API",
75 version = "0.1.0",
76 description = "HTTP API for managing oversync sources, sinks, and sync status."
77 )
78)]
79pub struct ApiDoc;
80
81pub fn router(state: Arc<ApiState>) -> Router {
82 let protected = Router::new()
84 .route(
85 "/sources",
86 get(handlers::list_sources).post(mutations::create_source),
87 )
88 .route(
89 "/sources/{name}",
90 get(handlers::get_source)
91 .put(mutations::update_source)
92 .delete(mutations::delete_source),
93 )
94 .route("/sources/{name}/trigger", post(operations::trigger_source))
95 .route(
96 "/sources/{source}/queries",
97 get(queries::list_queries).post(queries::create_query),
98 )
99 .route(
100 "/sources/{source}/queries/{name}",
101 put(queries::update_query).delete(queries::delete_query),
102 )
103 .route(
104 "/sinks",
105 get(handlers::list_sinks).post(mutations::create_sink),
106 )
107 .route(
108 "/sinks/{name}",
109 put(mutations::update_sink).delete(mutations::delete_sink),
110 )
111 .route(
112 "/pipes",
113 get(handlers::list_pipes).post(mutations::create_pipe),
114 )
115 .route(
116 "/pipes/{name}",
117 get(handlers::get_pipe)
118 .put(mutations::update_pipe)
119 .delete(mutations::delete_pipe),
120 )
121 .route("/sync/pause", post(operations::pause_sync))
122 .route("/sync/resume", post(operations::resume_sync))
123 .route("/sync/status", get(operations::sync_status))
124 .route("/history", get(operations::get_history))
125 .route_layer(middleware::from_fn_with_state(
126 state.clone(),
127 auth::require_api_key,
128 ));
129
130 Router::new()
132 .route("/health", get(handlers::health))
133 .route(
134 "/openapi.json",
135 get(|| async { axum::Json(ApiDoc::openapi()) }),
136 )
137 .merge(protected)
138 .with_state(state)
139}
140
141#[cfg(test)]
142mod tests {
143 use super::*;
144 use axum::body::Body;
145 use axum::http::{Request, StatusCode};
146 use http_body_util::BodyExt;
147 use state::*;
148 use std::collections::HashMap;
149 use tokio::sync::RwLock;
150 use tower::ServiceExt;
151
152 fn test_state() -> Arc<ApiState> {
153 Arc::new(ApiState {
154 sources: Arc::new(RwLock::new(vec![SourceConfig {
155 name: "pg-prod".into(),
156 connector: "postgres".into(),
157 interval_secs: 300,
158 queries: vec![QueryConfig {
159 id: "users".into(),
160 key_column: "id".into(),
161 }],
162 }])),
163 sinks: Arc::new(RwLock::new(vec![SinkConfig {
164 name: "stdout".into(),
165 sink_type: "stdout".into(),
166 }])),
167 pipes: Arc::new(RwLock::new(vec![state::PipeConfigCache {
168 name: "catalog-sync".into(),
169 origin_connector: "postgres".into(),
170 origin_dsn: "postgres://ro@pg1:5432/meta".into(),
171 targets: vec!["kafka-main".into()],
172 interval_secs: 60,
173 enabled: true,
174 }])),
175 cycle_status: Arc::new(RwLock::new(HashMap::new())),
176 db_client: None,
177 lifecycle: None,
178 api_key: None,
179 })
180 }
181
182 async fn get_json(app: &Router, path: &str) -> (StatusCode, serde_json::Value) {
183 let req = Request::get(path).body(Body::empty()).unwrap();
184 let resp = app.clone().oneshot(req).await.unwrap();
185 let status = resp.status();
186 let body = resp.into_body().collect().await.unwrap().to_bytes();
187 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
188 (status, json)
189 }
190
191 #[tokio::test]
192 async fn health_returns_ok() {
193 let app = router(test_state());
194 let (status, json) = get_json(&app, "/health").await;
195 assert_eq!(status, StatusCode::OK);
196 assert_eq!(json["status"], "ok");
197 assert!(json["version"].is_string());
198 }
199
200 #[tokio::test]
201 async fn list_sources_returns_configured() {
202 let app = router(test_state());
203 let (status, json) = get_json(&app, "/sources").await;
204 assert_eq!(status, StatusCode::OK);
205 let sources = json["sources"].as_array().unwrap();
206 assert_eq!(sources.len(), 1);
207 assert_eq!(sources[0]["name"], "pg-prod");
208 assert_eq!(sources[0]["connector"], "postgres");
209 assert_eq!(sources[0]["queries"][0]["id"], "users");
210 }
211
212 #[tokio::test]
213 async fn get_source_by_name() {
214 let app = router(test_state());
215 let (status, json) = get_json(&app, "/sources/pg-prod").await;
216 assert_eq!(status, StatusCode::OK);
217 assert_eq!(json["name"], "pg-prod");
218 }
219
220 #[tokio::test]
221 async fn get_source_not_found() {
222 let app = router(test_state());
223 let (status, json) = get_json(&app, "/sources/nonexistent").await;
224 assert_eq!(status, StatusCode::OK); assert!(json["error"].as_str().unwrap().contains("not found"));
226 }
227
228 #[tokio::test]
229 async fn list_sinks_returns_configured() {
230 let app = router(test_state());
231 let (status, json) = get_json(&app, "/sinks").await;
232 assert_eq!(status, StatusCode::OK);
233 let sinks = json["sinks"].as_array().unwrap();
234 assert_eq!(sinks.len(), 1);
235 assert_eq!(sinks[0]["name"], "stdout");
236 assert_eq!(sinks[0]["sink_type"], "stdout");
237 }
238
239 #[tokio::test]
240 async fn openapi_spec_is_valid_json() {
241 let app = router(test_state());
242 let (status, json) = get_json(&app, "/openapi.json").await;
243 assert_eq!(status, StatusCode::OK);
244 assert_eq!(json["openapi"], "3.1.0");
245 assert_eq!(json["info"]["title"], "oversync API");
246 assert!(json["paths"]["/health"].is_object());
247 assert!(json["paths"]["/sources"].is_object());
248 assert!(json["paths"]["/sinks"].is_object());
249 assert!(json["paths"]["/pipes"].is_object());
250 assert!(json["paths"]["/history"].is_object());
251 assert!(json["paths"]["/sync/pause"].is_object());
252 assert!(json["paths"]["/sync/resume"].is_object());
253 }
254
255 #[tokio::test]
256 async fn sync_status_returns_default() {
257 let app = router(test_state());
258 let (status, json) = get_json(&app, "/sync/status").await;
259 assert_eq!(status, StatusCode::OK);
260 assert_eq!(json["running"], false);
261 }
262
263 #[tokio::test]
264 async fn list_pipes_returns_configured() {
265 let app = router(test_state());
266 let (status, json) = get_json(&app, "/pipes").await;
267 assert_eq!(status, StatusCode::OK);
268 let pipes = json["pipes"].as_array().unwrap();
269 assert_eq!(pipes.len(), 1);
270 assert_eq!(pipes[0]["name"], "catalog-sync");
271 assert_eq!(pipes[0]["origin_connector"], "postgres");
272 assert_eq!(pipes[0]["targets"][0], "kafka-main");
273 assert_eq!(pipes[0]["interval_secs"], 60);
274 assert!(pipes[0]["enabled"].as_bool().unwrap());
275 }
276
277 #[tokio::test]
278 async fn get_pipe_by_name() {
279 let app = router(test_state());
280 let (status, json) = get_json(&app, "/pipes/catalog-sync").await;
281 assert_eq!(status, StatusCode::OK);
282 assert_eq!(json["name"], "catalog-sync");
283 assert_eq!(json["origin_connector"], "postgres");
284 }
285
286 #[tokio::test]
287 async fn get_pipe_not_found() {
288 let app = router(test_state());
289 let (status, json) = get_json(&app, "/pipes/nonexistent").await;
290 assert_eq!(status, StatusCode::OK);
291 assert!(json["error"].as_str().unwrap().contains("not found"));
292 }
293}