Skip to main content

oversync_api/
mutations.rs

1use std::sync::Arc;
2
3use axum::Json;
4use axum::extract::{Path, State};
5use tracing::warn;
6
7use crate::state::ApiState;
8use crate::types::*;
9
10use oversync_queries::mutations;
11
12const SQL_DELETE_SINK: &str = mutations::DELETE_SINK;
13const SQL_CREATE_SINK: &str = mutations::CREATE_SINK;
14const SQL_UPDATE_SINK_TYPE: &str = mutations::UPDATE_SINK_TYPE;
15const SQL_UPDATE_SINK_ENABLED: &str = mutations::UPDATE_SINK_ENABLED;
16const SQL_UPDATE_SINK_CONFIG: &str = mutations::UPDATE_SINK_CONFIG;
17
18const SQL_DELETE_PIPE: &str = mutations::DELETE_PIPE;
19const SQL_DELETE_PIPE_PRESET: &str = mutations::DELETE_PIPE_PRESET;
20const SQL_CREATE_PIPE: &str = mutations::CREATE_PIPE;
21const SQL_CREATE_PIPE_PRESET: &str = mutations::CREATE_PIPE_PRESET;
22const SQL_DELETE_PIPE_QUERIES: &str = mutations::DELETE_PIPE_QUERIES;
23const SQL_CREATE_PIPE_QUERY: &str = mutations::CREATE_QUERY;
24const SQL_CREATE_PIPE_QUERY_WITH_SINKS: &str = mutations::CREATE_QUERY_WITH_SINKS;
25const SQL_UPDATE_PIPE_ORIGIN_CONNECTOR: &str = mutations::UPDATE_PIPE_ORIGIN_CONNECTOR;
26const SQL_UPDATE_PIPE_ORIGIN_DSN: &str = mutations::UPDATE_PIPE_ORIGIN_DSN;
27const SQL_UPDATE_PIPE_ORIGIN_CREDENTIAL: &str = mutations::UPDATE_PIPE_ORIGIN_CREDENTIAL;
28const SQL_UPDATE_PIPE_TRINO_URL: &str = mutations::UPDATE_PIPE_TRINO_URL;
29const SQL_UPDATE_PIPE_ORIGIN_CONFIG: &str = mutations::UPDATE_PIPE_ORIGIN_CONFIG;
30const SQL_UPDATE_PIPE_TARGETS: &str = mutations::UPDATE_PIPE_TARGETS;
31const SQL_UPDATE_PIPE_SCHEDULE: &str = mutations::UPDATE_PIPE_SCHEDULE;
32const SQL_UPDATE_PIPE_DELTA: &str = mutations::UPDATE_PIPE_DELTA;
33const SQL_UPDATE_PIPE_RETRY: &str = mutations::UPDATE_PIPE_RETRY;
34const SQL_UPDATE_PIPE_RECIPE: &str = mutations::UPDATE_PIPE_RECIPE;
35const SQL_UPDATE_PIPE_RECIPE_NONE: &str = mutations::UPDATE_PIPE_RECIPE_NONE;
36const SQL_UPDATE_PIPE_PRESET: &str = mutations::UPDATE_PIPE_PRESET;
37const SQL_UPDATE_PIPE_FILTERS: &str = mutations::UPDATE_PIPE_FILTERS;
38const SQL_UPDATE_PIPE_TRANSFORMS: &str = mutations::UPDATE_PIPE_TRANSFORMS;
39const SQL_UPDATE_PIPE_LINKS: &str = mutations::UPDATE_PIPE_LINKS;
40const SQL_UPDATE_PIPE_ENABLED: &str = mutations::UPDATE_PIPE_ENABLED;
41
42#[utoipa::path(
43	post,
44	path = "/sinks",
45	request_body = CreateSinkRequest,
46	responses(
47		(status = 200, description = "Sink created", body = MutationResponse),
48		(status = 400, description = "Bad request", body = ErrorResponse)
49	)
50)]
51pub async fn create_sink(
52	State(state): State<Arc<ApiState>>,
53	Json(req): Json<CreateSinkRequest>,
54) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
55	let db = require_db(&state)?;
56
57	let config_json = if req.config.is_null() {
58		serde_json::json!({})
59	} else {
60		req.config
61	};
62
63	db.query(SQL_DELETE_SINK)
64		.bind(("name", req.name.clone()))
65		.await
66		.and_then(|response| response.check())
67		.map_err(db_err)?;
68
69	db.query(SQL_CREATE_SINK)
70		.bind(("name", req.name.clone()))
71		.bind(("sink_type", req.sink_type))
72		.bind(("config", config_json))
73		.await
74		.and_then(|response| response.check())
75		.map_err(db_err)?;
76
77	reload_config(&state).await?;
78
79	Ok(Json(MutationResponse {
80		ok: true,
81		message: format!("sink '{}' created", req.name),
82	}))
83}
84
85#[utoipa::path(
86	put,
87	path = "/sinks/{name}",
88	params(("name" = String, Path, description = "Sink name")),
89	request_body = UpdateSinkRequest,
90	responses(
91		(status = 200, description = "Sink updated", body = MutationResponse),
92		(status = 400, description = "Bad request", body = ErrorResponse)
93	)
94)]
95pub async fn update_sink(
96	State(state): State<Arc<ApiState>>,
97	Path(name): Path<String>,
98	Json(req): Json<UpdateSinkRequest>,
99) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
100	let db = require_db(&state)?;
101
102	if let Some(sink_type) = req.sink_type {
103		db.query(SQL_UPDATE_SINK_TYPE)
104			.bind(("name", name.clone()))
105			.bind(("sink_type", sink_type))
106			.await
107			.and_then(|response| response.check())
108			.map_err(db_err)?;
109	}
110
111	if let Some(enabled) = req.enabled {
112		db.query(SQL_UPDATE_SINK_ENABLED)
113			.bind(("name", name.clone()))
114			.bind(("enabled", enabled))
115			.await
116			.and_then(|response| response.check())
117			.map_err(db_err)?;
118	}
119
120	if let Some(config) = req.config {
121		db.query(SQL_UPDATE_SINK_CONFIG)
122			.bind(("name", name.clone()))
123			.bind(("config", config))
124			.await
125			.and_then(|response| response.check())
126			.map_err(db_err)?;
127	}
128
129	reload_config(&state).await?;
130
131	Ok(Json(MutationResponse {
132		ok: true,
133		message: format!("sink '{name}' updated"),
134	}))
135}
136
137#[utoipa::path(
138	delete,
139	path = "/sinks/{name}",
140	params(("name" = String, Path, description = "Sink name")),
141	responses(
142		(status = 200, description = "Sink deleted", body = MutationResponse),
143		(status = 400, description = "Bad request", body = ErrorResponse)
144	)
145)]
146pub async fn delete_sink(
147	State(state): State<Arc<ApiState>>,
148	Path(name): Path<String>,
149) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
150	let db = require_db(&state)?;
151
152	db.query(SQL_DELETE_SINK)
153		.bind(("name", name.clone()))
154		.await
155		.and_then(|response| response.check())
156		.map_err(db_err)?;
157
158	reload_config(&state).await?;
159
160	Ok(Json(MutationResponse {
161		ok: true,
162		message: format!("sink '{name}' deleted"),
163	}))
164}
165
166// ── Pipe CRUD ───────────────────────────────────────────────
167
168#[utoipa::path(
169	post,
170	path = "/pipes",
171	request_body = CreatePipeRequest,
172	responses(
173		(status = 200, description = "Pipe created", body = MutationResponse),
174		(status = 400, description = "Bad request", body = ErrorResponse)
175	)
176)]
177pub async fn create_pipe(
178	State(state): State<Arc<ApiState>>,
179	Json(req): Json<CreatePipeRequest>,
180) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
181	let db = require_db(&state)?;
182	let queries = req.queries;
183
184	let origin_config = if req.origin_config.is_null() {
185		serde_json::json!({})
186	} else {
187		req.origin_config
188	};
189	let schedule = if req.schedule.is_null() {
190		serde_json::json!({"interval_secs": 300, "missed_tick_policy": "skip"})
191	} else {
192		req.schedule
193	};
194	let delta = if req.delta.is_null() {
195		serde_json::json!({"diff_mode": "db", "fail_safe_threshold": 30.0})
196	} else {
197		req.delta
198	};
199	let retry = if req.retry.is_null() {
200		serde_json::json!({"max_retries": 3, "retry_base_delay_secs": 5})
201	} else {
202		req.retry
203	};
204
205	db.query(SQL_DELETE_PIPE)
206		.bind(("name", req.name.clone()))
207		.await
208		.and_then(|response| response.check())
209		.map_err(db_err)?;
210
211	db.query(SQL_CREATE_PIPE)
212		.bind(("name", req.name.clone()))
213		.bind(("origin_connector", req.origin_connector))
214		.bind(("origin_dsn", req.origin_dsn))
215		.bind(("origin_credential", req.origin_credential))
216		.bind(("trino_url", req.trino_url))
217		.bind(("origin_config", origin_config))
218		.bind(("targets", req.targets))
219		.bind(("schedule", schedule))
220		.bind(("delta", delta))
221		.bind(("retry", retry))
222		.bind(("recipe", req.recipe))
223		.bind(("filters", req.filters))
224		.bind(("transforms", req.transforms))
225		.bind(("links", req.links))
226		.await
227		.and_then(|response| response.check())
228		.map_err(db_err)?;
229
230	for query in &queries {
231		create_pipe_query_record(db, &req.name, query)
232			.await
233			.map_err(config_err)?;
234	}
235
236	reload_config(&state).await?;
237
238	Ok(Json(MutationResponse {
239		ok: true,
240		message: format!("pipe '{}' created", req.name),
241	}))
242}
243
244#[utoipa::path(
245	put,
246	path = "/pipes/{name}",
247	params(("name" = String, Path, description = "Pipe name")),
248	request_body = UpdatePipeRequest,
249	responses(
250		(status = 200, description = "Pipe updated", body = MutationResponse),
251		(status = 400, description = "Bad request", body = ErrorResponse)
252	)
253)]
254pub async fn update_pipe(
255	State(state): State<Arc<ApiState>>,
256	Path(name): Path<String>,
257	Json(req): Json<UpdatePipeRequest>,
258) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
259	let db = require_db(&state)?;
260
261	if let Some(connector) = req.origin_connector {
262		db.query(SQL_UPDATE_PIPE_ORIGIN_CONNECTOR)
263			.bind(("name", name.clone()))
264			.bind(("v", connector))
265			.await
266			.and_then(|response| response.check())
267			.map_err(db_err)?;
268	}
269
270	if let Some(dsn) = req.origin_dsn {
271		db.query(SQL_UPDATE_PIPE_ORIGIN_DSN)
272			.bind(("name", name.clone()))
273			.bind(("v", dsn))
274			.await
275			.and_then(|response| response.check())
276			.map_err(db_err)?;
277	}
278
279	if let Some(credential) = req.origin_credential {
280		db.query(SQL_UPDATE_PIPE_ORIGIN_CREDENTIAL)
281			.bind(("name", name.clone()))
282			.bind(("v", credential))
283			.await
284			.and_then(|response| response.check())
285			.map_err(db_err)?;
286	}
287
288	if let Some(trino_url) = req.trino_url {
289		db.query(SQL_UPDATE_PIPE_TRINO_URL)
290			.bind(("name", name.clone()))
291			.bind(("v", trino_url))
292			.await
293			.and_then(|response| response.check())
294			.map_err(db_err)?;
295	}
296
297	if let Some(config) = req.origin_config {
298		db.query(SQL_UPDATE_PIPE_ORIGIN_CONFIG)
299			.bind(("name", name.clone()))
300			.bind(("v", config))
301			.await
302			.and_then(|response| response.check())
303			.map_err(db_err)?;
304	}
305
306	if let Some(targets) = req.targets {
307		db.query(SQL_UPDATE_PIPE_TARGETS)
308			.bind(("name", name.clone()))
309			.bind(("v", targets))
310			.await
311			.and_then(|response| response.check())
312			.map_err(db_err)?;
313	}
314
315	if let Some(schedule) = req.schedule {
316		db.query(SQL_UPDATE_PIPE_SCHEDULE)
317			.bind(("name", name.clone()))
318			.bind(("v", schedule))
319			.await
320			.and_then(|response| response.check())
321			.map_err(db_err)?;
322	}
323
324	if let Some(delta) = req.delta {
325		db.query(SQL_UPDATE_PIPE_DELTA)
326			.bind(("name", name.clone()))
327			.bind(("v", delta))
328			.await
329			.and_then(|response| response.check())
330			.map_err(db_err)?;
331	}
332
333	if let Some(retry) = req.retry {
334		db.query(SQL_UPDATE_PIPE_RETRY)
335			.bind(("name", name.clone()))
336			.bind(("v", retry))
337			.await
338			.and_then(|response| response.check())
339			.map_err(db_err)?;
340	}
341
342	if let Some(recipe) = req.recipe {
343		match recipe {
344			Some(recipe) => {
345				db.query(SQL_UPDATE_PIPE_RECIPE)
346					.bind(("name", name.clone()))
347					.bind(("v", recipe))
348					.await
349					.and_then(|response| response.check())
350					.map_err(db_err)?;
351			}
352			None => {
353				db.query(SQL_UPDATE_PIPE_RECIPE_NONE)
354					.bind(("name", name.clone()))
355					.await
356					.and_then(|response| response.check())
357					.map_err(db_err)?;
358			}
359		}
360	}
361
362	if let Some(filters) = req.filters {
363		db.query(SQL_UPDATE_PIPE_FILTERS)
364			.bind(("name", name.clone()))
365			.bind(("v", filters))
366			.await
367			.and_then(|response| response.check())
368			.map_err(db_err)?;
369	}
370
371	if let Some(transforms) = req.transforms {
372		db.query(SQL_UPDATE_PIPE_TRANSFORMS)
373			.bind(("name", name.clone()))
374			.bind(("v", transforms))
375			.await
376			.and_then(|response| response.check())
377			.map_err(db_err)?;
378	}
379
380	if let Some(links) = req.links {
381		db.query(SQL_UPDATE_PIPE_LINKS)
382			.bind(("name", name.clone()))
383			.bind(("v", links))
384			.await
385			.and_then(|response| response.check())
386			.map_err(db_err)?;
387	}
388
389	if let Some(enabled) = req.enabled {
390		db.query(SQL_UPDATE_PIPE_ENABLED)
391			.bind(("name", name.clone()))
392			.bind(("v", enabled))
393			.await
394			.and_then(|response| response.check())
395			.map_err(db_err)?;
396	}
397
398	if let Some(queries) = req.queries {
399		db.query(SQL_DELETE_PIPE_QUERIES)
400			.bind(("name", name.clone()))
401			.await
402			.and_then(|response| response.check())
403			.map_err(db_err)?;
404
405		for query in &queries {
406			create_pipe_query_record(db, &name, query)
407				.await
408				.map_err(config_err)?;
409		}
410	}
411
412	reload_config(&state).await?;
413
414	Ok(Json(MutationResponse {
415		ok: true,
416		message: format!("pipe '{name}' updated"),
417	}))
418}
419
420#[utoipa::path(
421	delete,
422	path = "/pipes/{name}",
423	params(("name" = String, Path, description = "Pipe name")),
424	responses(
425		(status = 200, description = "Pipe deleted", body = MutationResponse),
426		(status = 400, description = "Bad request", body = ErrorResponse)
427	)
428)]
429pub async fn delete_pipe(
430	State(state): State<Arc<ApiState>>,
431	Path(name): Path<String>,
432) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
433	let db = require_db(&state)?;
434
435	db.query(SQL_DELETE_PIPE)
436		.bind(("name", name.clone()))
437		.await
438		.and_then(|response| response.check())
439		.map_err(db_err)?;
440
441	db.query(SQL_DELETE_PIPE_QUERIES)
442		.bind(("name", name.clone()))
443		.await
444		.and_then(|response| response.check())
445		.map_err(db_err)?;
446
447	reload_config(&state).await?;
448
449	Ok(Json(MutationResponse {
450		ok: true,
451		message: format!("pipe '{name}' deleted"),
452	}))
453}
454
455#[utoipa::path(
456	post,
457	path = "/pipe-presets",
458	request_body = CreatePipePresetRequest,
459	responses(
460		(status = 200, description = "Pipe preset created", body = MutationResponse),
461		(status = 400, description = "Bad request", body = ErrorResponse)
462	)
463)]
464pub async fn create_pipe_preset(
465	State(state): State<Arc<ApiState>>,
466	Json(req): Json<CreatePipePresetRequest>,
467) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
468	let db = require_db(&state)?;
469	let spec = storage_pipe_preset_spec_value(req.spec).map_err(json_serde_err)?;
470
471	db.query(SQL_DELETE_PIPE_PRESET)
472		.bind(("name", req.name.clone()))
473		.await
474		.and_then(|response| response.check())
475		.map_err(db_err)?;
476
477	db.query(SQL_CREATE_PIPE_PRESET)
478		.bind(("name", req.name.clone()))
479		.bind(("description", req.description.clone()))
480		.bind(("spec", spec))
481		.await
482		.and_then(|response| response.check())
483		.map_err(db_err)?;
484
485	refresh_read_cache(state.as_ref())
486		.await
487		.map_err(config_err)?;
488
489	Ok(Json(MutationResponse {
490		ok: true,
491		message: format!("pipe preset '{}' created", req.name),
492	}))
493}
494
495#[utoipa::path(
496	put,
497	path = "/pipe-presets/{name}",
498	params(("name" = String, Path, description = "Pipe preset name")),
499	request_body = UpdatePipePresetRequest,
500	responses(
501		(status = 200, description = "Pipe preset updated", body = MutationResponse),
502		(status = 400, description = "Bad request", body = ErrorResponse)
503	)
504)]
505pub async fn update_pipe_preset(
506	State(state): State<Arc<ApiState>>,
507	Path(name): Path<String>,
508	Json(req): Json<UpdatePipePresetRequest>,
509) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
510	let db = require_db(&state)?;
511	let current = state
512		.pipe_presets_info()
513		.into_iter()
514		.find(|preset| preset.name == name)
515		.ok_or_else(|| {
516			Json(ErrorResponse {
517				error: format!("pipe preset not found: {name}"),
518			})
519		})?;
520
521	let spec = match req.spec {
522		Some(spec) => storage_pipe_preset_spec_value(spec).map_err(json_serde_err)?,
523		None => storage_pipe_preset_spec_value(
524			serde_json::from_value(current.spec).map_err(json_serde_err)?,
525		)
526		.map_err(json_serde_err)?,
527	};
528
529	db.query(SQL_UPDATE_PIPE_PRESET)
530		.bind(("name", name.clone()))
531		.bind(("description", req.description))
532		.bind(("spec", spec))
533		.await
534		.and_then(|response| response.check())
535		.map_err(db_err)?;
536
537	refresh_read_cache(state.as_ref())
538		.await
539		.map_err(config_err)?;
540
541	Ok(Json(MutationResponse {
542		ok: true,
543		message: format!("pipe preset '{name}' updated"),
544	}))
545}
546
547#[utoipa::path(
548	delete,
549	path = "/pipe-presets/{name}",
550	params(("name" = String, Path, description = "Pipe preset name")),
551	responses(
552		(status = 200, description = "Pipe preset deleted", body = MutationResponse),
553		(status = 400, description = "Bad request", body = ErrorResponse)
554	)
555)]
556pub async fn delete_pipe_preset(
557	State(state): State<Arc<ApiState>>,
558	Path(name): Path<String>,
559) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
560	let db = require_db(&state)?;
561
562	db.query(SQL_DELETE_PIPE_PRESET)
563		.bind(("name", name.clone()))
564		.await
565		.and_then(|response| response.check())
566		.map_err(db_err)?;
567
568	refresh_read_cache(state.as_ref())
569		.await
570		.map_err(config_err)?;
571
572	Ok(Json(MutationResponse {
573		ok: true,
574		message: format!("pipe preset '{name}' deleted"),
575	}))
576}
577
578fn require_db(
579	state: &ApiState,
580) -> Result<&surrealdb::Surreal<surrealdb::engine::any::Any>, Json<ErrorResponse>> {
581	state.db_client.as_deref().ok_or_else(|| {
582		Json(ErrorResponse {
583			error: "database not configured".into(),
584		})
585	})
586}
587
588fn db_err(e: surrealdb::Error) -> Json<ErrorResponse> {
589	Json(ErrorResponse {
590		error: format!("db: {e}"),
591	})
592}
593
594fn config_err(e: oversync_core::error::OversyncError) -> Json<ErrorResponse> {
595	Json(ErrorResponse {
596		error: e.to_string(),
597	})
598}
599
600fn json_serde_err(e: serde_json::Error) -> Json<ErrorResponse> {
601	Json(ErrorResponse {
602		error: format!("json: {e}"),
603	})
604}
605
606fn storage_pipe_preset_spec_value(
607	spec: PipePresetSpecInput,
608) -> Result<serde_json::Value, serde_json::Error> {
609	let origin_config = if spec.origin_config.is_null() {
610		serde_json::json!({})
611	} else {
612		spec.origin_config
613	};
614	let schedule = if spec.schedule.is_null() {
615		serde_json::json!({})
616	} else {
617		spec.schedule
618	};
619	let delta = if spec.delta.is_null() {
620		serde_json::json!({})
621	} else {
622		spec.delta
623	};
624	let retry = if spec.retry.is_null() {
625		serde_json::json!({})
626	} else {
627		spec.retry
628	};
629
630	serde_json::to_value(serde_json::json!({
631		"origin": {
632			"connector": spec.origin_connector,
633			"dsn": spec.origin_dsn,
634			"credential": spec.origin_credential,
635			"trino_url": spec.trino_url,
636			"config": origin_config,
637		},
638		"parameters": spec.parameters,
639		"targets": spec.targets,
640		"queries": spec.queries,
641		"schedule": schedule,
642		"delta": delta,
643		"retry": retry,
644		"recipe": spec.recipe,
645		"filters": spec.filters,
646		"transforms": spec.transforms,
647		"links": spec.links,
648	}))
649}
650
651fn api_pipe_preset_spec_value(spec: &serde_json::Value) -> Option<serde_json::Value> {
652	if spec.get("origin_connector").is_some() {
653		return Some(serde_json::json!({
654			"origin_connector": spec.get("origin_connector").cloned().unwrap_or(serde_json::Value::Null),
655			"origin_dsn": spec.get("origin_dsn").cloned().unwrap_or(serde_json::Value::Null),
656			"origin_credential": spec.get("origin_credential").cloned().unwrap_or(serde_json::Value::Null),
657			"trino_url": spec.get("trino_url").cloned().unwrap_or(serde_json::Value::Null),
658			"origin_config": spec.get("origin_config").cloned().unwrap_or_else(|| serde_json::json!({})),
659			"parameters": spec.get("parameters").cloned().unwrap_or_else(|| serde_json::json!([])),
660			"targets": spec.get("targets").cloned().unwrap_or_else(|| serde_json::json!([])),
661			"queries": spec.get("queries").cloned().unwrap_or_else(|| serde_json::json!([])),
662			"schedule": spec.get("schedule").cloned().unwrap_or_else(|| serde_json::json!({})),
663			"delta": spec.get("delta").cloned().unwrap_or_else(|| serde_json::json!({})),
664			"retry": spec.get("retry").cloned().unwrap_or_else(|| serde_json::json!({})),
665			"recipe": spec.get("recipe").cloned().unwrap_or(serde_json::Value::Null),
666			"filters": spec.get("filters").cloned().unwrap_or_else(|| serde_json::json!([])),
667			"transforms": spec.get("transforms").cloned().unwrap_or_else(|| serde_json::json!([])),
668			"links": spec.get("links").cloned().unwrap_or_else(|| serde_json::json!([])),
669		}));
670	}
671
672	let origin = spec.get("origin")?;
673	Some(serde_json::json!({
674		"origin_connector": origin.get("connector").cloned().unwrap_or(serde_json::Value::Null),
675		"origin_dsn": origin.get("dsn").cloned().unwrap_or(serde_json::Value::Null),
676		"origin_credential": origin.get("credential").cloned().unwrap_or(serde_json::Value::Null),
677		"trino_url": origin.get("trino_url").cloned().unwrap_or(serde_json::Value::Null),
678		"origin_config": match origin.get("config") {
679			Some(value) if !value.is_null() => value.clone(),
680			_ => serde_json::json!({}),
681		},
682		"parameters": spec.get("parameters").cloned().unwrap_or_else(|| serde_json::json!([])),
683		"targets": spec.get("targets").cloned().unwrap_or_else(|| serde_json::json!([])),
684		"queries": spec.get("queries").cloned().unwrap_or_else(|| serde_json::json!([])),
685		"schedule": match spec.get("schedule") {
686			Some(value) if !value.is_null() => value.clone(),
687			_ => serde_json::json!({}),
688		},
689		"delta": match spec.get("delta") {
690			Some(value) if !value.is_null() => value.clone(),
691			_ => serde_json::json!({}),
692		},
693		"retry": match spec.get("retry") {
694			Some(value) if !value.is_null() => value.clone(),
695			_ => serde_json::json!({}),
696		},
697		"recipe": spec.get("recipe").cloned().unwrap_or(serde_json::Value::Null),
698		"filters": spec.get("filters").cloned().unwrap_or_else(|| serde_json::json!([])),
699		"transforms": spec.get("transforms").cloned().unwrap_or_else(|| serde_json::json!([])),
700		"links": spec.get("links").cloned().unwrap_or_else(|| serde_json::json!([])),
701	}))
702}
703
704async fn create_pipe_query_record(
705	db: &surrealdb::Surreal<surrealdb::engine::any::Any>,
706	origin_id: &str,
707	query: &PipeQueryInput,
708) -> Result<(), oversync_core::error::OversyncError> {
709	let sql = if query.sinks.is_some() {
710		SQL_CREATE_PIPE_QUERY_WITH_SINKS
711	} else {
712		SQL_CREATE_PIPE_QUERY
713	};
714
715	db.query(sql)
716		.bind(("source", origin_id.to_string()))
717		.bind(("name", query.id.clone()))
718		.bind(("query", query.sql.clone()))
719		.bind(("key_column", query.key_column.clone()))
720		.bind(("sinks", query.sinks.clone()))
721		.bind(("transform", query.transform.clone()))
722		.await
723		.and_then(|response| response.check())
724		.map_err(|e| {
725			oversync_core::error::OversyncError::SurrealDb(format!(
726				"create query '{}' for pipe '{}': {e}",
727				query.id, origin_id
728			))
729		})?;
730
731	Ok(())
732}
733
734pub(crate) async fn reload_config(state: &ApiState) -> Result<(), Json<ErrorResponse>> {
735	if let (Some(lifecycle), Some(db)) = (&state.lifecycle, &state.db_client) {
736		lifecycle.restart_with_config_json(db).await.map_err(|e| {
737			Json(ErrorResponse {
738				error: format!("reload: {e}"),
739			})
740		})?;
741	}
742	refresh_read_cache(state).await.map_err(|e| {
743		Json(ErrorResponse {
744			error: format!("refresh cache: {e}"),
745		})
746	})?;
747	Ok(())
748}
749
750pub async fn refresh_read_cache(
751	state: &ApiState,
752) -> Result<(), oversync_core::error::OversyncError> {
753	let Some(db) = &state.db_client else {
754		return Ok(());
755	};
756
757	const SQL_LOAD_QUERIES: &str = oversync_queries::config::LOAD_QUERIES;
758	const SQL_READ_SINKS_CACHE: &str = oversync_queries::config::READ_SINKS_CACHE;
759	const SQL_READ_PIPES_CACHE: &str = oversync_queries::config::READ_PIPES_CACHE;
760	const SQL_READ_PIPE_PRESETS_CACHE: &str = oversync_queries::config::READ_PIPE_PRESETS_CACHE;
761
762	let query_rows =
763		read_cache_rows_or_empty(db, SQL_LOAD_QUERIES, "refresh queries cache").await?;
764	let mut query_counts_by_origin: std::collections::HashMap<String, usize> =
765		std::collections::HashMap::new();
766	for row in &query_rows {
767		let Some(origin_id) = row.get("origin_id").and_then(|v| v.as_str()) else {
768			continue;
769		};
770		*query_counts_by_origin
771			.entry(origin_id.to_string())
772			.or_default() += 1;
773	}
774
775	let sink_rows =
776		read_cache_rows_or_empty(db, SQL_READ_SINKS_CACHE, "refresh sinks cache").await?;
777	let mut sink_configs: Vec<crate::state::SinkConfig> = sink_rows
778		.iter()
779		.filter_map(|r| {
780			Some(crate::state::SinkConfig {
781				name: r.get("name")?.as_str()?.to_string(),
782				sink_type: r.get("sink_type")?.as_str()?.to_string(),
783				config: r.get("config").cloned(),
784			})
785		})
786		.collect();
787
788	let pipe_rows =
789		read_cache_rows_or_empty(db, SQL_READ_PIPES_CACHE, "refresh pipes cache").await?;
790	let mut pipe_configs: Vec<crate::state::PipeConfigCache> = pipe_rows
791		.iter()
792		.filter_map(|r| {
793			let name = r.get("name")?.as_str()?.to_string();
794			Some(crate::state::PipeConfigCache {
795				name: name.clone(),
796				origin_connector: r.get("origin_connector")?.as_str()?.to_string(),
797				origin_dsn: r.get("origin_dsn")?.as_str()?.to_string(),
798				targets: r
799					.get("targets")
800					.and_then(|v| v.as_array())
801					.map(|arr| {
802						arr.iter()
803							.filter_map(|v| v.as_str().map(String::from))
804							.collect()
805					})
806					.unwrap_or_default(),
807				interval_secs: r
808					.get("interval_secs")
809					.and_then(|v| v.as_u64())
810					.unwrap_or(300),
811				query_count: *query_counts_by_origin.get(name.as_str()).unwrap_or(&0),
812				recipe: r.get("recipe").cloned().filter(|v| !v.is_null()),
813				enabled: r.get("enabled").and_then(|v| v.as_bool()).unwrap_or(true),
814			})
815		})
816		.collect();
817
818	if let Some(lifecycle) = &state.lifecycle {
819		match lifecycle.runtime_cache_snapshot().await {
820			Ok(runtime) => merge_runtime_cache(&mut sink_configs, &mut pipe_configs, runtime),
821			Err(error) => {
822				warn!(error = %error, "failed to load runtime cache snapshot for API read model");
823			}
824		}
825	}
826
827	*state.sinks.write().await = sink_configs;
828	*state.pipes.write().await = pipe_configs;
829
830	let pipe_preset_rows = read_cache_rows_or_empty(
831		db,
832		SQL_READ_PIPE_PRESETS_CACHE,
833		"refresh pipe presets cache",
834	)
835	.await?;
836	let pipe_preset_configs: Vec<crate::state::PipePresetCache> = pipe_preset_rows
837		.iter()
838		.filter_map(|r| {
839			Some(crate::state::PipePresetCache {
840				name: r.get("name")?.as_str()?.to_string(),
841				description: r
842					.get("description")
843					.and_then(|v| v.as_str())
844					.map(String::from),
845				spec: api_pipe_preset_spec_value(r.get("spec")?)?,
846			})
847		})
848		.collect();
849	*state.pipe_presets.write().await = pipe_preset_configs;
850
851	Ok(())
852}
853
854fn merge_runtime_cache(
855	sinks: &mut Vec<crate::state::SinkConfig>,
856	pipes: &mut Vec<crate::state::PipeConfigCache>,
857	runtime: crate::state::RuntimeCacheSnapshot,
858) {
859	for runtime_sink in runtime.sinks {
860		match sinks.iter_mut().find(|sink| sink.name == runtime_sink.name) {
861			Some(existing) => {
862				if existing.config.is_none() && runtime_sink.config.is_some() {
863					existing.config = runtime_sink.config;
864				}
865				if existing.sink_type.is_empty() {
866					existing.sink_type = runtime_sink.sink_type;
867				}
868			}
869			None => sinks.push(runtime_sink),
870		}
871	}
872
873	for runtime_pipe in runtime.pipes {
874		match pipes.iter_mut().find(|pipe| pipe.name == runtime_pipe.name) {
875			Some(existing) => {
876				if existing.query_count == 0 && runtime_pipe.query_count > 0 {
877					existing.query_count = runtime_pipe.query_count;
878				}
879				if existing.recipe.is_none() && runtime_pipe.recipe.is_some() {
880					existing.recipe = runtime_pipe.recipe;
881				}
882				if existing.targets.is_empty() && !runtime_pipe.targets.is_empty() {
883					existing.targets = runtime_pipe.targets;
884				}
885				if existing.origin_connector.is_empty() {
886					existing.origin_connector = runtime_pipe.origin_connector;
887				}
888				if existing.origin_dsn.is_empty() {
889					existing.origin_dsn = runtime_pipe.origin_dsn;
890				}
891			}
892			None => pipes.push(runtime_pipe),
893		}
894	}
895}
896
897fn is_missing_table_error(error: &dyn std::fmt::Display) -> bool {
898	let message = error.to_string();
899	message.contains("does not exist") && message.contains("table")
900}
901
902async fn read_cache_rows_or_empty(
903	db: &surrealdb::Surreal<surrealdb::engine::any::Any>,
904	sql: &str,
905	context: &str,
906) -> Result<Vec<serde_json::Value>, oversync_core::error::OversyncError> {
907	let mut response = match db.query(sql).await {
908		Ok(response) => response,
909		Err(error) if is_missing_table_error(&error) => return Ok(Vec::new()),
910		Err(error) => {
911			return Err(oversync_core::error::OversyncError::SurrealDb(format!(
912				"{context}: {error}"
913			)));
914		}
915	};
916
917	match response.take(0) {
918		Ok(rows) => Ok(rows),
919		Err(error) if is_missing_table_error(&error) => Ok(Vec::new()),
920		Err(error) => Err(oversync_core::error::OversyncError::SurrealDb(format!(
921			"{context} take: {error}"
922		))),
923	}
924}
925
926#[cfg(test)]
927mod tests {
928	use super::{api_pipe_preset_spec_value, is_missing_table_error};
929
930	#[test]
931	fn api_pipe_preset_spec_value_accepts_legacy_flat_shape() {
932		let flat = serde_json::json!({
933			"origin_connector": "postgres",
934			"origin_dsn": "postgres://localhost/db",
935			"origin_credential": null,
936			"trino_url": null,
937			"origin_config": { "sslmode": "require" },
938			"targets": ["stdout"],
939			"queries": [{
940				"id": "aspect-columns",
941				"sql": "SELECT id, payload FROM columns",
942				"key_column": "id"
943			}],
944			"schedule": { "interval_secs": 900, "missed_tick_policy": "skip" },
945			"delta": { "diff_mode": "db", "fail_safe_threshold": 30 },
946			"retry": { "max_retries": 3, "retry_base_delay_secs": 5 },
947			"recipe": null,
948			"filters": [],
949			"transforms": [],
950			"links": []
951		});
952
953		let api = api_pipe_preset_spec_value(&flat).expect("legacy flat preset should map");
954		assert_eq!(api["origin_connector"], "postgres");
955		assert_eq!(api["origin_config"]["sslmode"], "require");
956		assert_eq!(api["queries"][0]["id"], "aspect-columns");
957	}
958
959	#[test]
960	fn missing_table_error_detection_matches_surreal_message_shape() {
961		assert!(is_missing_table_error(
962			&"The table 'pipe_config' does not exist"
963		));
964		assert!(!is_missing_table_error(&"permission denied"));
965	}
966}