1use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use rusqlite::params;
6
7use crate::domain::location::LocationId;
8use crate::domain::transfer::Transfer;
9use crate::infra::error::InfraError;
10use crate::infra::transfer_store::{TransferStatRow, TransferStore};
11
12use super::mapping::{query_transfers, ts_to_string};
13use super::{map_call_err, SqliteSyncStore};
14
15#[async_trait]
16impl TransferStore for SqliteSyncStore {
17 async fn insert_transfer(&self, transfer: &Transfer) -> Result<(), InfraError> {
18 let t = transfer.clone();
19 self.conn
20 .call(move |conn| {
21 let attempt_i64 = i64::from(t.attempt());
22 let created_at_str = ts_to_string(t.created_at());
23 let started_at_str = t.started_at().map(ts_to_string);
24 let finished_at_str = t.finished_at().map(ts_to_string);
25 let error_kind_str = t.error_kind().map(|k| k.to_string());
26 let depends_on_str = t.depends_on().map(|s| s.to_string());
27 conn.execute(
28 "INSERT INTO transfers (id, file_id, src, dest, kind, state, error, error_kind, attempt, created_at, started_at, finished_at, depends_on)
29 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
30 params![
31 t.id(),
32 t.file_id(),
33 t.src().as_str(),
34 t.dest().as_str(),
35 t.kind().as_str(),
36 t.state().as_str(),
37 t.error(),
38 error_kind_str,
39 attempt_i64,
40 created_at_str,
41 started_at_str,
42 finished_at_str,
43 depends_on_str,
44 ],
45 )
46 .map_err(|e| InfraError::Store { op: "sqlite", reason: format!("insert_transfer failed: {e}") })?;
47 Ok(())
48 })
49 .await
50 .map_err(map_call_err)
51 }
52
53 async fn update_transfer(&self, transfer: &Transfer) -> Result<(), InfraError> {
54 let t = transfer.clone();
55 self.conn
56 .call(move |conn| {
57 let started_at_str = t.started_at().map(ts_to_string);
58 let finished_at_str = t.finished_at().map(ts_to_string);
59 let error_kind_str = t.error_kind().map(|k| k.to_string());
60 conn.execute(
61 "UPDATE transfers SET state = ?, error = ?, error_kind = ?, started_at = ?, finished_at = ?, attempt = ?
62 WHERE id = ?",
63 params![
64 t.state().as_str(),
65 t.error(),
66 error_kind_str,
67 started_at_str,
68 finished_at_str,
69 i64::from(t.attempt()),
70 t.id(),
71 ],
72 )
73 .map_err(|e| InfraError::Store { op: "sqlite", reason: format!("update_transfer failed: {e}") })?;
74 Ok(())
75 })
76 .await
77 .map_err(map_call_err)
78 }
79
80 async fn queued_transfers(&self, dest: &LocationId) -> Result<Vec<Transfer>, InfraError> {
81 let dest_str = dest.as_str().to_string();
82 self.conn
83 .call(move |conn| {
84 query_transfers(
85 conn,
86 "SELECT t.* FROM transfers t
87 WHERE t.dest = ? AND t.state = 'queued'
88 AND NOT EXISTS (
89 SELECT 1 FROM transfers t2
90 WHERE t2.file_id = t.file_id
91 AND t2.dest = t.dest
92 AND t2.ROWID > t.ROWID
93 )
94 ORDER BY t.created_at",
95 &[&dest_str as &dyn rusqlite::types::ToSql],
96 )
97 })
98 .await
99 .map_err(map_call_err)
100 }
101
102 async fn latest_transfers_by_file(&self, file_id: &str) -> Result<Vec<Transfer>, InfraError> {
103 let file_id = file_id.to_string();
104 self.conn
105 .call(move |conn| {
106 query_transfers(
107 conn,
108 "SELECT t.* FROM transfers t
109 WHERE t.file_id = ?
110 AND NOT EXISTS (
111 SELECT 1 FROM transfers t2
112 WHERE t2.file_id = t.file_id
113 AND t2.dest = t.dest
114 AND t2.ROWID > t.ROWID
115 )",
116 &[&file_id as &dyn rusqlite::types::ToSql],
117 )
118 })
119 .await
120 .map_err(map_call_err)
121 }
122
123 async fn failed_transfers(&self) -> Result<Vec<Transfer>, InfraError> {
124 self.conn
125 .call(|conn| {
126 query_transfers(
127 conn,
128 "SELECT t.* FROM transfers t
129 WHERE t.state = 'failed'
130 AND NOT EXISTS (
131 SELECT 1 FROM transfers t2
132 WHERE t2.file_id = t.file_id
133 AND t2.dest = t.dest
134 AND t2.ROWID > t.ROWID
135 )
136 ORDER BY t.finished_at DESC",
137 &[],
138 )
139 })
140 .await
141 .map_err(map_call_err)
142 }
143
144 async fn all_pending_transfers(&self) -> Result<Vec<Transfer>, InfraError> {
145 self.conn
146 .call(|conn| {
147 query_transfers(
148 conn,
149 "SELECT t.* FROM transfers t
150 WHERE t.state IN ('queued', 'blocked')
151 AND NOT EXISTS (
152 SELECT 1 FROM transfers t2
153 WHERE t2.file_id = t.file_id
154 AND t2.dest = t.dest
155 AND t2.ROWID > t.ROWID
156 )
157 ORDER BY t.created_at",
158 &[],
159 )
160 })
161 .await
162 .map_err(map_call_err)
163 }
164
165 async fn transfer_stats(&self) -> Result<Vec<TransferStatRow>, InfraError> {
166 self.conn
167 .call(|conn| {
168 let mut stmt = conn
170 .prepare(
171 "SELECT src, dest, state, error_kind, attempt, COUNT(DISTINCT file_id) as file_count
172 FROM (
173 SELECT t.src, t.dest, t.state, t.error_kind, t.attempt, t.file_id
174 FROM transfers t
175 WHERE NOT EXISTS (
176 SELECT 1 FROM transfers t2
177 WHERE t2.file_id = t.file_id
178 AND t2.dest = t.dest
179 AND t2.ROWID > t.ROWID
180 )
181 )
182 GROUP BY src, dest, state, error_kind, attempt",
183 )
184 .map_err(|e| InfraError::Store {
185 op: "sqlite",
186 reason: format!("transfer_stats prepare failed: {e}"),
187 })?;
188
189 let rows = stmt
190 .query_map([], |row| {
191 let src_str: String = row.get(0)?;
192 let dest_str: String = row.get(1)?;
193 let state: String = row.get(2)?;
194 let error_kind: Option<String> = row.get(3)?;
195 let attempt: u32 = row.get(4)?;
196 let file_count: usize = row.get(5)?;
197 Ok((src_str, dest_str, state, error_kind, attempt, file_count))
198 })
199 .map_err(|e| InfraError::Store {
200 op: "sqlite",
201 reason: format!("transfer_stats query failed: {e}"),
202 })?;
203
204 let mut result = Vec::new();
205 for row in rows {
206 let (src_str, dest_str, state_str, error_kind, attempt, file_count) =
207 row.map_err(|e| InfraError::Store {
208 op: "sqlite",
209 reason: format!("transfer_stats row failed: {e}"),
210 })?;
211 let src = LocationId::new(src_str).map_err(|e| InfraError::Store {
212 op: "sqlite",
213 reason: format!("invalid src location: {e}"),
214 })?;
215 let dest = LocationId::new(dest_str).map_err(|e| InfraError::Store {
216 op: "sqlite",
217 reason: format!("invalid dest location: {e}"),
218 })?;
219 let state: crate::domain::transfer::TransferState =
220 state_str.parse().map_err(|e| InfraError::Store {
221 op: "sqlite",
222 reason: format!("invalid transfer state: {e}"),
223 })?;
224 result.push(TransferStatRow {
225 src,
226 dest,
227 state,
228 error_kind,
229 attempt,
230 file_count,
231 });
232 }
233 Ok(result)
234 })
235 .await
236 .map_err(map_call_err)
237 }
238
239 async fn present_counts_by_location(
240 &self,
241 ) -> Result<std::collections::HashMap<LocationId, usize>, InfraError> {
242 self.conn
243 .call(|conn| {
244 let mut stmt = conn
246 .prepare(
247 "SELECT location_id, COUNT(DISTINCT file_id) as file_count
248 FROM location_files
249 WHERE state = 'active'
250 GROUP BY location_id",
251 )
252 .map_err(|e| InfraError::Store {
253 op: "sqlite",
254 reason: format!("present_counts_by_location prepare failed: {e}"),
255 })?;
256
257 let rows = stmt
258 .query_map([], |row| {
259 let loc: String = row.get(0)?;
260 let count: usize = row.get(1)?;
261 Ok((loc, count))
262 })
263 .map_err(|e| InfraError::Store {
264 op: "sqlite",
265 reason: format!("present_counts_by_location query failed: {e}"),
266 })?;
267
268 let mut result = std::collections::HashMap::new();
269 for row in rows {
270 let (loc_str, count) = row.map_err(|e| InfraError::Store {
271 op: "sqlite",
272 reason: format!("present_counts_by_location row failed: {e}"),
273 })?;
274 let loc = LocationId::new(loc_str).map_err(|e| InfraError::Store {
275 op: "sqlite",
276 reason: format!("invalid location: {e}"),
277 })?;
278 result.insert(loc, count);
279 }
280 Ok(result)
281 })
282 .await
283 .map_err(map_call_err)
284 }
285
286 async fn prune_completed(&self, before: DateTime<Utc>) -> Result<usize, InfraError> {
287 let before_str = ts_to_string(before);
288 self.conn
289 .call(move |conn| {
290 let deleted = conn
292 .execute(
293 "DELETE FROM transfers
294 WHERE state = 'completed'
295 AND finished_at < ?1
296 AND id NOT IN (
297 SELECT t.id FROM transfers t
298 INNER JOIN (
299 SELECT file_id, dest, MAX(created_at) as max_created
300 FROM transfers
301 GROUP BY file_id, dest
302 ) latest ON t.file_id = latest.file_id
303 AND t.dest = latest.dest
304 AND t.created_at = latest.max_created
305 )",
306 params![before_str],
307 )
308 .map_err(|e| InfraError::Store {
309 op: "sqlite",
310 reason: format!("prune_completed failed: {e}"),
311 })?;
312 Ok(deleted)
313 })
314 .await
315 .map_err(map_call_err)
316 }
317
318 async fn count_queued(&self) -> Result<usize, InfraError> {
319 self.conn
320 .call(|conn| {
321 let count: i64 = conn
322 .query_row(
323 "SELECT COUNT(*) FROM transfers WHERE state = 'queued'",
324 [],
325 |row| row.get(0),
326 )
327 .map_err(|e| InfraError::Store {
328 op: "sqlite",
329 reason: format!("count_queued failed: {e}"),
330 })?;
331 Ok(count as usize)
332 })
333 .await
334 .map_err(map_call_err)
335 }
336
337 async fn cancel_orphaned_inflight(&self) -> Result<usize, InfraError> {
338 self.conn
339 .call(|conn| {
340 let count = conn
341 .execute(
342 "UPDATE transfers SET state = 'cancelled', finished_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') \
343 WHERE state = 'in_flight'",
344 [],
345 )
346 .map_err(|e| InfraError::Store {
347 op: "sqlite",
348 reason: format!("cancel_orphaned_inflight failed: {e}"),
349 })?;
350 Ok(count)
351 })
352 .await
353 .map_err(map_call_err)
354 }
355
356 async fn unblock_dependents(&self, completed_transfer_id: &str) -> Result<usize, InfraError> {
357 let id = completed_transfer_id.to_string();
358 self.conn
359 .call(move |conn| {
360 let count = conn
361 .execute(
362 "UPDATE transfers SET state = 'queued' WHERE depends_on = ? AND state = 'blocked'",
363 params![id],
364 )
365 .map_err(|e| InfraError::Store {
366 op: "sqlite",
367 reason: format!("unblock_dependents failed: {e}"),
368 })?;
369 Ok(count)
370 })
371 .await
372 .map_err(map_call_err)
373 }
374}