1use chrono::Utc;
8use rusqlite::params;
9use rusqlite::OptionalExtension;
10
11use crate::application::sdk::SyncReport;
12use crate::application::task::{TaskId, TaskStatus};
13use crate::infra::error::InfraError;
14
15use super::{map_call_err, SqliteSyncStore};
16
17impl SqliteSyncStore {
18 pub async fn insert_task(&self, id: &TaskId) -> Result<(), InfraError> {
20 let id_str = id.as_str().to_string();
21 let now = Utc::now().to_rfc3339();
22 self.conn
23 .call(move |conn| {
24 conn.execute(
25 "INSERT OR REPLACE INTO sync_tasks \
26 (task_id, status, phase, created_at, updated_at) \
27 VALUES (?1, 'pending', '', ?2, ?2)",
28 params![id_str, now],
29 )
30 .map(|_| ())
31 .map_err(|e| InfraError::Store {
32 op: "insert_task",
33 reason: format!("{e}"),
34 })
35 })
36 .await
37 .map_err(map_call_err)
38 }
39
40 pub async fn update_task_running(&self, id: &TaskId, phase: &str) -> Result<(), InfraError> {
42 let id_str = id.as_str().to_string();
43 let phase = phase.to_string();
44 let now = Utc::now().to_rfc3339();
45 self.conn
46 .call(move |conn| {
47 conn.execute(
48 "UPDATE sync_tasks SET status = 'running', phase = ?1, updated_at = ?2 \
49 WHERE task_id = ?3",
50 params![phase, now, id_str],
51 )
52 .map(|_| ())
53 .map_err(|e| InfraError::Store {
54 op: "update_task_running",
55 reason: format!("{e}"),
56 })
57 })
58 .await
59 .map_err(map_call_err)
60 }
61
62 pub async fn update_task_completed(
64 &self,
65 id: &TaskId,
66 report: &SyncReport,
67 ) -> Result<(), InfraError> {
68 let id_str = id.as_str().to_string();
69 let json = serde_json::to_string(report).map_err(|e| InfraError::Store {
70 op: "update_task_completed",
71 reason: format!("serialize SyncReport: {e}"),
72 })?;
73 let now = Utc::now().to_rfc3339();
74 self.conn
75 .call(move |conn| {
76 conn.execute(
77 "UPDATE sync_tasks SET status = 'completed', result_json = ?1, updated_at = ?2 \
78 WHERE task_id = ?3",
79 params![json, now, id_str],
80 )
81 .map(|_| ())
82 .map_err(|e| InfraError::Store {
83 op: "update_task_completed",
84 reason: format!("{e}"),
85 })
86 })
87 .await
88 .map_err(map_call_err)
89 }
90
91 pub async fn update_task_failed(&self, id: &TaskId, error: &str) -> Result<(), InfraError> {
93 let id_str = id.as_str().to_string();
94 let error = error.to_string();
95 let now = Utc::now().to_rfc3339();
96 self.conn
97 .call(move |conn| {
98 conn.execute(
99 "UPDATE sync_tasks SET status = 'failed', error = ?1, updated_at = ?2 \
100 WHERE task_id = ?3",
101 params![error, now, id_str],
102 )
103 .map(|_| ())
104 .map_err(|e| InfraError::Store {
105 op: "update_task_failed",
106 reason: format!("{e}"),
107 })
108 })
109 .await
110 .map_err(map_call_err)
111 }
112
113 pub async fn load_task(
115 &self,
116 id: &TaskId,
117 ) -> Result<Option<TaskStatus<SyncReport>>, InfraError> {
118 let id_str = id.as_str().to_string();
119 self.conn
120 .call(move |conn| {
121 let mut stmt = conn
122 .prepare(
123 "SELECT status, phase, result_json, error \
124 FROM sync_tasks WHERE task_id = ?1",
125 )
126 .map_err(|e| InfraError::Store {
127 op: "load_task",
128 reason: format!("{e}"),
129 })?;
130
131 let result = stmt
132 .query_row(params![id_str], |row| {
133 let status: String = row.get(0)?;
134 let phase: String = row.get(1)?;
135 let result_json: Option<String> = row.get(2)?;
136 let error: Option<String> = row.get(3)?;
137 Ok((status, phase, result_json, error))
138 })
139 .optional()
140 .map_err(|e| InfraError::Store {
141 op: "load_task",
142 reason: format!("{e}"),
143 })?;
144
145 match result {
146 None => Ok(None),
147 Some((status, phase, result_json, error)) => {
148 let task_status = match status.as_str() {
149 "pending" => TaskStatus::Pending,
150 "running" => TaskStatus::Running(phase),
151 "completed" => {
152 let report: SyncReport = result_json
153 .as_deref()
154 .map(serde_json::from_str)
155 .transpose()
156 .map_err(|e| InfraError::Store {
157 op: "load_task",
158 reason: format!("deserialize SyncReport: {e}"),
159 })?
160 .unwrap_or_default();
161 TaskStatus::Completed(report)
162 }
163 "failed" => TaskStatus::Failed(error.unwrap_or_default()),
164 other => {
165 return Err(InfraError::Store {
166 op: "load_task",
167 reason: format!("unknown status: {other}"),
168 });
169 }
170 };
171 Ok(Some(task_status))
172 }
173 }
174 })
175 .await
176 .map_err(map_call_err)
177 }
178
179 pub async fn recover_stale_running(&self) -> Result<usize, InfraError> {
184 let now = Utc::now().to_rfc3339();
185 self.conn
186 .call(move |conn| {
187 let count = conn
188 .execute(
189 "UPDATE sync_tasks SET status = 'failed', \
190 error = 'session terminated while task was running', \
191 updated_at = ?1 \
192 WHERE status = 'running'",
193 params![now],
194 )
195 .map_err(|e| InfraError::Store {
196 op: "recover_stale_running",
197 reason: format!("{e}"),
198 })?;
199 Ok(count)
200 })
201 .await
202 .map_err(map_call_err)
203 }
204}