Skip to main content

oversync_api/
operations.rs

1use std::sync::Arc;
2
3use axum::Json;
4use axum::extract::{Query, State};
5use axum::http::StatusCode;
6use chrono::{DateTime, Utc};
7use tracing::warn;
8
9use crate::state::ApiState;
10use crate::types::*;
11
12#[utoipa::path(
13	post,
14	path = "/sync/pause",
15	responses(
16		(status = 200, description = "Sync paused", body = MutationResponse)
17	)
18)]
19pub async fn pause_sync(State(state): State<Arc<ApiState>>) -> Json<MutationResponse> {
20	if let Some(ref lifecycle) = state.lifecycle {
21		lifecycle.pause().await;
22	}
23	Json(MutationResponse {
24		ok: true,
25		message: "sync paused".into(),
26	})
27}
28
29#[utoipa::path(
30	post,
31	path = "/sync/resume",
32	responses(
33		(status = 200, description = "Sync resumed", body = MutationResponse),
34		(status = 400, description = "Resume failed", body = ErrorResponse)
35	)
36)]
37pub async fn resume_sync(
38	State(state): State<Arc<ApiState>>,
39) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
40	if let Some(ref lifecycle) = state.lifecycle {
41		lifecycle.resume().await.map_err(|e| {
42			Json(ErrorResponse {
43				error: format!("resume: {e}"),
44			})
45		})?;
46	}
47	Ok(Json(MutationResponse {
48		ok: true,
49		message: "sync resumed".into(),
50	}))
51}
52
53#[utoipa::path(
54	get,
55	path = "/history",
56	responses(
57		(status = 200, description = "Cycle history", body = HistoryResponse),
58		(status = 400, description = "Database not configured", body = ErrorResponse)
59	)
60)]
61pub async fn get_history(
62	State(state): State<Arc<ApiState>>,
63) -> Result<Json<HistoryResponse>, Json<ErrorResponse>> {
64	let db = state.db_client.as_ref().ok_or_else(|| {
65		Json(ErrorResponse {
66			error: "database not configured".into(),
67		})
68	})?;
69
70	const SQL_TEMPLATE: &str = oversync_queries::delta::LIST_CYCLE_HISTORY;
71
72	// Collect per-pipe table names + legacy shared table
73	let pipe_names: Vec<String> = state
74		.pipes
75		.read()
76		.await
77		.iter()
78		.map(|p| p.name.clone())
79		.collect();
80
81	let mut all_tables: Vec<oversync_core::TableNames> = pipe_names
82		.iter()
83		.map(|name| oversync_core::TableNames::for_source(name))
84		.collect();
85	all_tables.push(oversync_core::TableNames::default_shared());
86	all_tables.dedup_by(|a, b| a.cycle_log == b.cycle_log);
87
88	// Query all per-pipe cycle_log tables concurrently
89	let mut set = tokio::task::JoinSet::new();
90	for tables in all_tables {
91		let sql = tables.resolve_sql(SQL_TEMPLATE);
92		let db = Arc::clone(db);
93		set.spawn(async move {
94			let mut resp = db.query(&sql).await.ok()?;
95			resp.take::<Vec<serde_json::Value>>(0).ok()
96		});
97	}
98
99	let mut all_rows: Vec<serde_json::Value> = Vec::new();
100	while let Some(result) = set.join_next().await {
101		if let Ok(Some(rows)) = result {
102			all_rows.extend(rows);
103		}
104	}
105
106	// Sort by started_at DESC, take 100
107	all_rows.sort_by(|a, b| {
108		let sa = a.get("started_at").and_then(|v| v.as_str()).unwrap_or("");
109		let sb = b.get("started_at").and_then(|v| v.as_str()).unwrap_or("");
110		sb.cmp(sa)
111	});
112	all_rows.truncate(100);
113
114	let cycles = all_rows
115		.iter()
116		.filter_map(|r| {
117			let cycle_id = r.get("cycle_id").and_then(|v| v.as_u64());
118			let status = r.get("status").and_then(|v| v.as_str());
119			if cycle_id.is_none() || status.is_none() {
120				warn!(?r, "skipping cycle_log row with missing cycle_id or status");
121				return None;
122			}
123			let started_at: DateTime<Utc> = r
124				.get("started_at")
125				.and_then(|v| v.as_str())
126				.and_then(|s| s.parse().ok())
127				.unwrap_or_default();
128			let finished_at: Option<DateTime<Utc>> = r
129				.get("finished_at")
130				.and_then(|v| v.as_str())
131				.and_then(|s| s.parse().ok());
132			let duration_ms =
133				finished_at.map(|f| (f - started_at).num_milliseconds().unsigned_abs());
134			Some(CycleInfo {
135				cycle_id: cycle_id?,
136				source: r
137					.get("origin_id")
138					.and_then(|v| v.as_str())
139					.unwrap_or("")
140					.to_string(),
141				query: r
142					.get("query_id")
143					.and_then(|v| v.as_str())
144					.unwrap_or("")
145					.to_string(),
146				status: status?.to_string(),
147				started_at,
148				finished_at,
149				rows_created: r.get("rows_created").and_then(|v| v.as_u64()).unwrap_or(0),
150				rows_updated: r.get("rows_updated").and_then(|v| v.as_u64()).unwrap_or(0),
151				rows_deleted: r.get("rows_deleted").and_then(|v| v.as_u64()).unwrap_or(0),
152				duration_ms,
153				error: r.get("error").and_then(|v| v.as_str()).map(String::from),
154			})
155		})
156		.collect();
157
158	Ok(Json(HistoryResponse { cycles }))
159}
160
161#[utoipa::path(
162	get,
163	path = "/sync/status",
164	responses(
165		(status = 200, description = "Sync status", body = StatusResponse)
166	)
167)]
168pub async fn sync_status(State(state): State<Arc<ApiState>>) -> Json<StatusResponse> {
169	let (running, paused) = match &state.lifecycle {
170		Some(lc) => (lc.is_running().await, lc.is_paused().await),
171		None => (false, false),
172	};
173	Json(StatusResponse { running, paused })
174}
175
176#[utoipa::path(
177	get,
178	path = "/config/export",
179	params(
180		("format" = Option<ExportConfigFormat>, Query, description = "Export format: toml or json")
181	),
182	responses(
183		(status = 200, description = "Current persisted config export", body = ExportConfigResponse),
184		(status = 400, description = "Config export unavailable", body = ErrorResponse)
185	)
186)]
187pub async fn export_config(
188	State(state): State<Arc<ApiState>>,
189	Query(query): Query<ExportConfigQuery>,
190) -> Result<Json<ExportConfigResponse>, (StatusCode, Json<ErrorResponse>)> {
191	let db = state.db_client.as_ref().ok_or_else(|| {
192		(
193			StatusCode::BAD_REQUEST,
194			Json(ErrorResponse {
195				error: "database not configured".into(),
196			}),
197		)
198	})?;
199	let lifecycle = state.lifecycle.as_ref().ok_or_else(|| {
200		(
201			StatusCode::BAD_REQUEST,
202			Json(ErrorResponse {
203				error: "config export unavailable".into(),
204			}),
205		)
206	})?;
207
208	let format = query.format.unwrap_or(ExportConfigFormat::Toml);
209	let content = lifecycle.export_config(db, format).await.map_err(|e| {
210		(
211			StatusCode::BAD_REQUEST,
212			Json(ErrorResponse {
213				error: format!("config export: {e}"),
214			}),
215		)
216	})?;
217
218	Ok(Json(ExportConfigResponse { format, content }))
219}
220
221#[utoipa::path(
222	post,
223	path = "/config/import",
224	request_body = ImportConfigRequest,
225	responses(
226		(status = 200, description = "Config imported into the current control plane", body = ImportConfigResponse),
227		(status = 400, description = "Config import failed", body = ErrorResponse)
228	)
229)]
230pub async fn import_config(
231	State(state): State<Arc<ApiState>>,
232	Json(req): Json<ImportConfigRequest>,
233) -> Result<Json<ImportConfigResponse>, (StatusCode, Json<ErrorResponse>)> {
234	let db = state.db_client.as_ref().ok_or_else(|| {
235		(
236			StatusCode::BAD_REQUEST,
237			Json(ErrorResponse {
238				error: "database not configured".into(),
239			}),
240		)
241	})?;
242	let lifecycle = state.lifecycle.as_ref().ok_or_else(|| {
243		(
244			StatusCode::BAD_REQUEST,
245			Json(ErrorResponse {
246				error: "config import unavailable".into(),
247			}),
248		)
249	})?;
250
251	let warnings = lifecycle
252		.import_config(db, req.format, &req.content)
253		.await
254		.map_err(|e| {
255			(
256				StatusCode::BAD_REQUEST,
257				Json(ErrorResponse {
258					error: format!("config import: {e}"),
259				}),
260			)
261		})?;
262
263	crate::mutations::refresh_read_cache(&state)
264		.await
265		.map_err(|e| {
266			(
267				StatusCode::BAD_REQUEST,
268				Json(ErrorResponse {
269					error: format!("refresh cache: {e}"),
270				}),
271			)
272		})?;
273
274	Ok(Json(ImportConfigResponse {
275		ok: true,
276		message: "config imported".into(),
277		warnings,
278	}))
279}