1use chrono::{DateTime, Utc};
2use mxr_core::AccountId;
3use sqlx::Row;
4
5#[derive(Debug, Clone, PartialEq, Eq)]
6pub struct SyncRuntimeStatus {
7 pub account_id: AccountId,
8 pub last_attempt_at: Option<DateTime<Utc>>,
9 pub last_success_at: Option<DateTime<Utc>>,
10 pub last_error: Option<String>,
11 pub failure_class: Option<String>,
12 pub consecutive_failures: u32,
13 pub backoff_until: Option<DateTime<Utc>>,
14 pub sync_in_progress: bool,
15 pub current_cursor_summary: Option<String>,
16 pub last_synced_count: u32,
17 pub updated_at: DateTime<Utc>,
18}
19
20#[derive(Debug, Clone, Default)]
21pub struct SyncRuntimeStatusUpdate {
22 pub last_attempt_at: Option<DateTime<Utc>>,
23 pub last_success_at: Option<DateTime<Utc>>,
24 pub last_error: Option<Option<String>>,
25 pub failure_class: Option<Option<String>>,
26 pub consecutive_failures: Option<u32>,
27 pub backoff_until: Option<Option<DateTime<Utc>>>,
28 pub sync_in_progress: Option<bool>,
29 pub current_cursor_summary: Option<Option<String>>,
30 pub last_synced_count: Option<u32>,
31}
32
33fn ts_or_default(ts: Option<i64>) -> Option<DateTime<Utc>> {
34 ts.and_then(|value| DateTime::from_timestamp(value, 0))
35}
36
37impl super::Store {
38 pub async fn upsert_sync_runtime_status(
39 &self,
40 account_id: &AccountId,
41 update: &SyncRuntimeStatusUpdate,
42 ) -> Result<(), sqlx::Error> {
43 let existing = self.get_sync_runtime_status(account_id).await?;
44 let now = Utc::now();
45 let merged = SyncRuntimeStatus {
46 account_id: account_id.clone(),
47 last_attempt_at: update
48 .last_attempt_at
49 .or(existing.as_ref().and_then(|row| row.last_attempt_at)),
50 last_success_at: update
51 .last_success_at
52 .or(existing.as_ref().and_then(|row| row.last_success_at)),
53 last_error: update
54 .last_error
55 .clone()
56 .unwrap_or_else(|| existing.as_ref().and_then(|row| row.last_error.clone())),
57 failure_class: update
58 .failure_class
59 .clone()
60 .unwrap_or_else(|| existing.as_ref().and_then(|row| row.failure_class.clone())),
61 consecutive_failures: update.consecutive_failures.unwrap_or_else(|| {
62 existing
63 .as_ref()
64 .map(|row| row.consecutive_failures)
65 .unwrap_or(0)
66 }),
67 backoff_until: update
68 .backoff_until
69 .unwrap_or_else(|| existing.as_ref().and_then(|row| row.backoff_until)),
70 sync_in_progress: update.sync_in_progress.unwrap_or_else(|| {
71 existing
72 .as_ref()
73 .map(|row| row.sync_in_progress)
74 .unwrap_or(false)
75 }),
76 current_cursor_summary: update.current_cursor_summary.clone().unwrap_or_else(|| {
77 existing
78 .as_ref()
79 .and_then(|row| row.current_cursor_summary.clone())
80 }),
81 last_synced_count: update.last_synced_count.unwrap_or_else(|| {
82 existing
83 .as_ref()
84 .map(|row| row.last_synced_count)
85 .unwrap_or(0)
86 }),
87 updated_at: now,
88 };
89
90 sqlx::query(
91 r#"
92 INSERT INTO sync_runtime_status (
93 account_id,
94 last_attempt_at,
95 last_success_at,
96 last_error,
97 failure_class,
98 consecutive_failures,
99 backoff_until,
100 sync_in_progress,
101 current_cursor_summary,
102 last_synced_count,
103 updated_at
104 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
105 ON CONFLICT(account_id) DO UPDATE SET
106 last_attempt_at = excluded.last_attempt_at,
107 last_success_at = excluded.last_success_at,
108 last_error = excluded.last_error,
109 failure_class = excluded.failure_class,
110 consecutive_failures = excluded.consecutive_failures,
111 backoff_until = excluded.backoff_until,
112 sync_in_progress = excluded.sync_in_progress,
113 current_cursor_summary = excluded.current_cursor_summary,
114 last_synced_count = excluded.last_synced_count,
115 updated_at = excluded.updated_at
116 "#,
117 )
118 .bind(account_id.as_str())
119 .bind(merged.last_attempt_at.map(|dt| dt.timestamp()))
120 .bind(merged.last_success_at.map(|dt| dt.timestamp()))
121 .bind(merged.last_error)
122 .bind(merged.failure_class)
123 .bind(merged.consecutive_failures as i64)
124 .bind(merged.backoff_until.map(|dt| dt.timestamp()))
125 .bind(merged.sync_in_progress)
126 .bind(merged.current_cursor_summary)
127 .bind(merged.last_synced_count as i64)
128 .bind(merged.updated_at.timestamp())
129 .execute(self.writer())
130 .await?;
131
132 Ok(())
133 }
134
135 pub async fn get_sync_runtime_status(
136 &self,
137 account_id: &AccountId,
138 ) -> Result<Option<SyncRuntimeStatus>, sqlx::Error> {
139 let row = sqlx::query(
140 r#"
141 SELECT
142 account_id as "account_id!",
143 last_attempt_at,
144 last_success_at,
145 last_error,
146 failure_class,
147 consecutive_failures as "consecutive_failures!",
148 backoff_until,
149 sync_in_progress as "sync_in_progress!: bool",
150 current_cursor_summary,
151 last_synced_count as "last_synced_count!",
152 updated_at as "updated_at!"
153 FROM sync_runtime_status
154 WHERE account_id = ?
155 "#,
156 )
157 .bind(account_id.as_str())
158 .fetch_optional(self.reader())
159 .await?;
160
161 Ok(row.map(|row| SyncRuntimeStatus {
162 account_id: AccountId::from_uuid(
163 uuid::Uuid::parse_str(&row.get::<String, _>("account_id")).unwrap(),
164 ),
165 last_attempt_at: ts_or_default(row.get("last_attempt_at")),
166 last_success_at: ts_or_default(row.get("last_success_at")),
167 last_error: row.get("last_error"),
168 failure_class: row.get("failure_class"),
169 consecutive_failures: row.get::<i64, _>("consecutive_failures") as u32,
170 backoff_until: ts_or_default(row.get("backoff_until")),
171 sync_in_progress: row.get("sync_in_progress"),
172 current_cursor_summary: row.get("current_cursor_summary"),
173 last_synced_count: row.get::<i64, _>("last_synced_count") as u32,
174 updated_at: DateTime::from_timestamp(row.get("updated_at"), 0).unwrap_or_default(),
175 }))
176 }
177
178 pub async fn list_sync_runtime_statuses(&self) -> Result<Vec<SyncRuntimeStatus>, sqlx::Error> {
179 let rows = sqlx::query(
180 r#"
181 SELECT
182 account_id as "account_id!",
183 last_attempt_at,
184 last_success_at,
185 last_error,
186 failure_class,
187 consecutive_failures as "consecutive_failures!",
188 backoff_until,
189 sync_in_progress as "sync_in_progress!: bool",
190 current_cursor_summary,
191 last_synced_count as "last_synced_count!",
192 updated_at as "updated_at!"
193 FROM sync_runtime_status
194 ORDER BY updated_at DESC, account_id ASC
195 "#,
196 )
197 .fetch_all(self.reader())
198 .await?;
199
200 Ok(rows
201 .into_iter()
202 .map(|row| SyncRuntimeStatus {
203 account_id: AccountId::from_uuid(
204 uuid::Uuid::parse_str(&row.get::<String, _>("account_id")).unwrap(),
205 ),
206 last_attempt_at: ts_or_default(row.get("last_attempt_at")),
207 last_success_at: ts_or_default(row.get("last_success_at")),
208 last_error: row.get("last_error"),
209 failure_class: row.get("failure_class"),
210 consecutive_failures: row.get::<i64, _>("consecutive_failures") as u32,
211 backoff_until: ts_or_default(row.get("backoff_until")),
212 sync_in_progress: row.get("sync_in_progress"),
213 current_cursor_summary: row.get("current_cursor_summary"),
214 last_synced_count: row.get::<i64, _>("last_synced_count") as u32,
215 updated_at: DateTime::from_timestamp(row.get("updated_at"), 0).unwrap_or_default(),
216 })
217 .collect())
218 }
219}