1use std::sync::Arc;
2
3use axum::Json;
4use axum::extract::{Path, State};
5
6use crate::state::ApiState;
7use crate::types::*;
8
9use oversync_queries::mutations;
10
11const SQL_DELETE_SOURCE: &str = mutations::DELETE_SOURCE;
12const SQL_CREATE_SOURCE: &str = mutations::CREATE_SOURCE;
13const SQL_UPDATE_SOURCE_CONNECTOR: &str = mutations::UPDATE_SOURCE_CONNECTOR;
14const SQL_UPDATE_SOURCE_ENABLED: &str = mutations::UPDATE_SOURCE_ENABLED;
15const SQL_UPDATE_SOURCE_CONFIG: &str = mutations::UPDATE_SOURCE_CONFIG;
16const SQL_DELETE_SOURCE_QUERIES: &str = mutations::DELETE_SOURCE_QUERIES;
17
18const SQL_DELETE_SINK: &str = mutations::DELETE_SINK;
19const SQL_CREATE_SINK: &str = mutations::CREATE_SINK;
20const SQL_UPDATE_SINK_TYPE: &str = mutations::UPDATE_SINK_TYPE;
21const SQL_UPDATE_SINK_ENABLED: &str = mutations::UPDATE_SINK_ENABLED;
22const SQL_UPDATE_SINK_CONFIG: &str = mutations::UPDATE_SINK_CONFIG;
23
24const SQL_DELETE_PIPE: &str = mutations::DELETE_PIPE;
25const SQL_CREATE_PIPE: &str = mutations::CREATE_PIPE;
26const SQL_DELETE_PIPE_QUERIES: &str = mutations::DELETE_PIPE_QUERIES;
27const SQL_UPDATE_PIPE_ORIGIN_CONNECTOR: &str = mutations::UPDATE_PIPE_ORIGIN_CONNECTOR;
28const SQL_UPDATE_PIPE_ORIGIN_DSN: &str = mutations::UPDATE_PIPE_ORIGIN_DSN;
29const SQL_UPDATE_PIPE_ORIGIN_CONFIG: &str = mutations::UPDATE_PIPE_ORIGIN_CONFIG;
30const SQL_UPDATE_PIPE_TARGETS: &str = mutations::UPDATE_PIPE_TARGETS;
31const SQL_UPDATE_PIPE_SCHEDULE: &str = mutations::UPDATE_PIPE_SCHEDULE;
32const SQL_UPDATE_PIPE_DELTA: &str = mutations::UPDATE_PIPE_DELTA;
33const SQL_UPDATE_PIPE_RETRY: &str = mutations::UPDATE_PIPE_RETRY;
34const SQL_UPDATE_PIPE_ENABLED: &str = mutations::UPDATE_PIPE_ENABLED;
35
36#[utoipa::path(
37 post,
38 path = "/sources",
39 request_body = CreateSourceRequest,
40 responses(
41 (status = 200, description = "Source created", body = MutationResponse),
42 (status = 400, description = "Bad request", body = ErrorResponse)
43 )
44)]
45pub async fn create_source(
46 State(state): State<Arc<ApiState>>,
47 Json(req): Json<CreateSourceRequest>,
48) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
49 let db = require_db(&state)?;
50
51 let config_json = if req.config.is_null() {
52 serde_json::json!({})
53 } else {
54 req.config
55 };
56
57 db.query(SQL_DELETE_SOURCE)
58 .bind(("name", req.name.clone()))
59 .await
60 .map_err(db_err)?;
61
62 db.query(SQL_CREATE_SOURCE)
63 .bind(("name", req.name.clone()))
64 .bind(("connector", req.connector))
65 .bind(("config", config_json))
66 .await
67 .map_err(db_err)?;
68
69 reload_config(&state).await?;
70
71 Ok(Json(MutationResponse {
72 ok: true,
73 message: format!("source '{}' created", req.name),
74 }))
75}
76
77#[utoipa::path(
78 put,
79 path = "/sources/{name}",
80 params(("name" = String, Path, description = "Source name")),
81 request_body = UpdateSourceRequest,
82 responses(
83 (status = 200, description = "Source updated", body = MutationResponse),
84 (status = 400, description = "Bad request", body = ErrorResponse)
85 )
86)]
87pub async fn update_source(
88 State(state): State<Arc<ApiState>>,
89 Path(name): Path<String>,
90 Json(req): Json<UpdateSourceRequest>,
91) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
92 let db = require_db(&state)?;
93
94 if let Some(connector) = req.connector {
95 db.query(SQL_UPDATE_SOURCE_CONNECTOR)
96 .bind(("name", name.clone()))
97 .bind(("connector", connector))
98 .await
99 .map_err(db_err)?;
100 }
101
102 if let Some(enabled) = req.enabled {
103 db.query(SQL_UPDATE_SOURCE_ENABLED)
104 .bind(("name", name.clone()))
105 .bind(("enabled", enabled))
106 .await
107 .map_err(db_err)?;
108 }
109
110 if let Some(config) = req.config {
111 db.query(SQL_UPDATE_SOURCE_CONFIG)
112 .bind(("name", name.clone()))
113 .bind(("config", config))
114 .await
115 .map_err(db_err)?;
116 }
117
118 reload_config(&state).await?;
119
120 Ok(Json(MutationResponse {
121 ok: true,
122 message: format!("source '{name}' updated"),
123 }))
124}
125
126#[utoipa::path(
127 delete,
128 path = "/sources/{name}",
129 params(("name" = String, Path, description = "Source name")),
130 responses(
131 (status = 200, description = "Source deleted", body = MutationResponse),
132 (status = 400, description = "Bad request", body = ErrorResponse)
133 )
134)]
135pub async fn delete_source(
136 State(state): State<Arc<ApiState>>,
137 Path(name): Path<String>,
138) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
139 let db = require_db(&state)?;
140
141 db.query(SQL_DELETE_SOURCE)
142 .bind(("name", name.clone()))
143 .await
144 .map_err(db_err)?;
145
146 db.query(SQL_DELETE_SOURCE_QUERIES)
147 .bind(("name", name.clone()))
148 .await
149 .map_err(db_err)?;
150
151 reload_config(&state).await?;
152
153 Ok(Json(MutationResponse {
154 ok: true,
155 message: format!("source '{name}' deleted"),
156 }))
157}
158
159#[utoipa::path(
160 post,
161 path = "/sinks",
162 request_body = CreateSinkRequest,
163 responses(
164 (status = 200, description = "Sink created", body = MutationResponse),
165 (status = 400, description = "Bad request", body = ErrorResponse)
166 )
167)]
168pub async fn create_sink(
169 State(state): State<Arc<ApiState>>,
170 Json(req): Json<CreateSinkRequest>,
171) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
172 let db = require_db(&state)?;
173
174 let config_json = if req.config.is_null() {
175 serde_json::json!({})
176 } else {
177 req.config
178 };
179
180 db.query(SQL_DELETE_SINK)
181 .bind(("name", req.name.clone()))
182 .await
183 .map_err(db_err)?;
184
185 db.query(SQL_CREATE_SINK)
186 .bind(("name", req.name.clone()))
187 .bind(("sink_type", req.sink_type))
188 .bind(("config", config_json))
189 .await
190 .map_err(db_err)?;
191
192 reload_config(&state).await?;
193
194 Ok(Json(MutationResponse {
195 ok: true,
196 message: format!("sink '{}' created", req.name),
197 }))
198}
199
200#[utoipa::path(
201 put,
202 path = "/sinks/{name}",
203 params(("name" = String, Path, description = "Sink name")),
204 request_body = UpdateSinkRequest,
205 responses(
206 (status = 200, description = "Sink updated", body = MutationResponse),
207 (status = 400, description = "Bad request", body = ErrorResponse)
208 )
209)]
210pub async fn update_sink(
211 State(state): State<Arc<ApiState>>,
212 Path(name): Path<String>,
213 Json(req): Json<UpdateSinkRequest>,
214) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
215 let db = require_db(&state)?;
216
217 if let Some(sink_type) = req.sink_type {
218 db.query(SQL_UPDATE_SINK_TYPE)
219 .bind(("name", name.clone()))
220 .bind(("sink_type", sink_type))
221 .await
222 .map_err(db_err)?;
223 }
224
225 if let Some(enabled) = req.enabled {
226 db.query(SQL_UPDATE_SINK_ENABLED)
227 .bind(("name", name.clone()))
228 .bind(("enabled", enabled))
229 .await
230 .map_err(db_err)?;
231 }
232
233 if let Some(config) = req.config {
234 db.query(SQL_UPDATE_SINK_CONFIG)
235 .bind(("name", name.clone()))
236 .bind(("config", config))
237 .await
238 .map_err(db_err)?;
239 }
240
241 reload_config(&state).await?;
242
243 Ok(Json(MutationResponse {
244 ok: true,
245 message: format!("sink '{name}' updated"),
246 }))
247}
248
249#[utoipa::path(
250 delete,
251 path = "/sinks/{name}",
252 params(("name" = String, Path, description = "Sink name")),
253 responses(
254 (status = 200, description = "Sink deleted", body = MutationResponse),
255 (status = 400, description = "Bad request", body = ErrorResponse)
256 )
257)]
258pub async fn delete_sink(
259 State(state): State<Arc<ApiState>>,
260 Path(name): Path<String>,
261) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
262 let db = require_db(&state)?;
263
264 db.query(SQL_DELETE_SINK)
265 .bind(("name", name.clone()))
266 .await
267 .map_err(db_err)?;
268
269 reload_config(&state).await?;
270
271 Ok(Json(MutationResponse {
272 ok: true,
273 message: format!("sink '{name}' deleted"),
274 }))
275}
276
277#[utoipa::path(
280 post,
281 path = "/pipes",
282 request_body = CreatePipeRequest,
283 responses(
284 (status = 200, description = "Pipe created", body = MutationResponse),
285 (status = 400, description = "Bad request", body = ErrorResponse)
286 )
287)]
288pub async fn create_pipe(
289 State(state): State<Arc<ApiState>>,
290 Json(req): Json<CreatePipeRequest>,
291) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
292 let db = require_db(&state)?;
293
294 let origin_config = if req.origin_config.is_null() {
295 serde_json::json!({})
296 } else {
297 req.origin_config
298 };
299 let schedule = if req.schedule.is_null() {
300 serde_json::json!({"interval_secs": 300, "missed_tick_policy": "skip"})
301 } else {
302 req.schedule
303 };
304 let delta = if req.delta.is_null() {
305 serde_json::json!({"diff_mode": "db", "fail_safe_threshold": 30.0})
306 } else {
307 req.delta
308 };
309 let retry = if req.retry.is_null() {
310 serde_json::json!({"max_retries": 3, "retry_base_delay_secs": 5})
311 } else {
312 req.retry
313 };
314
315 db.query(SQL_DELETE_PIPE)
316 .bind(("name", req.name.clone()))
317 .await
318 .map_err(db_err)?;
319
320 db.query(SQL_CREATE_PIPE)
321 .bind(("name", req.name.clone()))
322 .bind(("origin_connector", req.origin_connector))
323 .bind(("origin_dsn", req.origin_dsn))
324 .bind(("origin_config", origin_config))
325 .bind(("targets", req.targets))
326 .bind(("schedule", schedule))
327 .bind(("delta", delta))
328 .bind(("retry", retry))
329 .await
330 .map_err(db_err)?;
331
332 reload_config(&state).await?;
333
334 Ok(Json(MutationResponse {
335 ok: true,
336 message: format!("pipe '{}' created", req.name),
337 }))
338}
339
340#[utoipa::path(
341 put,
342 path = "/pipes/{name}",
343 params(("name" = String, Path, description = "Pipe name")),
344 request_body = UpdatePipeRequest,
345 responses(
346 (status = 200, description = "Pipe updated", body = MutationResponse),
347 (status = 400, description = "Bad request", body = ErrorResponse)
348 )
349)]
350pub async fn update_pipe(
351 State(state): State<Arc<ApiState>>,
352 Path(name): Path<String>,
353 Json(req): Json<UpdatePipeRequest>,
354) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
355 let db = require_db(&state)?;
356
357 if let Some(connector) = req.origin_connector {
358 db.query(SQL_UPDATE_PIPE_ORIGIN_CONNECTOR)
359 .bind(("name", name.clone()))
360 .bind(("v", connector))
361 .await
362 .map_err(db_err)?;
363 }
364
365 if let Some(dsn) = req.origin_dsn {
366 db.query(SQL_UPDATE_PIPE_ORIGIN_DSN)
367 .bind(("name", name.clone()))
368 .bind(("v", dsn))
369 .await
370 .map_err(db_err)?;
371 }
372
373 if let Some(config) = req.origin_config {
374 db.query(SQL_UPDATE_PIPE_ORIGIN_CONFIG)
375 .bind(("name", name.clone()))
376 .bind(("v", config))
377 .await
378 .map_err(db_err)?;
379 }
380
381 if let Some(targets) = req.targets {
382 db.query(SQL_UPDATE_PIPE_TARGETS)
383 .bind(("name", name.clone()))
384 .bind(("v", targets))
385 .await
386 .map_err(db_err)?;
387 }
388
389 if let Some(schedule) = req.schedule {
390 db.query(SQL_UPDATE_PIPE_SCHEDULE)
391 .bind(("name", name.clone()))
392 .bind(("v", schedule))
393 .await
394 .map_err(db_err)?;
395 }
396
397 if let Some(delta) = req.delta {
398 db.query(SQL_UPDATE_PIPE_DELTA)
399 .bind(("name", name.clone()))
400 .bind(("v", delta))
401 .await
402 .map_err(db_err)?;
403 }
404
405 if let Some(retry) = req.retry {
406 db.query(SQL_UPDATE_PIPE_RETRY)
407 .bind(("name", name.clone()))
408 .bind(("v", retry))
409 .await
410 .map_err(db_err)?;
411 }
412
413 if let Some(enabled) = req.enabled {
414 db.query(SQL_UPDATE_PIPE_ENABLED)
415 .bind(("name", name.clone()))
416 .bind(("v", enabled))
417 .await
418 .map_err(db_err)?;
419 }
420
421 reload_config(&state).await?;
422
423 Ok(Json(MutationResponse {
424 ok: true,
425 message: format!("pipe '{name}' updated"),
426 }))
427}
428
429#[utoipa::path(
430 delete,
431 path = "/pipes/{name}",
432 params(("name" = String, Path, description = "Pipe name")),
433 responses(
434 (status = 200, description = "Pipe deleted", body = MutationResponse),
435 (status = 400, description = "Bad request", body = ErrorResponse)
436 )
437)]
438pub async fn delete_pipe(
439 State(state): State<Arc<ApiState>>,
440 Path(name): Path<String>,
441) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
442 let db = require_db(&state)?;
443
444 db.query(SQL_DELETE_PIPE)
445 .bind(("name", name.clone()))
446 .await
447 .map_err(db_err)?;
448
449 db.query(SQL_DELETE_PIPE_QUERIES)
450 .bind(("name", name.clone()))
451 .await
452 .map_err(db_err)?;
453
454 reload_config(&state).await?;
455
456 Ok(Json(MutationResponse {
457 ok: true,
458 message: format!("pipe '{name}' deleted"),
459 }))
460}
461
462fn require_db(
463 state: &ApiState,
464) -> Result<&surrealdb::Surreal<surrealdb::engine::any::Any>, Json<ErrorResponse>> {
465 state.db_client.as_deref().ok_or_else(|| {
466 Json(ErrorResponse {
467 error: "database not configured".into(),
468 })
469 })
470}
471
472fn db_err(e: surrealdb::Error) -> Json<ErrorResponse> {
473 Json(ErrorResponse {
474 error: format!("db: {e}"),
475 })
476}
477
478pub(crate) async fn reload_config(state: &ApiState) -> Result<(), Json<ErrorResponse>> {
479 if let (Some(lifecycle), Some(db)) = (&state.lifecycle, &state.db_client) {
480 lifecycle.restart_with_config_json(db).await.map_err(|e| {
481 Json(ErrorResponse {
482 error: format!("reload: {e}"),
483 })
484 })?;
485 }
486 refresh_read_cache(state).await;
487 Ok(())
488}
489
490pub async fn refresh_read_cache(state: &ApiState) {
491 let Some(db) = &state.db_client else { return };
492
493 const SQL_READ_SOURCES_CACHE: &str = oversync_queries::config::READ_SOURCES_CACHE;
494 const SQL_READ_SINKS_CACHE: &str = oversync_queries::config::READ_SINKS_CACHE;
495 const SQL_READ_PIPES_CACHE: &str = oversync_queries::config::READ_PIPES_CACHE;
496
497 if let Ok(mut resp) = db.query(SQL_READ_SOURCES_CACHE).await
498 && let Ok(rows) = resp.take::<Vec<serde_json::Value>>(0)
499 {
500 let configs: Vec<crate::state::SourceConfig> = rows
501 .iter()
502 .filter_map(|r| {
503 Some(crate::state::SourceConfig {
504 name: r.get("name")?.as_str()?.to_string(),
505 connector: r.get("connector")?.as_str()?.to_string(),
506 interval_secs: r
507 .get("interval_secs")
508 .and_then(|v| v.as_u64())
509 .unwrap_or(300),
510 queries: vec![],
511 })
512 })
513 .collect();
514 *state.sources.write().await = configs;
515 }
516
517 if let Ok(mut resp) = db.query(SQL_READ_SINKS_CACHE).await
518 && let Ok(rows) = resp.take::<Vec<serde_json::Value>>(0)
519 {
520 let configs: Vec<crate::state::SinkConfig> = rows
521 .iter()
522 .filter_map(|r| {
523 Some(crate::state::SinkConfig {
524 name: r.get("name")?.as_str()?.to_string(),
525 sink_type: r.get("sink_type")?.as_str()?.to_string(),
526 config: r.get("config").cloned(),
527 })
528 })
529 .collect();
530 *state.sinks.write().await = configs;
531 }
532
533 if let Ok(mut resp) = db.query(SQL_READ_PIPES_CACHE).await
534 && let Ok(rows) = resp.take::<Vec<serde_json::Value>>(0)
535 {
536 let configs: Vec<crate::state::PipeConfigCache> = rows
537 .iter()
538 .filter_map(|r| {
539 Some(crate::state::PipeConfigCache {
540 name: r.get("name")?.as_str()?.to_string(),
541 origin_connector: r.get("origin_connector")?.as_str()?.to_string(),
542 origin_dsn: r.get("origin_dsn")?.as_str()?.to_string(),
543 targets: r
544 .get("targets")
545 .and_then(|v| v.as_array())
546 .map(|arr| {
547 arr.iter()
548 .filter_map(|v| v.as_str().map(String::from))
549 .collect()
550 })
551 .unwrap_or_default(),
552 interval_secs: r
553 .get("interval_secs")
554 .and_then(|v| v.as_u64())
555 .unwrap_or(300),
556 enabled: r.get("enabled").and_then(|v| v.as_bool()).unwrap_or(true),
557 })
558 })
559 .collect();
560 *state.pipes.write().await = configs;
561 }
562}