1mod location_file_store_impl;
10mod mapping;
11mod schema;
12mod task_store_impl;
13mod topology_file_store_impl;
14mod transfer_store_impl;
15
16use std::path::Path;
17
18use crate::infra::error::InfraError;
19
20pub struct SqliteSyncStore {
26 conn: tokio_rusqlite::Connection,
27}
28
29impl SqliteSyncStore {
30 pub async fn open(path: &Path) -> Result<Self, InfraError> {
32 let path = path.to_path_buf();
33 let conn =
34 tokio_rusqlite::Connection::open(&path)
35 .await
36 .map_err(|e| InfraError::Store {
37 op: "sqlite",
38 reason: format!("open failed: {e}"),
39 })?;
40 conn.call(schema::init_connection)
41 .await
42 .map_err(map_call_err)?;
43 Ok(Self { conn })
44 }
45
46 pub async fn open_in_memory() -> Result<Self, InfraError> {
48 let conn = tokio_rusqlite::Connection::open_in_memory()
49 .await
50 .map_err(|e| InfraError::Store {
51 op: "sqlite",
52 reason: format!("open_in_memory failed: {e}"),
53 })?;
54 conn.call(schema::init_connection)
55 .await
56 .map_err(map_call_err)?;
57 Ok(Self { conn })
58 }
59}
60
61fn map_call_err(e: tokio_rusqlite::Error<InfraError>) -> InfraError {
67 match e {
68 tokio_rusqlite::Error::Error(infra_err) => infra_err,
69 tokio_rusqlite::Error::ConnectionClosed => InfraError::Store {
70 op: "sqlite",
71 reason: "sqlite connection closed".into(),
72 },
73 tokio_rusqlite::Error::Close((_, e)) => InfraError::Store {
74 op: "sqlite",
75 reason: format!("sqlite close error: {e}"),
76 },
77 other => InfraError::Store {
78 op: "sqlite",
79 reason: format!("tokio-rusqlite: {other:?}"),
80 },
81 }
82}
83
84#[cfg(test)]
89mod tests {
90 use super::*;
91
92 use rusqlite::params;
93
94 use crate::domain::file_type::FileType;
95 use crate::domain::location::LocationId;
96 use crate::domain::topology_file::TopologyFile;
97 use crate::domain::transfer::Transfer;
98 use crate::infra::topology_file_store::TopologyFileStore;
99 use crate::infra::transfer_store::TransferStore;
100
101 fn loc(s: &str) -> LocationId {
102 LocationId::new(s).expect("valid test location")
103 }
104
105 async fn insert_test_topology_file(store: &SqliteSyncStore, path: &str) -> TopologyFile {
108 let tf =
109 TopologyFile::new(path.to_string(), FileType::Image).expect("valid test topology file");
110 TopologyFileStore::upsert(&*store, &tf)
111 .await
112 .expect("insert topology file");
113 tf
114 }
115
116 #[tokio::test]
121 async fn insert_and_query_transfer() {
122 let store = SqliteSyncStore::open_in_memory().await.expect("open");
123 let file = insert_test_topology_file(&store, "output/t.png").await;
124
125 let transfer =
126 Transfer::new(file.id().to_string(), loc("local"), loc("cloud")).expect("valid");
127 store
128 .insert_transfer(&transfer)
129 .await
130 .expect("insert transfer");
131
132 let queued = store.queued_transfers(&loc("cloud")).await.expect("queued");
133 assert_eq!(queued.len(), 1);
134 assert_eq!(queued[0].file_id(), file.id());
135 assert_eq!(queued[0].dest(), &loc("cloud"));
136 }
137
138 #[tokio::test]
139 async fn update_transfer_state() {
140 let store = SqliteSyncStore::open_in_memory().await.expect("open");
141 let file = insert_test_topology_file(&store, "output/s.png").await;
142
143 let mut transfer =
144 Transfer::new(file.id().to_string(), loc("local"), loc("cloud")).expect("valid");
145 store
146 .insert_transfer(&transfer)
147 .await
148 .expect("insert transfer");
149
150 transfer.start().expect("start");
151 store
152 .update_transfer(&transfer)
153 .await
154 .expect("update transfer");
155
156 let queued = store.queued_transfers(&loc("cloud")).await.expect("queued");
157 assert_eq!(queued.len(), 0);
158
159 transfer.complete().expect("complete");
160 store
161 .update_transfer(&transfer)
162 .await
163 .expect("update transfer");
164
165 let latest = store
166 .latest_transfers_by_file(file.id())
167 .await
168 .expect("latest");
169 assert_eq!(latest.len(), 1);
170 assert_eq!(
171 latest[0].state(),
172 crate::domain::transfer::TransferState::Completed
173 );
174 }
175
176 #[tokio::test]
177 async fn failed_transfers_query() {
178 let store = SqliteSyncStore::open_in_memory().await.expect("open");
179 let file = insert_test_topology_file(&store, "output/f.png").await;
180
181 let mut transfer =
182 Transfer::new(file.id().to_string(), loc("local"), loc("cloud")).expect("valid");
183 transfer.start().expect("start");
184 transfer
185 .fail(
186 "timeout".into(),
187 crate::domain::retry::TransferErrorKind::Transient,
188 )
189 .expect("fail");
190 store
191 .insert_transfer(&transfer)
192 .await
193 .expect("insert transfer");
194
195 let failed = store.failed_transfers().await.expect("failed");
196 assert_eq!(failed.len(), 1);
197 assert_eq!(failed[0].error(), Some("timeout"));
198 assert_eq!(
199 failed[0].error_kind(),
200 Some(crate::domain::retry::TransferErrorKind::Transient)
201 );
202 }
203
204 #[tokio::test]
205 async fn failed_transfers_excludes_retried() {
206 let store = SqliteSyncStore::open_in_memory().await.expect("open");
207 let file = insert_test_topology_file(&store, "output/retry.png").await;
208
209 let mut t1 =
211 Transfer::new(file.id().to_string(), loc("local"), loc("cloud")).expect("valid");
212 t1.start().expect("start");
213 t1.fail(
214 "net error".into(),
215 crate::domain::retry::TransferErrorKind::Transient,
216 )
217 .expect("fail");
218 store.insert_transfer(&t1).await.expect("insert t1");
219
220 let mut t2 = t1.retry().expect("retry");
222 t2.start().expect("start");
223 t2.fail(
224 "net error again".into(),
225 crate::domain::retry::TransferErrorKind::Transient,
226 )
227 .expect("fail");
228 store.insert_transfer(&t2).await.expect("insert t2");
229
230 let failed = store.failed_transfers().await.expect("failed");
232 assert_eq!(
233 failed.len(),
234 1,
235 "should return only the latest failed transfer"
236 );
237 assert_eq!(failed[0].error(), Some("net error again"));
238 assert_eq!(failed[0].attempt(), 2);
239 }
240
241 #[tokio::test]
242 async fn latest_transfers_by_file_returns_latest_per_dest() {
243 let store = SqliteSyncStore::open_in_memory().await.expect("open");
244 let file = insert_test_topology_file(&store, "output/r.png").await;
245
246 let mut t1 =
247 Transfer::new(file.id().to_string(), loc("local"), loc("cloud")).expect("valid");
248 t1.start().expect("start");
249 t1.fail(
250 "err".into(),
251 crate::domain::retry::TransferErrorKind::Transient,
252 )
253 .expect("fail");
254 store.insert_transfer(&t1).await.expect("insert t1");
255
256 let t2 = t1.retry().expect("retry");
257 store.insert_transfer(&t2).await.expect("insert t2");
258
259 let mut t3 = Transfer::new(file.id().to_string(), loc("local"), loc("pod")).expect("valid");
260 t3.start().expect("start");
261 t3.complete().expect("complete");
262 store.insert_transfer(&t3).await.expect("insert t3");
263
264 let latest = store
265 .latest_transfers_by_file(file.id())
266 .await
267 .expect("latest");
268 assert_eq!(latest.len(), 2);
269
270 let cloud = latest
271 .iter()
272 .find(|t| t.dest() == &loc("cloud"))
273 .expect("cloud");
274 assert_eq!(
275 cloud.state(),
276 crate::domain::transfer::TransferState::Queued
277 );
278 assert_eq!(cloud.attempt(), 2);
279
280 let pod = latest
281 .iter()
282 .find(|t| t.dest() == &loc("pod"))
283 .expect("pod");
284 assert_eq!(
285 pod.state(),
286 crate::domain::transfer::TransferState::Completed
287 );
288 }
289
290 #[tokio::test]
291 async fn queued_returns_only_latest_per_file_dest() {
292 let store = SqliteSyncStore::open_in_memory().await.expect("open");
293 let file = insert_test_topology_file(&store, "output/q.png").await;
294
295 let mut t1 =
296 Transfer::new(file.id().to_string(), loc("local"), loc("cloud")).expect("valid");
297 t1.start().expect("start");
298 t1.fail(
299 "err".into(),
300 crate::domain::retry::TransferErrorKind::Transient,
301 )
302 .expect("fail");
303 store.insert_transfer(&t1).await.expect("insert t1");
304
305 let t2 = t1.retry().expect("retry");
306 store.insert_transfer(&t2).await.expect("insert t2");
307
308 let queued = store.queued_transfers(&loc("cloud")).await.expect("queued");
309 assert_eq!(queued.len(), 1);
310 assert_eq!(queued[0].attempt(), 2);
311 }
312
313 #[tokio::test]
318 async fn unblock_dependents_transitions_blocked_to_queued() {
319 use crate::domain::transfer::TransferKind;
320
321 let store = SqliteSyncStore::open_in_memory().await.expect("open");
322 let file = insert_test_topology_file(&store, "output/chain.png").await;
323
324 let t1 =
326 Transfer::new(file.id().to_string(), loc("local"), loc("cloud")).expect("valid t1");
327 let t1_id = t1.id().to_string();
328 store.insert_transfer(&t1).await.expect("insert t1");
329
330 let t2 = Transfer::with_dependency(
332 file.id().to_string(),
333 loc("cloud"),
334 loc("pod"),
335 TransferKind::Sync,
336 t1_id.clone(),
337 )
338 .expect("valid t2");
339 let t2_id = t2.id().to_string();
340 store.insert_transfer(&t2).await.expect("insert t2");
341
342 let queued_before = store.queued_transfers(&loc("pod")).await.expect("queued");
344 assert_eq!(
345 queued_before.len(),
346 0,
347 "blocked transfer must not appear in queued"
348 );
349
350 let unblocked = store.unblock_dependents(&t1_id).await.expect("unblock");
352 assert_eq!(unblocked, 1, "exactly one transfer should be unblocked");
353
354 let queued_after = store.queued_transfers(&loc("pod")).await.expect("queued");
356 assert_eq!(
357 queued_after.len(),
358 1,
359 "unblocked transfer must appear in queued"
360 );
361 assert_eq!(queued_after[0].id(), t2_id);
362 }
363
364 #[tokio::test]
365 async fn unblock_dependents_ignores_non_blocked_state() {
366 use crate::domain::transfer::TransferKind;
367
368 let store = SqliteSyncStore::open_in_memory().await.expect("open");
369 let file = insert_test_topology_file(&store, "output/nonblock.png").await;
370
371 let t1 =
373 Transfer::new(file.id().to_string(), loc("local"), loc("cloud")).expect("valid t1");
374 let t1_id = t1.id().to_string();
375 store.insert_transfer(&t1).await.expect("insert t1");
376
377 let t2 = Transfer::with_dependency(
379 file.id().to_string(),
380 loc("cloud"),
381 loc("pod"),
382 TransferKind::Sync,
383 t1_id.clone(),
384 )
385 .expect("valid t2");
386 store.insert_transfer(&t2).await.expect("insert t2");
389
390 let t2_id_clone = t2.id().to_string();
392 store
393 .conn
394 .call(move |conn| {
395 conn.execute(
396 "UPDATE transfers SET state = 'in_flight' WHERE id = ?",
397 params![t2_id_clone],
398 )
399 .map_err(|e| InfraError::Store {
400 op: "sqlite",
401 reason: format!("{e}"),
402 })
403 })
404 .await
405 .expect("manual update");
406
407 let unblocked = store.unblock_dependents(&t1_id).await.expect("unblock");
409 assert_eq!(unblocked, 0, "in_flight transfer must not be unblocked");
410 }
411
412 #[tokio::test]
413 async fn unblock_dependents_multiple_dependents() {
414 use crate::domain::transfer::TransferKind;
415
416 let store = SqliteSyncStore::open_in_memory().await.expect("open");
417 let file_a = insert_test_topology_file(&store, "output/multi_a.png").await;
418 let file_b = insert_test_topology_file(&store, "output/multi_b.png").await;
419
420 let t1 =
422 Transfer::new(file_a.id().to_string(), loc("local"), loc("cloud")).expect("valid t1");
423 let t1_id = t1.id().to_string();
424 store.insert_transfer(&t1).await.expect("insert t1");
425
426 let t2 = Transfer::with_dependency(
428 file_a.id().to_string(),
429 loc("cloud"),
430 loc("pod"),
431 TransferKind::Sync,
432 t1_id.clone(),
433 )
434 .expect("valid t2");
435 store.insert_transfer(&t2).await.expect("insert t2");
436
437 let t3 = Transfer::with_dependency(
439 file_b.id().to_string(),
440 loc("cloud"),
441 loc("nas"),
442 TransferKind::Sync,
443 t1_id.clone(),
444 )
445 .expect("valid t3");
446 store.insert_transfer(&t3).await.expect("insert t3");
447
448 let unblocked = store.unblock_dependents(&t1_id).await.expect("unblock");
450 assert_eq!(unblocked, 2, "both blocked transfers should be unblocked");
451
452 let pod_queued = store.queued_transfers(&loc("pod")).await.expect("pod");
454 assert_eq!(pod_queued.len(), 1);
455 let nas_queued = store.queued_transfers(&loc("nas")).await.expect("nas");
456 assert_eq!(nas_queued.len(), 1);
457 }
458
459 #[tokio::test]
460 async fn unblock_dependents_no_dependents_returns_zero() {
461 let store = SqliteSyncStore::open_in_memory().await.expect("open");
462
463 let unblocked = store
465 .unblock_dependents("nonexistent-id")
466 .await
467 .expect("unblock");
468 assert_eq!(unblocked, 0);
469 }
470}