oversync_api/
operations.rs1use std::sync::Arc;
2
3use axum::Json;
4use axum::extract::{Path, State};
5use chrono::{DateTime, Utc};
6use tracing::warn;
7
8use crate::state::ApiState;
9use crate::types::*;
10
11#[utoipa::path(
12 post,
13 path = "/sources/{name}/trigger",
14 params(("name" = String, Path, description = "Source name to trigger")),
15 responses(
16 (status = 200, description = "Trigger initiated", body = TriggerResponse),
17 (status = 400, description = "Bad request", body = ErrorResponse)
18 )
19)]
20pub async fn trigger_source(
21 State(state): State<Arc<ApiState>>,
22 Path(name): Path<String>,
23) -> Result<Json<TriggerResponse>, Json<ErrorResponse>> {
24 let found = state.sources.read().await.iter().any(|s| s.name == name);
25 if !found {
26 return Err(Json(ErrorResponse {
27 error: format!("source not found: {name}"),
28 }));
29 }
30
31 if let Some(ref lifecycle) = state.lifecycle
33 && let Some(ref db) = state.db_client
34 {
35 lifecycle.restart_with_config_json(db).await.map_err(|e| {
36 Json(ErrorResponse {
37 error: format!("trigger: {e}"),
38 })
39 })?;
40 }
41
42 Ok(Json(TriggerResponse {
43 source: name,
44 message: "trigger initiated".into(),
45 }))
46}
47
48#[utoipa::path(
49 post,
50 path = "/sync/pause",
51 responses(
52 (status = 200, description = "Sync paused", body = MutationResponse)
53 )
54)]
55pub async fn pause_sync(State(state): State<Arc<ApiState>>) -> Json<MutationResponse> {
56 if let Some(ref lifecycle) = state.lifecycle {
57 lifecycle.pause().await;
58 }
59 Json(MutationResponse {
60 ok: true,
61 message: "sync paused".into(),
62 })
63}
64
65#[utoipa::path(
66 post,
67 path = "/sync/resume",
68 responses(
69 (status = 200, description = "Sync resumed", body = MutationResponse),
70 (status = 400, description = "Resume failed", body = ErrorResponse)
71 )
72)]
73pub async fn resume_sync(
74 State(state): State<Arc<ApiState>>,
75) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
76 if let Some(ref lifecycle) = state.lifecycle {
77 lifecycle.resume().await.map_err(|e| {
78 Json(ErrorResponse {
79 error: format!("resume: {e}"),
80 })
81 })?;
82 }
83 Ok(Json(MutationResponse {
84 ok: true,
85 message: "sync resumed".into(),
86 }))
87}
88
89#[utoipa::path(
90 get,
91 path = "/history",
92 responses(
93 (status = 200, description = "Cycle history", body = HistoryResponse),
94 (status = 400, description = "Database not configured", body = ErrorResponse)
95 )
96)]
97pub async fn get_history(
98 State(state): State<Arc<ApiState>>,
99) -> Result<Json<HistoryResponse>, Json<ErrorResponse>> {
100 let db = state.db_client.as_ref().ok_or_else(|| {
101 Json(ErrorResponse {
102 error: "database not configured".into(),
103 })
104 })?;
105
106 const SQL_TEMPLATE: &str = oversync_queries::delta::LIST_CYCLE_HISTORY;
107
108 let pipe_names: Vec<String> = state
110 .pipes
111 .read()
112 .await
113 .iter()
114 .map(|p| p.name.clone())
115 .collect();
116 let source_names: Vec<String> = state
117 .sources
118 .read()
119 .await
120 .iter()
121 .map(|s| s.name.clone())
122 .collect();
123
124 let mut all_tables: Vec<oversync_core::TableNames> = pipe_names
125 .iter()
126 .chain(source_names.iter())
127 .map(|name| oversync_core::TableNames::for_source(name))
128 .collect();
129 all_tables.push(oversync_core::TableNames::default_shared());
130 all_tables.dedup_by(|a, b| a.cycle_log == b.cycle_log);
131
132 let mut set = tokio::task::JoinSet::new();
134 for tables in all_tables {
135 let sql = tables.resolve_sql(SQL_TEMPLATE);
136 let db = Arc::clone(db);
137 set.spawn(async move {
138 let mut resp = db.query(&sql).await.ok()?;
139 resp.take::<Vec<serde_json::Value>>(0).ok()
140 });
141 }
142
143 let mut all_rows: Vec<serde_json::Value> = Vec::new();
144 while let Some(result) = set.join_next().await {
145 if let Ok(Some(rows)) = result {
146 all_rows.extend(rows);
147 }
148 }
149
150 all_rows.sort_by(|a, b| {
152 let sa = a.get("started_at").and_then(|v| v.as_str()).unwrap_or("");
153 let sb = b.get("started_at").and_then(|v| v.as_str()).unwrap_or("");
154 sb.cmp(sa)
155 });
156 all_rows.truncate(100);
157
158 let cycles = all_rows
159 .iter()
160 .filter_map(|r| {
161 let cycle_id = r.get("cycle_id").and_then(|v| v.as_u64());
162 let status = r.get("status").and_then(|v| v.as_str());
163 if cycle_id.is_none() || status.is_none() {
164 warn!(?r, "skipping cycle_log row with missing cycle_id or status");
165 return None;
166 }
167 let started_at: DateTime<Utc> = r
168 .get("started_at")
169 .and_then(|v| v.as_str())
170 .and_then(|s| s.parse().ok())
171 .unwrap_or_default();
172 let finished_at: Option<DateTime<Utc>> = r
173 .get("finished_at")
174 .and_then(|v| v.as_str())
175 .and_then(|s| s.parse().ok());
176 let duration_ms =
177 finished_at.map(|f| (f - started_at).num_milliseconds().unsigned_abs());
178 Some(CycleInfo {
179 cycle_id: cycle_id?,
180 source: r
181 .get("origin_id")
182 .and_then(|v| v.as_str())
183 .unwrap_or("")
184 .to_string(),
185 query: r
186 .get("query_id")
187 .and_then(|v| v.as_str())
188 .unwrap_or("")
189 .to_string(),
190 status: status?.to_string(),
191 started_at,
192 finished_at,
193 rows_created: r.get("rows_created").and_then(|v| v.as_u64()).unwrap_or(0),
194 rows_updated: r.get("rows_updated").and_then(|v| v.as_u64()).unwrap_or(0),
195 rows_deleted: r.get("rows_deleted").and_then(|v| v.as_u64()).unwrap_or(0),
196 duration_ms,
197 error: r.get("error").and_then(|v| v.as_str()).map(String::from),
198 })
199 })
200 .collect();
201
202 Ok(Json(HistoryResponse { cycles }))
203}
204
205#[utoipa::path(
206 get,
207 path = "/sync/status",
208 responses(
209 (status = 200, description = "Sync status", body = StatusResponse)
210 )
211)]
212pub async fn sync_status(State(state): State<Arc<ApiState>>) -> Json<StatusResponse> {
213 let (running, paused) = match &state.lifecycle {
214 Some(lc) => (lc.is_running().await, lc.is_paused().await),
215 None => (false, false),
216 };
217 Json(StatusResponse { running, paused })
218}