Skip to main content

oversync_api/
operations.rs

1use std::sync::Arc;
2
3use axum::Json;
4use axum::extract::{Path, State};
5use tracing::warn;
6
7use crate::state::ApiState;
8use crate::types::*;
9
10#[utoipa::path(
11	post,
12	path = "/sources/{name}/trigger",
13	params(("name" = String, Path, description = "Source name to trigger")),
14	responses(
15		(status = 200, description = "Trigger initiated", body = TriggerResponse),
16		(status = 400, description = "Bad request", body = ErrorResponse)
17	)
18)]
19pub async fn trigger_source(
20	State(state): State<Arc<ApiState>>,
21	Path(name): Path<String>,
22) -> Result<Json<TriggerResponse>, Json<ErrorResponse>> {
23	let found = state.sources.read().await.iter().any(|s| s.name == name);
24	if !found {
25		return Err(Json(ErrorResponse {
26			error: format!("source not found: {name}"),
27		}));
28	}
29
30	// Trigger by restarting the lifecycle with current config
31	if let Some(ref lifecycle) = state.lifecycle
32		&& let Some(ref db) = state.db_client
33	{
34		lifecycle.restart_with_config_json(db).await.map_err(|e| {
35			Json(ErrorResponse {
36				error: format!("trigger: {e}"),
37			})
38		})?;
39	}
40
41	Ok(Json(TriggerResponse {
42		source: name,
43		message: "trigger initiated".into(),
44	}))
45}
46
47#[utoipa::path(
48	post,
49	path = "/sync/pause",
50	responses(
51		(status = 200, description = "Sync paused", body = MutationResponse)
52	)
53)]
54pub async fn pause_sync(State(state): State<Arc<ApiState>>) -> Json<MutationResponse> {
55	if let Some(ref lifecycle) = state.lifecycle {
56		lifecycle.pause().await;
57	}
58	Json(MutationResponse {
59		ok: true,
60		message: "sync paused".into(),
61	})
62}
63
64#[utoipa::path(
65	post,
66	path = "/sync/resume",
67	responses(
68		(status = 200, description = "Sync resumed", body = MutationResponse),
69		(status = 400, description = "Resume failed", body = ErrorResponse)
70	)
71)]
72pub async fn resume_sync(
73	State(state): State<Arc<ApiState>>,
74) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
75	if let Some(ref lifecycle) = state.lifecycle {
76		lifecycle.resume().await.map_err(|e| {
77			Json(ErrorResponse {
78				error: format!("resume: {e}"),
79			})
80		})?;
81	}
82	Ok(Json(MutationResponse {
83		ok: true,
84		message: "sync resumed".into(),
85	}))
86}
87
88#[utoipa::path(
89	get,
90	path = "/history",
91	responses(
92		(status = 200, description = "Cycle history", body = HistoryResponse),
93		(status = 400, description = "Database not configured", body = ErrorResponse)
94	)
95)]
96pub async fn get_history(
97	State(state): State<Arc<ApiState>>,
98) -> Result<Json<HistoryResponse>, Json<ErrorResponse>> {
99	let db = state.db_client.as_ref().ok_or_else(|| {
100		Json(ErrorResponse {
101			error: "database not configured".into(),
102		})
103	})?;
104
105	const SQL_CYCLE_HISTORY: &str = oversync_queries::delta::LIST_CYCLE_HISTORY;
106
107	let mut response = db.query(SQL_CYCLE_HISTORY).await.map_err(|e| {
108		Json(ErrorResponse {
109			error: format!("db: {e}"),
110		})
111	})?;
112
113	let rows: Vec<serde_json::Value> = response.take(0).map_err(|e| {
114		Json(ErrorResponse {
115			error: format!("db take: {e}"),
116		})
117	})?;
118
119	let cycles = rows
120		.iter()
121		.filter_map(|r| {
122			let cycle_id = r.get("cycle_id").and_then(|v| v.as_u64());
123			let status = r.get("status").and_then(|v| v.as_str());
124			if cycle_id.is_none() || status.is_none() {
125				warn!(?r, "skipping cycle_log row with missing cycle_id or status");
126				return None;
127			}
128			Some(CycleInfo {
129				cycle_id: cycle_id?,
130				status: status?.to_string(),
131				started_at: r
132					.get("started_at")
133					.and_then(|v| v.as_str())
134					.and_then(|s| s.parse().ok())
135					.unwrap_or_default(),
136				finished_at: r
137					.get("finished_at")
138					.and_then(|v| v.as_str())
139					.and_then(|s| s.parse().ok()),
140				rows_created: r.get("rows_created").and_then(|v| v.as_u64()).unwrap_or(0),
141				rows_updated: r.get("rows_updated").and_then(|v| v.as_u64()).unwrap_or(0),
142				rows_deleted: r.get("rows_deleted").and_then(|v| v.as_u64()).unwrap_or(0),
143			})
144		})
145		.collect();
146
147	Ok(Json(HistoryResponse { cycles }))
148}
149
150#[utoipa::path(
151	get,
152	path = "/sync/status",
153	responses(
154		(status = 200, description = "Sync status", body = StatusResponse)
155	)
156)]
157pub async fn sync_status(State(state): State<Arc<ApiState>>) -> Json<StatusResponse> {
158	let (running, paused) = match &state.lifecycle {
159		Some(lc) => (lc.is_running().await, lc.is_paused().await),
160		None => (false, false),
161	};
162	Json(StatusResponse { running, paused })
163}