1use std::sync::Arc;
2
3use axum::Json;
4use axum::extract::{Path, State};
5
6use crate::state::ApiState;
7use crate::types::*;
8
9use oversync_queries::mutations;
10
11const SQL_DELETE_SINK: &str = mutations::DELETE_SINK;
12const SQL_CREATE_SINK: &str = mutations::CREATE_SINK;
13const SQL_UPDATE_SINK_TYPE: &str = mutations::UPDATE_SINK_TYPE;
14const SQL_UPDATE_SINK_ENABLED: &str = mutations::UPDATE_SINK_ENABLED;
15const SQL_UPDATE_SINK_CONFIG: &str = mutations::UPDATE_SINK_CONFIG;
16
17const SQL_DELETE_PIPE: &str = mutations::DELETE_PIPE;
18const SQL_DELETE_PIPE_PRESET: &str = mutations::DELETE_PIPE_PRESET;
19const SQL_CREATE_PIPE: &str = mutations::CREATE_PIPE;
20const SQL_CREATE_PIPE_PRESET: &str = mutations::CREATE_PIPE_PRESET;
21const SQL_DELETE_PIPE_QUERIES: &str = mutations::DELETE_PIPE_QUERIES;
22const SQL_CREATE_PIPE_QUERY: &str = mutations::CREATE_QUERY;
23const SQL_CREATE_PIPE_QUERY_WITH_SINKS: &str = mutations::CREATE_QUERY_WITH_SINKS;
24const SQL_UPDATE_PIPE_ORIGIN_CONNECTOR: &str = mutations::UPDATE_PIPE_ORIGIN_CONNECTOR;
25const SQL_UPDATE_PIPE_ORIGIN_DSN: &str = mutations::UPDATE_PIPE_ORIGIN_DSN;
26const SQL_UPDATE_PIPE_ORIGIN_CREDENTIAL: &str = mutations::UPDATE_PIPE_ORIGIN_CREDENTIAL;
27const SQL_UPDATE_PIPE_TRINO_URL: &str = mutations::UPDATE_PIPE_TRINO_URL;
28const SQL_UPDATE_PIPE_ORIGIN_CONFIG: &str = mutations::UPDATE_PIPE_ORIGIN_CONFIG;
29const SQL_UPDATE_PIPE_TARGETS: &str = mutations::UPDATE_PIPE_TARGETS;
30const SQL_UPDATE_PIPE_SCHEDULE: &str = mutations::UPDATE_PIPE_SCHEDULE;
31const SQL_UPDATE_PIPE_DELTA: &str = mutations::UPDATE_PIPE_DELTA;
32const SQL_UPDATE_PIPE_RETRY: &str = mutations::UPDATE_PIPE_RETRY;
33const SQL_UPDATE_PIPE_RECIPE: &str = mutations::UPDATE_PIPE_RECIPE;
34const SQL_UPDATE_PIPE_RECIPE_NONE: &str = mutations::UPDATE_PIPE_RECIPE_NONE;
35const SQL_UPDATE_PIPE_PRESET: &str = mutations::UPDATE_PIPE_PRESET;
36const SQL_UPDATE_PIPE_FILTERS: &str = mutations::UPDATE_PIPE_FILTERS;
37const SQL_UPDATE_PIPE_TRANSFORMS: &str = mutations::UPDATE_PIPE_TRANSFORMS;
38const SQL_UPDATE_PIPE_LINKS: &str = mutations::UPDATE_PIPE_LINKS;
39const SQL_UPDATE_PIPE_ENABLED: &str = mutations::UPDATE_PIPE_ENABLED;
40
41#[utoipa::path(
42 post,
43 path = "/sinks",
44 request_body = CreateSinkRequest,
45 responses(
46 (status = 200, description = "Sink created", body = MutationResponse),
47 (status = 400, description = "Bad request", body = ErrorResponse)
48 )
49)]
50pub async fn create_sink(
51 State(state): State<Arc<ApiState>>,
52 Json(req): Json<CreateSinkRequest>,
53) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
54 let db = require_db(&state)?;
55
56 let config_json = if req.config.is_null() {
57 serde_json::json!({})
58 } else {
59 req.config
60 };
61
62 db.query(SQL_DELETE_SINK)
63 .bind(("name", req.name.clone()))
64 .await
65 .and_then(|response| response.check())
66 .map_err(db_err)?;
67
68 db.query(SQL_CREATE_SINK)
69 .bind(("name", req.name.clone()))
70 .bind(("sink_type", req.sink_type))
71 .bind(("config", config_json))
72 .await
73 .and_then(|response| response.check())
74 .map_err(db_err)?;
75
76 reload_config(&state).await?;
77
78 Ok(Json(MutationResponse {
79 ok: true,
80 message: format!("sink '{}' created", req.name),
81 }))
82}
83
84#[utoipa::path(
85 put,
86 path = "/sinks/{name}",
87 params(("name" = String, Path, description = "Sink name")),
88 request_body = UpdateSinkRequest,
89 responses(
90 (status = 200, description = "Sink updated", body = MutationResponse),
91 (status = 400, description = "Bad request", body = ErrorResponse)
92 )
93)]
94pub async fn update_sink(
95 State(state): State<Arc<ApiState>>,
96 Path(name): Path<String>,
97 Json(req): Json<UpdateSinkRequest>,
98) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
99 let db = require_db(&state)?;
100
101 if let Some(sink_type) = req.sink_type {
102 db.query(SQL_UPDATE_SINK_TYPE)
103 .bind(("name", name.clone()))
104 .bind(("sink_type", sink_type))
105 .await
106 .and_then(|response| response.check())
107 .map_err(db_err)?;
108 }
109
110 if let Some(enabled) = req.enabled {
111 db.query(SQL_UPDATE_SINK_ENABLED)
112 .bind(("name", name.clone()))
113 .bind(("enabled", enabled))
114 .await
115 .and_then(|response| response.check())
116 .map_err(db_err)?;
117 }
118
119 if let Some(config) = req.config {
120 db.query(SQL_UPDATE_SINK_CONFIG)
121 .bind(("name", name.clone()))
122 .bind(("config", config))
123 .await
124 .and_then(|response| response.check())
125 .map_err(db_err)?;
126 }
127
128 reload_config(&state).await?;
129
130 Ok(Json(MutationResponse {
131 ok: true,
132 message: format!("sink '{name}' updated"),
133 }))
134}
135
136#[utoipa::path(
137 delete,
138 path = "/sinks/{name}",
139 params(("name" = String, Path, description = "Sink name")),
140 responses(
141 (status = 200, description = "Sink deleted", body = MutationResponse),
142 (status = 400, description = "Bad request", body = ErrorResponse)
143 )
144)]
145pub async fn delete_sink(
146 State(state): State<Arc<ApiState>>,
147 Path(name): Path<String>,
148) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
149 let db = require_db(&state)?;
150
151 db.query(SQL_DELETE_SINK)
152 .bind(("name", name.clone()))
153 .await
154 .and_then(|response| response.check())
155 .map_err(db_err)?;
156
157 reload_config(&state).await?;
158
159 Ok(Json(MutationResponse {
160 ok: true,
161 message: format!("sink '{name}' deleted"),
162 }))
163}
164
165#[utoipa::path(
168 post,
169 path = "/pipes",
170 request_body = CreatePipeRequest,
171 responses(
172 (status = 200, description = "Pipe created", body = MutationResponse),
173 (status = 400, description = "Bad request", body = ErrorResponse)
174 )
175)]
176pub async fn create_pipe(
177 State(state): State<Arc<ApiState>>,
178 Json(req): Json<CreatePipeRequest>,
179) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
180 let db = require_db(&state)?;
181 let queries = req.queries;
182
183 let origin_config = if req.origin_config.is_null() {
184 serde_json::json!({})
185 } else {
186 req.origin_config
187 };
188 let schedule = if req.schedule.is_null() {
189 serde_json::json!({"interval_secs": 300, "missed_tick_policy": "skip"})
190 } else {
191 req.schedule
192 };
193 let delta = if req.delta.is_null() {
194 serde_json::json!({"diff_mode": "db", "fail_safe_threshold": 30.0})
195 } else {
196 req.delta
197 };
198 let retry = if req.retry.is_null() {
199 serde_json::json!({"max_retries": 3, "retry_base_delay_secs": 5})
200 } else {
201 req.retry
202 };
203
204 db.query(SQL_DELETE_PIPE)
205 .bind(("name", req.name.clone()))
206 .await
207 .and_then(|response| response.check())
208 .map_err(db_err)?;
209
210 db.query(SQL_CREATE_PIPE)
211 .bind(("name", req.name.clone()))
212 .bind(("origin_connector", req.origin_connector))
213 .bind(("origin_dsn", req.origin_dsn))
214 .bind(("origin_credential", req.origin_credential))
215 .bind(("trino_url", req.trino_url))
216 .bind(("origin_config", origin_config))
217 .bind(("targets", req.targets))
218 .bind(("schedule", schedule))
219 .bind(("delta", delta))
220 .bind(("retry", retry))
221 .bind(("recipe", req.recipe))
222 .bind(("filters", req.filters))
223 .bind(("transforms", req.transforms))
224 .bind(("links", req.links))
225 .await
226 .and_then(|response| response.check())
227 .map_err(db_err)?;
228
229 for query in &queries {
230 create_pipe_query_record(db, &req.name, query)
231 .await
232 .map_err(config_err)?;
233 }
234
235 reload_config(&state).await?;
236
237 Ok(Json(MutationResponse {
238 ok: true,
239 message: format!("pipe '{}' created", req.name),
240 }))
241}
242
243#[utoipa::path(
244 put,
245 path = "/pipes/{name}",
246 params(("name" = String, Path, description = "Pipe name")),
247 request_body = UpdatePipeRequest,
248 responses(
249 (status = 200, description = "Pipe updated", body = MutationResponse),
250 (status = 400, description = "Bad request", body = ErrorResponse)
251 )
252)]
253pub async fn update_pipe(
254 State(state): State<Arc<ApiState>>,
255 Path(name): Path<String>,
256 Json(req): Json<UpdatePipeRequest>,
257) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
258 let db = require_db(&state)?;
259
260 if let Some(connector) = req.origin_connector {
261 db.query(SQL_UPDATE_PIPE_ORIGIN_CONNECTOR)
262 .bind(("name", name.clone()))
263 .bind(("v", connector))
264 .await
265 .and_then(|response| response.check())
266 .map_err(db_err)?;
267 }
268
269 if let Some(dsn) = req.origin_dsn {
270 db.query(SQL_UPDATE_PIPE_ORIGIN_DSN)
271 .bind(("name", name.clone()))
272 .bind(("v", dsn))
273 .await
274 .and_then(|response| response.check())
275 .map_err(db_err)?;
276 }
277
278 if let Some(credential) = req.origin_credential {
279 db.query(SQL_UPDATE_PIPE_ORIGIN_CREDENTIAL)
280 .bind(("name", name.clone()))
281 .bind(("v", credential))
282 .await
283 .and_then(|response| response.check())
284 .map_err(db_err)?;
285 }
286
287 if let Some(trino_url) = req.trino_url {
288 db.query(SQL_UPDATE_PIPE_TRINO_URL)
289 .bind(("name", name.clone()))
290 .bind(("v", trino_url))
291 .await
292 .and_then(|response| response.check())
293 .map_err(db_err)?;
294 }
295
296 if let Some(config) = req.origin_config {
297 db.query(SQL_UPDATE_PIPE_ORIGIN_CONFIG)
298 .bind(("name", name.clone()))
299 .bind(("v", config))
300 .await
301 .and_then(|response| response.check())
302 .map_err(db_err)?;
303 }
304
305 if let Some(targets) = req.targets {
306 db.query(SQL_UPDATE_PIPE_TARGETS)
307 .bind(("name", name.clone()))
308 .bind(("v", targets))
309 .await
310 .and_then(|response| response.check())
311 .map_err(db_err)?;
312 }
313
314 if let Some(schedule) = req.schedule {
315 db.query(SQL_UPDATE_PIPE_SCHEDULE)
316 .bind(("name", name.clone()))
317 .bind(("v", schedule))
318 .await
319 .and_then(|response| response.check())
320 .map_err(db_err)?;
321 }
322
323 if let Some(delta) = req.delta {
324 db.query(SQL_UPDATE_PIPE_DELTA)
325 .bind(("name", name.clone()))
326 .bind(("v", delta))
327 .await
328 .and_then(|response| response.check())
329 .map_err(db_err)?;
330 }
331
332 if let Some(retry) = req.retry {
333 db.query(SQL_UPDATE_PIPE_RETRY)
334 .bind(("name", name.clone()))
335 .bind(("v", retry))
336 .await
337 .and_then(|response| response.check())
338 .map_err(db_err)?;
339 }
340
341 if let Some(recipe) = req.recipe {
342 match recipe {
343 Some(recipe) => {
344 db.query(SQL_UPDATE_PIPE_RECIPE)
345 .bind(("name", name.clone()))
346 .bind(("v", recipe))
347 .await
348 .and_then(|response| response.check())
349 .map_err(db_err)?;
350 }
351 None => {
352 db.query(SQL_UPDATE_PIPE_RECIPE_NONE)
353 .bind(("name", name.clone()))
354 .await
355 .and_then(|response| response.check())
356 .map_err(db_err)?;
357 }
358 }
359 }
360
361 if let Some(filters) = req.filters {
362 db.query(SQL_UPDATE_PIPE_FILTERS)
363 .bind(("name", name.clone()))
364 .bind(("v", filters))
365 .await
366 .and_then(|response| response.check())
367 .map_err(db_err)?;
368 }
369
370 if let Some(transforms) = req.transforms {
371 db.query(SQL_UPDATE_PIPE_TRANSFORMS)
372 .bind(("name", name.clone()))
373 .bind(("v", transforms))
374 .await
375 .and_then(|response| response.check())
376 .map_err(db_err)?;
377 }
378
379 if let Some(links) = req.links {
380 db.query(SQL_UPDATE_PIPE_LINKS)
381 .bind(("name", name.clone()))
382 .bind(("v", links))
383 .await
384 .and_then(|response| response.check())
385 .map_err(db_err)?;
386 }
387
388 if let Some(enabled) = req.enabled {
389 db.query(SQL_UPDATE_PIPE_ENABLED)
390 .bind(("name", name.clone()))
391 .bind(("v", enabled))
392 .await
393 .and_then(|response| response.check())
394 .map_err(db_err)?;
395 }
396
397 if let Some(queries) = req.queries {
398 db.query(SQL_DELETE_PIPE_QUERIES)
399 .bind(("name", name.clone()))
400 .await
401 .and_then(|response| response.check())
402 .map_err(db_err)?;
403
404 for query in &queries {
405 create_pipe_query_record(db, &name, query)
406 .await
407 .map_err(config_err)?;
408 }
409 }
410
411 reload_config(&state).await?;
412
413 Ok(Json(MutationResponse {
414 ok: true,
415 message: format!("pipe '{name}' updated"),
416 }))
417}
418
419#[utoipa::path(
420 delete,
421 path = "/pipes/{name}",
422 params(("name" = String, Path, description = "Pipe name")),
423 responses(
424 (status = 200, description = "Pipe deleted", body = MutationResponse),
425 (status = 400, description = "Bad request", body = ErrorResponse)
426 )
427)]
428pub async fn delete_pipe(
429 State(state): State<Arc<ApiState>>,
430 Path(name): Path<String>,
431) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
432 let db = require_db(&state)?;
433
434 db.query(SQL_DELETE_PIPE)
435 .bind(("name", name.clone()))
436 .await
437 .and_then(|response| response.check())
438 .map_err(db_err)?;
439
440 db.query(SQL_DELETE_PIPE_QUERIES)
441 .bind(("name", name.clone()))
442 .await
443 .and_then(|response| response.check())
444 .map_err(db_err)?;
445
446 reload_config(&state).await?;
447
448 Ok(Json(MutationResponse {
449 ok: true,
450 message: format!("pipe '{name}' deleted"),
451 }))
452}
453
454#[utoipa::path(
455 post,
456 path = "/pipe-presets",
457 request_body = CreatePipePresetRequest,
458 responses(
459 (status = 200, description = "Pipe preset created", body = MutationResponse),
460 (status = 400, description = "Bad request", body = ErrorResponse)
461 )
462)]
463pub async fn create_pipe_preset(
464 State(state): State<Arc<ApiState>>,
465 Json(req): Json<CreatePipePresetRequest>,
466) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
467 let db = require_db(&state)?;
468 let spec = storage_pipe_preset_spec_value(req.spec).map_err(json_serde_err)?;
469
470 db.query(SQL_DELETE_PIPE_PRESET)
471 .bind(("name", req.name.clone()))
472 .await
473 .and_then(|response| response.check())
474 .map_err(db_err)?;
475
476 db.query(SQL_CREATE_PIPE_PRESET)
477 .bind(("name", req.name.clone()))
478 .bind(("description", req.description.clone()))
479 .bind(("spec", spec))
480 .await
481 .and_then(|response| response.check())
482 .map_err(db_err)?;
483
484 refresh_read_cache(state.as_ref())
485 .await
486 .map_err(config_err)?;
487
488 Ok(Json(MutationResponse {
489 ok: true,
490 message: format!("pipe preset '{}' created", req.name),
491 }))
492}
493
494#[utoipa::path(
495 put,
496 path = "/pipe-presets/{name}",
497 params(("name" = String, Path, description = "Pipe preset name")),
498 request_body = UpdatePipePresetRequest,
499 responses(
500 (status = 200, description = "Pipe preset updated", body = MutationResponse),
501 (status = 400, description = "Bad request", body = ErrorResponse)
502 )
503)]
504pub async fn update_pipe_preset(
505 State(state): State<Arc<ApiState>>,
506 Path(name): Path<String>,
507 Json(req): Json<UpdatePipePresetRequest>,
508) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
509 let db = require_db(&state)?;
510 let current = state
511 .pipe_presets_info()
512 .into_iter()
513 .find(|preset| preset.name == name)
514 .ok_or_else(|| {
515 Json(ErrorResponse {
516 error: format!("pipe preset not found: {name}"),
517 })
518 })?;
519
520 let spec = match req.spec {
521 Some(spec) => storage_pipe_preset_spec_value(spec).map_err(json_serde_err)?,
522 None => storage_pipe_preset_spec_value(
523 serde_json::from_value(current.spec).map_err(json_serde_err)?,
524 )
525 .map_err(json_serde_err)?,
526 };
527
528 db.query(SQL_UPDATE_PIPE_PRESET)
529 .bind(("name", name.clone()))
530 .bind(("description", req.description))
531 .bind(("spec", spec))
532 .await
533 .and_then(|response| response.check())
534 .map_err(db_err)?;
535
536 refresh_read_cache(state.as_ref())
537 .await
538 .map_err(config_err)?;
539
540 Ok(Json(MutationResponse {
541 ok: true,
542 message: format!("pipe preset '{name}' updated"),
543 }))
544}
545
546#[utoipa::path(
547 delete,
548 path = "/pipe-presets/{name}",
549 params(("name" = String, Path, description = "Pipe preset name")),
550 responses(
551 (status = 200, description = "Pipe preset deleted", body = MutationResponse),
552 (status = 400, description = "Bad request", body = ErrorResponse)
553 )
554)]
555pub async fn delete_pipe_preset(
556 State(state): State<Arc<ApiState>>,
557 Path(name): Path<String>,
558) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
559 let db = require_db(&state)?;
560
561 db.query(SQL_DELETE_PIPE_PRESET)
562 .bind(("name", name.clone()))
563 .await
564 .and_then(|response| response.check())
565 .map_err(db_err)?;
566
567 refresh_read_cache(state.as_ref())
568 .await
569 .map_err(config_err)?;
570
571 Ok(Json(MutationResponse {
572 ok: true,
573 message: format!("pipe preset '{name}' deleted"),
574 }))
575}
576
577fn require_db(
578 state: &ApiState,
579) -> Result<&surrealdb::Surreal<surrealdb::engine::any::Any>, Json<ErrorResponse>> {
580 state.db_client.as_deref().ok_or_else(|| {
581 Json(ErrorResponse {
582 error: "database not configured".into(),
583 })
584 })
585}
586
587fn db_err(e: surrealdb::Error) -> Json<ErrorResponse> {
588 Json(ErrorResponse {
589 error: format!("db: {e}"),
590 })
591}
592
593fn config_err(e: oversync_core::error::OversyncError) -> Json<ErrorResponse> {
594 Json(ErrorResponse {
595 error: e.to_string(),
596 })
597}
598
599fn json_serde_err(e: serde_json::Error) -> Json<ErrorResponse> {
600 Json(ErrorResponse {
601 error: format!("json: {e}"),
602 })
603}
604
605fn storage_pipe_preset_spec_value(
606 spec: PipePresetSpecInput,
607) -> Result<serde_json::Value, serde_json::Error> {
608 let origin_config = if spec.origin_config.is_null() {
609 serde_json::json!({})
610 } else {
611 spec.origin_config
612 };
613 let schedule = if spec.schedule.is_null() {
614 serde_json::json!({})
615 } else {
616 spec.schedule
617 };
618 let delta = if spec.delta.is_null() {
619 serde_json::json!({})
620 } else {
621 spec.delta
622 };
623 let retry = if spec.retry.is_null() {
624 serde_json::json!({})
625 } else {
626 spec.retry
627 };
628
629 serde_json::to_value(serde_json::json!({
630 "origin": {
631 "connector": spec.origin_connector,
632 "dsn": spec.origin_dsn,
633 "credential": spec.origin_credential,
634 "trino_url": spec.trino_url,
635 "config": origin_config,
636 },
637 "parameters": spec.parameters,
638 "targets": spec.targets,
639 "queries": spec.queries,
640 "schedule": schedule,
641 "delta": delta,
642 "retry": retry,
643 "recipe": spec.recipe,
644 "filters": spec.filters,
645 "transforms": spec.transforms,
646 "links": spec.links,
647 }))
648}
649
650fn api_pipe_preset_spec_value(spec: &serde_json::Value) -> Option<serde_json::Value> {
651 if spec.get("origin_connector").is_some() {
652 return Some(serde_json::json!({
653 "origin_connector": spec.get("origin_connector").cloned().unwrap_or(serde_json::Value::Null),
654 "origin_dsn": spec.get("origin_dsn").cloned().unwrap_or(serde_json::Value::Null),
655 "origin_credential": spec.get("origin_credential").cloned().unwrap_or(serde_json::Value::Null),
656 "trino_url": spec.get("trino_url").cloned().unwrap_or(serde_json::Value::Null),
657 "origin_config": spec.get("origin_config").cloned().unwrap_or_else(|| serde_json::json!({})),
658 "parameters": spec.get("parameters").cloned().unwrap_or_else(|| serde_json::json!([])),
659 "targets": spec.get("targets").cloned().unwrap_or_else(|| serde_json::json!([])),
660 "queries": spec.get("queries").cloned().unwrap_or_else(|| serde_json::json!([])),
661 "schedule": spec.get("schedule").cloned().unwrap_or_else(|| serde_json::json!({})),
662 "delta": spec.get("delta").cloned().unwrap_or_else(|| serde_json::json!({})),
663 "retry": spec.get("retry").cloned().unwrap_or_else(|| serde_json::json!({})),
664 "recipe": spec.get("recipe").cloned().unwrap_or(serde_json::Value::Null),
665 "filters": spec.get("filters").cloned().unwrap_or_else(|| serde_json::json!([])),
666 "transforms": spec.get("transforms").cloned().unwrap_or_else(|| serde_json::json!([])),
667 "links": spec.get("links").cloned().unwrap_or_else(|| serde_json::json!([])),
668 }));
669 }
670
671 let origin = spec.get("origin")?;
672 Some(serde_json::json!({
673 "origin_connector": origin.get("connector").cloned().unwrap_or(serde_json::Value::Null),
674 "origin_dsn": origin.get("dsn").cloned().unwrap_or(serde_json::Value::Null),
675 "origin_credential": origin.get("credential").cloned().unwrap_or(serde_json::Value::Null),
676 "trino_url": origin.get("trino_url").cloned().unwrap_or(serde_json::Value::Null),
677 "origin_config": match origin.get("config") {
678 Some(value) if !value.is_null() => value.clone(),
679 _ => serde_json::json!({}),
680 },
681 "parameters": spec.get("parameters").cloned().unwrap_or_else(|| serde_json::json!([])),
682 "targets": spec.get("targets").cloned().unwrap_or_else(|| serde_json::json!([])),
683 "queries": spec.get("queries").cloned().unwrap_or_else(|| serde_json::json!([])),
684 "schedule": match spec.get("schedule") {
685 Some(value) if !value.is_null() => value.clone(),
686 _ => serde_json::json!({}),
687 },
688 "delta": match spec.get("delta") {
689 Some(value) if !value.is_null() => value.clone(),
690 _ => serde_json::json!({}),
691 },
692 "retry": match spec.get("retry") {
693 Some(value) if !value.is_null() => value.clone(),
694 _ => serde_json::json!({}),
695 },
696 "recipe": spec.get("recipe").cloned().unwrap_or(serde_json::Value::Null),
697 "filters": spec.get("filters").cloned().unwrap_or_else(|| serde_json::json!([])),
698 "transforms": spec.get("transforms").cloned().unwrap_or_else(|| serde_json::json!([])),
699 "links": spec.get("links").cloned().unwrap_or_else(|| serde_json::json!([])),
700 }))
701}
702
703async fn create_pipe_query_record(
704 db: &surrealdb::Surreal<surrealdb::engine::any::Any>,
705 origin_id: &str,
706 query: &PipeQueryInput,
707) -> Result<(), oversync_core::error::OversyncError> {
708 let sql = if query.sinks.is_some() {
709 SQL_CREATE_PIPE_QUERY_WITH_SINKS
710 } else {
711 SQL_CREATE_PIPE_QUERY
712 };
713
714 db.query(sql)
715 .bind(("source", origin_id.to_string()))
716 .bind(("name", query.id.clone()))
717 .bind(("query", query.sql.clone()))
718 .bind(("key_column", query.key_column.clone()))
719 .bind(("sinks", query.sinks.clone()))
720 .bind(("transform", query.transform.clone()))
721 .await
722 .and_then(|response| response.check())
723 .map_err(|e| {
724 oversync_core::error::OversyncError::SurrealDb(format!(
725 "create query '{}' for pipe '{}': {e}",
726 query.id, origin_id
727 ))
728 })?;
729
730 Ok(())
731}
732
733pub(crate) async fn reload_config(state: &ApiState) -> Result<(), Json<ErrorResponse>> {
734 if let (Some(lifecycle), Some(db)) = (&state.lifecycle, &state.db_client) {
735 lifecycle.restart_with_config_json(db).await.map_err(|e| {
736 Json(ErrorResponse {
737 error: format!("reload: {e}"),
738 })
739 })?;
740 }
741 refresh_read_cache(state).await.map_err(|e| {
742 Json(ErrorResponse {
743 error: format!("refresh cache: {e}"),
744 })
745 })?;
746 Ok(())
747}
748
749pub async fn refresh_read_cache(
750 state: &ApiState,
751) -> Result<(), oversync_core::error::OversyncError> {
752 let Some(db) = &state.db_client else {
753 return Ok(());
754 };
755
756 const SQL_LOAD_QUERIES: &str = oversync_queries::config::LOAD_QUERIES;
757 const SQL_READ_SINKS_CACHE: &str = oversync_queries::config::READ_SINKS_CACHE;
758 const SQL_READ_PIPES_CACHE: &str = oversync_queries::config::READ_PIPES_CACHE;
759 const SQL_READ_PIPE_PRESETS_CACHE: &str = oversync_queries::config::READ_PIPE_PRESETS_CACHE;
760
761 let query_rows =
762 read_cache_rows_or_empty(db, SQL_LOAD_QUERIES, "refresh queries cache").await?;
763 let mut query_counts_by_origin: std::collections::HashMap<String, usize> =
764 std::collections::HashMap::new();
765 for row in &query_rows {
766 let Some(origin_id) = row.get("origin_id").and_then(|v| v.as_str()) else {
767 continue;
768 };
769 *query_counts_by_origin
770 .entry(origin_id.to_string())
771 .or_default() += 1;
772 }
773
774 let sink_rows =
775 read_cache_rows_or_empty(db, SQL_READ_SINKS_CACHE, "refresh sinks cache").await?;
776 let sink_configs: Vec<crate::state::SinkConfig> = sink_rows
777 .iter()
778 .filter_map(|r| {
779 Some(crate::state::SinkConfig {
780 name: r.get("name")?.as_str()?.to_string(),
781 sink_type: r.get("sink_type")?.as_str()?.to_string(),
782 config: r.get("config").cloned(),
783 })
784 })
785 .collect();
786 *state.sinks.write().await = sink_configs;
787
788 let pipe_rows =
789 read_cache_rows_or_empty(db, SQL_READ_PIPES_CACHE, "refresh pipes cache").await?;
790 let pipe_configs: Vec<crate::state::PipeConfigCache> = pipe_rows
791 .iter()
792 .filter_map(|r| {
793 let name = r.get("name")?.as_str()?.to_string();
794 Some(crate::state::PipeConfigCache {
795 name: name.clone(),
796 origin_connector: r.get("origin_connector")?.as_str()?.to_string(),
797 origin_dsn: r.get("origin_dsn")?.as_str()?.to_string(),
798 targets: r
799 .get("targets")
800 .and_then(|v| v.as_array())
801 .map(|arr| {
802 arr.iter()
803 .filter_map(|v| v.as_str().map(String::from))
804 .collect()
805 })
806 .unwrap_or_default(),
807 interval_secs: r
808 .get("interval_secs")
809 .and_then(|v| v.as_u64())
810 .unwrap_or(300),
811 query_count: *query_counts_by_origin.get(name.as_str()).unwrap_or(&0),
812 recipe: r.get("recipe").cloned().filter(|v| !v.is_null()),
813 enabled: r.get("enabled").and_then(|v| v.as_bool()).unwrap_or(true),
814 })
815 })
816 .collect();
817 *state.pipes.write().await = pipe_configs;
818
819 let pipe_preset_rows = read_cache_rows_or_empty(
820 db,
821 SQL_READ_PIPE_PRESETS_CACHE,
822 "refresh pipe presets cache",
823 )
824 .await?;
825 let pipe_preset_configs: Vec<crate::state::PipePresetCache> = pipe_preset_rows
826 .iter()
827 .filter_map(|r| {
828 Some(crate::state::PipePresetCache {
829 name: r.get("name")?.as_str()?.to_string(),
830 description: r
831 .get("description")
832 .and_then(|v| v.as_str())
833 .map(String::from),
834 spec: api_pipe_preset_spec_value(r.get("spec")?)?,
835 })
836 })
837 .collect();
838 *state.pipe_presets.write().await = pipe_preset_configs;
839
840 Ok(())
841}
842
843fn is_missing_table_error(error: &dyn std::fmt::Display) -> bool {
844 let message = error.to_string();
845 message.contains("does not exist") && message.contains("table")
846}
847
848async fn read_cache_rows_or_empty(
849 db: &surrealdb::Surreal<surrealdb::engine::any::Any>,
850 sql: &str,
851 context: &str,
852) -> Result<Vec<serde_json::Value>, oversync_core::error::OversyncError> {
853 let mut response = match db.query(sql).await {
854 Ok(response) => response,
855 Err(error) if is_missing_table_error(&error) => return Ok(Vec::new()),
856 Err(error) => {
857 return Err(oversync_core::error::OversyncError::SurrealDb(format!(
858 "{context}: {error}"
859 )));
860 }
861 };
862
863 match response.take(0) {
864 Ok(rows) => Ok(rows),
865 Err(error) if is_missing_table_error(&error) => Ok(Vec::new()),
866 Err(error) => Err(oversync_core::error::OversyncError::SurrealDb(format!(
867 "{context} take: {error}"
868 ))),
869 }
870}
871
872#[cfg(test)]
873mod tests {
874 use super::{api_pipe_preset_spec_value, is_missing_table_error};
875
876 #[test]
877 fn api_pipe_preset_spec_value_accepts_legacy_flat_shape() {
878 let flat = serde_json::json!({
879 "origin_connector": "postgres",
880 "origin_dsn": "postgres://localhost/db",
881 "origin_credential": null,
882 "trino_url": null,
883 "origin_config": { "sslmode": "require" },
884 "targets": ["stdout"],
885 "queries": [{
886 "id": "aspect-columns",
887 "sql": "SELECT id, payload FROM columns",
888 "key_column": "id"
889 }],
890 "schedule": { "interval_secs": 900, "missed_tick_policy": "skip" },
891 "delta": { "diff_mode": "db", "fail_safe_threshold": 30 },
892 "retry": { "max_retries": 3, "retry_base_delay_secs": 5 },
893 "recipe": null,
894 "filters": [],
895 "transforms": [],
896 "links": []
897 });
898
899 let api = api_pipe_preset_spec_value(&flat).expect("legacy flat preset should map");
900 assert_eq!(api["origin_connector"], "postgres");
901 assert_eq!(api["origin_config"]["sslmode"], "require");
902 assert_eq!(api["queries"][0]["id"], "aspect-columns");
903 }
904
905 #[test]
906 fn missing_table_error_detection_matches_surreal_message_shape() {
907 assert!(is_missing_table_error(
908 &"The table 'pipe_config' does not exist"
909 ));
910 assert!(!is_missing_table_error(&"permission denied"));
911 }
912}