Skip to main content

oversync_api/
operations.rs

1use std::sync::Arc;
2
3use axum::Json;
4use axum::extract::{Path, State};
5use chrono::{DateTime, Utc};
6use tracing::warn;
7
8use crate::state::ApiState;
9use crate::types::*;
10
11#[utoipa::path(
12	post,
13	path = "/sources/{name}/trigger",
14	params(("name" = String, Path, description = "Source name to trigger")),
15	responses(
16		(status = 200, description = "Trigger initiated", body = TriggerResponse),
17		(status = 400, description = "Bad request", body = ErrorResponse)
18	)
19)]
20pub async fn trigger_source(
21	State(state): State<Arc<ApiState>>,
22	Path(name): Path<String>,
23) -> Result<Json<TriggerResponse>, Json<ErrorResponse>> {
24	let found = state.sources.read().await.iter().any(|s| s.name == name);
25	if !found {
26		return Err(Json(ErrorResponse {
27			error: format!("source not found: {name}"),
28		}));
29	}
30
31	// Trigger by restarting the lifecycle with current config
32	if let Some(ref lifecycle) = state.lifecycle
33		&& let Some(ref db) = state.db_client
34	{
35		lifecycle.restart_with_config_json(db).await.map_err(|e| {
36			Json(ErrorResponse {
37				error: format!("trigger: {e}"),
38			})
39		})?;
40	}
41
42	Ok(Json(TriggerResponse {
43		source: name,
44		message: "trigger initiated".into(),
45	}))
46}
47
48#[utoipa::path(
49	post,
50	path = "/sync/pause",
51	responses(
52		(status = 200, description = "Sync paused", body = MutationResponse)
53	)
54)]
55pub async fn pause_sync(State(state): State<Arc<ApiState>>) -> Json<MutationResponse> {
56	if let Some(ref lifecycle) = state.lifecycle {
57		lifecycle.pause().await;
58	}
59	Json(MutationResponse {
60		ok: true,
61		message: "sync paused".into(),
62	})
63}
64
65#[utoipa::path(
66	post,
67	path = "/sync/resume",
68	responses(
69		(status = 200, description = "Sync resumed", body = MutationResponse),
70		(status = 400, description = "Resume failed", body = ErrorResponse)
71	)
72)]
73pub async fn resume_sync(
74	State(state): State<Arc<ApiState>>,
75) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
76	if let Some(ref lifecycle) = state.lifecycle {
77		lifecycle.resume().await.map_err(|e| {
78			Json(ErrorResponse {
79				error: format!("resume: {e}"),
80			})
81		})?;
82	}
83	Ok(Json(MutationResponse {
84		ok: true,
85		message: "sync resumed".into(),
86	}))
87}
88
89#[utoipa::path(
90	get,
91	path = "/history",
92	responses(
93		(status = 200, description = "Cycle history", body = HistoryResponse),
94		(status = 400, description = "Database not configured", body = ErrorResponse)
95	)
96)]
97pub async fn get_history(
98	State(state): State<Arc<ApiState>>,
99) -> Result<Json<HistoryResponse>, Json<ErrorResponse>> {
100	let db = state.db_client.as_ref().ok_or_else(|| {
101		Json(ErrorResponse {
102			error: "database not configured".into(),
103		})
104	})?;
105
106	const SQL_TEMPLATE: &str = oversync_queries::delta::LIST_CYCLE_HISTORY;
107
108	// Collect per-pipe table names + legacy shared table
109	let pipe_names: Vec<String> = state
110		.pipes
111		.read()
112		.await
113		.iter()
114		.map(|p| p.name.clone())
115		.collect();
116	let source_names: Vec<String> = state
117		.sources
118		.read()
119		.await
120		.iter()
121		.map(|s| s.name.clone())
122		.collect();
123
124	let mut all_tables: Vec<oversync_core::TableNames> = pipe_names
125		.iter()
126		.chain(source_names.iter())
127		.map(|name| oversync_core::TableNames::for_source(name))
128		.collect();
129	all_tables.push(oversync_core::TableNames::default_shared());
130	all_tables.dedup_by(|a, b| a.cycle_log == b.cycle_log);
131
132	// Query all per-pipe cycle_log tables concurrently
133	let mut set = tokio::task::JoinSet::new();
134	for tables in all_tables {
135		let sql = tables.resolve_sql(SQL_TEMPLATE);
136		let db = Arc::clone(db);
137		set.spawn(async move {
138			let mut resp = db.query(&sql).await.ok()?;
139			resp.take::<Vec<serde_json::Value>>(0).ok()
140		});
141	}
142
143	let mut all_rows: Vec<serde_json::Value> = Vec::new();
144	while let Some(result) = set.join_next().await {
145		if let Ok(Some(rows)) = result {
146			all_rows.extend(rows);
147		}
148	}
149
150	// Sort by started_at DESC, take 100
151	all_rows.sort_by(|a, b| {
152		let sa = a.get("started_at").and_then(|v| v.as_str()).unwrap_or("");
153		let sb = b.get("started_at").and_then(|v| v.as_str()).unwrap_or("");
154		sb.cmp(sa)
155	});
156	all_rows.truncate(100);
157
158	let cycles = all_rows
159		.iter()
160		.filter_map(|r| {
161			let cycle_id = r.get("cycle_id").and_then(|v| v.as_u64());
162			let status = r.get("status").and_then(|v| v.as_str());
163			if cycle_id.is_none() || status.is_none() {
164				warn!(?r, "skipping cycle_log row with missing cycle_id or status");
165				return None;
166			}
167			let started_at: DateTime<Utc> = r
168				.get("started_at")
169				.and_then(|v| v.as_str())
170				.and_then(|s| s.parse().ok())
171				.unwrap_or_default();
172			let finished_at: Option<DateTime<Utc>> = r
173				.get("finished_at")
174				.and_then(|v| v.as_str())
175				.and_then(|s| s.parse().ok());
176			let duration_ms =
177				finished_at.map(|f| (f - started_at).num_milliseconds().unsigned_abs());
178			Some(CycleInfo {
179				cycle_id: cycle_id?,
180				source: r
181					.get("origin_id")
182					.and_then(|v| v.as_str())
183					.unwrap_or("")
184					.to_string(),
185				query: r
186					.get("query_id")
187					.and_then(|v| v.as_str())
188					.unwrap_or("")
189					.to_string(),
190				status: status?.to_string(),
191				started_at,
192				finished_at,
193				rows_created: r.get("rows_created").and_then(|v| v.as_u64()).unwrap_or(0),
194				rows_updated: r.get("rows_updated").and_then(|v| v.as_u64()).unwrap_or(0),
195				rows_deleted: r.get("rows_deleted").and_then(|v| v.as_u64()).unwrap_or(0),
196				duration_ms,
197				error: r.get("error").and_then(|v| v.as_str()).map(String::from),
198			})
199		})
200		.collect();
201
202	Ok(Json(HistoryResponse { cycles }))
203}
204
205#[utoipa::path(
206	get,
207	path = "/sync/status",
208	responses(
209		(status = 200, description = "Sync status", body = StatusResponse)
210	)
211)]
212pub async fn sync_status(State(state): State<Arc<ApiState>>) -> Json<StatusResponse> {
213	let (running, paused) = match &state.lifecycle {
214		Some(lc) => (lc.is_running().await, lc.is_paused().await),
215		None => (false, false),
216	};
217	Json(StatusResponse { running, paused })
218}