Skip to main content

vdsl_sync/infra/sqlite/
task_store_impl.rs

1//! SQLite persistence for sync task status.
2//!
3//! Persists `TaskStatus<SyncReport>` so that poll() survives
4//! MCP server session restarts. On startup, `recover_stale_running()`
5//! marks any `running` tasks as `failed` (process crashed).
6
7use chrono::Utc;
8use rusqlite::params;
9use rusqlite::OptionalExtension;
10
11use crate::application::sdk::SyncReport;
12use crate::application::task::{TaskId, TaskStatus};
13use crate::infra::error::InfraError;
14
15use super::{map_call_err, SqliteSyncStore};
16
17impl SqliteSyncStore {
18    /// Insert a new task as Pending.
19    pub async fn insert_task(&self, id: &TaskId) -> Result<(), InfraError> {
20        let id_str = id.as_str().to_string();
21        let now = Utc::now().to_rfc3339();
22        self.conn
23            .call(move |conn| {
24                conn.execute(
25                    "INSERT OR REPLACE INTO sync_tasks \
26                     (task_id, status, phase, created_at, updated_at) \
27                     VALUES (?1, 'pending', '', ?2, ?2)",
28                    params![id_str, now],
29                )
30                .map(|_| ())
31                .map_err(|e| InfraError::Store {
32                    op: "insert_task",
33                    reason: format!("{e}"),
34                })
35            })
36            .await
37            .map_err(map_call_err)
38    }
39
40    /// Update task status to Running with a phase description.
41    pub async fn update_task_running(&self, id: &TaskId, phase: &str) -> Result<(), InfraError> {
42        let id_str = id.as_str().to_string();
43        let phase = phase.to_string();
44        let now = Utc::now().to_rfc3339();
45        self.conn
46            .call(move |conn| {
47                conn.execute(
48                    "UPDATE sync_tasks SET status = 'running', phase = ?1, updated_at = ?2 \
49                     WHERE task_id = ?3",
50                    params![phase, now, id_str],
51                )
52                .map(|_| ())
53                .map_err(|e| InfraError::Store {
54                    op: "update_task_running",
55                    reason: format!("{e}"),
56                })
57            })
58            .await
59            .map_err(map_call_err)
60    }
61
62    /// Update task status to Completed with a serialized SyncReport.
63    pub async fn update_task_completed(
64        &self,
65        id: &TaskId,
66        report: &SyncReport,
67    ) -> Result<(), InfraError> {
68        let id_str = id.as_str().to_string();
69        let json = serde_json::to_string(report).map_err(|e| InfraError::Store {
70            op: "update_task_completed",
71            reason: format!("serialize SyncReport: {e}"),
72        })?;
73        let now = Utc::now().to_rfc3339();
74        self.conn
75            .call(move |conn| {
76                conn.execute(
77                    "UPDATE sync_tasks SET status = 'completed', result_json = ?1, updated_at = ?2 \
78                     WHERE task_id = ?3",
79                    params![json, now, id_str],
80                )
81                .map(|_| ())
82                .map_err(|e| InfraError::Store {
83                    op: "update_task_completed",
84                    reason: format!("{e}"),
85                })
86            })
87            .await
88            .map_err(map_call_err)
89    }
90
91    /// Update task status to Failed with an error message.
92    pub async fn update_task_failed(&self, id: &TaskId, error: &str) -> Result<(), InfraError> {
93        let id_str = id.as_str().to_string();
94        let error = error.to_string();
95        let now = Utc::now().to_rfc3339();
96        self.conn
97            .call(move |conn| {
98                conn.execute(
99                    "UPDATE sync_tasks SET status = 'failed', error = ?1, updated_at = ?2 \
100                     WHERE task_id = ?3",
101                    params![error, now, id_str],
102                )
103                .map(|_| ())
104                .map_err(|e| InfraError::Store {
105                    op: "update_task_failed",
106                    reason: format!("{e}"),
107                })
108            })
109            .await
110            .map_err(map_call_err)
111    }
112
113    /// Load task status from DB. Returns None if task_id is unknown.
114    pub async fn load_task(
115        &self,
116        id: &TaskId,
117    ) -> Result<Option<TaskStatus<SyncReport>>, InfraError> {
118        let id_str = id.as_str().to_string();
119        self.conn
120            .call(move |conn| {
121                let mut stmt = conn
122                    .prepare(
123                        "SELECT status, phase, result_json, error \
124                         FROM sync_tasks WHERE task_id = ?1",
125                    )
126                    .map_err(|e| InfraError::Store {
127                        op: "load_task",
128                        reason: format!("{e}"),
129                    })?;
130
131                let result = stmt
132                    .query_row(params![id_str], |row| {
133                        let status: String = row.get(0)?;
134                        let phase: String = row.get(1)?;
135                        let result_json: Option<String> = row.get(2)?;
136                        let error: Option<String> = row.get(3)?;
137                        Ok((status, phase, result_json, error))
138                    })
139                    .optional()
140                    .map_err(|e| InfraError::Store {
141                        op: "load_task",
142                        reason: format!("{e}"),
143                    })?;
144
145                match result {
146                    None => Ok(None),
147                    Some((status, phase, result_json, error)) => {
148                        let task_status = match status.as_str() {
149                            "pending" => TaskStatus::Pending,
150                            "running" => TaskStatus::Running(phase),
151                            "completed" => {
152                                let report: SyncReport = result_json
153                                    .as_deref()
154                                    .map(serde_json::from_str)
155                                    .transpose()
156                                    .map_err(|e| InfraError::Store {
157                                        op: "load_task",
158                                        reason: format!("deserialize SyncReport: {e}"),
159                                    })?
160                                    .unwrap_or_default();
161                                TaskStatus::Completed(report)
162                            }
163                            "failed" => TaskStatus::Failed(error.unwrap_or_default()),
164                            other => {
165                                return Err(InfraError::Store {
166                                    op: "load_task",
167                                    reason: format!("unknown status: {other}"),
168                                });
169                            }
170                        };
171                        Ok(Some(task_status))
172                    }
173                }
174            })
175            .await
176            .map_err(map_call_err)
177    }
178
179    /// On startup, mark all `running` tasks as `failed`.
180    ///
181    /// If a task was `running` when the process terminated, it will never
182    /// reach Completed/Failed. This recovers those zombie tasks.
183    pub async fn recover_stale_running(&self) -> Result<usize, InfraError> {
184        let now = Utc::now().to_rfc3339();
185        self.conn
186            .call(move |conn| {
187                let count = conn
188                    .execute(
189                        "UPDATE sync_tasks SET status = 'failed', \
190                         error = 'session terminated while task was running', \
191                         updated_at = ?1 \
192                         WHERE status = 'running'",
193                        params![now],
194                    )
195                    .map_err(|e| InfraError::Store {
196                        op: "recover_stale_running",
197                        reason: format!("{e}"),
198                    })?;
199                Ok(count)
200            })
201            .await
202            .map_err(map_call_err)
203    }
204}