Skip to main content

oversync_api/
mutations.rs

1use std::sync::Arc;
2
3use axum::Json;
4use axum::extract::{Path, State};
5
6use crate::state::ApiState;
7use crate::types::*;
8
9use oversync_queries::mutations;
10
11const SQL_DELETE_SINK: &str = mutations::DELETE_SINK;
12const SQL_CREATE_SINK: &str = mutations::CREATE_SINK;
13const SQL_UPDATE_SINK_TYPE: &str = mutations::UPDATE_SINK_TYPE;
14const SQL_UPDATE_SINK_ENABLED: &str = mutations::UPDATE_SINK_ENABLED;
15const SQL_UPDATE_SINK_CONFIG: &str = mutations::UPDATE_SINK_CONFIG;
16
17const SQL_DELETE_PIPE: &str = mutations::DELETE_PIPE;
18const SQL_DELETE_PIPE_PRESET: &str = mutations::DELETE_PIPE_PRESET;
19const SQL_CREATE_PIPE: &str = mutations::CREATE_PIPE;
20const SQL_CREATE_PIPE_PRESET: &str = mutations::CREATE_PIPE_PRESET;
21const SQL_DELETE_PIPE_QUERIES: &str = mutations::DELETE_PIPE_QUERIES;
22const SQL_CREATE_PIPE_QUERY: &str = mutations::CREATE_QUERY;
23const SQL_CREATE_PIPE_QUERY_WITH_SINKS: &str = mutations::CREATE_QUERY_WITH_SINKS;
24const SQL_UPDATE_PIPE_ORIGIN_CONNECTOR: &str = mutations::UPDATE_PIPE_ORIGIN_CONNECTOR;
25const SQL_UPDATE_PIPE_ORIGIN_DSN: &str = mutations::UPDATE_PIPE_ORIGIN_DSN;
26const SQL_UPDATE_PIPE_ORIGIN_CREDENTIAL: &str = mutations::UPDATE_PIPE_ORIGIN_CREDENTIAL;
27const SQL_UPDATE_PIPE_TRINO_URL: &str = mutations::UPDATE_PIPE_TRINO_URL;
28const SQL_UPDATE_PIPE_ORIGIN_CONFIG: &str = mutations::UPDATE_PIPE_ORIGIN_CONFIG;
29const SQL_UPDATE_PIPE_TARGETS: &str = mutations::UPDATE_PIPE_TARGETS;
30const SQL_UPDATE_PIPE_SCHEDULE: &str = mutations::UPDATE_PIPE_SCHEDULE;
31const SQL_UPDATE_PIPE_DELTA: &str = mutations::UPDATE_PIPE_DELTA;
32const SQL_UPDATE_PIPE_RETRY: &str = mutations::UPDATE_PIPE_RETRY;
33const SQL_UPDATE_PIPE_RECIPE: &str = mutations::UPDATE_PIPE_RECIPE;
34const SQL_UPDATE_PIPE_RECIPE_NONE: &str = mutations::UPDATE_PIPE_RECIPE_NONE;
35const SQL_UPDATE_PIPE_PRESET: &str = mutations::UPDATE_PIPE_PRESET;
36const SQL_UPDATE_PIPE_FILTERS: &str = mutations::UPDATE_PIPE_FILTERS;
37const SQL_UPDATE_PIPE_TRANSFORMS: &str = mutations::UPDATE_PIPE_TRANSFORMS;
38const SQL_UPDATE_PIPE_LINKS: &str = mutations::UPDATE_PIPE_LINKS;
39const SQL_UPDATE_PIPE_ENABLED: &str = mutations::UPDATE_PIPE_ENABLED;
40
41#[utoipa::path(
42	post,
43	path = "/sinks",
44	request_body = CreateSinkRequest,
45	responses(
46		(status = 200, description = "Sink created", body = MutationResponse),
47		(status = 400, description = "Bad request", body = ErrorResponse)
48	)
49)]
50pub async fn create_sink(
51	State(state): State<Arc<ApiState>>,
52	Json(req): Json<CreateSinkRequest>,
53) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
54	let db = require_db(&state)?;
55
56	let config_json = if req.config.is_null() {
57		serde_json::json!({})
58	} else {
59		req.config
60	};
61
62	db.query(SQL_DELETE_SINK)
63		.bind(("name", req.name.clone()))
64		.await
65		.and_then(|response| response.check())
66		.map_err(db_err)?;
67
68	db.query(SQL_CREATE_SINK)
69		.bind(("name", req.name.clone()))
70		.bind(("sink_type", req.sink_type))
71		.bind(("config", config_json))
72		.await
73		.and_then(|response| response.check())
74		.map_err(db_err)?;
75
76	reload_config(&state).await?;
77
78	Ok(Json(MutationResponse {
79		ok: true,
80		message: format!("sink '{}' created", req.name),
81	}))
82}
83
84#[utoipa::path(
85	put,
86	path = "/sinks/{name}",
87	params(("name" = String, Path, description = "Sink name")),
88	request_body = UpdateSinkRequest,
89	responses(
90		(status = 200, description = "Sink updated", body = MutationResponse),
91		(status = 400, description = "Bad request", body = ErrorResponse)
92	)
93)]
94pub async fn update_sink(
95	State(state): State<Arc<ApiState>>,
96	Path(name): Path<String>,
97	Json(req): Json<UpdateSinkRequest>,
98) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
99	let db = require_db(&state)?;
100
101	if let Some(sink_type) = req.sink_type {
102		db.query(SQL_UPDATE_SINK_TYPE)
103			.bind(("name", name.clone()))
104			.bind(("sink_type", sink_type))
105			.await
106			.and_then(|response| response.check())
107			.map_err(db_err)?;
108	}
109
110	if let Some(enabled) = req.enabled {
111		db.query(SQL_UPDATE_SINK_ENABLED)
112			.bind(("name", name.clone()))
113			.bind(("enabled", enabled))
114			.await
115			.and_then(|response| response.check())
116			.map_err(db_err)?;
117	}
118
119	if let Some(config) = req.config {
120		db.query(SQL_UPDATE_SINK_CONFIG)
121			.bind(("name", name.clone()))
122			.bind(("config", config))
123			.await
124			.and_then(|response| response.check())
125			.map_err(db_err)?;
126	}
127
128	reload_config(&state).await?;
129
130	Ok(Json(MutationResponse {
131		ok: true,
132		message: format!("sink '{name}' updated"),
133	}))
134}
135
136#[utoipa::path(
137	delete,
138	path = "/sinks/{name}",
139	params(("name" = String, Path, description = "Sink name")),
140	responses(
141		(status = 200, description = "Sink deleted", body = MutationResponse),
142		(status = 400, description = "Bad request", body = ErrorResponse)
143	)
144)]
145pub async fn delete_sink(
146	State(state): State<Arc<ApiState>>,
147	Path(name): Path<String>,
148) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
149	let db = require_db(&state)?;
150
151	db.query(SQL_DELETE_SINK)
152		.bind(("name", name.clone()))
153		.await
154		.and_then(|response| response.check())
155		.map_err(db_err)?;
156
157	reload_config(&state).await?;
158
159	Ok(Json(MutationResponse {
160		ok: true,
161		message: format!("sink '{name}' deleted"),
162	}))
163}
164
165// ── Pipe CRUD ───────────────────────────────────────────────
166
167#[utoipa::path(
168	post,
169	path = "/pipes",
170	request_body = CreatePipeRequest,
171	responses(
172		(status = 200, description = "Pipe created", body = MutationResponse),
173		(status = 400, description = "Bad request", body = ErrorResponse)
174	)
175)]
176pub async fn create_pipe(
177	State(state): State<Arc<ApiState>>,
178	Json(req): Json<CreatePipeRequest>,
179) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
180	let db = require_db(&state)?;
181	let queries = req.queries;
182
183	let origin_config = if req.origin_config.is_null() {
184		serde_json::json!({})
185	} else {
186		req.origin_config
187	};
188	let schedule = if req.schedule.is_null() {
189		serde_json::json!({"interval_secs": 300, "missed_tick_policy": "skip"})
190	} else {
191		req.schedule
192	};
193	let delta = if req.delta.is_null() {
194		serde_json::json!({"diff_mode": "db", "fail_safe_threshold": 30.0})
195	} else {
196		req.delta
197	};
198	let retry = if req.retry.is_null() {
199		serde_json::json!({"max_retries": 3, "retry_base_delay_secs": 5})
200	} else {
201		req.retry
202	};
203
204	db.query(SQL_DELETE_PIPE)
205		.bind(("name", req.name.clone()))
206		.await
207		.and_then(|response| response.check())
208		.map_err(db_err)?;
209
210	db.query(SQL_CREATE_PIPE)
211		.bind(("name", req.name.clone()))
212		.bind(("origin_connector", req.origin_connector))
213		.bind(("origin_dsn", req.origin_dsn))
214		.bind(("origin_credential", req.origin_credential))
215		.bind(("trino_url", req.trino_url))
216		.bind(("origin_config", origin_config))
217		.bind(("targets", req.targets))
218		.bind(("schedule", schedule))
219		.bind(("delta", delta))
220		.bind(("retry", retry))
221		.bind(("recipe", req.recipe))
222		.bind(("filters", req.filters))
223		.bind(("transforms", req.transforms))
224		.bind(("links", req.links))
225		.await
226		.and_then(|response| response.check())
227		.map_err(db_err)?;
228
229	for query in &queries {
230		create_pipe_query_record(db, &req.name, query)
231			.await
232			.map_err(config_err)?;
233	}
234
235	reload_config(&state).await?;
236
237	Ok(Json(MutationResponse {
238		ok: true,
239		message: format!("pipe '{}' created", req.name),
240	}))
241}
242
243#[utoipa::path(
244	put,
245	path = "/pipes/{name}",
246	params(("name" = String, Path, description = "Pipe name")),
247	request_body = UpdatePipeRequest,
248	responses(
249		(status = 200, description = "Pipe updated", body = MutationResponse),
250		(status = 400, description = "Bad request", body = ErrorResponse)
251	)
252)]
253pub async fn update_pipe(
254	State(state): State<Arc<ApiState>>,
255	Path(name): Path<String>,
256	Json(req): Json<UpdatePipeRequest>,
257) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
258	let db = require_db(&state)?;
259
260	if let Some(connector) = req.origin_connector {
261		db.query(SQL_UPDATE_PIPE_ORIGIN_CONNECTOR)
262			.bind(("name", name.clone()))
263			.bind(("v", connector))
264			.await
265			.and_then(|response| response.check())
266			.map_err(db_err)?;
267	}
268
269	if let Some(dsn) = req.origin_dsn {
270		db.query(SQL_UPDATE_PIPE_ORIGIN_DSN)
271			.bind(("name", name.clone()))
272			.bind(("v", dsn))
273			.await
274			.and_then(|response| response.check())
275			.map_err(db_err)?;
276	}
277
278	if let Some(credential) = req.origin_credential {
279		db.query(SQL_UPDATE_PIPE_ORIGIN_CREDENTIAL)
280			.bind(("name", name.clone()))
281			.bind(("v", credential))
282			.await
283			.and_then(|response| response.check())
284			.map_err(db_err)?;
285	}
286
287	if let Some(trino_url) = req.trino_url {
288		db.query(SQL_UPDATE_PIPE_TRINO_URL)
289			.bind(("name", name.clone()))
290			.bind(("v", trino_url))
291			.await
292			.and_then(|response| response.check())
293			.map_err(db_err)?;
294	}
295
296	if let Some(config) = req.origin_config {
297		db.query(SQL_UPDATE_PIPE_ORIGIN_CONFIG)
298			.bind(("name", name.clone()))
299			.bind(("v", config))
300			.await
301			.and_then(|response| response.check())
302			.map_err(db_err)?;
303	}
304
305	if let Some(targets) = req.targets {
306		db.query(SQL_UPDATE_PIPE_TARGETS)
307			.bind(("name", name.clone()))
308			.bind(("v", targets))
309			.await
310			.and_then(|response| response.check())
311			.map_err(db_err)?;
312	}
313
314	if let Some(schedule) = req.schedule {
315		db.query(SQL_UPDATE_PIPE_SCHEDULE)
316			.bind(("name", name.clone()))
317			.bind(("v", schedule))
318			.await
319			.and_then(|response| response.check())
320			.map_err(db_err)?;
321	}
322
323	if let Some(delta) = req.delta {
324		db.query(SQL_UPDATE_PIPE_DELTA)
325			.bind(("name", name.clone()))
326			.bind(("v", delta))
327			.await
328			.and_then(|response| response.check())
329			.map_err(db_err)?;
330	}
331
332	if let Some(retry) = req.retry {
333		db.query(SQL_UPDATE_PIPE_RETRY)
334			.bind(("name", name.clone()))
335			.bind(("v", retry))
336			.await
337			.and_then(|response| response.check())
338			.map_err(db_err)?;
339	}
340
341	if let Some(recipe) = req.recipe {
342		match recipe {
343			Some(recipe) => {
344				db.query(SQL_UPDATE_PIPE_RECIPE)
345					.bind(("name", name.clone()))
346					.bind(("v", recipe))
347					.await
348					.and_then(|response| response.check())
349					.map_err(db_err)?;
350			}
351			None => {
352				db.query(SQL_UPDATE_PIPE_RECIPE_NONE)
353					.bind(("name", name.clone()))
354					.await
355					.and_then(|response| response.check())
356					.map_err(db_err)?;
357			}
358		}
359	}
360
361	if let Some(filters) = req.filters {
362		db.query(SQL_UPDATE_PIPE_FILTERS)
363			.bind(("name", name.clone()))
364			.bind(("v", filters))
365			.await
366			.and_then(|response| response.check())
367			.map_err(db_err)?;
368	}
369
370	if let Some(transforms) = req.transforms {
371		db.query(SQL_UPDATE_PIPE_TRANSFORMS)
372			.bind(("name", name.clone()))
373			.bind(("v", transforms))
374			.await
375			.and_then(|response| response.check())
376			.map_err(db_err)?;
377	}
378
379	if let Some(links) = req.links {
380		db.query(SQL_UPDATE_PIPE_LINKS)
381			.bind(("name", name.clone()))
382			.bind(("v", links))
383			.await
384			.and_then(|response| response.check())
385			.map_err(db_err)?;
386	}
387
388	if let Some(enabled) = req.enabled {
389		db.query(SQL_UPDATE_PIPE_ENABLED)
390			.bind(("name", name.clone()))
391			.bind(("v", enabled))
392			.await
393			.and_then(|response| response.check())
394			.map_err(db_err)?;
395	}
396
397	if let Some(queries) = req.queries {
398		db.query(SQL_DELETE_PIPE_QUERIES)
399			.bind(("name", name.clone()))
400			.await
401			.and_then(|response| response.check())
402			.map_err(db_err)?;
403
404		for query in &queries {
405			create_pipe_query_record(db, &name, query)
406				.await
407				.map_err(config_err)?;
408		}
409	}
410
411	reload_config(&state).await?;
412
413	Ok(Json(MutationResponse {
414		ok: true,
415		message: format!("pipe '{name}' updated"),
416	}))
417}
418
419#[utoipa::path(
420	delete,
421	path = "/pipes/{name}",
422	params(("name" = String, Path, description = "Pipe name")),
423	responses(
424		(status = 200, description = "Pipe deleted", body = MutationResponse),
425		(status = 400, description = "Bad request", body = ErrorResponse)
426	)
427)]
428pub async fn delete_pipe(
429	State(state): State<Arc<ApiState>>,
430	Path(name): Path<String>,
431) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
432	let db = require_db(&state)?;
433
434	db.query(SQL_DELETE_PIPE)
435		.bind(("name", name.clone()))
436		.await
437		.and_then(|response| response.check())
438		.map_err(db_err)?;
439
440	db.query(SQL_DELETE_PIPE_QUERIES)
441		.bind(("name", name.clone()))
442		.await
443		.and_then(|response| response.check())
444		.map_err(db_err)?;
445
446	reload_config(&state).await?;
447
448	Ok(Json(MutationResponse {
449		ok: true,
450		message: format!("pipe '{name}' deleted"),
451	}))
452}
453
454#[utoipa::path(
455	post,
456	path = "/pipe-presets",
457	request_body = CreatePipePresetRequest,
458	responses(
459		(status = 200, description = "Pipe preset created", body = MutationResponse),
460		(status = 400, description = "Bad request", body = ErrorResponse)
461	)
462)]
463pub async fn create_pipe_preset(
464	State(state): State<Arc<ApiState>>,
465	Json(req): Json<CreatePipePresetRequest>,
466) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
467	let db = require_db(&state)?;
468	let spec = storage_pipe_preset_spec_value(req.spec).map_err(json_serde_err)?;
469
470	db.query(SQL_DELETE_PIPE_PRESET)
471		.bind(("name", req.name.clone()))
472		.await
473		.and_then(|response| response.check())
474		.map_err(db_err)?;
475
476	db.query(SQL_CREATE_PIPE_PRESET)
477		.bind(("name", req.name.clone()))
478		.bind(("description", req.description.clone()))
479		.bind(("spec", spec))
480		.await
481		.and_then(|response| response.check())
482		.map_err(db_err)?;
483
484	refresh_read_cache(state.as_ref())
485		.await
486		.map_err(config_err)?;
487
488	Ok(Json(MutationResponse {
489		ok: true,
490		message: format!("pipe preset '{}' created", req.name),
491	}))
492}
493
494#[utoipa::path(
495	put,
496	path = "/pipe-presets/{name}",
497	params(("name" = String, Path, description = "Pipe preset name")),
498	request_body = UpdatePipePresetRequest,
499	responses(
500		(status = 200, description = "Pipe preset updated", body = MutationResponse),
501		(status = 400, description = "Bad request", body = ErrorResponse)
502	)
503)]
504pub async fn update_pipe_preset(
505	State(state): State<Arc<ApiState>>,
506	Path(name): Path<String>,
507	Json(req): Json<UpdatePipePresetRequest>,
508) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
509	let db = require_db(&state)?;
510	let current = state
511		.pipe_presets_info()
512		.into_iter()
513		.find(|preset| preset.name == name)
514		.ok_or_else(|| {
515			Json(ErrorResponse {
516				error: format!("pipe preset not found: {name}"),
517			})
518		})?;
519
520	let spec = match req.spec {
521		Some(spec) => storage_pipe_preset_spec_value(spec).map_err(json_serde_err)?,
522		None => storage_pipe_preset_spec_value(
523			serde_json::from_value(current.spec).map_err(json_serde_err)?,
524		)
525		.map_err(json_serde_err)?,
526	};
527
528	db.query(SQL_UPDATE_PIPE_PRESET)
529		.bind(("name", name.clone()))
530		.bind(("description", req.description))
531		.bind(("spec", spec))
532		.await
533		.and_then(|response| response.check())
534		.map_err(db_err)?;
535
536	refresh_read_cache(state.as_ref())
537		.await
538		.map_err(config_err)?;
539
540	Ok(Json(MutationResponse {
541		ok: true,
542		message: format!("pipe preset '{name}' updated"),
543	}))
544}
545
546#[utoipa::path(
547	delete,
548	path = "/pipe-presets/{name}",
549	params(("name" = String, Path, description = "Pipe preset name")),
550	responses(
551		(status = 200, description = "Pipe preset deleted", body = MutationResponse),
552		(status = 400, description = "Bad request", body = ErrorResponse)
553	)
554)]
555pub async fn delete_pipe_preset(
556	State(state): State<Arc<ApiState>>,
557	Path(name): Path<String>,
558) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
559	let db = require_db(&state)?;
560
561	db.query(SQL_DELETE_PIPE_PRESET)
562		.bind(("name", name.clone()))
563		.await
564		.and_then(|response| response.check())
565		.map_err(db_err)?;
566
567	refresh_read_cache(state.as_ref())
568		.await
569		.map_err(config_err)?;
570
571	Ok(Json(MutationResponse {
572		ok: true,
573		message: format!("pipe preset '{name}' deleted"),
574	}))
575}
576
577fn require_db(
578	state: &ApiState,
579) -> Result<&surrealdb::Surreal<surrealdb::engine::any::Any>, Json<ErrorResponse>> {
580	state.db_client.as_deref().ok_or_else(|| {
581		Json(ErrorResponse {
582			error: "database not configured".into(),
583		})
584	})
585}
586
587fn db_err(e: surrealdb::Error) -> Json<ErrorResponse> {
588	Json(ErrorResponse {
589		error: format!("db: {e}"),
590	})
591}
592
593fn config_err(e: oversync_core::error::OversyncError) -> Json<ErrorResponse> {
594	Json(ErrorResponse {
595		error: e.to_string(),
596	})
597}
598
599fn json_serde_err(e: serde_json::Error) -> Json<ErrorResponse> {
600	Json(ErrorResponse {
601		error: format!("json: {e}"),
602	})
603}
604
605fn storage_pipe_preset_spec_value(
606	spec: PipePresetSpecInput,
607) -> Result<serde_json::Value, serde_json::Error> {
608	let origin_config = if spec.origin_config.is_null() {
609		serde_json::json!({})
610	} else {
611		spec.origin_config
612	};
613	let schedule = if spec.schedule.is_null() {
614		serde_json::json!({})
615	} else {
616		spec.schedule
617	};
618	let delta = if spec.delta.is_null() {
619		serde_json::json!({})
620	} else {
621		spec.delta
622	};
623	let retry = if spec.retry.is_null() {
624		serde_json::json!({})
625	} else {
626		spec.retry
627	};
628
629	serde_json::to_value(serde_json::json!({
630		"origin": {
631			"connector": spec.origin_connector,
632			"dsn": spec.origin_dsn,
633			"credential": spec.origin_credential,
634			"trino_url": spec.trino_url,
635			"config": origin_config,
636		},
637		"parameters": spec.parameters,
638		"targets": spec.targets,
639		"queries": spec.queries,
640		"schedule": schedule,
641		"delta": delta,
642		"retry": retry,
643		"recipe": spec.recipe,
644		"filters": spec.filters,
645		"transforms": spec.transforms,
646		"links": spec.links,
647	}))
648}
649
650fn api_pipe_preset_spec_value(spec: &serde_json::Value) -> Option<serde_json::Value> {
651	if spec.get("origin_connector").is_some() {
652		return Some(serde_json::json!({
653			"origin_connector": spec.get("origin_connector").cloned().unwrap_or(serde_json::Value::Null),
654			"origin_dsn": spec.get("origin_dsn").cloned().unwrap_or(serde_json::Value::Null),
655			"origin_credential": spec.get("origin_credential").cloned().unwrap_or(serde_json::Value::Null),
656			"trino_url": spec.get("trino_url").cloned().unwrap_or(serde_json::Value::Null),
657			"origin_config": spec.get("origin_config").cloned().unwrap_or_else(|| serde_json::json!({})),
658			"parameters": spec.get("parameters").cloned().unwrap_or_else(|| serde_json::json!([])),
659			"targets": spec.get("targets").cloned().unwrap_or_else(|| serde_json::json!([])),
660			"queries": spec.get("queries").cloned().unwrap_or_else(|| serde_json::json!([])),
661			"schedule": spec.get("schedule").cloned().unwrap_or_else(|| serde_json::json!({})),
662			"delta": spec.get("delta").cloned().unwrap_or_else(|| serde_json::json!({})),
663			"retry": spec.get("retry").cloned().unwrap_or_else(|| serde_json::json!({})),
664			"recipe": spec.get("recipe").cloned().unwrap_or(serde_json::Value::Null),
665			"filters": spec.get("filters").cloned().unwrap_or_else(|| serde_json::json!([])),
666			"transforms": spec.get("transforms").cloned().unwrap_or_else(|| serde_json::json!([])),
667			"links": spec.get("links").cloned().unwrap_or_else(|| serde_json::json!([])),
668		}));
669	}
670
671	let origin = spec.get("origin")?;
672	Some(serde_json::json!({
673		"origin_connector": origin.get("connector").cloned().unwrap_or(serde_json::Value::Null),
674		"origin_dsn": origin.get("dsn").cloned().unwrap_or(serde_json::Value::Null),
675		"origin_credential": origin.get("credential").cloned().unwrap_or(serde_json::Value::Null),
676		"trino_url": origin.get("trino_url").cloned().unwrap_or(serde_json::Value::Null),
677		"origin_config": match origin.get("config") {
678			Some(value) if !value.is_null() => value.clone(),
679			_ => serde_json::json!({}),
680		},
681		"parameters": spec.get("parameters").cloned().unwrap_or_else(|| serde_json::json!([])),
682		"targets": spec.get("targets").cloned().unwrap_or_else(|| serde_json::json!([])),
683		"queries": spec.get("queries").cloned().unwrap_or_else(|| serde_json::json!([])),
684		"schedule": match spec.get("schedule") {
685			Some(value) if !value.is_null() => value.clone(),
686			_ => serde_json::json!({}),
687		},
688		"delta": match spec.get("delta") {
689			Some(value) if !value.is_null() => value.clone(),
690			_ => serde_json::json!({}),
691		},
692		"retry": match spec.get("retry") {
693			Some(value) if !value.is_null() => value.clone(),
694			_ => serde_json::json!({}),
695		},
696		"recipe": spec.get("recipe").cloned().unwrap_or(serde_json::Value::Null),
697		"filters": spec.get("filters").cloned().unwrap_or_else(|| serde_json::json!([])),
698		"transforms": spec.get("transforms").cloned().unwrap_or_else(|| serde_json::json!([])),
699		"links": spec.get("links").cloned().unwrap_or_else(|| serde_json::json!([])),
700	}))
701}
702
703async fn create_pipe_query_record(
704	db: &surrealdb::Surreal<surrealdb::engine::any::Any>,
705	origin_id: &str,
706	query: &PipeQueryInput,
707) -> Result<(), oversync_core::error::OversyncError> {
708	let sql = if query.sinks.is_some() {
709		SQL_CREATE_PIPE_QUERY_WITH_SINKS
710	} else {
711		SQL_CREATE_PIPE_QUERY
712	};
713
714	db.query(sql)
715		.bind(("source", origin_id.to_string()))
716		.bind(("name", query.id.clone()))
717		.bind(("query", query.sql.clone()))
718		.bind(("key_column", query.key_column.clone()))
719		.bind(("sinks", query.sinks.clone()))
720		.bind(("transform", query.transform.clone()))
721		.await
722		.and_then(|response| response.check())
723		.map_err(|e| {
724			oversync_core::error::OversyncError::SurrealDb(format!(
725				"create query '{}' for pipe '{}': {e}",
726				query.id, origin_id
727			))
728		})?;
729
730	Ok(())
731}
732
733pub(crate) async fn reload_config(state: &ApiState) -> Result<(), Json<ErrorResponse>> {
734	if let (Some(lifecycle), Some(db)) = (&state.lifecycle, &state.db_client) {
735		lifecycle.restart_with_config_json(db).await.map_err(|e| {
736			Json(ErrorResponse {
737				error: format!("reload: {e}"),
738			})
739		})?;
740	}
741	refresh_read_cache(state).await.map_err(|e| {
742		Json(ErrorResponse {
743			error: format!("refresh cache: {e}"),
744		})
745	})?;
746	Ok(())
747}
748
749pub async fn refresh_read_cache(
750	state: &ApiState,
751) -> Result<(), oversync_core::error::OversyncError> {
752	let Some(db) = &state.db_client else {
753		return Ok(());
754	};
755
756	const SQL_LOAD_QUERIES: &str = oversync_queries::config::LOAD_QUERIES;
757	const SQL_READ_SINKS_CACHE: &str = oversync_queries::config::READ_SINKS_CACHE;
758	const SQL_READ_PIPES_CACHE: &str = oversync_queries::config::READ_PIPES_CACHE;
759	const SQL_READ_PIPE_PRESETS_CACHE: &str = oversync_queries::config::READ_PIPE_PRESETS_CACHE;
760
761	let query_rows =
762		read_cache_rows_or_empty(db, SQL_LOAD_QUERIES, "refresh queries cache").await?;
763	let mut query_counts_by_origin: std::collections::HashMap<String, usize> =
764		std::collections::HashMap::new();
765	for row in &query_rows {
766		let Some(origin_id) = row.get("origin_id").and_then(|v| v.as_str()) else {
767			continue;
768		};
769		*query_counts_by_origin
770			.entry(origin_id.to_string())
771			.or_default() += 1;
772	}
773
774	let sink_rows =
775		read_cache_rows_or_empty(db, SQL_READ_SINKS_CACHE, "refresh sinks cache").await?;
776	let sink_configs: Vec<crate::state::SinkConfig> = sink_rows
777		.iter()
778		.filter_map(|r| {
779			Some(crate::state::SinkConfig {
780				name: r.get("name")?.as_str()?.to_string(),
781				sink_type: r.get("sink_type")?.as_str()?.to_string(),
782				config: r.get("config").cloned(),
783			})
784		})
785		.collect();
786	*state.sinks.write().await = sink_configs;
787
788	let pipe_rows =
789		read_cache_rows_or_empty(db, SQL_READ_PIPES_CACHE, "refresh pipes cache").await?;
790	let pipe_configs: Vec<crate::state::PipeConfigCache> = pipe_rows
791		.iter()
792		.filter_map(|r| {
793			let name = r.get("name")?.as_str()?.to_string();
794			Some(crate::state::PipeConfigCache {
795				name: name.clone(),
796				origin_connector: r.get("origin_connector")?.as_str()?.to_string(),
797				origin_dsn: r.get("origin_dsn")?.as_str()?.to_string(),
798				targets: r
799					.get("targets")
800					.and_then(|v| v.as_array())
801					.map(|arr| {
802						arr.iter()
803							.filter_map(|v| v.as_str().map(String::from))
804							.collect()
805					})
806					.unwrap_or_default(),
807				interval_secs: r
808					.get("interval_secs")
809					.and_then(|v| v.as_u64())
810					.unwrap_or(300),
811				query_count: *query_counts_by_origin.get(name.as_str()).unwrap_or(&0),
812				recipe: r.get("recipe").cloned().filter(|v| !v.is_null()),
813				enabled: r.get("enabled").and_then(|v| v.as_bool()).unwrap_or(true),
814			})
815		})
816		.collect();
817	*state.pipes.write().await = pipe_configs;
818
819	let pipe_preset_rows = read_cache_rows_or_empty(
820		db,
821		SQL_READ_PIPE_PRESETS_CACHE,
822		"refresh pipe presets cache",
823	)
824	.await?;
825	let pipe_preset_configs: Vec<crate::state::PipePresetCache> = pipe_preset_rows
826		.iter()
827		.filter_map(|r| {
828			Some(crate::state::PipePresetCache {
829				name: r.get("name")?.as_str()?.to_string(),
830				description: r
831					.get("description")
832					.and_then(|v| v.as_str())
833					.map(String::from),
834				spec: api_pipe_preset_spec_value(r.get("spec")?)?,
835			})
836		})
837		.collect();
838	*state.pipe_presets.write().await = pipe_preset_configs;
839
840	Ok(())
841}
842
843fn is_missing_table_error(error: &dyn std::fmt::Display) -> bool {
844	let message = error.to_string();
845	message.contains("does not exist") && message.contains("table")
846}
847
848async fn read_cache_rows_or_empty(
849	db: &surrealdb::Surreal<surrealdb::engine::any::Any>,
850	sql: &str,
851	context: &str,
852) -> Result<Vec<serde_json::Value>, oversync_core::error::OversyncError> {
853	let mut response = match db.query(sql).await {
854		Ok(response) => response,
855		Err(error) if is_missing_table_error(&error) => return Ok(Vec::new()),
856		Err(error) => {
857			return Err(oversync_core::error::OversyncError::SurrealDb(format!(
858				"{context}: {error}"
859			)));
860		}
861	};
862
863	match response.take(0) {
864		Ok(rows) => Ok(rows),
865		Err(error) if is_missing_table_error(&error) => Ok(Vec::new()),
866		Err(error) => Err(oversync_core::error::OversyncError::SurrealDb(format!(
867			"{context} take: {error}"
868		))),
869	}
870}
871
872#[cfg(test)]
873mod tests {
874	use super::{api_pipe_preset_spec_value, is_missing_table_error};
875
876	#[test]
877	fn api_pipe_preset_spec_value_accepts_legacy_flat_shape() {
878		let flat = serde_json::json!({
879			"origin_connector": "postgres",
880			"origin_dsn": "postgres://localhost/db",
881			"origin_credential": null,
882			"trino_url": null,
883			"origin_config": { "sslmode": "require" },
884			"targets": ["stdout"],
885			"queries": [{
886				"id": "aspect-columns",
887				"sql": "SELECT id, payload FROM columns",
888				"key_column": "id"
889			}],
890			"schedule": { "interval_secs": 900, "missed_tick_policy": "skip" },
891			"delta": { "diff_mode": "db", "fail_safe_threshold": 30 },
892			"retry": { "max_retries": 3, "retry_base_delay_secs": 5 },
893			"recipe": null,
894			"filters": [],
895			"transforms": [],
896			"links": []
897		});
898
899		let api = api_pipe_preset_spec_value(&flat).expect("legacy flat preset should map");
900		assert_eq!(api["origin_connector"], "postgres");
901		assert_eq!(api["origin_config"]["sslmode"], "require");
902		assert_eq!(api["queries"][0]["id"], "aspect-columns");
903	}
904
905	#[test]
906	fn missing_table_error_detection_matches_surreal_message_shape() {
907		assert!(is_missing_table_error(
908			&"The table 'pipe_config' does not exist"
909		));
910		assert!(!is_missing_table_error(&"permission denied"));
911	}
912}