Skip to main content

oversync_api/
queries.rs

1use std::sync::Arc;
2
3use axum::Json;
4use axum::extract::{Path, State};
5
6use crate::state::ApiState;
7use crate::types::*;
8
9use oversync_queries::query_config;
10
11const SQL_LIST_QUERIES: &str = query_config::LIST_BY_SOURCE;
12const SQL_DELETE_QUERY: &str = query_config::DELETE_ONE;
13const SQL_UPDATE_QUERY_SQL: &str = query_config::UPDATE_QUERY;
14const SQL_UPDATE_QUERY_KEY: &str = query_config::UPDATE_KEY_COLUMN;
15const SQL_UPDATE_QUERY_SINKS: &str = query_config::UPDATE_SINKS;
16const SQL_UPDATE_QUERY_ENABLED: &str = query_config::UPDATE_ENABLED;
17
18fn db_err(e: surrealdb::Error) -> Json<ErrorResponse> {
19	Json(ErrorResponse {
20		error: format!("db: {e}"),
21	})
22}
23
24fn require_db(
25	state: &ApiState,
26) -> Result<&surrealdb::Surreal<surrealdb::engine::any::Any>, Json<ErrorResponse>> {
27	state.db_client.as_deref().ok_or_else(|| {
28		Json(ErrorResponse {
29			error: "database not configured".into(),
30		})
31	})
32}
33
34#[utoipa::path(
35	get,
36	path = "/sources/{source}/queries",
37	params(("source" = String, Path, description = "Source name")),
38	responses(
39		(status = 200, description = "List queries for source", body = QueryListResponse),
40		(status = 400, description = "Bad request", body = ErrorResponse)
41	)
42)]
43pub async fn list_queries(
44	State(state): State<Arc<ApiState>>,
45	Path(source): Path<String>,
46) -> Result<Json<QueryListResponse>, Json<ErrorResponse>> {
47	let db = require_db(&state)?;
48
49	let mut resp = db
50		.query(SQL_LIST_QUERIES)
51		.bind(("source", source))
52		.await
53		.map_err(db_err)?;
54
55	let rows: Vec<serde_json::Value> = resp.take(0).map_err(db_err)?;
56
57	let queries = rows
58		.iter()
59		.filter_map(|r| {
60			Some(QueryDetail {
61				name: r.get("name")?.as_str()?.to_string(),
62				query: r.get("query")?.as_str()?.to_string(),
63				key_column: r.get("key_column")?.as_str()?.to_string(),
64				sinks: r.get("sinks").and_then(|v| v.as_array()).map(|arr| {
65					arr.iter()
66						.filter_map(|v| v.as_str().map(String::from))
67						.collect()
68				}),
69				enabled: r.get("enabled").and_then(|v| v.as_bool()).unwrap_or(true),
70			})
71		})
72		.collect();
73
74	Ok(Json(QueryListResponse { queries }))
75}
76
77#[utoipa::path(
78	post,
79	path = "/sources/{source}/queries",
80	params(("source" = String, Path, description = "Source name")),
81	request_body = CreateQueryRequest,
82	responses(
83		(status = 200, description = "Query created", body = MutationResponse),
84		(status = 400, description = "Bad request", body = ErrorResponse)
85	)
86)]
87pub async fn create_query(
88	State(state): State<Arc<ApiState>>,
89	Path(source): Path<String>,
90	Json(req): Json<CreateQueryRequest>,
91) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
92	let db = require_db(&state)?;
93
94	db.query(SQL_DELETE_QUERY)
95		.bind(("source", source.clone()))
96		.bind(("name", req.name.clone()))
97		.await
98		.map_err(db_err)?;
99
100	let mut query_str = "CREATE query_config SET origin_id = $source, name = $name, query = $query, key_column = $key_column, enabled = true".to_string();
101
102	let sinks_val = req
103		.sinks
104		.map(|s| serde_json::Value::Array(s.into_iter().map(serde_json::Value::String).collect()));
105
106	if sinks_val.is_some() {
107		query_str.push_str(", sinks = $sinks");
108	}
109
110	let mut q = db
111		.query(&query_str)
112		.bind(("source", source.clone()))
113		.bind(("name", req.name.clone()))
114		.bind(("query", req.query))
115		.bind(("key_column", req.key_column));
116
117	if let Some(sv) = sinks_val {
118		q = q.bind(("sinks", sv));
119	}
120
121	q.await.map_err(db_err)?;
122
123	super::mutations::reload_config_pub(&state).await?;
124
125	Ok(Json(MutationResponse {
126		ok: true,
127		message: format!("query '{}' created for source '{source}'", req.name),
128	}))
129}
130
131#[utoipa::path(
132	put,
133	path = "/sources/{source}/queries/{name}",
134	params(
135		("source" = String, Path, description = "Source name"),
136		("name" = String, Path, description = "Query name"),
137	),
138	request_body = UpdateQueryRequest,
139	responses(
140		(status = 200, description = "Query updated", body = MutationResponse),
141		(status = 400, description = "Bad request", body = ErrorResponse)
142	)
143)]
144pub async fn update_query(
145	State(state): State<Arc<ApiState>>,
146	Path((source, name)): Path<(String, String)>,
147	Json(req): Json<UpdateQueryRequest>,
148) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
149	let db = require_db(&state)?;
150
151	if let Some(query) = req.query {
152		db.query(SQL_UPDATE_QUERY_SQL)
153			.bind(("source", source.clone()))
154			.bind(("name", name.clone()))
155			.bind(("query", query))
156			.await
157			.map_err(db_err)?;
158	}
159
160	if let Some(key_column) = req.key_column {
161		db.query(SQL_UPDATE_QUERY_KEY)
162			.bind(("source", source.clone()))
163			.bind(("name", name.clone()))
164			.bind(("key_column", key_column))
165			.await
166			.map_err(db_err)?;
167	}
168
169	if let Some(sinks) = req.sinks {
170		let sinks_val =
171			serde_json::Value::Array(sinks.into_iter().map(serde_json::Value::String).collect());
172		db.query(SQL_UPDATE_QUERY_SINKS)
173			.bind(("source", source.clone()))
174			.bind(("name", name.clone()))
175			.bind(("sinks", sinks_val))
176			.await
177			.map_err(db_err)?;
178	}
179
180	if let Some(enabled) = req.enabled {
181		db.query(SQL_UPDATE_QUERY_ENABLED)
182			.bind(("source", source.clone()))
183			.bind(("name", name.clone()))
184			.bind(("enabled", enabled))
185			.await
186			.map_err(db_err)?;
187	}
188
189	super::mutations::reload_config_pub(&state).await?;
190
191	Ok(Json(MutationResponse {
192		ok: true,
193		message: format!("query '{name}' updated for source '{source}'"),
194	}))
195}
196
197#[utoipa::path(
198	delete,
199	path = "/sources/{source}/queries/{name}",
200	params(
201		("source" = String, Path, description = "Source name"),
202		("name" = String, Path, description = "Query name"),
203	),
204	responses(
205		(status = 200, description = "Query deleted", body = MutationResponse),
206		(status = 400, description = "Bad request", body = ErrorResponse)
207	)
208)]
209pub async fn delete_query(
210	State(state): State<Arc<ApiState>>,
211	Path((source, name)): Path<(String, String)>,
212) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
213	let db = require_db(&state)?;
214
215	db.query(SQL_DELETE_QUERY)
216		.bind(("source", source.clone()))
217		.bind(("name", name.clone()))
218		.await
219		.map_err(db_err)?;
220
221	super::mutations::reload_config_pub(&state).await?;
222
223	Ok(Json(MutationResponse {
224		ok: true,
225		message: format!("query '{name}' deleted from source '{source}'"),
226	}))
227}