1use std::sync::Arc;
2
3use axum::Json;
4use axum::extract::{Path, State};
5use tracing::warn;
6
7use crate::state::ApiState;
8use crate::types::*;
9
10use oversync_queries::mutations;
11
12const SQL_DELETE_SINK: &str = mutations::DELETE_SINK;
13const SQL_CREATE_SINK: &str = mutations::CREATE_SINK;
14const SQL_UPDATE_SINK_TYPE: &str = mutations::UPDATE_SINK_TYPE;
15const SQL_UPDATE_SINK_ENABLED: &str = mutations::UPDATE_SINK_ENABLED;
16const SQL_UPDATE_SINK_CONFIG: &str = mutations::UPDATE_SINK_CONFIG;
17
18const SQL_DELETE_PIPE: &str = mutations::DELETE_PIPE;
19const SQL_DELETE_PIPE_PRESET: &str = mutations::DELETE_PIPE_PRESET;
20const SQL_CREATE_PIPE: &str = mutations::CREATE_PIPE;
21const SQL_CREATE_PIPE_PRESET: &str = mutations::CREATE_PIPE_PRESET;
22const SQL_DELETE_PIPE_QUERIES: &str = mutations::DELETE_PIPE_QUERIES;
23const SQL_CREATE_PIPE_QUERY: &str = mutations::CREATE_QUERY;
24const SQL_CREATE_PIPE_QUERY_WITH_SINKS: &str = mutations::CREATE_QUERY_WITH_SINKS;
25const SQL_UPDATE_PIPE_ORIGIN_CONNECTOR: &str = mutations::UPDATE_PIPE_ORIGIN_CONNECTOR;
26const SQL_UPDATE_PIPE_ORIGIN_DSN: &str = mutations::UPDATE_PIPE_ORIGIN_DSN;
27const SQL_UPDATE_PIPE_ORIGIN_CREDENTIAL: &str = mutations::UPDATE_PIPE_ORIGIN_CREDENTIAL;
28const SQL_UPDATE_PIPE_TRINO_URL: &str = mutations::UPDATE_PIPE_TRINO_URL;
29const SQL_UPDATE_PIPE_ORIGIN_CONFIG: &str = mutations::UPDATE_PIPE_ORIGIN_CONFIG;
30const SQL_UPDATE_PIPE_TARGETS: &str = mutations::UPDATE_PIPE_TARGETS;
31const SQL_UPDATE_PIPE_SCHEDULE: &str = mutations::UPDATE_PIPE_SCHEDULE;
32const SQL_UPDATE_PIPE_DELTA: &str = mutations::UPDATE_PIPE_DELTA;
33const SQL_UPDATE_PIPE_RETRY: &str = mutations::UPDATE_PIPE_RETRY;
34const SQL_UPDATE_PIPE_RECIPE: &str = mutations::UPDATE_PIPE_RECIPE;
35const SQL_UPDATE_PIPE_RECIPE_NONE: &str = mutations::UPDATE_PIPE_RECIPE_NONE;
36const SQL_UPDATE_PIPE_PRESET: &str = mutations::UPDATE_PIPE_PRESET;
37const SQL_UPDATE_PIPE_FILTERS: &str = mutations::UPDATE_PIPE_FILTERS;
38const SQL_UPDATE_PIPE_TRANSFORMS: &str = mutations::UPDATE_PIPE_TRANSFORMS;
39const SQL_UPDATE_PIPE_LINKS: &str = mutations::UPDATE_PIPE_LINKS;
40const SQL_UPDATE_PIPE_ENABLED: &str = mutations::UPDATE_PIPE_ENABLED;
41
42#[utoipa::path(
43 post,
44 path = "/sinks",
45 request_body = CreateSinkRequest,
46 responses(
47 (status = 200, description = "Sink created", body = MutationResponse),
48 (status = 400, description = "Bad request", body = ErrorResponse)
49 )
50)]
51pub async fn create_sink(
52 State(state): State<Arc<ApiState>>,
53 Json(req): Json<CreateSinkRequest>,
54) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
55 let db = require_db(&state)?;
56
57 let config_json = if req.config.is_null() {
58 serde_json::json!({})
59 } else {
60 req.config
61 };
62
63 db.query(SQL_DELETE_SINK)
64 .bind(("name", req.name.clone()))
65 .await
66 .and_then(|response| response.check())
67 .map_err(db_err)?;
68
69 db.query(SQL_CREATE_SINK)
70 .bind(("name", req.name.clone()))
71 .bind(("sink_type", req.sink_type))
72 .bind(("config", config_json))
73 .await
74 .and_then(|response| response.check())
75 .map_err(db_err)?;
76
77 reload_config(&state).await?;
78
79 Ok(Json(MutationResponse {
80 ok: true,
81 message: format!("sink '{}' created", req.name),
82 }))
83}
84
85#[utoipa::path(
86 put,
87 path = "/sinks/{name}",
88 params(("name" = String, Path, description = "Sink name")),
89 request_body = UpdateSinkRequest,
90 responses(
91 (status = 200, description = "Sink updated", body = MutationResponse),
92 (status = 400, description = "Bad request", body = ErrorResponse)
93 )
94)]
95pub async fn update_sink(
96 State(state): State<Arc<ApiState>>,
97 Path(name): Path<String>,
98 Json(req): Json<UpdateSinkRequest>,
99) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
100 let db = require_db(&state)?;
101
102 if let Some(sink_type) = req.sink_type {
103 db.query(SQL_UPDATE_SINK_TYPE)
104 .bind(("name", name.clone()))
105 .bind(("sink_type", sink_type))
106 .await
107 .and_then(|response| response.check())
108 .map_err(db_err)?;
109 }
110
111 if let Some(enabled) = req.enabled {
112 db.query(SQL_UPDATE_SINK_ENABLED)
113 .bind(("name", name.clone()))
114 .bind(("enabled", enabled))
115 .await
116 .and_then(|response| response.check())
117 .map_err(db_err)?;
118 }
119
120 if let Some(config) = req.config {
121 db.query(SQL_UPDATE_SINK_CONFIG)
122 .bind(("name", name.clone()))
123 .bind(("config", config))
124 .await
125 .and_then(|response| response.check())
126 .map_err(db_err)?;
127 }
128
129 reload_config(&state).await?;
130
131 Ok(Json(MutationResponse {
132 ok: true,
133 message: format!("sink '{name}' updated"),
134 }))
135}
136
137#[utoipa::path(
138 delete,
139 path = "/sinks/{name}",
140 params(("name" = String, Path, description = "Sink name")),
141 responses(
142 (status = 200, description = "Sink deleted", body = MutationResponse),
143 (status = 400, description = "Bad request", body = ErrorResponse)
144 )
145)]
146pub async fn delete_sink(
147 State(state): State<Arc<ApiState>>,
148 Path(name): Path<String>,
149) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
150 let db = require_db(&state)?;
151
152 db.query(SQL_DELETE_SINK)
153 .bind(("name", name.clone()))
154 .await
155 .and_then(|response| response.check())
156 .map_err(db_err)?;
157
158 reload_config(&state).await?;
159
160 Ok(Json(MutationResponse {
161 ok: true,
162 message: format!("sink '{name}' deleted"),
163 }))
164}
165
166#[utoipa::path(
169 post,
170 path = "/pipes",
171 request_body = CreatePipeRequest,
172 responses(
173 (status = 200, description = "Pipe created", body = MutationResponse),
174 (status = 400, description = "Bad request", body = ErrorResponse)
175 )
176)]
177pub async fn create_pipe(
178 State(state): State<Arc<ApiState>>,
179 Json(req): Json<CreatePipeRequest>,
180) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
181 let db = require_db(&state)?;
182 let queries = req.queries;
183
184 let origin_config = if req.origin_config.is_null() {
185 serde_json::json!({})
186 } else {
187 req.origin_config
188 };
189 let schedule = if req.schedule.is_null() {
190 serde_json::json!({"interval_secs": 300, "missed_tick_policy": "skip"})
191 } else {
192 req.schedule
193 };
194 let delta = if req.delta.is_null() {
195 serde_json::json!({"diff_mode": "db", "fail_safe_threshold": 30.0})
196 } else {
197 req.delta
198 };
199 let retry = if req.retry.is_null() {
200 serde_json::json!({"max_retries": 3, "retry_base_delay_secs": 5})
201 } else {
202 req.retry
203 };
204
205 db.query(SQL_DELETE_PIPE)
206 .bind(("name", req.name.clone()))
207 .await
208 .and_then(|response| response.check())
209 .map_err(db_err)?;
210
211 db.query(SQL_CREATE_PIPE)
212 .bind(("name", req.name.clone()))
213 .bind(("origin_connector", req.origin_connector))
214 .bind(("origin_dsn", req.origin_dsn))
215 .bind(("origin_credential", req.origin_credential))
216 .bind(("trino_url", req.trino_url))
217 .bind(("origin_config", origin_config))
218 .bind(("targets", req.targets))
219 .bind(("schedule", schedule))
220 .bind(("delta", delta))
221 .bind(("retry", retry))
222 .bind(("recipe", req.recipe))
223 .bind(("filters", req.filters))
224 .bind(("transforms", req.transforms))
225 .bind(("links", req.links))
226 .await
227 .and_then(|response| response.check())
228 .map_err(db_err)?;
229
230 for query in &queries {
231 create_pipe_query_record(db, &req.name, query)
232 .await
233 .map_err(config_err)?;
234 }
235
236 reload_config(&state).await?;
237
238 Ok(Json(MutationResponse {
239 ok: true,
240 message: format!("pipe '{}' created", req.name),
241 }))
242}
243
244#[utoipa::path(
245 put,
246 path = "/pipes/{name}",
247 params(("name" = String, Path, description = "Pipe name")),
248 request_body = UpdatePipeRequest,
249 responses(
250 (status = 200, description = "Pipe updated", body = MutationResponse),
251 (status = 400, description = "Bad request", body = ErrorResponse)
252 )
253)]
254pub async fn update_pipe(
255 State(state): State<Arc<ApiState>>,
256 Path(name): Path<String>,
257 Json(req): Json<UpdatePipeRequest>,
258) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
259 let db = require_db(&state)?;
260
261 if let Some(connector) = req.origin_connector {
262 db.query(SQL_UPDATE_PIPE_ORIGIN_CONNECTOR)
263 .bind(("name", name.clone()))
264 .bind(("v", connector))
265 .await
266 .and_then(|response| response.check())
267 .map_err(db_err)?;
268 }
269
270 if let Some(dsn) = req.origin_dsn {
271 db.query(SQL_UPDATE_PIPE_ORIGIN_DSN)
272 .bind(("name", name.clone()))
273 .bind(("v", dsn))
274 .await
275 .and_then(|response| response.check())
276 .map_err(db_err)?;
277 }
278
279 if let Some(credential) = req.origin_credential {
280 db.query(SQL_UPDATE_PIPE_ORIGIN_CREDENTIAL)
281 .bind(("name", name.clone()))
282 .bind(("v", credential))
283 .await
284 .and_then(|response| response.check())
285 .map_err(db_err)?;
286 }
287
288 if let Some(trino_url) = req.trino_url {
289 db.query(SQL_UPDATE_PIPE_TRINO_URL)
290 .bind(("name", name.clone()))
291 .bind(("v", trino_url))
292 .await
293 .and_then(|response| response.check())
294 .map_err(db_err)?;
295 }
296
297 if let Some(config) = req.origin_config {
298 db.query(SQL_UPDATE_PIPE_ORIGIN_CONFIG)
299 .bind(("name", name.clone()))
300 .bind(("v", config))
301 .await
302 .and_then(|response| response.check())
303 .map_err(db_err)?;
304 }
305
306 if let Some(targets) = req.targets {
307 db.query(SQL_UPDATE_PIPE_TARGETS)
308 .bind(("name", name.clone()))
309 .bind(("v", targets))
310 .await
311 .and_then(|response| response.check())
312 .map_err(db_err)?;
313 }
314
315 if let Some(schedule) = req.schedule {
316 db.query(SQL_UPDATE_PIPE_SCHEDULE)
317 .bind(("name", name.clone()))
318 .bind(("v", schedule))
319 .await
320 .and_then(|response| response.check())
321 .map_err(db_err)?;
322 }
323
324 if let Some(delta) = req.delta {
325 db.query(SQL_UPDATE_PIPE_DELTA)
326 .bind(("name", name.clone()))
327 .bind(("v", delta))
328 .await
329 .and_then(|response| response.check())
330 .map_err(db_err)?;
331 }
332
333 if let Some(retry) = req.retry {
334 db.query(SQL_UPDATE_PIPE_RETRY)
335 .bind(("name", name.clone()))
336 .bind(("v", retry))
337 .await
338 .and_then(|response| response.check())
339 .map_err(db_err)?;
340 }
341
342 if let Some(recipe) = req.recipe {
343 match recipe {
344 Some(recipe) => {
345 db.query(SQL_UPDATE_PIPE_RECIPE)
346 .bind(("name", name.clone()))
347 .bind(("v", recipe))
348 .await
349 .and_then(|response| response.check())
350 .map_err(db_err)?;
351 }
352 None => {
353 db.query(SQL_UPDATE_PIPE_RECIPE_NONE)
354 .bind(("name", name.clone()))
355 .await
356 .and_then(|response| response.check())
357 .map_err(db_err)?;
358 }
359 }
360 }
361
362 if let Some(filters) = req.filters {
363 db.query(SQL_UPDATE_PIPE_FILTERS)
364 .bind(("name", name.clone()))
365 .bind(("v", filters))
366 .await
367 .and_then(|response| response.check())
368 .map_err(db_err)?;
369 }
370
371 if let Some(transforms) = req.transforms {
372 db.query(SQL_UPDATE_PIPE_TRANSFORMS)
373 .bind(("name", name.clone()))
374 .bind(("v", transforms))
375 .await
376 .and_then(|response| response.check())
377 .map_err(db_err)?;
378 }
379
380 if let Some(links) = req.links {
381 db.query(SQL_UPDATE_PIPE_LINKS)
382 .bind(("name", name.clone()))
383 .bind(("v", links))
384 .await
385 .and_then(|response| response.check())
386 .map_err(db_err)?;
387 }
388
389 if let Some(enabled) = req.enabled {
390 db.query(SQL_UPDATE_PIPE_ENABLED)
391 .bind(("name", name.clone()))
392 .bind(("v", enabled))
393 .await
394 .and_then(|response| response.check())
395 .map_err(db_err)?;
396 }
397
398 if let Some(queries) = req.queries {
399 db.query(SQL_DELETE_PIPE_QUERIES)
400 .bind(("name", name.clone()))
401 .await
402 .and_then(|response| response.check())
403 .map_err(db_err)?;
404
405 for query in &queries {
406 create_pipe_query_record(db, &name, query)
407 .await
408 .map_err(config_err)?;
409 }
410 }
411
412 reload_config(&state).await?;
413
414 Ok(Json(MutationResponse {
415 ok: true,
416 message: format!("pipe '{name}' updated"),
417 }))
418}
419
420#[utoipa::path(
421 delete,
422 path = "/pipes/{name}",
423 params(("name" = String, Path, description = "Pipe name")),
424 responses(
425 (status = 200, description = "Pipe deleted", body = MutationResponse),
426 (status = 400, description = "Bad request", body = ErrorResponse)
427 )
428)]
429pub async fn delete_pipe(
430 State(state): State<Arc<ApiState>>,
431 Path(name): Path<String>,
432) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
433 let db = require_db(&state)?;
434
435 db.query(SQL_DELETE_PIPE)
436 .bind(("name", name.clone()))
437 .await
438 .and_then(|response| response.check())
439 .map_err(db_err)?;
440
441 db.query(SQL_DELETE_PIPE_QUERIES)
442 .bind(("name", name.clone()))
443 .await
444 .and_then(|response| response.check())
445 .map_err(db_err)?;
446
447 reload_config(&state).await?;
448
449 Ok(Json(MutationResponse {
450 ok: true,
451 message: format!("pipe '{name}' deleted"),
452 }))
453}
454
455#[utoipa::path(
456 post,
457 path = "/pipe-presets",
458 request_body = CreatePipePresetRequest,
459 responses(
460 (status = 200, description = "Pipe preset created", body = MutationResponse),
461 (status = 400, description = "Bad request", body = ErrorResponse)
462 )
463)]
464pub async fn create_pipe_preset(
465 State(state): State<Arc<ApiState>>,
466 Json(req): Json<CreatePipePresetRequest>,
467) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
468 let db = require_db(&state)?;
469 let spec = storage_pipe_preset_spec_value(req.spec).map_err(json_serde_err)?;
470
471 db.query(SQL_DELETE_PIPE_PRESET)
472 .bind(("name", req.name.clone()))
473 .await
474 .and_then(|response| response.check())
475 .map_err(db_err)?;
476
477 db.query(SQL_CREATE_PIPE_PRESET)
478 .bind(("name", req.name.clone()))
479 .bind(("description", req.description.clone()))
480 .bind(("spec", spec))
481 .await
482 .and_then(|response| response.check())
483 .map_err(db_err)?;
484
485 refresh_read_cache(state.as_ref())
486 .await
487 .map_err(config_err)?;
488
489 Ok(Json(MutationResponse {
490 ok: true,
491 message: format!("pipe preset '{}' created", req.name),
492 }))
493}
494
495#[utoipa::path(
496 put,
497 path = "/pipe-presets/{name}",
498 params(("name" = String, Path, description = "Pipe preset name")),
499 request_body = UpdatePipePresetRequest,
500 responses(
501 (status = 200, description = "Pipe preset updated", body = MutationResponse),
502 (status = 400, description = "Bad request", body = ErrorResponse)
503 )
504)]
505pub async fn update_pipe_preset(
506 State(state): State<Arc<ApiState>>,
507 Path(name): Path<String>,
508 Json(req): Json<UpdatePipePresetRequest>,
509) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
510 let db = require_db(&state)?;
511 let current = state
512 .pipe_presets_info()
513 .into_iter()
514 .find(|preset| preset.name == name)
515 .ok_or_else(|| {
516 Json(ErrorResponse {
517 error: format!("pipe preset not found: {name}"),
518 })
519 })?;
520
521 let spec = match req.spec {
522 Some(spec) => storage_pipe_preset_spec_value(spec).map_err(json_serde_err)?,
523 None => storage_pipe_preset_spec_value(
524 serde_json::from_value(current.spec).map_err(json_serde_err)?,
525 )
526 .map_err(json_serde_err)?,
527 };
528
529 db.query(SQL_UPDATE_PIPE_PRESET)
530 .bind(("name", name.clone()))
531 .bind(("description", req.description))
532 .bind(("spec", spec))
533 .await
534 .and_then(|response| response.check())
535 .map_err(db_err)?;
536
537 refresh_read_cache(state.as_ref())
538 .await
539 .map_err(config_err)?;
540
541 Ok(Json(MutationResponse {
542 ok: true,
543 message: format!("pipe preset '{name}' updated"),
544 }))
545}
546
547#[utoipa::path(
548 delete,
549 path = "/pipe-presets/{name}",
550 params(("name" = String, Path, description = "Pipe preset name")),
551 responses(
552 (status = 200, description = "Pipe preset deleted", body = MutationResponse),
553 (status = 400, description = "Bad request", body = ErrorResponse)
554 )
555)]
556pub async fn delete_pipe_preset(
557 State(state): State<Arc<ApiState>>,
558 Path(name): Path<String>,
559) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
560 let db = require_db(&state)?;
561
562 db.query(SQL_DELETE_PIPE_PRESET)
563 .bind(("name", name.clone()))
564 .await
565 .and_then(|response| response.check())
566 .map_err(db_err)?;
567
568 refresh_read_cache(state.as_ref())
569 .await
570 .map_err(config_err)?;
571
572 Ok(Json(MutationResponse {
573 ok: true,
574 message: format!("pipe preset '{name}' deleted"),
575 }))
576}
577
578fn require_db(
579 state: &ApiState,
580) -> Result<&surrealdb::Surreal<surrealdb::engine::any::Any>, Json<ErrorResponse>> {
581 state.db_client.as_deref().ok_or_else(|| {
582 Json(ErrorResponse {
583 error: "database not configured".into(),
584 })
585 })
586}
587
588fn db_err(e: surrealdb::Error) -> Json<ErrorResponse> {
589 Json(ErrorResponse {
590 error: format!("db: {e}"),
591 })
592}
593
594fn config_err(e: oversync_core::error::OversyncError) -> Json<ErrorResponse> {
595 Json(ErrorResponse {
596 error: e.to_string(),
597 })
598}
599
600fn json_serde_err(e: serde_json::Error) -> Json<ErrorResponse> {
601 Json(ErrorResponse {
602 error: format!("json: {e}"),
603 })
604}
605
606fn storage_pipe_preset_spec_value(
607 spec: PipePresetSpecInput,
608) -> Result<serde_json::Value, serde_json::Error> {
609 let origin_config = if spec.origin_config.is_null() {
610 serde_json::json!({})
611 } else {
612 spec.origin_config
613 };
614 let schedule = if spec.schedule.is_null() {
615 serde_json::json!({})
616 } else {
617 spec.schedule
618 };
619 let delta = if spec.delta.is_null() {
620 serde_json::json!({})
621 } else {
622 spec.delta
623 };
624 let retry = if spec.retry.is_null() {
625 serde_json::json!({})
626 } else {
627 spec.retry
628 };
629
630 serde_json::to_value(serde_json::json!({
631 "origin": {
632 "connector": spec.origin_connector,
633 "dsn": spec.origin_dsn,
634 "credential": spec.origin_credential,
635 "trino_url": spec.trino_url,
636 "config": origin_config,
637 },
638 "parameters": spec.parameters,
639 "targets": spec.targets,
640 "queries": spec.queries,
641 "schedule": schedule,
642 "delta": delta,
643 "retry": retry,
644 "recipe": spec.recipe,
645 "filters": spec.filters,
646 "transforms": spec.transforms,
647 "links": spec.links,
648 }))
649}
650
651fn api_pipe_preset_spec_value(spec: &serde_json::Value) -> Option<serde_json::Value> {
652 if spec.get("origin_connector").is_some() {
653 return Some(serde_json::json!({
654 "origin_connector": spec.get("origin_connector").cloned().unwrap_or(serde_json::Value::Null),
655 "origin_dsn": spec.get("origin_dsn").cloned().unwrap_or(serde_json::Value::Null),
656 "origin_credential": spec.get("origin_credential").cloned().unwrap_or(serde_json::Value::Null),
657 "trino_url": spec.get("trino_url").cloned().unwrap_or(serde_json::Value::Null),
658 "origin_config": spec.get("origin_config").cloned().unwrap_or_else(|| serde_json::json!({})),
659 "parameters": spec.get("parameters").cloned().unwrap_or_else(|| serde_json::json!([])),
660 "targets": spec.get("targets").cloned().unwrap_or_else(|| serde_json::json!([])),
661 "queries": spec.get("queries").cloned().unwrap_or_else(|| serde_json::json!([])),
662 "schedule": spec.get("schedule").cloned().unwrap_or_else(|| serde_json::json!({})),
663 "delta": spec.get("delta").cloned().unwrap_or_else(|| serde_json::json!({})),
664 "retry": spec.get("retry").cloned().unwrap_or_else(|| serde_json::json!({})),
665 "recipe": spec.get("recipe").cloned().unwrap_or(serde_json::Value::Null),
666 "filters": spec.get("filters").cloned().unwrap_or_else(|| serde_json::json!([])),
667 "transforms": spec.get("transforms").cloned().unwrap_or_else(|| serde_json::json!([])),
668 "links": spec.get("links").cloned().unwrap_or_else(|| serde_json::json!([])),
669 }));
670 }
671
672 let origin = spec.get("origin")?;
673 Some(serde_json::json!({
674 "origin_connector": origin.get("connector").cloned().unwrap_or(serde_json::Value::Null),
675 "origin_dsn": origin.get("dsn").cloned().unwrap_or(serde_json::Value::Null),
676 "origin_credential": origin.get("credential").cloned().unwrap_or(serde_json::Value::Null),
677 "trino_url": origin.get("trino_url").cloned().unwrap_or(serde_json::Value::Null),
678 "origin_config": match origin.get("config") {
679 Some(value) if !value.is_null() => value.clone(),
680 _ => serde_json::json!({}),
681 },
682 "parameters": spec.get("parameters").cloned().unwrap_or_else(|| serde_json::json!([])),
683 "targets": spec.get("targets").cloned().unwrap_or_else(|| serde_json::json!([])),
684 "queries": spec.get("queries").cloned().unwrap_or_else(|| serde_json::json!([])),
685 "schedule": match spec.get("schedule") {
686 Some(value) if !value.is_null() => value.clone(),
687 _ => serde_json::json!({}),
688 },
689 "delta": match spec.get("delta") {
690 Some(value) if !value.is_null() => value.clone(),
691 _ => serde_json::json!({}),
692 },
693 "retry": match spec.get("retry") {
694 Some(value) if !value.is_null() => value.clone(),
695 _ => serde_json::json!({}),
696 },
697 "recipe": spec.get("recipe").cloned().unwrap_or(serde_json::Value::Null),
698 "filters": spec.get("filters").cloned().unwrap_or_else(|| serde_json::json!([])),
699 "transforms": spec.get("transforms").cloned().unwrap_or_else(|| serde_json::json!([])),
700 "links": spec.get("links").cloned().unwrap_or_else(|| serde_json::json!([])),
701 }))
702}
703
704async fn create_pipe_query_record(
705 db: &surrealdb::Surreal<surrealdb::engine::any::Any>,
706 origin_id: &str,
707 query: &PipeQueryInput,
708) -> Result<(), oversync_core::error::OversyncError> {
709 let sql = if query.sinks.is_some() {
710 SQL_CREATE_PIPE_QUERY_WITH_SINKS
711 } else {
712 SQL_CREATE_PIPE_QUERY
713 };
714
715 db.query(sql)
716 .bind(("source", origin_id.to_string()))
717 .bind(("name", query.id.clone()))
718 .bind(("query", query.sql.clone()))
719 .bind(("key_column", query.key_column.clone()))
720 .bind(("sinks", query.sinks.clone()))
721 .bind(("transform", query.transform.clone()))
722 .await
723 .and_then(|response| response.check())
724 .map_err(|e| {
725 oversync_core::error::OversyncError::SurrealDb(format!(
726 "create query '{}' for pipe '{}': {e}",
727 query.id, origin_id
728 ))
729 })?;
730
731 Ok(())
732}
733
734pub(crate) async fn reload_config(state: &ApiState) -> Result<(), Json<ErrorResponse>> {
735 if let (Some(lifecycle), Some(db)) = (&state.lifecycle, &state.db_client) {
736 lifecycle.restart_with_config_json(db).await.map_err(|e| {
737 Json(ErrorResponse {
738 error: format!("reload: {e}"),
739 })
740 })?;
741 }
742 refresh_read_cache(state).await.map_err(|e| {
743 Json(ErrorResponse {
744 error: format!("refresh cache: {e}"),
745 })
746 })?;
747 Ok(())
748}
749
750pub async fn refresh_read_cache(
751 state: &ApiState,
752) -> Result<(), oversync_core::error::OversyncError> {
753 let Some(db) = &state.db_client else {
754 return Ok(());
755 };
756
757 const SQL_LOAD_QUERIES: &str = oversync_queries::config::LOAD_QUERIES;
758 const SQL_READ_SINKS_CACHE: &str = oversync_queries::config::READ_SINKS_CACHE;
759 const SQL_READ_PIPES_CACHE: &str = oversync_queries::config::READ_PIPES_CACHE;
760 const SQL_READ_PIPE_PRESETS_CACHE: &str = oversync_queries::config::READ_PIPE_PRESETS_CACHE;
761
762 let query_rows =
763 read_cache_rows_or_empty(db, SQL_LOAD_QUERIES, "refresh queries cache").await?;
764 let mut query_counts_by_origin: std::collections::HashMap<String, usize> =
765 std::collections::HashMap::new();
766 for row in &query_rows {
767 let Some(origin_id) = row.get("origin_id").and_then(|v| v.as_str()) else {
768 continue;
769 };
770 *query_counts_by_origin
771 .entry(origin_id.to_string())
772 .or_default() += 1;
773 }
774
775 let sink_rows =
776 read_cache_rows_or_empty(db, SQL_READ_SINKS_CACHE, "refresh sinks cache").await?;
777 let mut sink_configs: Vec<crate::state::SinkConfig> = sink_rows
778 .iter()
779 .filter_map(|r| {
780 Some(crate::state::SinkConfig {
781 name: r.get("name")?.as_str()?.to_string(),
782 sink_type: r.get("sink_type")?.as_str()?.to_string(),
783 config: r.get("config").cloned(),
784 })
785 })
786 .collect();
787
788 let pipe_rows =
789 read_cache_rows_or_empty(db, SQL_READ_PIPES_CACHE, "refresh pipes cache").await?;
790 let mut pipe_configs: Vec<crate::state::PipeConfigCache> = pipe_rows
791 .iter()
792 .filter_map(|r| {
793 let name = r.get("name")?.as_str()?.to_string();
794 Some(crate::state::PipeConfigCache {
795 name: name.clone(),
796 origin_connector: r.get("origin_connector")?.as_str()?.to_string(),
797 origin_dsn: r.get("origin_dsn")?.as_str()?.to_string(),
798 targets: r
799 .get("targets")
800 .and_then(|v| v.as_array())
801 .map(|arr| {
802 arr.iter()
803 .filter_map(|v| v.as_str().map(String::from))
804 .collect()
805 })
806 .unwrap_or_default(),
807 interval_secs: r
808 .get("interval_secs")
809 .and_then(|v| v.as_u64())
810 .unwrap_or(300),
811 query_count: *query_counts_by_origin.get(name.as_str()).unwrap_or(&0),
812 recipe: r.get("recipe").cloned().filter(|v| !v.is_null()),
813 enabled: r.get("enabled").and_then(|v| v.as_bool()).unwrap_or(true),
814 })
815 })
816 .collect();
817
818 if let Some(lifecycle) = &state.lifecycle {
819 match lifecycle.runtime_cache_snapshot().await {
820 Ok(runtime) => merge_runtime_cache(&mut sink_configs, &mut pipe_configs, runtime),
821 Err(error) => {
822 warn!(error = %error, "failed to load runtime cache snapshot for API read model");
823 }
824 }
825 }
826
827 *state.sinks.write().await = sink_configs;
828 *state.pipes.write().await = pipe_configs;
829
830 let pipe_preset_rows = read_cache_rows_or_empty(
831 db,
832 SQL_READ_PIPE_PRESETS_CACHE,
833 "refresh pipe presets cache",
834 )
835 .await?;
836 let pipe_preset_configs: Vec<crate::state::PipePresetCache> = pipe_preset_rows
837 .iter()
838 .filter_map(|r| {
839 Some(crate::state::PipePresetCache {
840 name: r.get("name")?.as_str()?.to_string(),
841 description: r
842 .get("description")
843 .and_then(|v| v.as_str())
844 .map(String::from),
845 spec: api_pipe_preset_spec_value(r.get("spec")?)?,
846 })
847 })
848 .collect();
849 *state.pipe_presets.write().await = pipe_preset_configs;
850
851 Ok(())
852}
853
854fn merge_runtime_cache(
855 sinks: &mut Vec<crate::state::SinkConfig>,
856 pipes: &mut Vec<crate::state::PipeConfigCache>,
857 runtime: crate::state::RuntimeCacheSnapshot,
858) {
859 for runtime_sink in runtime.sinks {
860 match sinks.iter_mut().find(|sink| sink.name == runtime_sink.name) {
861 Some(existing) => {
862 if existing.config.is_none() && runtime_sink.config.is_some() {
863 existing.config = runtime_sink.config;
864 }
865 if existing.sink_type.is_empty() {
866 existing.sink_type = runtime_sink.sink_type;
867 }
868 }
869 None => sinks.push(runtime_sink),
870 }
871 }
872
873 for runtime_pipe in runtime.pipes {
874 match pipes.iter_mut().find(|pipe| pipe.name == runtime_pipe.name) {
875 Some(existing) => {
876 if existing.query_count == 0 && runtime_pipe.query_count > 0 {
877 existing.query_count = runtime_pipe.query_count;
878 }
879 if existing.recipe.is_none() && runtime_pipe.recipe.is_some() {
880 existing.recipe = runtime_pipe.recipe;
881 }
882 if existing.targets.is_empty() && !runtime_pipe.targets.is_empty() {
883 existing.targets = runtime_pipe.targets;
884 }
885 if existing.origin_connector.is_empty() {
886 existing.origin_connector = runtime_pipe.origin_connector;
887 }
888 if existing.origin_dsn.is_empty() {
889 existing.origin_dsn = runtime_pipe.origin_dsn;
890 }
891 }
892 None => pipes.push(runtime_pipe),
893 }
894 }
895}
896
897fn is_missing_table_error(error: &dyn std::fmt::Display) -> bool {
898 let message = error.to_string();
899 message.contains("does not exist") && message.contains("table")
900}
901
902async fn read_cache_rows_or_empty(
903 db: &surrealdb::Surreal<surrealdb::engine::any::Any>,
904 sql: &str,
905 context: &str,
906) -> Result<Vec<serde_json::Value>, oversync_core::error::OversyncError> {
907 let mut response = match db.query(sql).await {
908 Ok(response) => response,
909 Err(error) if is_missing_table_error(&error) => return Ok(Vec::new()),
910 Err(error) => {
911 return Err(oversync_core::error::OversyncError::SurrealDb(format!(
912 "{context}: {error}"
913 )));
914 }
915 };
916
917 match response.take(0) {
918 Ok(rows) => Ok(rows),
919 Err(error) if is_missing_table_error(&error) => Ok(Vec::new()),
920 Err(error) => Err(oversync_core::error::OversyncError::SurrealDb(format!(
921 "{context} take: {error}"
922 ))),
923 }
924}
925
926#[cfg(test)]
927mod tests {
928 use super::{api_pipe_preset_spec_value, is_missing_table_error};
929
930 #[test]
931 fn api_pipe_preset_spec_value_accepts_legacy_flat_shape() {
932 let flat = serde_json::json!({
933 "origin_connector": "postgres",
934 "origin_dsn": "postgres://localhost/db",
935 "origin_credential": null,
936 "trino_url": null,
937 "origin_config": { "sslmode": "require" },
938 "targets": ["stdout"],
939 "queries": [{
940 "id": "aspect-columns",
941 "sql": "SELECT id, payload FROM columns",
942 "key_column": "id"
943 }],
944 "schedule": { "interval_secs": 900, "missed_tick_policy": "skip" },
945 "delta": { "diff_mode": "db", "fail_safe_threshold": 30 },
946 "retry": { "max_retries": 3, "retry_base_delay_secs": 5 },
947 "recipe": null,
948 "filters": [],
949 "transforms": [],
950 "links": []
951 });
952
953 let api = api_pipe_preset_spec_value(&flat).expect("legacy flat preset should map");
954 assert_eq!(api["origin_connector"], "postgres");
955 assert_eq!(api["origin_config"]["sslmode"], "require");
956 assert_eq!(api["queries"][0]["id"], "aspect-columns");
957 }
958
959 #[test]
960 fn missing_table_error_detection_matches_surreal_message_shape() {
961 assert!(is_missing_table_error(
962 &"The table 'pipe_config' does not exist"
963 ));
964 assert!(!is_missing_table_error(&"permission denied"));
965 }
966}