1use chrono::{DateTime, Utc};
2use mxr_core::AccountId;
3use sqlx::Row;
4
5#[derive(Debug, Clone, PartialEq, Eq)]
6pub struct SyncRuntimeStatus {
7 pub account_id: AccountId,
8 pub last_attempt_at: Option<DateTime<Utc>>,
9 pub last_success_at: Option<DateTime<Utc>>,
10 pub last_error: Option<String>,
11 pub failure_class: Option<String>,
12 pub consecutive_failures: u32,
13 pub backoff_until: Option<DateTime<Utc>>,
14 pub sync_in_progress: bool,
15 pub current_cursor_summary: Option<String>,
16 pub last_synced_count: u32,
17 pub updated_at: DateTime<Utc>,
18}
19
20#[derive(Debug, Clone, Default)]
21pub struct SyncRuntimeStatusUpdate {
22 pub last_attempt_at: Option<DateTime<Utc>>,
23 pub last_success_at: Option<DateTime<Utc>>,
24 pub last_error: Option<Option<String>>,
25 pub failure_class: Option<Option<String>>,
26 pub consecutive_failures: Option<u32>,
27 pub backoff_until: Option<Option<DateTime<Utc>>>,
28 pub sync_in_progress: Option<bool>,
29 pub current_cursor_summary: Option<Option<String>>,
30 pub last_synced_count: Option<u32>,
31}
32
33fn ts_or_default(ts: Option<i64>) -> Option<DateTime<Utc>> {
34 ts.and_then(|value| DateTime::from_timestamp(value, 0))
35}
36
37fn row_to_sync_runtime_status(row: &sqlx::sqlite::SqliteRow) -> SyncRuntimeStatus {
38 SyncRuntimeStatus {
39 account_id: AccountId::from_uuid(
40 uuid::Uuid::parse_str(&row.get::<String, _>(0)).expect("valid account uuid"),
41 ),
42 last_attempt_at: ts_or_default(row.get(1)),
43 last_success_at: ts_or_default(row.get(2)),
44 last_error: row.get(3),
45 failure_class: row.get(4),
46 consecutive_failures: row.get::<i64, _>(5) as u32,
47 backoff_until: ts_or_default(row.get(6)),
48 sync_in_progress: row.get::<i64, _>(7) != 0,
49 current_cursor_summary: row.get(8),
50 last_synced_count: row.get::<i64, _>(9) as u32,
51 updated_at: DateTime::from_timestamp(row.get(10), 0).unwrap_or_default(),
52 }
53}
54
55impl super::Store {
56 pub async fn upsert_sync_runtime_status(
57 &self,
58 account_id: &AccountId,
59 update: &SyncRuntimeStatusUpdate,
60 ) -> Result<(), sqlx::Error> {
61 let existing = self.get_sync_runtime_status(account_id).await?;
62 let now = Utc::now();
63 let merged = SyncRuntimeStatus {
64 account_id: account_id.clone(),
65 last_attempt_at: update
66 .last_attempt_at
67 .or(existing.as_ref().and_then(|row| row.last_attempt_at)),
68 last_success_at: update
69 .last_success_at
70 .or(existing.as_ref().and_then(|row| row.last_success_at)),
71 last_error: update
72 .last_error
73 .clone()
74 .unwrap_or_else(|| existing.as_ref().and_then(|row| row.last_error.clone())),
75 failure_class: update
76 .failure_class
77 .clone()
78 .unwrap_or_else(|| existing.as_ref().and_then(|row| row.failure_class.clone())),
79 consecutive_failures: update.consecutive_failures.unwrap_or_else(|| {
80 existing
81 .as_ref()
82 .map(|row| row.consecutive_failures)
83 .unwrap_or(0)
84 }),
85 backoff_until: update
86 .backoff_until
87 .unwrap_or_else(|| existing.as_ref().and_then(|row| row.backoff_until)),
88 sync_in_progress: update.sync_in_progress.unwrap_or_else(|| {
89 existing
90 .as_ref()
91 .map(|row| row.sync_in_progress)
92 .unwrap_or(false)
93 }),
94 current_cursor_summary: update.current_cursor_summary.clone().unwrap_or_else(|| {
95 existing
96 .as_ref()
97 .and_then(|row| row.current_cursor_summary.clone())
98 }),
99 last_synced_count: update.last_synced_count.unwrap_or_else(|| {
100 existing
101 .as_ref()
102 .map(|row| row.last_synced_count)
103 .unwrap_or(0)
104 }),
105 updated_at: now,
106 };
107
108 sqlx::query(
109 r#"
110 INSERT INTO sync_runtime_status (
111 account_id,
112 last_attempt_at,
113 last_success_at,
114 last_error,
115 failure_class,
116 consecutive_failures,
117 backoff_until,
118 sync_in_progress,
119 current_cursor_summary,
120 last_synced_count,
121 updated_at
122 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
123 ON CONFLICT(account_id) DO UPDATE SET
124 last_attempt_at = excluded.last_attempt_at,
125 last_success_at = excluded.last_success_at,
126 last_error = excluded.last_error,
127 failure_class = excluded.failure_class,
128 consecutive_failures = excluded.consecutive_failures,
129 backoff_until = excluded.backoff_until,
130 sync_in_progress = excluded.sync_in_progress,
131 current_cursor_summary = excluded.current_cursor_summary,
132 last_synced_count = excluded.last_synced_count,
133 updated_at = excluded.updated_at
134 "#,
135 )
136 .bind(account_id.as_str())
137 .bind(merged.last_attempt_at.map(|dt| dt.timestamp()))
138 .bind(merged.last_success_at.map(|dt| dt.timestamp()))
139 .bind(merged.last_error)
140 .bind(merged.failure_class)
141 .bind(merged.consecutive_failures as i64)
142 .bind(merged.backoff_until.map(|dt| dt.timestamp()))
143 .bind(merged.sync_in_progress)
144 .bind(merged.current_cursor_summary)
145 .bind(merged.last_synced_count as i64)
146 .bind(merged.updated_at.timestamp())
147 .execute(self.writer())
148 .await?;
149
150 Ok(())
151 }
152
153 pub async fn get_sync_runtime_status(
154 &self,
155 account_id: &AccountId,
156 ) -> Result<Option<SyncRuntimeStatus>, sqlx::Error> {
157 let row = sqlx::query(
158 r#"
159 SELECT
160 account_id,
161 last_attempt_at,
162 last_success_at,
163 last_error,
164 failure_class,
165 consecutive_failures,
166 backoff_until,
167 sync_in_progress,
168 current_cursor_summary,
169 last_synced_count,
170 updated_at
171 FROM sync_runtime_status
172 WHERE account_id = ?
173 "#,
174 )
175 .bind(account_id.as_str())
176 .fetch_optional(self.reader())
177 .await?;
178
179 Ok(row.map(|row| row_to_sync_runtime_status(&row)))
180 }
181
182 pub async fn list_sync_runtime_statuses(&self) -> Result<Vec<SyncRuntimeStatus>, sqlx::Error> {
183 let rows = sqlx::query(
184 r#"
185 SELECT
186 account_id,
187 last_attempt_at,
188 last_success_at,
189 last_error,
190 failure_class,
191 consecutive_failures,
192 backoff_until,
193 sync_in_progress,
194 current_cursor_summary,
195 last_synced_count,
196 updated_at
197 FROM sync_runtime_status
198 ORDER BY updated_at DESC, account_id ASC
199 "#,
200 )
201 .fetch_all(self.reader())
202 .await?;
203
204 Ok(rows
205 .into_iter()
206 .map(|row| row_to_sync_runtime_status(&row))
207 .collect())
208 }
209}