oversync_api/
operations.rs1use std::sync::Arc;
2
3use axum::Json;
4use axum::extract::{Path, State};
5use tracing::warn;
6
7use crate::state::ApiState;
8use crate::types::*;
9
10#[utoipa::path(
11 post,
12 path = "/sources/{name}/trigger",
13 params(("name" = String, Path, description = "Source name to trigger")),
14 responses(
15 (status = 200, description = "Trigger initiated", body = TriggerResponse),
16 (status = 400, description = "Bad request", body = ErrorResponse)
17 )
18)]
19pub async fn trigger_source(
20 State(state): State<Arc<ApiState>>,
21 Path(name): Path<String>,
22) -> Result<Json<TriggerResponse>, Json<ErrorResponse>> {
23 let found = state.sources.read().await.iter().any(|s| s.name == name);
24 if !found {
25 return Err(Json(ErrorResponse {
26 error: format!("source not found: {name}"),
27 }));
28 }
29
30 if let Some(ref lifecycle) = state.lifecycle
32 && let Some(ref db) = state.db_client
33 {
34 lifecycle.restart_with_config_json(db).await.map_err(|e| {
35 Json(ErrorResponse {
36 error: format!("trigger: {e}"),
37 })
38 })?;
39 }
40
41 Ok(Json(TriggerResponse {
42 source: name,
43 message: "trigger initiated".into(),
44 }))
45}
46
47#[utoipa::path(
48 post,
49 path = "/sync/pause",
50 responses(
51 (status = 200, description = "Sync paused", body = MutationResponse)
52 )
53)]
54pub async fn pause_sync(State(state): State<Arc<ApiState>>) -> Json<MutationResponse> {
55 if let Some(ref lifecycle) = state.lifecycle {
56 lifecycle.pause().await;
57 }
58 Json(MutationResponse {
59 ok: true,
60 message: "sync paused".into(),
61 })
62}
63
64#[utoipa::path(
65 post,
66 path = "/sync/resume",
67 responses(
68 (status = 200, description = "Sync resumed", body = MutationResponse),
69 (status = 400, description = "Resume failed", body = ErrorResponse)
70 )
71)]
72pub async fn resume_sync(
73 State(state): State<Arc<ApiState>>,
74) -> Result<Json<MutationResponse>, Json<ErrorResponse>> {
75 if let Some(ref lifecycle) = state.lifecycle {
76 lifecycle.resume().await.map_err(|e| {
77 Json(ErrorResponse {
78 error: format!("resume: {e}"),
79 })
80 })?;
81 }
82 Ok(Json(MutationResponse {
83 ok: true,
84 message: "sync resumed".into(),
85 }))
86}
87
88#[utoipa::path(
89 get,
90 path = "/history",
91 responses(
92 (status = 200, description = "Cycle history", body = HistoryResponse),
93 (status = 400, description = "Database not configured", body = ErrorResponse)
94 )
95)]
96pub async fn get_history(
97 State(state): State<Arc<ApiState>>,
98) -> Result<Json<HistoryResponse>, Json<ErrorResponse>> {
99 let db = state.db_client.as_ref().ok_or_else(|| {
100 Json(ErrorResponse {
101 error: "database not configured".into(),
102 })
103 })?;
104
105 const SQL_CYCLE_HISTORY: &str = oversync_queries::delta::LIST_CYCLE_HISTORY;
106
107 let mut response = db.query(SQL_CYCLE_HISTORY).await.map_err(|e| {
108 Json(ErrorResponse {
109 error: format!("db: {e}"),
110 })
111 })?;
112
113 let rows: Vec<serde_json::Value> = response.take(0).map_err(|e| {
114 Json(ErrorResponse {
115 error: format!("db take: {e}"),
116 })
117 })?;
118
119 let cycles = rows
120 .iter()
121 .filter_map(|r| {
122 let cycle_id = r.get("cycle_id").and_then(|v| v.as_u64());
123 let status = r.get("status").and_then(|v| v.as_str());
124 if cycle_id.is_none() || status.is_none() {
125 warn!(?r, "skipping cycle_log row with missing cycle_id or status");
126 return None;
127 }
128 Some(CycleInfo {
129 cycle_id: cycle_id?,
130 status: status?.to_string(),
131 started_at: r
132 .get("started_at")
133 .and_then(|v| v.as_str())
134 .and_then(|s| s.parse().ok())
135 .unwrap_or_default(),
136 finished_at: r
137 .get("finished_at")
138 .and_then(|v| v.as_str())
139 .and_then(|s| s.parse().ok()),
140 rows_created: r.get("rows_created").and_then(|v| v.as_u64()).unwrap_or(0),
141 rows_updated: r.get("rows_updated").and_then(|v| v.as_u64()).unwrap_or(0),
142 rows_deleted: r.get("rows_deleted").and_then(|v| v.as_u64()).unwrap_or(0),
143 })
144 })
145 .collect();
146
147 Ok(Json(HistoryResponse { cycles }))
148}
149
150#[utoipa::path(
151 get,
152 path = "/sync/status",
153 responses(
154 (status = 200, description = "Sync status", body = StatusResponse)
155 )
156)]
157pub async fn sync_status(State(state): State<Arc<ApiState>>) -> Json<StatusResponse> {
158 let (running, paused) = match &state.lifecycle {
159 Some(lc) => (lc.is_running().await, lc.is_paused().await),
160 None => (false, false),
161 };
162 Json(StatusResponse { running, paused })
163}