1use std::sync::Arc;
2
3use axum::Json;
4use axum::extract::{Query, State};
5use axum::http::StatusCode;
6use chrono::{DateTime, Utc};
7use tracing::warn;
8
9use crate::state::ApiState;
10use crate::types::*;
11
12#[utoipa::path(
13 post,
14 path = "/sync/pause",
15 responses(
16 (status = 200, description = "Sync paused", body = MutationResponse)
17 )
18)]
19pub async fn pause_sync(State(state): State<Arc<ApiState>>) -> Json<MutationResponse> {
20 if let Some(ref lifecycle) = state.lifecycle {
21 lifecycle.pause().await;
22 }
23 Json(MutationResponse {
24 ok: true,
25 message: "sync paused".into(),
26 })
27}
28
29#[utoipa::path(
30 post,
31 path = "/sync/resume",
32 responses(
33 (status = 200, description = "Sync resumed", body = MutationResponse),
34 (status = 400, description = "Resume failed", body = ErrorResponse)
35 )
36)]
37pub async fn resume_sync(
38 State(state): State<Arc<ApiState>>,
39) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
40 if let Some(ref lifecycle) = state.lifecycle {
41 lifecycle.resume().await.map_err(|e| {
42 Json(ErrorResponse {
43 error: format!("resume: {e}"),
44 })
45 })?;
46 }
47 Ok(Json(MutationResponse {
48 ok: true,
49 message: "sync resumed".into(),
50 }))
51}
52
53#[utoipa::path(
54 get,
55 path = "/history",
56 responses(
57 (status = 200, description = "Cycle history", body = HistoryResponse),
58 (status = 400, description = "Database not configured", body = ErrorResponse)
59 )
60)]
61pub async fn get_history(
62 State(state): State<Arc<ApiState>>,
63) -> Result<Json<HistoryResponse>, Json<ErrorResponse>> {
64 let db = state.db_client.as_ref().ok_or_else(|| {
65 Json(ErrorResponse {
66 error: "database not configured".into(),
67 })
68 })?;
69
70 const SQL_TEMPLATE: &str = oversync_queries::delta::LIST_CYCLE_HISTORY;
71
72 let pipe_names: Vec<String> = state
74 .pipes
75 .read()
76 .await
77 .iter()
78 .map(|p| p.name.clone())
79 .collect();
80
81 let mut all_tables: Vec<oversync_core::TableNames> = pipe_names
82 .iter()
83 .map(|name| oversync_core::TableNames::for_source(name))
84 .collect();
85 all_tables.push(oversync_core::TableNames::default_shared());
86 all_tables.dedup_by(|a, b| a.cycle_log == b.cycle_log);
87
88 let mut set = tokio::task::JoinSet::new();
90 for tables in all_tables {
91 let sql = tables.resolve_sql(SQL_TEMPLATE);
92 let db = Arc::clone(db);
93 set.spawn(async move {
94 let mut resp = db.query(&sql).await.ok()?;
95 resp.take::<Vec<serde_json::Value>>(0).ok()
96 });
97 }
98
99 let mut all_rows: Vec<serde_json::Value> = Vec::new();
100 while let Some(result) = set.join_next().await {
101 if let Ok(Some(rows)) = result {
102 all_rows.extend(rows);
103 }
104 }
105
106 all_rows.sort_by(|a, b| {
108 let sa = a.get("started_at").and_then(|v| v.as_str()).unwrap_or("");
109 let sb = b.get("started_at").and_then(|v| v.as_str()).unwrap_or("");
110 sb.cmp(sa)
111 });
112 all_rows.truncate(100);
113
114 let cycles = all_rows
115 .iter()
116 .filter_map(|r| {
117 let cycle_id = r.get("cycle_id").and_then(|v| v.as_u64());
118 let status = r.get("status").and_then(|v| v.as_str());
119 if cycle_id.is_none() || status.is_none() {
120 warn!(?r, "skipping cycle_log row with missing cycle_id or status");
121 return None;
122 }
123 let started_at: DateTime<Utc> = r
124 .get("started_at")
125 .and_then(|v| v.as_str())
126 .and_then(|s| s.parse().ok())
127 .unwrap_or_default();
128 let finished_at: Option<DateTime<Utc>> = r
129 .get("finished_at")
130 .and_then(|v| v.as_str())
131 .and_then(|s| s.parse().ok());
132 let duration_ms =
133 finished_at.map(|f| (f - started_at).num_milliseconds().unsigned_abs());
134 Some(CycleInfo {
135 cycle_id: cycle_id?,
136 source: r
137 .get("origin_id")
138 .and_then(|v| v.as_str())
139 .unwrap_or("")
140 .to_string(),
141 query: r
142 .get("query_id")
143 .and_then(|v| v.as_str())
144 .unwrap_or("")
145 .to_string(),
146 status: status?.to_string(),
147 started_at,
148 finished_at,
149 rows_created: r.get("rows_created").and_then(|v| v.as_u64()).unwrap_or(0),
150 rows_updated: r.get("rows_updated").and_then(|v| v.as_u64()).unwrap_or(0),
151 rows_deleted: r.get("rows_deleted").and_then(|v| v.as_u64()).unwrap_or(0),
152 duration_ms,
153 error: r.get("error").and_then(|v| v.as_str()).map(String::from),
154 })
155 })
156 .collect();
157
158 Ok(Json(HistoryResponse { cycles }))
159}
160
161#[utoipa::path(
162 get,
163 path = "/sync/status",
164 responses(
165 (status = 200, description = "Sync status", body = StatusResponse)
166 )
167)]
168pub async fn sync_status(State(state): State<Arc<ApiState>>) -> Json<StatusResponse> {
169 let (running, paused) = match &state.lifecycle {
170 Some(lc) => (lc.is_running().await, lc.is_paused().await),
171 None => (false, false),
172 };
173 Json(StatusResponse { running, paused })
174}
175
176#[utoipa::path(
177 get,
178 path = "/config/export",
179 params(
180 ("format" = Option<ExportConfigFormat>, Query, description = "Export format: toml or json")
181 ),
182 responses(
183 (status = 200, description = "Current persisted config export", body = ExportConfigResponse),
184 (status = 400, description = "Config export unavailable", body = ErrorResponse)
185 )
186)]
187pub async fn export_config(
188 State(state): State<Arc<ApiState>>,
189 Query(query): Query<ExportConfigQuery>,
190) -> Result<Json<ExportConfigResponse>, (StatusCode, Json<ErrorResponse>)> {
191 let db = state.db_client.as_ref().ok_or_else(|| {
192 (
193 StatusCode::BAD_REQUEST,
194 Json(ErrorResponse {
195 error: "database not configured".into(),
196 }),
197 )
198 })?;
199 let lifecycle = state.lifecycle.as_ref().ok_or_else(|| {
200 (
201 StatusCode::BAD_REQUEST,
202 Json(ErrorResponse {
203 error: "config export unavailable".into(),
204 }),
205 )
206 })?;
207
208 let format = query.format.unwrap_or(ExportConfigFormat::Toml);
209 let content = lifecycle.export_config(db, format).await.map_err(|e| {
210 (
211 StatusCode::BAD_REQUEST,
212 Json(ErrorResponse {
213 error: format!("config export: {e}"),
214 }),
215 )
216 })?;
217
218 Ok(Json(ExportConfigResponse { format, content }))
219}
220
221#[utoipa::path(
222 post,
223 path = "/config/import",
224 request_body = ImportConfigRequest,
225 responses(
226 (status = 200, description = "Config imported into the current control plane", body = ImportConfigResponse),
227 (status = 400, description = "Config import failed", body = ErrorResponse)
228 )
229)]
230pub async fn import_config(
231 State(state): State<Arc<ApiState>>,
232 Json(req): Json<ImportConfigRequest>,
233) -> Result<Json<ImportConfigResponse>, (StatusCode, Json<ErrorResponse>)> {
234 let db = state.db_client.as_ref().ok_or_else(|| {
235 (
236 StatusCode::BAD_REQUEST,
237 Json(ErrorResponse {
238 error: "database not configured".into(),
239 }),
240 )
241 })?;
242 let lifecycle = state.lifecycle.as_ref().ok_or_else(|| {
243 (
244 StatusCode::BAD_REQUEST,
245 Json(ErrorResponse {
246 error: "config import unavailable".into(),
247 }),
248 )
249 })?;
250
251 let warnings = lifecycle
252 .import_config(db, req.format, &req.content)
253 .await
254 .map_err(|e| {
255 (
256 StatusCode::BAD_REQUEST,
257 Json(ErrorResponse {
258 error: format!("config import: {e}"),
259 }),
260 )
261 })?;
262
263 crate::mutations::refresh_read_cache(&state)
264 .await
265 .map_err(|e| {
266 (
267 StatusCode::BAD_REQUEST,
268 Json(ErrorResponse {
269 error: format!("refresh cache: {e}"),
270 }),
271 )
272 })?;
273
274 Ok(Json(ImportConfigResponse {
275 ok: true,
276 message: "config imported".into(),
277 warnings,
278 }))
279}