Skip to main content

mxr_store/
sync_runtime_status.rs

1use chrono::{DateTime, Utc};
2use mxr_core::AccountId;
3use sqlx::Row;
4
5#[derive(Debug, Clone, PartialEq, Eq)]
6pub struct SyncRuntimeStatus {
7    pub account_id: AccountId,
8    pub last_attempt_at: Option<DateTime<Utc>>,
9    pub last_success_at: Option<DateTime<Utc>>,
10    pub last_error: Option<String>,
11    pub failure_class: Option<String>,
12    pub consecutive_failures: u32,
13    pub backoff_until: Option<DateTime<Utc>>,
14    pub sync_in_progress: bool,
15    pub current_cursor_summary: Option<String>,
16    pub last_synced_count: u32,
17    pub updated_at: DateTime<Utc>,
18}
19
20#[derive(Debug, Clone, Default)]
21pub struct SyncRuntimeStatusUpdate {
22    pub last_attempt_at: Option<DateTime<Utc>>,
23    pub last_success_at: Option<DateTime<Utc>>,
24    pub last_error: Option<Option<String>>,
25    pub failure_class: Option<Option<String>>,
26    pub consecutive_failures: Option<u32>,
27    pub backoff_until: Option<Option<DateTime<Utc>>>,
28    pub sync_in_progress: Option<bool>,
29    pub current_cursor_summary: Option<Option<String>>,
30    pub last_synced_count: Option<u32>,
31}
32
33fn ts_or_default(ts: Option<i64>) -> Option<DateTime<Utc>> {
34    ts.and_then(|value| DateTime::from_timestamp(value, 0))
35}
36
37impl super::Store {
38    pub async fn upsert_sync_runtime_status(
39        &self,
40        account_id: &AccountId,
41        update: &SyncRuntimeStatusUpdate,
42    ) -> Result<(), sqlx::Error> {
43        let existing = self.get_sync_runtime_status(account_id).await?;
44        let now = Utc::now();
45        let merged = SyncRuntimeStatus {
46            account_id: account_id.clone(),
47            last_attempt_at: update
48                .last_attempt_at
49                .or(existing.as_ref().and_then(|row| row.last_attempt_at)),
50            last_success_at: update
51                .last_success_at
52                .or(existing.as_ref().and_then(|row| row.last_success_at)),
53            last_error: update
54                .last_error
55                .clone()
56                .unwrap_or_else(|| existing.as_ref().and_then(|row| row.last_error.clone())),
57            failure_class: update
58                .failure_class
59                .clone()
60                .unwrap_or_else(|| existing.as_ref().and_then(|row| row.failure_class.clone())),
61            consecutive_failures: update.consecutive_failures.unwrap_or_else(|| {
62                existing
63                    .as_ref()
64                    .map(|row| row.consecutive_failures)
65                    .unwrap_or(0)
66            }),
67            backoff_until: update
68                .backoff_until
69                .unwrap_or_else(|| existing.as_ref().and_then(|row| row.backoff_until)),
70            sync_in_progress: update.sync_in_progress.unwrap_or_else(|| {
71                existing
72                    .as_ref()
73                    .map(|row| row.sync_in_progress)
74                    .unwrap_or(false)
75            }),
76            current_cursor_summary: update.current_cursor_summary.clone().unwrap_or_else(|| {
77                existing
78                    .as_ref()
79                    .and_then(|row| row.current_cursor_summary.clone())
80            }),
81            last_synced_count: update.last_synced_count.unwrap_or_else(|| {
82                existing
83                    .as_ref()
84                    .map(|row| row.last_synced_count)
85                    .unwrap_or(0)
86            }),
87            updated_at: now,
88        };
89
90        sqlx::query(
91            r#"
92            INSERT INTO sync_runtime_status (
93                account_id,
94                last_attempt_at,
95                last_success_at,
96                last_error,
97                failure_class,
98                consecutive_failures,
99                backoff_until,
100                sync_in_progress,
101                current_cursor_summary,
102                last_synced_count,
103                updated_at
104            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
105            ON CONFLICT(account_id) DO UPDATE SET
106                last_attempt_at = excluded.last_attempt_at,
107                last_success_at = excluded.last_success_at,
108                last_error = excluded.last_error,
109                failure_class = excluded.failure_class,
110                consecutive_failures = excluded.consecutive_failures,
111                backoff_until = excluded.backoff_until,
112                sync_in_progress = excluded.sync_in_progress,
113                current_cursor_summary = excluded.current_cursor_summary,
114                last_synced_count = excluded.last_synced_count,
115                updated_at = excluded.updated_at
116            "#,
117        )
118        .bind(account_id.as_str())
119        .bind(merged.last_attempt_at.map(|dt| dt.timestamp()))
120        .bind(merged.last_success_at.map(|dt| dt.timestamp()))
121        .bind(merged.last_error)
122        .bind(merged.failure_class)
123        .bind(merged.consecutive_failures as i64)
124        .bind(merged.backoff_until.map(|dt| dt.timestamp()))
125        .bind(merged.sync_in_progress)
126        .bind(merged.current_cursor_summary)
127        .bind(merged.last_synced_count as i64)
128        .bind(merged.updated_at.timestamp())
129        .execute(self.writer())
130        .await?;
131
132        Ok(())
133    }
134
135    pub async fn get_sync_runtime_status(
136        &self,
137        account_id: &AccountId,
138    ) -> Result<Option<SyncRuntimeStatus>, sqlx::Error> {
139        let row = sqlx::query(
140            r#"
141            SELECT
142                account_id as "account_id!",
143                last_attempt_at,
144                last_success_at,
145                last_error,
146                failure_class,
147                consecutive_failures as "consecutive_failures!",
148                backoff_until,
149                sync_in_progress as "sync_in_progress!: bool",
150                current_cursor_summary,
151                last_synced_count as "last_synced_count!",
152                updated_at as "updated_at!"
153            FROM sync_runtime_status
154            WHERE account_id = ?
155            "#,
156        )
157        .bind(account_id.as_str())
158        .fetch_optional(self.reader())
159        .await?;
160
161        Ok(row.map(|row| SyncRuntimeStatus {
162            account_id: AccountId::from_uuid(
163                uuid::Uuid::parse_str(&row.get::<String, _>("account_id")).unwrap(),
164            ),
165            last_attempt_at: ts_or_default(row.get("last_attempt_at")),
166            last_success_at: ts_or_default(row.get("last_success_at")),
167            last_error: row.get("last_error"),
168            failure_class: row.get("failure_class"),
169            consecutive_failures: row.get::<i64, _>("consecutive_failures") as u32,
170            backoff_until: ts_or_default(row.get("backoff_until")),
171            sync_in_progress: row.get("sync_in_progress"),
172            current_cursor_summary: row.get("current_cursor_summary"),
173            last_synced_count: row.get::<i64, _>("last_synced_count") as u32,
174            updated_at: DateTime::from_timestamp(row.get("updated_at"), 0).unwrap_or_default(),
175        }))
176    }
177
178    pub async fn list_sync_runtime_statuses(&self) -> Result<Vec<SyncRuntimeStatus>, sqlx::Error> {
179        let rows = sqlx::query(
180            r#"
181            SELECT
182                account_id as "account_id!",
183                last_attempt_at,
184                last_success_at,
185                last_error,
186                failure_class,
187                consecutive_failures as "consecutive_failures!",
188                backoff_until,
189                sync_in_progress as "sync_in_progress!: bool",
190                current_cursor_summary,
191                last_synced_count as "last_synced_count!",
192                updated_at as "updated_at!"
193            FROM sync_runtime_status
194            ORDER BY updated_at DESC, account_id ASC
195            "#,
196        )
197        .fetch_all(self.reader())
198        .await?;
199
200        Ok(rows
201            .into_iter()
202            .map(|row| SyncRuntimeStatus {
203                account_id: AccountId::from_uuid(
204                    uuid::Uuid::parse_str(&row.get::<String, _>("account_id")).unwrap(),
205                ),
206                last_attempt_at: ts_or_default(row.get("last_attempt_at")),
207                last_success_at: ts_or_default(row.get("last_success_at")),
208                last_error: row.get("last_error"),
209                failure_class: row.get("failure_class"),
210                consecutive_failures: row.get::<i64, _>("consecutive_failures") as u32,
211                backoff_until: ts_or_default(row.get("backoff_until")),
212                sync_in_progress: row.get("sync_in_progress"),
213                current_cursor_summary: row.get("current_cursor_summary"),
214                last_synced_count: row.get::<i64, _>("last_synced_count") as u32,
215                updated_at: DateTime::from_timestamp(row.get("updated_at"), 0).unwrap_or_default(),
216            })
217            .collect())
218    }
219}