1use std::sync::Arc;
2
3use axum::Json;
4use axum::extract::{Path, State};
5
6use crate::state::ApiState;
7use crate::types::*;
8
9use oversync_queries::query_config;
10
11const SQL_LIST_QUERIES: &str = query_config::LIST_BY_SOURCE;
12const SQL_DELETE_QUERY: &str = query_config::DELETE_ONE;
13const SQL_UPDATE_QUERY_SQL: &str = query_config::UPDATE_QUERY;
14const SQL_UPDATE_QUERY_KEY: &str = query_config::UPDATE_KEY_COLUMN;
15const SQL_UPDATE_QUERY_SINKS: &str = query_config::UPDATE_SINKS;
16const SQL_UPDATE_QUERY_ENABLED: &str = query_config::UPDATE_ENABLED;
17
18fn db_err(e: surrealdb::Error) -> Json<ErrorResponse> {
19 Json(ErrorResponse {
20 error: format!("db: {e}"),
21 })
22}
23
24fn require_db(
25 state: &ApiState,
26) -> Result<&surrealdb::Surreal<surrealdb::engine::any::Any>, Json<ErrorResponse>> {
27 state.db_client.as_deref().ok_or_else(|| {
28 Json(ErrorResponse {
29 error: "database not configured".into(),
30 })
31 })
32}
33
34#[utoipa::path(
35 get,
36 path = "/sources/{source}/queries",
37 params(("source" = String, Path, description = "Source name")),
38 responses(
39 (status = 200, description = "List queries for source", body = QueryListResponse),
40 (status = 400, description = "Bad request", body = ErrorResponse)
41 )
42)]
43pub async fn list_queries(
44 State(state): State<Arc<ApiState>>,
45 Path(source): Path<String>,
46) -> Result<Json<QueryListResponse>, Json<ErrorResponse>> {
47 let db = require_db(&state)?;
48
49 let mut resp = db
50 .query(SQL_LIST_QUERIES)
51 .bind(("source", source))
52 .await
53 .map_err(db_err)?;
54
55 let rows: Vec<serde_json::Value> = resp.take(0).map_err(db_err)?;
56
57 let queries = rows
58 .iter()
59 .filter_map(|r| {
60 Some(QueryDetail {
61 name: r.get("name")?.as_str()?.to_string(),
62 query: r.get("query")?.as_str()?.to_string(),
63 key_column: r.get("key_column")?.as_str()?.to_string(),
64 sinks: r.get("sinks").and_then(|v| v.as_array()).map(|arr| {
65 arr.iter()
66 .filter_map(|v| v.as_str().map(String::from))
67 .collect()
68 }),
69 enabled: r.get("enabled").and_then(|v| v.as_bool()).unwrap_or(true),
70 })
71 })
72 .collect();
73
74 Ok(Json(QueryListResponse { queries }))
75}
76
77#[utoipa::path(
78 post,
79 path = "/sources/{source}/queries",
80 params(("source" = String, Path, description = "Source name")),
81 request_body = CreateQueryRequest,
82 responses(
83 (status = 200, description = "Query created", body = MutationResponse),
84 (status = 400, description = "Bad request", body = ErrorResponse)
85 )
86)]
87pub async fn create_query(
88 State(state): State<Arc<ApiState>>,
89 Path(source): Path<String>,
90 Json(req): Json<CreateQueryRequest>,
91) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
92 let db = require_db(&state)?;
93
94 db.query(SQL_DELETE_QUERY)
95 .bind(("source", source.clone()))
96 .bind(("name", req.name.clone()))
97 .await
98 .map_err(db_err)?;
99
100 let mut query_str = "CREATE query_config SET origin_id = $source, name = $name, query = $query, key_column = $key_column, enabled = true".to_string();
101
102 let sinks_val = req
103 .sinks
104 .map(|s| serde_json::Value::Array(s.into_iter().map(serde_json::Value::String).collect()));
105
106 if sinks_val.is_some() {
107 query_str.push_str(", sinks = $sinks");
108 }
109
110 let mut q = db
111 .query(&query_str)
112 .bind(("source", source.clone()))
113 .bind(("name", req.name.clone()))
114 .bind(("query", req.query))
115 .bind(("key_column", req.key_column));
116
117 if let Some(sv) = sinks_val {
118 q = q.bind(("sinks", sv));
119 }
120
121 q.await.map_err(db_err)?;
122
123 super::mutations::reload_config_pub(&state).await?;
124
125 Ok(Json(MutationResponse {
126 ok: true,
127 message: format!("query '{}' created for source '{source}'", req.name),
128 }))
129}
130
131#[utoipa::path(
132 put,
133 path = "/sources/{source}/queries/{name}",
134 params(
135 ("source" = String, Path, description = "Source name"),
136 ("name" = String, Path, description = "Query name"),
137 ),
138 request_body = UpdateQueryRequest,
139 responses(
140 (status = 200, description = "Query updated", body = MutationResponse),
141 (status = 400, description = "Bad request", body = ErrorResponse)
142 )
143)]
144pub async fn update_query(
145 State(state): State<Arc<ApiState>>,
146 Path((source, name)): Path<(String, String)>,
147 Json(req): Json<UpdateQueryRequest>,
148) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
149 let db = require_db(&state)?;
150
151 if let Some(query) = req.query {
152 db.query(SQL_UPDATE_QUERY_SQL)
153 .bind(("source", source.clone()))
154 .bind(("name", name.clone()))
155 .bind(("query", query))
156 .await
157 .map_err(db_err)?;
158 }
159
160 if let Some(key_column) = req.key_column {
161 db.query(SQL_UPDATE_QUERY_KEY)
162 .bind(("source", source.clone()))
163 .bind(("name", name.clone()))
164 .bind(("key_column", key_column))
165 .await
166 .map_err(db_err)?;
167 }
168
169 if let Some(sinks) = req.sinks {
170 let sinks_val =
171 serde_json::Value::Array(sinks.into_iter().map(serde_json::Value::String).collect());
172 db.query(SQL_UPDATE_QUERY_SINKS)
173 .bind(("source", source.clone()))
174 .bind(("name", name.clone()))
175 .bind(("sinks", sinks_val))
176 .await
177 .map_err(db_err)?;
178 }
179
180 if let Some(enabled) = req.enabled {
181 db.query(SQL_UPDATE_QUERY_ENABLED)
182 .bind(("source", source.clone()))
183 .bind(("name", name.clone()))
184 .bind(("enabled", enabled))
185 .await
186 .map_err(db_err)?;
187 }
188
189 super::mutations::reload_config_pub(&state).await?;
190
191 Ok(Json(MutationResponse {
192 ok: true,
193 message: format!("query '{name}' updated for source '{source}'"),
194 }))
195}
196
197#[utoipa::path(
198 delete,
199 path = "/sources/{source}/queries/{name}",
200 params(
201 ("source" = String, Path, description = "Source name"),
202 ("name" = String, Path, description = "Query name"),
203 ),
204 responses(
205 (status = 200, description = "Query deleted", body = MutationResponse),
206 (status = 400, description = "Bad request", body = ErrorResponse)
207 )
208)]
209pub async fn delete_query(
210 State(state): State<Arc<ApiState>>,
211 Path((source, name)): Path<(String, String)>,
212) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
213 let db = require_db(&state)?;
214
215 db.query(SQL_DELETE_QUERY)
216 .bind(("source", source.clone()))
217 .bind(("name", name.clone()))
218 .await
219 .map_err(db_err)?;
220
221 super::mutations::reload_config_pub(&state).await?;
222
223 Ok(Json(MutationResponse {
224 ok: true,
225 message: format!("query '{name}' deleted from source '{source}'"),
226 }))
227}