1use std::sync::Arc;
2
3use axum::Json;
4use axum::extract::{Path, State};
5
6use crate::state::ApiState;
7use crate::types::*;
8
9use oversync_queries::{mutations, query_config};
10
11const SQL_LIST_QUERIES: &str = query_config::LIST_BY_SOURCE;
12const SQL_DELETE_QUERY: &str = query_config::DELETE_ONE;
13const SQL_CREATE_QUERY: &str = mutations::CREATE_QUERY;
14const SQL_CREATE_QUERY_WITH_SINKS: &str = mutations::CREATE_QUERY_WITH_SINKS;
15const SQL_UPDATE_QUERY_SQL: &str = query_config::UPDATE_QUERY;
16const SQL_UPDATE_QUERY_KEY: &str = query_config::UPDATE_KEY_COLUMN;
17const SQL_UPDATE_QUERY_SINKS: &str = query_config::UPDATE_SINKS;
18const SQL_UPDATE_QUERY_ENABLED: &str = query_config::UPDATE_ENABLED;
19
20fn db_err(e: surrealdb::Error) -> Json<ErrorResponse> {
21 Json(ErrorResponse {
22 error: format!("db: {e}"),
23 })
24}
25
26fn require_db(
27 state: &ApiState,
28) -> Result<&surrealdb::Surreal<surrealdb::engine::any::Any>, Json<ErrorResponse>> {
29 state.db_client.as_deref().ok_or_else(|| {
30 Json(ErrorResponse {
31 error: "database not configured".into(),
32 })
33 })
34}
35
36#[utoipa::path(
37 get,
38 path = "/sources/{source}/queries",
39 params(("source" = String, Path, description = "Source name")),
40 responses(
41 (status = 200, description = "List queries for source", body = QueryListResponse),
42 (status = 400, description = "Bad request", body = ErrorResponse)
43 )
44)]
45pub async fn list_queries(
46 State(state): State<Arc<ApiState>>,
47 Path(source): Path<String>,
48) -> Result<Json<QueryListResponse>, Json<ErrorResponse>> {
49 let db = require_db(&state)?;
50
51 let mut resp = db
52 .query(SQL_LIST_QUERIES)
53 .bind(("source", source))
54 .await
55 .map_err(db_err)?;
56
57 let rows: Vec<serde_json::Value> = resp.take(0).map_err(db_err)?;
58
59 let queries = rows
60 .iter()
61 .filter_map(|r| {
62 Some(QueryDetail {
63 name: r.get("name")?.as_str()?.to_string(),
64 query: r.get("query")?.as_str()?.to_string(),
65 key_column: r.get("key_column")?.as_str()?.to_string(),
66 sinks: r.get("sinks").and_then(|v| v.as_array()).map(|arr| {
67 arr.iter()
68 .filter_map(|v| v.as_str().map(String::from))
69 .collect()
70 }),
71 enabled: r.get("enabled").and_then(|v| v.as_bool()).unwrap_or(true),
72 })
73 })
74 .collect();
75
76 Ok(Json(QueryListResponse { queries }))
77}
78
79#[utoipa::path(
80 post,
81 path = "/sources/{source}/queries",
82 params(("source" = String, Path, description = "Source name")),
83 request_body = CreateQueryRequest,
84 responses(
85 (status = 200, description = "Query created", body = MutationResponse),
86 (status = 400, description = "Bad request", body = ErrorResponse)
87 )
88)]
89pub async fn create_query(
90 State(state): State<Arc<ApiState>>,
91 Path(source): Path<String>,
92 Json(req): Json<CreateQueryRequest>,
93) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
94 let db = require_db(&state)?;
95
96 db.query(SQL_DELETE_QUERY)
97 .bind(("source", source.clone()))
98 .bind(("name", req.name.clone()))
99 .await
100 .map_err(db_err)?;
101
102 let sinks_val = req
103 .sinks
104 .map(|s| serde_json::Value::Array(s.into_iter().map(serde_json::Value::String).collect()));
105
106 if let Some(sv) = sinks_val {
107 db.query(SQL_CREATE_QUERY_WITH_SINKS)
108 .bind(("source", source.clone()))
109 .bind(("name", req.name.clone()))
110 .bind(("query", req.query))
111 .bind(("key_column", req.key_column))
112 .bind(("sinks", sv))
113 .await
114 .map_err(db_err)?;
115 } else {
116 db.query(SQL_CREATE_QUERY)
117 .bind(("source", source.clone()))
118 .bind(("name", req.name.clone()))
119 .bind(("query", req.query))
120 .bind(("key_column", req.key_column))
121 .await
122 .map_err(db_err)?;
123 }
124
125 super::mutations::reload_config_pub(&state).await?;
126
127 Ok(Json(MutationResponse {
128 ok: true,
129 message: format!("query '{}' created for source '{source}'", req.name),
130 }))
131}
132
133#[utoipa::path(
134 put,
135 path = "/sources/{source}/queries/{name}",
136 params(
137 ("source" = String, Path, description = "Source name"),
138 ("name" = String, Path, description = "Query name"),
139 ),
140 request_body = UpdateQueryRequest,
141 responses(
142 (status = 200, description = "Query updated", body = MutationResponse),
143 (status = 400, description = "Bad request", body = ErrorResponse)
144 )
145)]
146pub async fn update_query(
147 State(state): State<Arc<ApiState>>,
148 Path((source, name)): Path<(String, String)>,
149 Json(req): Json<UpdateQueryRequest>,
150) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
151 let db = require_db(&state)?;
152
153 if let Some(query) = req.query {
154 db.query(SQL_UPDATE_QUERY_SQL)
155 .bind(("source", source.clone()))
156 .bind(("name", name.clone()))
157 .bind(("query", query))
158 .await
159 .map_err(db_err)?;
160 }
161
162 if let Some(key_column) = req.key_column {
163 db.query(SQL_UPDATE_QUERY_KEY)
164 .bind(("source", source.clone()))
165 .bind(("name", name.clone()))
166 .bind(("key_column", key_column))
167 .await
168 .map_err(db_err)?;
169 }
170
171 if let Some(sinks) = req.sinks {
172 let sinks_val =
173 serde_json::Value::Array(sinks.into_iter().map(serde_json::Value::String).collect());
174 db.query(SQL_UPDATE_QUERY_SINKS)
175 .bind(("source", source.clone()))
176 .bind(("name", name.clone()))
177 .bind(("sinks", sinks_val))
178 .await
179 .map_err(db_err)?;
180 }
181
182 if let Some(enabled) = req.enabled {
183 db.query(SQL_UPDATE_QUERY_ENABLED)
184 .bind(("source", source.clone()))
185 .bind(("name", name.clone()))
186 .bind(("enabled", enabled))
187 .await
188 .map_err(db_err)?;
189 }
190
191 super::mutations::reload_config_pub(&state).await?;
192
193 Ok(Json(MutationResponse {
194 ok: true,
195 message: format!("query '{name}' updated for source '{source}'"),
196 }))
197}
198
199#[utoipa::path(
200 delete,
201 path = "/sources/{source}/queries/{name}",
202 params(
203 ("source" = String, Path, description = "Source name"),
204 ("name" = String, Path, description = "Query name"),
205 ),
206 responses(
207 (status = 200, description = "Query deleted", body = MutationResponse),
208 (status = 400, description = "Bad request", body = ErrorResponse)
209 )
210)]
211pub async fn delete_query(
212 State(state): State<Arc<ApiState>>,
213 Path((source, name)): Path<(String, String)>,
214) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
215 let db = require_db(&state)?;
216
217 db.query(SQL_DELETE_QUERY)
218 .bind(("source", source.clone()))
219 .bind(("name", name.clone()))
220 .await
221 .map_err(db_err)?;
222
223 super::mutations::reload_config_pub(&state).await?;
224
225 Ok(Json(MutationResponse {
226 ok: true,
227 message: format!("query '{name}' deleted from source '{source}'"),
228 }))
229}