Skip to main content

mxr_store/
sync_runtime_status.rs

1use chrono::{DateTime, Utc};
2use mxr_core::AccountId;
3use sqlx::Row;
4
5#[derive(Debug, Clone, PartialEq, Eq)]
6pub struct SyncRuntimeStatus {
7    pub account_id: AccountId,
8    pub last_attempt_at: Option<DateTime<Utc>>,
9    pub last_success_at: Option<DateTime<Utc>>,
10    pub last_error: Option<String>,
11    pub failure_class: Option<String>,
12    pub consecutive_failures: u32,
13    pub backoff_until: Option<DateTime<Utc>>,
14    pub sync_in_progress: bool,
15    pub current_cursor_summary: Option<String>,
16    pub last_synced_count: u32,
17    pub updated_at: DateTime<Utc>,
18}
19
20#[derive(Debug, Clone, Default)]
21pub struct SyncRuntimeStatusUpdate {
22    pub last_attempt_at: Option<DateTime<Utc>>,
23    pub last_success_at: Option<DateTime<Utc>>,
24    pub last_error: Option<Option<String>>,
25    pub failure_class: Option<Option<String>>,
26    pub consecutive_failures: Option<u32>,
27    pub backoff_until: Option<Option<DateTime<Utc>>>,
28    pub sync_in_progress: Option<bool>,
29    pub current_cursor_summary: Option<Option<String>>,
30    pub last_synced_count: Option<u32>,
31}
32
33fn ts_or_default(ts: Option<i64>) -> Option<DateTime<Utc>> {
34    ts.and_then(|value| DateTime::from_timestamp(value, 0))
35}
36
37fn row_to_sync_runtime_status(row: &sqlx::sqlite::SqliteRow) -> SyncRuntimeStatus {
38    SyncRuntimeStatus {
39        account_id: AccountId::from_uuid(
40            uuid::Uuid::parse_str(&row.get::<String, _>(0)).expect("valid account uuid"),
41        ),
42        last_attempt_at: ts_or_default(row.get(1)),
43        last_success_at: ts_or_default(row.get(2)),
44        last_error: row.get(3),
45        failure_class: row.get(4),
46        consecutive_failures: row.get::<i64, _>(5) as u32,
47        backoff_until: ts_or_default(row.get(6)),
48        sync_in_progress: row.get::<i64, _>(7) != 0,
49        current_cursor_summary: row.get(8),
50        last_synced_count: row.get::<i64, _>(9) as u32,
51        updated_at: DateTime::from_timestamp(row.get(10), 0).unwrap_or_default(),
52    }
53}
54
55impl super::Store {
56    pub async fn upsert_sync_runtime_status(
57        &self,
58        account_id: &AccountId,
59        update: &SyncRuntimeStatusUpdate,
60    ) -> Result<(), sqlx::Error> {
61        let existing = self.get_sync_runtime_status(account_id).await?;
62        let now = Utc::now();
63        let merged = SyncRuntimeStatus {
64            account_id: account_id.clone(),
65            last_attempt_at: update
66                .last_attempt_at
67                .or(existing.as_ref().and_then(|row| row.last_attempt_at)),
68            last_success_at: update
69                .last_success_at
70                .or(existing.as_ref().and_then(|row| row.last_success_at)),
71            last_error: update
72                .last_error
73                .clone()
74                .unwrap_or_else(|| existing.as_ref().and_then(|row| row.last_error.clone())),
75            failure_class: update
76                .failure_class
77                .clone()
78                .unwrap_or_else(|| existing.as_ref().and_then(|row| row.failure_class.clone())),
79            consecutive_failures: update.consecutive_failures.unwrap_or_else(|| {
80                existing
81                    .as_ref()
82                    .map(|row| row.consecutive_failures)
83                    .unwrap_or(0)
84            }),
85            backoff_until: update
86                .backoff_until
87                .unwrap_or_else(|| existing.as_ref().and_then(|row| row.backoff_until)),
88            sync_in_progress: update.sync_in_progress.unwrap_or_else(|| {
89                existing
90                    .as_ref()
91                    .map(|row| row.sync_in_progress)
92                    .unwrap_or(false)
93            }),
94            current_cursor_summary: update.current_cursor_summary.clone().unwrap_or_else(|| {
95                existing
96                    .as_ref()
97                    .and_then(|row| row.current_cursor_summary.clone())
98            }),
99            last_synced_count: update.last_synced_count.unwrap_or_else(|| {
100                existing
101                    .as_ref()
102                    .map(|row| row.last_synced_count)
103                    .unwrap_or(0)
104            }),
105            updated_at: now,
106        };
107
108        sqlx::query(
109            r#"
110            INSERT INTO sync_runtime_status (
111                account_id,
112                last_attempt_at,
113                last_success_at,
114                last_error,
115                failure_class,
116                consecutive_failures,
117                backoff_until,
118                sync_in_progress,
119                current_cursor_summary,
120                last_synced_count,
121                updated_at
122            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
123            ON CONFLICT(account_id) DO UPDATE SET
124                last_attempt_at = excluded.last_attempt_at,
125                last_success_at = excluded.last_success_at,
126                last_error = excluded.last_error,
127                failure_class = excluded.failure_class,
128                consecutive_failures = excluded.consecutive_failures,
129                backoff_until = excluded.backoff_until,
130                sync_in_progress = excluded.sync_in_progress,
131                current_cursor_summary = excluded.current_cursor_summary,
132                last_synced_count = excluded.last_synced_count,
133                updated_at = excluded.updated_at
134            "#,
135        )
136        .bind(account_id.as_str())
137        .bind(merged.last_attempt_at.map(|dt| dt.timestamp()))
138        .bind(merged.last_success_at.map(|dt| dt.timestamp()))
139        .bind(merged.last_error)
140        .bind(merged.failure_class)
141        .bind(merged.consecutive_failures as i64)
142        .bind(merged.backoff_until.map(|dt| dt.timestamp()))
143        .bind(merged.sync_in_progress)
144        .bind(merged.current_cursor_summary)
145        .bind(merged.last_synced_count as i64)
146        .bind(merged.updated_at.timestamp())
147        .execute(self.writer())
148        .await?;
149
150        Ok(())
151    }
152
153    pub async fn get_sync_runtime_status(
154        &self,
155        account_id: &AccountId,
156    ) -> Result<Option<SyncRuntimeStatus>, sqlx::Error> {
157        let row = sqlx::query(
158            r#"
159            SELECT
160                account_id,
161                last_attempt_at,
162                last_success_at,
163                last_error,
164                failure_class,
165                consecutive_failures,
166                backoff_until,
167                sync_in_progress,
168                current_cursor_summary,
169                last_synced_count,
170                updated_at
171            FROM sync_runtime_status
172            WHERE account_id = ?
173            "#,
174        )
175        .bind(account_id.as_str())
176        .fetch_optional(self.reader())
177        .await?;
178
179        Ok(row.map(|row| row_to_sync_runtime_status(&row)))
180    }
181
182    pub async fn list_sync_runtime_statuses(&self) -> Result<Vec<SyncRuntimeStatus>, sqlx::Error> {
183        let rows = sqlx::query(
184            r#"
185            SELECT
186                account_id,
187                last_attempt_at,
188                last_success_at,
189                last_error,
190                failure_class,
191                consecutive_failures,
192                backoff_until,
193                sync_in_progress,
194                current_cursor_summary,
195                last_synced_count,
196                updated_at
197            FROM sync_runtime_status
198            ORDER BY updated_at DESC, account_id ASC
199            "#,
200        )
201        .fetch_all(self.reader())
202        .await?;
203
204        Ok(rows
205            .into_iter()
206            .map(|row| row_to_sync_runtime_status(&row))
207            .collect())
208    }
209}