Skip to main content

vdsl_sync/infra/sqlite/
transfer_store_impl.rs

1//! TransferStore trait implementation for SqliteSyncStore.
2
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use rusqlite::params;
6
7use crate::domain::location::LocationId;
8use crate::domain::transfer::Transfer;
9use crate::infra::error::InfraError;
10use crate::infra::transfer_store::{TransferStatRow, TransferStore};
11
12use super::mapping::{query_transfers, ts_to_string};
13use super::{map_call_err, SqliteSyncStore};
14
15#[async_trait]
16impl TransferStore for SqliteSyncStore {
17    async fn insert_transfer(&self, transfer: &Transfer) -> Result<(), InfraError> {
18        let t = transfer.clone();
19        self.conn
20            .call(move |conn| {
21                let attempt_i64 = i64::from(t.attempt());
22                let created_at_str = ts_to_string(t.created_at());
23                let started_at_str = t.started_at().map(ts_to_string);
24                let finished_at_str = t.finished_at().map(ts_to_string);
25                let error_kind_str = t.error_kind().map(|k| k.to_string());
26                let depends_on_str = t.depends_on().map(|s| s.to_string());
27                conn.execute(
28                    "INSERT INTO transfers (id, file_id, src, dest, kind, state, error, error_kind, attempt, created_at, started_at, finished_at, depends_on)
29                     VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
30                    params![
31                        t.id(),
32                        t.file_id(),
33                        t.src().as_str(),
34                        t.dest().as_str(),
35                        t.kind().as_str(),
36                        t.state().as_str(),
37                        t.error(),
38                        error_kind_str,
39                        attempt_i64,
40                        created_at_str,
41                        started_at_str,
42                        finished_at_str,
43                        depends_on_str,
44                    ],
45                )
46                .map_err(|e| InfraError::Store { op: "sqlite", reason: format!("insert_transfer failed: {e}") })?;
47                Ok(())
48            })
49            .await
50            .map_err(map_call_err)
51    }
52
53    async fn update_transfer(&self, transfer: &Transfer) -> Result<(), InfraError> {
54        let t = transfer.clone();
55        self.conn
56            .call(move |conn| {
57                let started_at_str = t.started_at().map(ts_to_string);
58                let finished_at_str = t.finished_at().map(ts_to_string);
59                let error_kind_str = t.error_kind().map(|k| k.to_string());
60                conn.execute(
61                    "UPDATE transfers SET state = ?, error = ?, error_kind = ?, started_at = ?, finished_at = ?, attempt = ?
62                     WHERE id = ?",
63                    params![
64                        t.state().as_str(),
65                        t.error(),
66                        error_kind_str,
67                        started_at_str,
68                        finished_at_str,
69                        i64::from(t.attempt()),
70                        t.id(),
71                    ],
72                )
73                .map_err(|e| InfraError::Store { op: "sqlite", reason: format!("update_transfer failed: {e}") })?;
74                Ok(())
75            })
76            .await
77            .map_err(map_call_err)
78    }
79
80    async fn queued_transfers(&self, dest: &LocationId) -> Result<Vec<Transfer>, InfraError> {
81        let dest_str = dest.as_str().to_string();
82        self.conn
83            .call(move |conn| {
84                query_transfers(
85                    conn,
86                    "SELECT t.* FROM transfers t
87                     WHERE t.dest = ? AND t.state = 'queued'
88                       AND NOT EXISTS (
89                           SELECT 1 FROM transfers t2
90                           WHERE t2.file_id = t.file_id
91                             AND t2.dest = t.dest
92                             AND t2.ROWID > t.ROWID
93                       )
94                     ORDER BY t.created_at",
95                    &[&dest_str as &dyn rusqlite::types::ToSql],
96                )
97            })
98            .await
99            .map_err(map_call_err)
100    }
101
102    async fn latest_transfers_by_file(&self, file_id: &str) -> Result<Vec<Transfer>, InfraError> {
103        let file_id = file_id.to_string();
104        self.conn
105            .call(move |conn| {
106                query_transfers(
107                    conn,
108                    "SELECT t.* FROM transfers t
109                     WHERE t.file_id = ?
110                       AND NOT EXISTS (
111                           SELECT 1 FROM transfers t2
112                           WHERE t2.file_id = t.file_id
113                             AND t2.dest = t.dest
114                             AND t2.ROWID > t.ROWID
115                       )",
116                    &[&file_id as &dyn rusqlite::types::ToSql],
117                )
118            })
119            .await
120            .map_err(map_call_err)
121    }
122
123    async fn failed_transfers(&self) -> Result<Vec<Transfer>, InfraError> {
124        self.conn
125            .call(|conn| {
126                query_transfers(
127                    conn,
128                    "SELECT t.* FROM transfers t
129                     WHERE t.state = 'failed'
130                       AND NOT EXISTS (
131                           SELECT 1 FROM transfers t2
132                           WHERE t2.file_id = t.file_id
133                             AND t2.dest = t.dest
134                             AND t2.ROWID > t.ROWID
135                       )
136                     ORDER BY t.finished_at DESC",
137                    &[],
138                )
139            })
140            .await
141            .map_err(map_call_err)
142    }
143
144    async fn all_pending_transfers(&self) -> Result<Vec<Transfer>, InfraError> {
145        self.conn
146            .call(|conn| {
147                query_transfers(
148                    conn,
149                    "SELECT t.* FROM transfers t
150                     WHERE t.state IN ('queued', 'blocked')
151                       AND NOT EXISTS (
152                           SELECT 1 FROM transfers t2
153                           WHERE t2.file_id = t.file_id
154                             AND t2.dest = t.dest
155                             AND t2.ROWID > t.ROWID
156                       )
157                     ORDER BY t.created_at",
158                    &[],
159                )
160            })
161            .await
162            .map_err(map_call_err)
163    }
164
165    async fn transfer_stats(&self) -> Result<Vec<TransferStatRow>, InfraError> {
166        self.conn
167            .call(|conn| {
168                // 最新Transfer(file_id×dest別)をGROUP BYして集約
169                let mut stmt = conn
170                    .prepare(
171                        "SELECT src, dest, state, error_kind, attempt, COUNT(DISTINCT file_id) as file_count
172                         FROM (
173                             SELECT t.src, t.dest, t.state, t.error_kind, t.attempt, t.file_id
174                             FROM transfers t
175                             WHERE NOT EXISTS (
176                                 SELECT 1 FROM transfers t2
177                                 WHERE t2.file_id = t.file_id
178                                   AND t2.dest = t.dest
179                                   AND t2.ROWID > t.ROWID
180                             )
181                         )
182                         GROUP BY src, dest, state, error_kind, attempt",
183                    )
184                    .map_err(|e| InfraError::Store {
185                        op: "sqlite",
186                        reason: format!("transfer_stats prepare failed: {e}"),
187                    })?;
188
189                let rows = stmt
190                    .query_map([], |row| {
191                        let src_str: String = row.get(0)?;
192                        let dest_str: String = row.get(1)?;
193                        let state: String = row.get(2)?;
194                        let error_kind: Option<String> = row.get(3)?;
195                        let attempt: u32 = row.get(4)?;
196                        let file_count: usize = row.get(5)?;
197                        Ok((src_str, dest_str, state, error_kind, attempt, file_count))
198                    })
199                    .map_err(|e| InfraError::Store {
200                        op: "sqlite",
201                        reason: format!("transfer_stats query failed: {e}"),
202                    })?;
203
204                let mut result = Vec::new();
205                for row in rows {
206                    let (src_str, dest_str, state_str, error_kind, attempt, file_count) =
207                        row.map_err(|e| InfraError::Store {
208                            op: "sqlite",
209                            reason: format!("transfer_stats row failed: {e}"),
210                        })?;
211                    let src = LocationId::new(src_str).map_err(|e| InfraError::Store {
212                        op: "sqlite",
213                        reason: format!("invalid src location: {e}"),
214                    })?;
215                    let dest = LocationId::new(dest_str).map_err(|e| InfraError::Store {
216                        op: "sqlite",
217                        reason: format!("invalid dest location: {e}"),
218                    })?;
219                    let state: crate::domain::transfer::TransferState =
220                        state_str.parse().map_err(|e| InfraError::Store {
221                            op: "sqlite",
222                            reason: format!("invalid transfer state: {e}"),
223                        })?;
224                    result.push(TransferStatRow {
225                        src,
226                        dest,
227                        state,
228                        error_kind,
229                        attempt,
230                        file_count,
231                    });
232                }
233                Ok(result)
234            })
235            .await
236            .map_err(map_call_err)
237    }
238
239    async fn present_counts_by_location(
240        &self,
241    ) -> Result<std::collections::HashMap<LocationId, usize>, InfraError> {
242        self.conn
243            .call(|conn| {
244                // location_filesのactive件数をlocation別にカウント
245                let mut stmt = conn
246                    .prepare(
247                        "SELECT location_id, COUNT(DISTINCT file_id) as file_count
248                         FROM location_files
249                         WHERE state = 'active'
250                         GROUP BY location_id",
251                    )
252                    .map_err(|e| InfraError::Store {
253                        op: "sqlite",
254                        reason: format!("present_counts_by_location prepare failed: {e}"),
255                    })?;
256
257                let rows = stmt
258                    .query_map([], |row| {
259                        let loc: String = row.get(0)?;
260                        let count: usize = row.get(1)?;
261                        Ok((loc, count))
262                    })
263                    .map_err(|e| InfraError::Store {
264                        op: "sqlite",
265                        reason: format!("present_counts_by_location query failed: {e}"),
266                    })?;
267
268                let mut result = std::collections::HashMap::new();
269                for row in rows {
270                    let (loc_str, count) = row.map_err(|e| InfraError::Store {
271                        op: "sqlite",
272                        reason: format!("present_counts_by_location row failed: {e}"),
273                    })?;
274                    let loc = LocationId::new(loc_str).map_err(|e| InfraError::Store {
275                        op: "sqlite",
276                        reason: format!("invalid location: {e}"),
277                    })?;
278                    result.insert(loc, count);
279                }
280                Ok(result)
281            })
282            .await
283            .map_err(map_call_err)
284    }
285
286    async fn prune_completed(&self, before: DateTime<Utc>) -> Result<usize, InfraError> {
287        let before_str = ts_to_string(before);
288        self.conn
289            .call(move |conn| {
290                // 各 file_id × dest の最新Transferは保持し、それより古い completed を削除
291                let deleted = conn
292                    .execute(
293                        "DELETE FROM transfers
294                         WHERE state = 'completed'
295                           AND finished_at < ?1
296                           AND id NOT IN (
297                               SELECT t.id FROM transfers t
298                               INNER JOIN (
299                                   SELECT file_id, dest, MAX(created_at) as max_created
300                                   FROM transfers
301                                   GROUP BY file_id, dest
302                               ) latest ON t.file_id = latest.file_id
303                                           AND t.dest = latest.dest
304                                           AND t.created_at = latest.max_created
305                           )",
306                        params![before_str],
307                    )
308                    .map_err(|e| InfraError::Store {
309                        op: "sqlite",
310                        reason: format!("prune_completed failed: {e}"),
311                    })?;
312                Ok(deleted)
313            })
314            .await
315            .map_err(map_call_err)
316    }
317
318    async fn count_queued(&self) -> Result<usize, InfraError> {
319        self.conn
320            .call(|conn| {
321                let count: i64 = conn
322                    .query_row(
323                        "SELECT COUNT(*) FROM transfers WHERE state = 'queued'",
324                        [],
325                        |row| row.get(0),
326                    )
327                    .map_err(|e| InfraError::Store {
328                        op: "sqlite",
329                        reason: format!("count_queued failed: {e}"),
330                    })?;
331                Ok(count as usize)
332            })
333            .await
334            .map_err(map_call_err)
335    }
336
337    async fn cancel_orphaned_inflight(&self) -> Result<usize, InfraError> {
338        self.conn
339            .call(|conn| {
340                let count = conn
341                    .execute(
342                        "UPDATE transfers SET state = 'cancelled', finished_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') \
343                         WHERE state = 'in_flight'",
344                        [],
345                    )
346                    .map_err(|e| InfraError::Store {
347                        op: "sqlite",
348                        reason: format!("cancel_orphaned_inflight failed: {e}"),
349                    })?;
350                Ok(count)
351            })
352            .await
353            .map_err(map_call_err)
354    }
355
356    async fn unblock_dependents(&self, completed_transfer_id: &str) -> Result<usize, InfraError> {
357        let id = completed_transfer_id.to_string();
358        self.conn
359            .call(move |conn| {
360                let count = conn
361                    .execute(
362                        "UPDATE transfers SET state = 'queued' WHERE depends_on = ? AND state = 'blocked'",
363                        params![id],
364                    )
365                    .map_err(|e| InfraError::Store {
366                        op: "sqlite",
367                        reason: format!("unblock_dependents failed: {e}"),
368                    })?;
369                Ok(count)
370            })
371            .await
372            .map_err(map_call_err)
373    }
374}