Skip to main content

oversync_api/
mutations.rs

1use std::sync::Arc;
2
3use axum::Json;
4use axum::extract::{Path, State};
5
6use crate::state::ApiState;
7use crate::types::*;
8
9use oversync_queries::mutations;
10
11const SQL_DELETE_SOURCE: &str = mutations::DELETE_SOURCE;
12const SQL_CREATE_SOURCE: &str = mutations::CREATE_SOURCE;
13const SQL_UPDATE_SOURCE_CONNECTOR: &str = mutations::UPDATE_SOURCE_CONNECTOR;
14const SQL_UPDATE_SOURCE_ENABLED: &str = mutations::UPDATE_SOURCE_ENABLED;
15const SQL_UPDATE_SOURCE_CONFIG: &str = mutations::UPDATE_SOURCE_CONFIG;
16const SQL_DELETE_SOURCE_QUERIES: &str = mutations::DELETE_SOURCE_QUERIES;
17
18const SQL_DELETE_SINK: &str = mutations::DELETE_SINK;
19const SQL_CREATE_SINK: &str = mutations::CREATE_SINK;
20const SQL_UPDATE_SINK_TYPE: &str = mutations::UPDATE_SINK_TYPE;
21const SQL_UPDATE_SINK_ENABLED: &str = mutations::UPDATE_SINK_ENABLED;
22const SQL_UPDATE_SINK_CONFIG: &str = mutations::UPDATE_SINK_CONFIG;
23
24const SQL_DELETE_PIPE: &str = mutations::DELETE_PIPE;
25const SQL_CREATE_PIPE: &str = mutations::CREATE_PIPE;
26const SQL_DELETE_PIPE_QUERIES: &str = mutations::DELETE_PIPE_QUERIES;
27const SQL_UPDATE_PIPE_ORIGIN_CONNECTOR: &str = mutations::UPDATE_PIPE_ORIGIN_CONNECTOR;
28const SQL_UPDATE_PIPE_ORIGIN_DSN: &str = mutations::UPDATE_PIPE_ORIGIN_DSN;
29const SQL_UPDATE_PIPE_ORIGIN_CONFIG: &str = mutations::UPDATE_PIPE_ORIGIN_CONFIG;
30const SQL_UPDATE_PIPE_TARGETS: &str = mutations::UPDATE_PIPE_TARGETS;
31const SQL_UPDATE_PIPE_SCHEDULE: &str = mutations::UPDATE_PIPE_SCHEDULE;
32const SQL_UPDATE_PIPE_DELTA: &str = mutations::UPDATE_PIPE_DELTA;
33const SQL_UPDATE_PIPE_RETRY: &str = mutations::UPDATE_PIPE_RETRY;
34const SQL_UPDATE_PIPE_ENABLED: &str = mutations::UPDATE_PIPE_ENABLED;
35
36#[utoipa::path(
37	post,
38	path = "/sources",
39	request_body = CreateSourceRequest,
40	responses(
41		(status = 200, description = "Source created", body = MutationResponse),
42		(status = 400, description = "Bad request", body = ErrorResponse)
43	)
44)]
45pub async fn create_source(
46	State(state): State<Arc<ApiState>>,
47	Json(req): Json<CreateSourceRequest>,
48) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
49	let db = require_db(&state)?;
50
51	let config_json = if req.config.is_null() {
52		serde_json::json!({})
53	} else {
54		req.config
55	};
56
57	db.query(SQL_DELETE_SOURCE)
58		.bind(("name", req.name.clone()))
59		.await
60		.map_err(db_err)?;
61
62	db.query(SQL_CREATE_SOURCE)
63		.bind(("name", req.name.clone()))
64		.bind(("connector", req.connector))
65		.bind(("config", config_json))
66		.await
67		.map_err(db_err)?;
68
69	reload_config(&state).await?;
70
71	Ok(Json(MutationResponse {
72		ok: true,
73		message: format!("source '{}' created", req.name),
74	}))
75}
76
77#[utoipa::path(
78	put,
79	path = "/sources/{name}",
80	params(("name" = String, Path, description = "Source name")),
81	request_body = UpdateSourceRequest,
82	responses(
83		(status = 200, description = "Source updated", body = MutationResponse),
84		(status = 400, description = "Bad request", body = ErrorResponse)
85	)
86)]
87pub async fn update_source(
88	State(state): State<Arc<ApiState>>,
89	Path(name): Path<String>,
90	Json(req): Json<UpdateSourceRequest>,
91) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
92	let db = require_db(&state)?;
93
94	if let Some(connector) = req.connector {
95		db.query(SQL_UPDATE_SOURCE_CONNECTOR)
96			.bind(("name", name.clone()))
97			.bind(("connector", connector))
98			.await
99			.map_err(db_err)?;
100	}
101
102	if let Some(enabled) = req.enabled {
103		db.query(SQL_UPDATE_SOURCE_ENABLED)
104			.bind(("name", name.clone()))
105			.bind(("enabled", enabled))
106			.await
107			.map_err(db_err)?;
108	}
109
110	if let Some(config) = req.config {
111		db.query(SQL_UPDATE_SOURCE_CONFIG)
112			.bind(("name", name.clone()))
113			.bind(("config", config))
114			.await
115			.map_err(db_err)?;
116	}
117
118	reload_config(&state).await?;
119
120	Ok(Json(MutationResponse {
121		ok: true,
122		message: format!("source '{name}' updated"),
123	}))
124}
125
126#[utoipa::path(
127	delete,
128	path = "/sources/{name}",
129	params(("name" = String, Path, description = "Source name")),
130	responses(
131		(status = 200, description = "Source deleted", body = MutationResponse),
132		(status = 400, description = "Bad request", body = ErrorResponse)
133	)
134)]
135pub async fn delete_source(
136	State(state): State<Arc<ApiState>>,
137	Path(name): Path<String>,
138) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
139	let db = require_db(&state)?;
140
141	db.query(SQL_DELETE_SOURCE)
142		.bind(("name", name.clone()))
143		.await
144		.map_err(db_err)?;
145
146	db.query(SQL_DELETE_SOURCE_QUERIES)
147		.bind(("name", name.clone()))
148		.await
149		.map_err(db_err)?;
150
151	reload_config(&state).await?;
152
153	Ok(Json(MutationResponse {
154		ok: true,
155		message: format!("source '{name}' deleted"),
156	}))
157}
158
159#[utoipa::path(
160	post,
161	path = "/sinks",
162	request_body = CreateSinkRequest,
163	responses(
164		(status = 200, description = "Sink created", body = MutationResponse),
165		(status = 400, description = "Bad request", body = ErrorResponse)
166	)
167)]
168pub async fn create_sink(
169	State(state): State<Arc<ApiState>>,
170	Json(req): Json<CreateSinkRequest>,
171) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
172	let db = require_db(&state)?;
173
174	let config_json = if req.config.is_null() {
175		serde_json::json!({})
176	} else {
177		req.config
178	};
179
180	db.query(SQL_DELETE_SINK)
181		.bind(("name", req.name.clone()))
182		.await
183		.map_err(db_err)?;
184
185	db.query(SQL_CREATE_SINK)
186		.bind(("name", req.name.clone()))
187		.bind(("sink_type", req.sink_type))
188		.bind(("config", config_json))
189		.await
190		.map_err(db_err)?;
191
192	reload_config(&state).await?;
193
194	Ok(Json(MutationResponse {
195		ok: true,
196		message: format!("sink '{}' created", req.name),
197	}))
198}
199
200#[utoipa::path(
201	put,
202	path = "/sinks/{name}",
203	params(("name" = String, Path, description = "Sink name")),
204	request_body = UpdateSinkRequest,
205	responses(
206		(status = 200, description = "Sink updated", body = MutationResponse),
207		(status = 400, description = "Bad request", body = ErrorResponse)
208	)
209)]
210pub async fn update_sink(
211	State(state): State<Arc<ApiState>>,
212	Path(name): Path<String>,
213	Json(req): Json<UpdateSinkRequest>,
214) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
215	let db = require_db(&state)?;
216
217	if let Some(sink_type) = req.sink_type {
218		db.query(SQL_UPDATE_SINK_TYPE)
219			.bind(("name", name.clone()))
220			.bind(("sink_type", sink_type))
221			.await
222			.map_err(db_err)?;
223	}
224
225	if let Some(enabled) = req.enabled {
226		db.query(SQL_UPDATE_SINK_ENABLED)
227			.bind(("name", name.clone()))
228			.bind(("enabled", enabled))
229			.await
230			.map_err(db_err)?;
231	}
232
233	if let Some(config) = req.config {
234		db.query(SQL_UPDATE_SINK_CONFIG)
235			.bind(("name", name.clone()))
236			.bind(("config", config))
237			.await
238			.map_err(db_err)?;
239	}
240
241	reload_config(&state).await?;
242
243	Ok(Json(MutationResponse {
244		ok: true,
245		message: format!("sink '{name}' updated"),
246	}))
247}
248
249#[utoipa::path(
250	delete,
251	path = "/sinks/{name}",
252	params(("name" = String, Path, description = "Sink name")),
253	responses(
254		(status = 200, description = "Sink deleted", body = MutationResponse),
255		(status = 400, description = "Bad request", body = ErrorResponse)
256	)
257)]
258pub async fn delete_sink(
259	State(state): State<Arc<ApiState>>,
260	Path(name): Path<String>,
261) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
262	let db = require_db(&state)?;
263
264	db.query(SQL_DELETE_SINK)
265		.bind(("name", name.clone()))
266		.await
267		.map_err(db_err)?;
268
269	reload_config(&state).await?;
270
271	Ok(Json(MutationResponse {
272		ok: true,
273		message: format!("sink '{name}' deleted"),
274	}))
275}
276
277// ── Pipe CRUD ───────────────────────────────────────────────
278
279#[utoipa::path(
280	post,
281	path = "/pipes",
282	request_body = CreatePipeRequest,
283	responses(
284		(status = 200, description = "Pipe created", body = MutationResponse),
285		(status = 400, description = "Bad request", body = ErrorResponse)
286	)
287)]
288pub async fn create_pipe(
289	State(state): State<Arc<ApiState>>,
290	Json(req): Json<CreatePipeRequest>,
291) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
292	let db = require_db(&state)?;
293
294	let origin_config = if req.origin_config.is_null() {
295		serde_json::json!({})
296	} else {
297		req.origin_config
298	};
299	let schedule = if req.schedule.is_null() {
300		serde_json::json!({"interval_secs": 300, "missed_tick_policy": "skip"})
301	} else {
302		req.schedule
303	};
304	let delta = if req.delta.is_null() {
305		serde_json::json!({"diff_mode": "db", "fail_safe_threshold": 30.0})
306	} else {
307		req.delta
308	};
309	let retry = if req.retry.is_null() {
310		serde_json::json!({"max_retries": 3, "retry_base_delay_secs": 5})
311	} else {
312		req.retry
313	};
314
315	db.query(SQL_DELETE_PIPE)
316		.bind(("name", req.name.clone()))
317		.await
318		.map_err(db_err)?;
319
320	db.query(SQL_CREATE_PIPE)
321		.bind(("name", req.name.clone()))
322		.bind(("origin_connector", req.origin_connector))
323		.bind(("origin_dsn", req.origin_dsn))
324		.bind(("origin_config", origin_config))
325		.bind(("targets", req.targets))
326		.bind(("schedule", schedule))
327		.bind(("delta", delta))
328		.bind(("retry", retry))
329		.await
330		.map_err(db_err)?;
331
332	reload_config(&state).await?;
333
334	Ok(Json(MutationResponse {
335		ok: true,
336		message: format!("pipe '{}' created", req.name),
337	}))
338}
339
340#[utoipa::path(
341	put,
342	path = "/pipes/{name}",
343	params(("name" = String, Path, description = "Pipe name")),
344	request_body = UpdatePipeRequest,
345	responses(
346		(status = 200, description = "Pipe updated", body = MutationResponse),
347		(status = 400, description = "Bad request", body = ErrorResponse)
348	)
349)]
350pub async fn update_pipe(
351	State(state): State<Arc<ApiState>>,
352	Path(name): Path<String>,
353	Json(req): Json<UpdatePipeRequest>,
354) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
355	let db = require_db(&state)?;
356
357	if let Some(connector) = req.origin_connector {
358		db.query(SQL_UPDATE_PIPE_ORIGIN_CONNECTOR)
359			.bind(("name", name.clone()))
360			.bind(("v", connector))
361			.await
362			.map_err(db_err)?;
363	}
364
365	if let Some(dsn) = req.origin_dsn {
366		db.query(SQL_UPDATE_PIPE_ORIGIN_DSN)
367			.bind(("name", name.clone()))
368			.bind(("v", dsn))
369			.await
370			.map_err(db_err)?;
371	}
372
373	if let Some(config) = req.origin_config {
374		db.query(SQL_UPDATE_PIPE_ORIGIN_CONFIG)
375			.bind(("name", name.clone()))
376			.bind(("v", config))
377			.await
378			.map_err(db_err)?;
379	}
380
381	if let Some(targets) = req.targets {
382		db.query(SQL_UPDATE_PIPE_TARGETS)
383			.bind(("name", name.clone()))
384			.bind(("v", targets))
385			.await
386			.map_err(db_err)?;
387	}
388
389	if let Some(schedule) = req.schedule {
390		db.query(SQL_UPDATE_PIPE_SCHEDULE)
391			.bind(("name", name.clone()))
392			.bind(("v", schedule))
393			.await
394			.map_err(db_err)?;
395	}
396
397	if let Some(delta) = req.delta {
398		db.query(SQL_UPDATE_PIPE_DELTA)
399			.bind(("name", name.clone()))
400			.bind(("v", delta))
401			.await
402			.map_err(db_err)?;
403	}
404
405	if let Some(retry) = req.retry {
406		db.query(SQL_UPDATE_PIPE_RETRY)
407			.bind(("name", name.clone()))
408			.bind(("v", retry))
409			.await
410			.map_err(db_err)?;
411	}
412
413	if let Some(enabled) = req.enabled {
414		db.query(SQL_UPDATE_PIPE_ENABLED)
415			.bind(("name", name.clone()))
416			.bind(("v", enabled))
417			.await
418			.map_err(db_err)?;
419	}
420
421	reload_config(&state).await?;
422
423	Ok(Json(MutationResponse {
424		ok: true,
425		message: format!("pipe '{name}' updated"),
426	}))
427}
428
429#[utoipa::path(
430	delete,
431	path = "/pipes/{name}",
432	params(("name" = String, Path, description = "Pipe name")),
433	responses(
434		(status = 200, description = "Pipe deleted", body = MutationResponse),
435		(status = 400, description = "Bad request", body = ErrorResponse)
436	)
437)]
438pub async fn delete_pipe(
439	State(state): State<Arc<ApiState>>,
440	Path(name): Path<String>,
441) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
442	let db = require_db(&state)?;
443
444	db.query(SQL_DELETE_PIPE)
445		.bind(("name", name.clone()))
446		.await
447		.map_err(db_err)?;
448
449	db.query(SQL_DELETE_PIPE_QUERIES)
450		.bind(("name", name.clone()))
451		.await
452		.map_err(db_err)?;
453
454	reload_config(&state).await?;
455
456	Ok(Json(MutationResponse {
457		ok: true,
458		message: format!("pipe '{name}' deleted"),
459	}))
460}
461
462fn require_db(
463	state: &ApiState,
464) -> Result<&surrealdb::Surreal<surrealdb::engine::any::Any>, Json<ErrorResponse>> {
465	state.db_client.as_deref().ok_or_else(|| {
466		Json(ErrorResponse {
467			error: "database not configured".into(),
468		})
469	})
470}
471
472fn db_err(e: surrealdb::Error) -> Json<ErrorResponse> {
473	Json(ErrorResponse {
474		error: format!("db: {e}"),
475	})
476}
477
478pub(crate) async fn reload_config(state: &ApiState) -> Result<(), Json<ErrorResponse>> {
479	if let (Some(lifecycle), Some(db)) = (&state.lifecycle, &state.db_client) {
480		lifecycle.restart_with_config_json(db).await.map_err(|e| {
481			Json(ErrorResponse {
482				error: format!("reload: {e}"),
483			})
484		})?;
485	}
486	refresh_read_cache(state).await;
487	Ok(())
488}
489
490pub async fn refresh_read_cache(state: &ApiState) {
491	let Some(db) = &state.db_client else { return };
492
493	const SQL_READ_SOURCES_CACHE: &str = oversync_queries::config::READ_SOURCES_CACHE;
494	const SQL_READ_SINKS_CACHE: &str = oversync_queries::config::READ_SINKS_CACHE;
495	const SQL_READ_PIPES_CACHE: &str = oversync_queries::config::READ_PIPES_CACHE;
496
497	if let Ok(mut resp) = db.query(SQL_READ_SOURCES_CACHE).await
498		&& let Ok(rows) = resp.take::<Vec<serde_json::Value>>(0)
499	{
500		let configs: Vec<crate::state::SourceConfig> = rows
501			.iter()
502			.filter_map(|r| {
503				Some(crate::state::SourceConfig {
504					name: r.get("name")?.as_str()?.to_string(),
505					connector: r.get("connector")?.as_str()?.to_string(),
506					interval_secs: r
507						.get("interval_secs")
508						.and_then(|v| v.as_u64())
509						.unwrap_or(300),
510					queries: vec![],
511				})
512			})
513			.collect();
514		*state.sources.write().await = configs;
515	}
516
517	if let Ok(mut resp) = db.query(SQL_READ_SINKS_CACHE).await
518		&& let Ok(rows) = resp.take::<Vec<serde_json::Value>>(0)
519	{
520		let configs: Vec<crate::state::SinkConfig> = rows
521			.iter()
522			.filter_map(|r| {
523				Some(crate::state::SinkConfig {
524					name: r.get("name")?.as_str()?.to_string(),
525					sink_type: r.get("sink_type")?.as_str()?.to_string(),
526					config: r.get("config").cloned(),
527				})
528			})
529			.collect();
530		*state.sinks.write().await = configs;
531	}
532
533	if let Ok(mut resp) = db.query(SQL_READ_PIPES_CACHE).await
534		&& let Ok(rows) = resp.take::<Vec<serde_json::Value>>(0)
535	{
536		let configs: Vec<crate::state::PipeConfigCache> = rows
537			.iter()
538			.filter_map(|r| {
539				Some(crate::state::PipeConfigCache {
540					name: r.get("name")?.as_str()?.to_string(),
541					origin_connector: r.get("origin_connector")?.as_str()?.to_string(),
542					origin_dsn: r.get("origin_dsn")?.as_str()?.to_string(),
543					targets: r
544						.get("targets")
545						.and_then(|v| v.as_array())
546						.map(|arr| {
547							arr.iter()
548								.filter_map(|v| v.as_str().map(String::from))
549								.collect()
550						})
551						.unwrap_or_default(),
552					interval_secs: r
553						.get("interval_secs")
554						.and_then(|v| v.as_u64())
555						.unwrap_or(300),
556					enabled: r.get("enabled").and_then(|v| v.as_bool()).unwrap_or(true),
557				})
558			})
559			.collect();
560		*state.pipes.write().await = configs;
561	}
562}