Skip to main content

oversync_api/
queries.rs

1use std::sync::Arc;
2
3use axum::Json;
4use axum::extract::{Path, State};
5
6use crate::state::ApiState;
7use crate::types::*;
8
9use oversync_queries::{mutations, query_config};
10
11const SQL_LIST_QUERIES: &str = query_config::LIST_BY_SOURCE;
12const SQL_DELETE_QUERY: &str = query_config::DELETE_ONE;
13const SQL_CREATE_QUERY: &str = mutations::CREATE_QUERY;
14const SQL_CREATE_QUERY_WITH_SINKS: &str = mutations::CREATE_QUERY_WITH_SINKS;
15const SQL_UPDATE_QUERY_SQL: &str = query_config::UPDATE_QUERY;
16const SQL_UPDATE_QUERY_KEY: &str = query_config::UPDATE_KEY_COLUMN;
17const SQL_UPDATE_QUERY_SINKS: &str = query_config::UPDATE_SINKS;
18const SQL_UPDATE_QUERY_ENABLED: &str = query_config::UPDATE_ENABLED;
19
20fn db_err(e: surrealdb::Error) -> Json<ErrorResponse> {
21	Json(ErrorResponse {
22		error: format!("db: {e}"),
23	})
24}
25
26fn require_db(
27	state: &ApiState,
28) -> Result<&surrealdb::Surreal<surrealdb::engine::any::Any>, Json<ErrorResponse>> {
29	state.db_client.as_deref().ok_or_else(|| {
30		Json(ErrorResponse {
31			error: "database not configured".into(),
32		})
33	})
34}
35
36#[utoipa::path(
37	get,
38	path = "/sources/{source}/queries",
39	params(("source" = String, Path, description = "Source name")),
40	responses(
41		(status = 200, description = "List queries for source", body = QueryListResponse),
42		(status = 400, description = "Bad request", body = ErrorResponse)
43	)
44)]
45pub async fn list_queries(
46	State(state): State<Arc<ApiState>>,
47	Path(source): Path<String>,
48) -> Result<Json<QueryListResponse>, Json<ErrorResponse>> {
49	let db = require_db(&state)?;
50
51	let mut resp = db
52		.query(SQL_LIST_QUERIES)
53		.bind(("source", source))
54		.await
55		.map_err(db_err)?;
56
57	let rows: Vec<serde_json::Value> = resp.take(0).map_err(db_err)?;
58
59	let queries = rows
60		.iter()
61		.filter_map(|r| {
62			Some(QueryDetail {
63				name: r.get("name")?.as_str()?.to_string(),
64				query: r.get("query")?.as_str()?.to_string(),
65				key_column: r.get("key_column")?.as_str()?.to_string(),
66				sinks: r.get("sinks").and_then(|v| v.as_array()).map(|arr| {
67					arr.iter()
68						.filter_map(|v| v.as_str().map(String::from))
69						.collect()
70				}),
71				enabled: r.get("enabled").and_then(|v| v.as_bool()).unwrap_or(true),
72			})
73		})
74		.collect();
75
76	Ok(Json(QueryListResponse { queries }))
77}
78
79#[utoipa::path(
80	post,
81	path = "/sources/{source}/queries",
82	params(("source" = String, Path, description = "Source name")),
83	request_body = CreateQueryRequest,
84	responses(
85		(status = 200, description = "Query created", body = MutationResponse),
86		(status = 400, description = "Bad request", body = ErrorResponse)
87	)
88)]
89pub async fn create_query(
90	State(state): State<Arc<ApiState>>,
91	Path(source): Path<String>,
92	Json(req): Json<CreateQueryRequest>,
93) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
94	let db = require_db(&state)?;
95
96	db.query(SQL_DELETE_QUERY)
97		.bind(("source", source.clone()))
98		.bind(("name", req.name.clone()))
99		.await
100		.map_err(db_err)?;
101
102	let sinks_val = req
103		.sinks
104		.map(|s| serde_json::Value::Array(s.into_iter().map(serde_json::Value::String).collect()));
105
106	if let Some(sv) = sinks_val {
107		db.query(SQL_CREATE_QUERY_WITH_SINKS)
108			.bind(("source", source.clone()))
109			.bind(("name", req.name.clone()))
110			.bind(("query", req.query))
111			.bind(("key_column", req.key_column))
112			.bind(("sinks", sv))
113			.await
114			.map_err(db_err)?;
115	} else {
116		db.query(SQL_CREATE_QUERY)
117			.bind(("source", source.clone()))
118			.bind(("name", req.name.clone()))
119			.bind(("query", req.query))
120			.bind(("key_column", req.key_column))
121			.await
122			.map_err(db_err)?;
123	}
124
125	super::mutations::reload_config(&state).await?;
126
127	Ok(Json(MutationResponse {
128		ok: true,
129		message: format!("query '{}' created for source '{source}'", req.name),
130	}))
131}
132
133#[utoipa::path(
134	put,
135	path = "/sources/{source}/queries/{name}",
136	params(
137		("source" = String, Path, description = "Source name"),
138		("name" = String, Path, description = "Query name"),
139	),
140	request_body = UpdateQueryRequest,
141	responses(
142		(status = 200, description = "Query updated", body = MutationResponse),
143		(status = 400, description = "Bad request", body = ErrorResponse)
144	)
145)]
146pub async fn update_query(
147	State(state): State<Arc<ApiState>>,
148	Path((source, name)): Path<(String, String)>,
149	Json(req): Json<UpdateQueryRequest>,
150) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
151	let db = require_db(&state)?;
152
153	if let Some(query) = req.query {
154		db.query(SQL_UPDATE_QUERY_SQL)
155			.bind(("source", source.clone()))
156			.bind(("name", name.clone()))
157			.bind(("query", query))
158			.await
159			.map_err(db_err)?;
160	}
161
162	if let Some(key_column) = req.key_column {
163		db.query(SQL_UPDATE_QUERY_KEY)
164			.bind(("source", source.clone()))
165			.bind(("name", name.clone()))
166			.bind(("key_column", key_column))
167			.await
168			.map_err(db_err)?;
169	}
170
171	if let Some(sinks) = req.sinks {
172		let sinks_val =
173			serde_json::Value::Array(sinks.into_iter().map(serde_json::Value::String).collect());
174		db.query(SQL_UPDATE_QUERY_SINKS)
175			.bind(("source", source.clone()))
176			.bind(("name", name.clone()))
177			.bind(("sinks", sinks_val))
178			.await
179			.map_err(db_err)?;
180	}
181
182	if let Some(enabled) = req.enabled {
183		db.query(SQL_UPDATE_QUERY_ENABLED)
184			.bind(("source", source.clone()))
185			.bind(("name", name.clone()))
186			.bind(("enabled", enabled))
187			.await
188			.map_err(db_err)?;
189	}
190
191	super::mutations::reload_config(&state).await?;
192
193	Ok(Json(MutationResponse {
194		ok: true,
195		message: format!("query '{name}' updated for source '{source}'"),
196	}))
197}
198
199#[utoipa::path(
200	delete,
201	path = "/sources/{source}/queries/{name}",
202	params(
203		("source" = String, Path, description = "Source name"),
204		("name" = String, Path, description = "Query name"),
205	),
206	responses(
207		(status = 200, description = "Query deleted", body = MutationResponse),
208		(status = 400, description = "Bad request", body = ErrorResponse)
209	)
210)]
211pub async fn delete_query(
212	State(state): State<Arc<ApiState>>,
213	Path((source, name)): Path<(String, String)>,
214) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
215	let db = require_db(&state)?;
216
217	db.query(SQL_DELETE_QUERY)
218		.bind(("source", source.clone()))
219		.bind(("name", name.clone()))
220		.await
221		.map_err(db_err)?;
222
223	super::mutations::reload_config(&state).await?;
224
225	Ok(Json(MutationResponse {
226		ok: true,
227		message: format!("query '{name}' deleted from source '{source}'"),
228	}))
229}