autumn-web 0.4.0

An opinionated, convention-over-configuration web framework for Rust
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
//! Local-disk implementation of [`BlobStore`].
//!
//! Bytes land under a configurable `root` directory; URLs are
//! HMAC-SHA256-signed and time-bounded, served by an axum router mounted
//! by the framework on startup.
//!
//! Suitable for `dev`, single-replica deployments, and integration
//! tests. Multi-replica production should use the `S3` backend.

use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use bytes::Bytes;
use futures::StreamExt as _;
use http::StatusCode;
use sha2::{Digest, Sha256};

use super::blob::{Blob, BlobMeta};
use super::{BlobFuture, BlobStore, BlobStoreError, ByteStream, validate_key};

/// HMAC signing key used by the local backend.
///
/// In test and dev a random key is generated at startup; in production
/// callers are expected to set `[storage.local].signing_key` (or the
/// `AUTUMN_STORAGE__LOCAL__SIGNING_KEY` env var) so URLs survive process
/// restarts and replicas agree on the signature.
#[derive(Clone)]
pub struct SigningKey(Arc<Vec<u8>>);

impl std::fmt::Debug for SigningKey {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        // Don't leak key material into logs.
        f.debug_struct("SigningKey")
            .field("len", &self.0.len())
            .finish()
    }
}

impl SigningKey {
    /// Create a key from explicit bytes.
    #[must_use]
    pub fn new(bytes: Vec<u8>) -> Self {
        Self(Arc::new(bytes))
    }

    /// Generate a random 32-byte key.
    #[must_use]
    pub fn random() -> Self {
        let mut bytes = vec![0u8; 32];
        // Mix a UUID v4 (already cryptographic-quality random) into the key.
        let a = uuid::Uuid::new_v4();
        let b = uuid::Uuid::new_v4();
        bytes[..16].copy_from_slice(a.as_bytes());
        bytes[16..].copy_from_slice(b.as_bytes());
        Self::new(bytes)
    }

    fn as_bytes(&self) -> &[u8] {
        &self.0
    }
}

/// Local-disk blob store.
///
/// Construct via [`LocalBlobStore::new`]. The framework wires this up
/// from `[storage.local]` automatically when `storage.backend = "local"`.
#[derive(Clone, Debug)]
pub struct LocalBlobStore {
    inner: Arc<LocalInner>,
}

#[derive(Debug)]
struct LocalInner {
    provider_id: String,
    root: PathBuf,
    /// `root` after `std::fs::canonicalize` — i.e. the actual on-disk
    /// location after any symlinks have been followed. Stashed at
    /// construction time and used by `safe_path_for_key` to verify
    /// that user-supplied keys can't escape the configured root via a
    /// hostile or accidental symlink in the storage tree.
    canonical_root: PathBuf,
    mount_path: String,
    default_expiry: Duration,
    signing_key: SigningKey,
    /// Former signing keys accepted during a rotation grace window.
    previous_signing_keys: Vec<SigningKey>,
}

impl LocalBlobStore {
    /// Create a new local store rooted at `root`.
    ///
    /// `mount_path` must start with `/` — it's the prefix the framework
    /// uses to serve signed URLs (default `/_blobs`).
    ///
    /// # Errors
    ///
    /// Returns [`BlobStoreError::Io`] when the root directory cannot be
    /// created.
    pub fn new(
        provider_id: impl Into<String>,
        root: impl Into<PathBuf>,
        mount_path: impl Into<String>,
        default_expiry: Duration,
        signing_key: SigningKey,
        previous_signing_keys: Vec<SigningKey>,
    ) -> Result<Self, BlobStoreError> {
        let mount_path = mount_path.into();
        // axum panics with `Paths must start with a '/'` if we hand it a
        // mount path that doesn't lead with a slash. Catch that here as
        // a recoverable configuration error so a bad
        // `[storage.local].mount_path` (or the env-var equivalent)
        // surfaces a clean message instead of a router-build panic.
        if !mount_path.starts_with('/') {
            return Err(BlobStoreError::InvalidInput(format!(
                "storage.local.mount_path must start with '/' (got {mount_path:?})"
            )));
        }
        let root = root.into();
        std::fs::create_dir_all(&root).map_err(BlobStoreError::io)?;
        // Canonicalize once at construction — `safe_path_for_key`
        // compares each operation's resolved target against this so
        // a hostile symlink inside the storage tree (e.g.
        // `root/avatars -> /etc`) can't be used to escape the root.
        let canonical_root = std::fs::canonicalize(&root).map_err(BlobStoreError::io)?;
        Ok(Self {
            inner: Arc::new(LocalInner {
                provider_id: provider_id.into(),
                root,
                canonical_root,
                mount_path,
                default_expiry,
                signing_key,
                previous_signing_keys,
            }),
        })
    }

    /// Borrow the configured mount path.
    #[must_use]
    pub fn mount_path(&self) -> &str {
        &self.inner.mount_path
    }

    /// Borrow the configured signing key — used by the framework when
    /// wiring up the serving route.
    #[must_use]
    pub fn signing_key(&self) -> SigningKey {
        self.inner.signing_key.clone()
    }

    /// Borrow the on-disk root directory.
    #[must_use]
    pub fn root(&self) -> &Path {
        &self.inner.root
    }

    /// Resolve a user-supplied key to a `PathBuf` and verify the
    /// resolved path stays under the canonical storage root.
    ///
    /// Beyond the lexical checks in [`validate_key`], this walks the
    /// deepest existing prefix of the target, follows any symlinks
    /// along the way (`tokio::fs::canonicalize`), and asserts the
    /// result is still under `canonical_root`. That blocks the hostile-
    /// symlink case where `root/avatars -> /etc` would otherwise let a
    /// key like `avatars/passwd` read or write outside the blob
    /// directory.
    ///
    /// There's still a TOCTOU window between this check and the IO
    /// that follows; a co-located attacker who can win that race needs
    /// `openat`-style primitives to fully eliminate, which Rust's std
    /// doesn't expose. The check still removes the common "operator
    /// configured a symlink they didn't realize was unsafe" failure
    /// mode.
    async fn safe_path_for_key(&self, key: &str) -> Result<PathBuf, BlobStoreError> {
        validate_key(key)?;
        let target = self.inner.root.join(key);

        // `canonicalize` errors with NotFound when the target doesn't
        // exist yet (which is normal for `put`). Walk up to the deepest
        // ancestor that exists, canonicalize that, and check it.
        let mut probe = target.clone();
        let canon_existing = loop {
            match tokio::fs::canonicalize(&probe).await {
                Ok(p) => break p,
                Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
                    if !probe.pop() {
                        return Err(BlobStoreError::io(
                            "storage root vanished while resolving blob key",
                        ));
                    }
                }
                Err(err) => return Err(BlobStoreError::io(err)),
            }
        };

        if !canon_existing.starts_with(&self.inner.canonical_root) {
            return Err(BlobStoreError::PermissionDenied(
                "blob key resolves outside storage root".into(),
            ));
        }
        Ok(target)
    }

    /// Serve-side helper: read the bytes plus the persisted metadata
    /// for a blob. Used by [`serve_router`] so locally-served URLs
    /// reflect the original `content_type` instead of defaulting to
    /// `application/octet-stream`. Returns `None` for the metadata
    /// when the sidecar is missing (older blobs, or backends that
    /// were filled by something other than `put`/`put_stream`).
    pub(crate) async fn get_with_meta(
        &self,
        key: &str,
    ) -> Result<(Bytes, Option<StoredBlobMeta>), BlobStoreError> {
        let path = self.safe_path_for_key(key).await?;
        let bytes = match tokio::fs::read(&path).await {
            Ok(bytes) => Bytes::from(bytes),
            Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
                return Err(BlobStoreError::NotFound(key.to_owned()));
            }
            Err(err) => return Err(BlobStoreError::io(err)),
        };
        let meta = read_meta_sidecar(&path).await;
        Ok((bytes, meta))
    }
}

impl BlobStore for LocalBlobStore {
    fn provider_id(&self) -> &str {
        &self.inner.provider_id
    }

    fn put<'a>(
        &'a self,
        key: &'a str,
        content_type: &'a str,
        bytes: Bytes,
    ) -> BlobFuture<'a, Blob> {
        Box::pin(async move {
            let path = self.safe_path_for_key(key).await?;
            if let Some(parent) = path.parent() {
                tokio::fs::create_dir_all(parent)
                    .await
                    .map_err(BlobStoreError::io)?;
            }
            let etag = sha256_hex(&bytes);

            // Write to a temp file in the same directory, then rename
            // into place. This means a partial write (disk full,
            // crash, killed process) never leaves a truncated blob at
            // `path` for a future `get` to serve.
            let tmp_path = temp_sibling_path(&path);
            if let Err(err) = tokio::fs::write(&tmp_path, &bytes).await {
                let _ = tokio::fs::remove_file(&tmp_path).await;
                return Err(BlobStoreError::io(err));
            }
            if let Err(err) = atomic_replace(&tmp_path, &path).await {
                let _ = tokio::fs::remove_file(&tmp_path).await;
                return Err(BlobStoreError::io(err));
            }
            // Persist the content_type + etag in a sibling sidecar so
            // `head`, `get_with_meta`, and the serving route can return
            // the right MIME instead of `application/octet-stream`. If
            // the sidecar write fails on an overwrite, clear any
            // pre-existing sidecar so future `head`/serve calls fall
            // back to `application/octet-stream` rather than reporting
            // stale `content_type` from the previous put. The bytes
            // themselves are committed and correct; we'd rather serve
            // "unknown MIME" than misrepresent the MIME.
            if write_meta_sidecar(
                &path,
                &StoredBlobMeta {
                    content_type: content_type.to_owned(),
                    etag: Some(etag.clone()),
                },
            )
            .await
            .is_err()
            {
                drop_stale_sidecar(&path).await;
            }
            Ok(Blob {
                provider_id: self.inner.provider_id.clone(),
                key: key.to_owned(),
                content_type: content_type.to_owned(),
                byte_size: bytes.len() as u64,
                etag: Some(etag),
            })
        })
    }

    fn put_stream<'a>(
        &'a self,
        key: &'a str,
        content_type: &'a str,
        mut data: ByteStream<'a>,
    ) -> BlobFuture<'a, Blob> {
        Box::pin(async move {
            use tokio::io::AsyncWriteExt as _;

            let path = self.safe_path_for_key(key).await?;
            if let Some(parent) = path.parent() {
                tokio::fs::create_dir_all(parent)
                    .await
                    .map_err(BlobStoreError::io)?;
            }

            // Stream into a sibling temp file and rename into place
            // only on a clean finish. A client disconnect, mid-stream
            // error, or transient I/O failure leaves the temp file
            // (which we unlink) and never touches `path`, so future
            // `get` calls don't serve a corrupted blob.
            let tmp_path = temp_sibling_path(&path);
            let result = async {
                let mut file = tokio::fs::File::create(&tmp_path)
                    .await
                    .map_err(BlobStoreError::io)?;
                let mut hasher = Sha256::new();
                let mut byte_size: u64 = 0;
                while let Some(chunk) = data.next().await {
                    let chunk = chunk?;
                    hasher.update(&chunk);
                    byte_size = byte_size.saturating_add(chunk.len() as u64);
                    file.write_all(&chunk).await.map_err(BlobStoreError::io)?;
                }
                file.flush().await.map_err(BlobStoreError::io)?;
                Ok::<(u64, String), BlobStoreError>((byte_size, hex(hasher.finalize())))
            }
            .await;

            match result {
                Ok((byte_size, etag)) => {
                    if let Err(err) = atomic_replace(&tmp_path, &path).await {
                        let _ = tokio::fs::remove_file(&tmp_path).await;
                        return Err(BlobStoreError::io(err));
                    }
                    if write_meta_sidecar(
                        &path,
                        &StoredBlobMeta {
                            content_type: content_type.to_owned(),
                            etag: Some(etag.clone()),
                        },
                    )
                    .await
                    .is_err()
                    {
                        // Same rationale as `put`: clear any
                        // pre-existing sidecar so future `head`/serve
                        // requests don't report stale MIME for the
                        // freshly committed bytes.
                        drop_stale_sidecar(&path).await;
                    }
                    Ok(Blob {
                        provider_id: self.inner.provider_id.clone(),
                        key: key.to_owned(),
                        content_type: content_type.to_owned(),
                        byte_size,
                        etag: Some(etag),
                    })
                }
                Err(err) => {
                    let _ = tokio::fs::remove_file(&tmp_path).await;
                    Err(err)
                }
            }
        })
    }

    fn get<'a>(&'a self, key: &'a str) -> BlobFuture<'a, Bytes> {
        Box::pin(async move {
            let path = self.safe_path_for_key(key).await?;
            match tokio::fs::read(&path).await {
                Ok(bytes) => Ok(Bytes::from(bytes)),
                Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
                    Err(BlobStoreError::NotFound(key.to_owned()))
                }
                Err(err) => Err(BlobStoreError::io(err)),
            }
        })
    }

    fn delete<'a>(&'a self, key: &'a str) -> BlobFuture<'a, ()> {
        Box::pin(async move {
            let path = self.safe_path_for_key(key).await?;
            // Delete the blob bytes first, then the sidecar. If the
            // blob delete fails (permissions, transient I/O, …) the
            // sidecar stays in place, so a failed delete is
            // side-effect-free as far as `head`/serve are concerned.
            // If the blob delete succeeds but the sidecar delete fails,
            // the orphan sidecar is harmless: a future `head` on the
            // (now-missing) key returns `None` from the metadata-stat
            // call before the sidecar is even read, and a future `put`
            // overwrites the sidecar atomically.
            match tokio::fs::remove_file(&path).await {
                Ok(()) => {}
                Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
                Err(err) => return Err(BlobStoreError::io(err)),
            }
            let _ = tokio::fs::remove_file(meta_sidecar_path(&path)).await;
            Ok(())
        })
    }

    fn head<'a>(&'a self, key: &'a str) -> BlobFuture<'a, Option<BlobMeta>> {
        Box::pin(async move {
            let path = self.safe_path_for_key(key).await?;
            match tokio::fs::metadata(&path).await {
                Ok(fs_meta) => {
                    // Prefer the persisted sidecar metadata
                    // (content_type + etag) over the filesystem
                    // defaults. Fall back to `application/octet-stream`
                    // for blobs written by something other than
                    // `put`/`put_stream` (older deployments, manual
                    // file drops, …).
                    let sidecar = read_meta_sidecar(&path).await;
                    Ok(Some(BlobMeta {
                        key: key.to_owned(),
                        content_type: sidecar.as_ref().map_or_else(
                            || "application/octet-stream".to_owned(),
                            |m| m.content_type.clone(),
                        ),
                        byte_size: fs_meta.len(),
                        etag: sidecar.and_then(|m| m.etag),
                    }))
                }
                Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
                Err(err) => Err(BlobStoreError::io(err)),
            }
        })
    }

    fn presigned_url<'a>(&'a self, key: &'a str, expires_in: Duration) -> BlobFuture<'a, String> {
        Box::pin(async move {
            validate_key(key)?;
            let expires_in = if expires_in.is_zero() {
                self.inner.default_expiry
            } else {
                expires_in
            };
            let exp_at = SystemTime::now()
                .checked_add(expires_in)
                .unwrap_or(UNIX_EPOCH)
                .duration_since(UNIX_EPOCH)
                .map_or(0, |d| d.as_secs());

            // Sign the canonical (unencoded) key — the serving route
            // decodes path segments before re-signing for verification.
            let signature = sign(self.inner.signing_key.as_bytes(), key, exp_at);
            // Percent-encode each segment so keys containing reserved
            // URL characters (`?`, `#`, `%`, spaces, …) round-trip
            // correctly through the path. `/` survives as the segment
            // separator.
            let encoded_key = encode_key_path(key);
            let url = format!(
                "{base}/{encoded_key}?exp={exp_at}&sig={signature}",
                base = self.inner.mount_path.trim_end_matches('/'),
            );
            Ok(url)
        })
    }
}

/// Compute the canonical signature for `(key, expiry)`.
///
/// # Panics
///
/// Never; `Hmac::new_from_slice` accepts any key length.
#[must_use]
pub fn sign(key_bytes: &[u8], blob_key: &str, expires_at: u64) -> String {
    use hmac::{Hmac, Mac};
    let mut mac =
        <Hmac<Sha256> as Mac>::new_from_slice(key_bytes).expect("HMAC accepts any key length");
    mac.update(blob_key.as_bytes());
    mac.update(b":");
    mac.update(expires_at.to_string().as_bytes());
    hex(mac.finalize().into_bytes())
}

/// Verify a `(key, expiry, signature)` triple.
///
/// Returns `Ok(())` when the signature matches and `expires_at` is
/// still in the future.
///
/// # Errors
///
/// Returns [`BlobStoreError::Signature`] for malformed, expired, or
/// mismatched signatures.
pub fn verify(
    signing_key: &[u8],
    blob_key: &str,
    expires_at: u64,
    signature: &str,
) -> Result<(), BlobStoreError> {
    let expected = sign(signing_key, blob_key, expires_at);
    if !constant_time_eq(expected.as_bytes(), signature.as_bytes()) {
        return Err(BlobStoreError::Signature("signature mismatch".into()));
    }
    let now = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .map_or(0, |d| d.as_secs());
    if expires_at < now {
        return Err(BlobStoreError::Signature("signed url expired".into()));
    }
    Ok(())
}

fn sha256_hex(bytes: &[u8]) -> String {
    let mut hasher = Sha256::new();
    hasher.update(bytes);
    hex(hasher.finalize())
}

/// Build a same-directory temp path for atomic write-then-rename. The
/// suffix carries a UUID v4 so concurrent writers to the same key
/// don't collide. Same-directory placement is what makes the `rename`
/// atomic on POSIX (cross-device renames would silently degrade to
/// copy-and-delete).
/// Replace `dst` with `src` atomically across platforms.
///
/// On POSIX, `rename` overwrites an existing destination atomically.
/// Replace `dst` with `src` atomically across platforms.
///
/// On POSIX, `rename` overwrites an existing destination atomically,
/// so the first attempt is the fast path. On Windows, `MoveFileEx`
/// without `MOVEFILE_REPLACE_EXISTING` errors with `AlreadyExists`
/// when the destination exists; the fallback path moves the existing
/// `dst` aside to a sibling backup, renames `src` into place, and
/// removes the backup. If the second rename fails for any reason
/// (transient I/O error, permissions, etc.), we rename the backup
/// back into `dst` so a failed overwrite never destroys the
/// previously committed blob — the caller still gets the rename
/// error and cleans up `src`, but the original blob stays intact.
///
/// There's still a tiny non-atomic window on the Windows fallback
/// path between the move-aside and the rename-into-place. Eliminating
/// it requires `MoveFileExW(MOVEFILE_REPLACE_EXISTING)` via the
/// `windows` crate, which we'd rather not pull in for one syscall.
async fn atomic_replace(src: &std::path::Path, dst: &std::path::Path) -> std::io::Result<()> {
    match tokio::fs::rename(src, dst).await {
        Ok(()) => Ok(()),
        Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
            // Move the existing dst aside under a unique sibling name
            // so we can restore it if the rename-into-place fails.
            let backup = backup_sibling_path(dst);
            tokio::fs::rename(dst, &backup).await?;
            match tokio::fs::rename(src, dst).await {
                Ok(()) => {
                    let _ = tokio::fs::remove_file(&backup).await;
                    Ok(())
                }
                Err(rename_err) => {
                    // Restore the original dst from backup. If even
                    // this fails the system is in a state we can't
                    // automatically recover, but it's still better
                    // than the unconditional-delete alternative.
                    let _ = tokio::fs::rename(&backup, dst).await;
                    Err(rename_err)
                }
            }
        }
        Err(err) => Err(err),
    }
}

fn backup_sibling_path(path: &std::path::Path) -> std::path::PathBuf {
    let id = uuid::Uuid::new_v4().simple().to_string();
    let mut name = path.file_name().map_or_else(
        || std::ffi::OsString::from("blob"),
        std::ffi::OsStr::to_owned,
    );
    name.push(".bak.");
    name.push(&id);
    path.with_file_name(name)
}

fn temp_sibling_path(path: &std::path::Path) -> std::path::PathBuf {
    let id = uuid::Uuid::new_v4().simple().to_string();
    let mut name = path.file_name().map_or_else(
        || std::ffi::OsString::from("blob"),
        std::ffi::OsStr::to_owned,
    );
    name.push(".tmp.");
    name.push(&id);
    path.with_file_name(name)
}

/// Persisted metadata that travels alongside each blob's bytes.
///
/// Written by `put`/`put_stream` to a `<path>.meta` sibling so the
/// serving route can return the original `Content-Type` instead of
/// the default `application/octet-stream`.
///
/// **Caveat**: a blob whose key happens to end in `.meta` and aliases
/// the sidecar of a sibling key would collide. Document the constraint
/// in [`docs/guide/storage.md`](../../../docs/guide/storage.md) — in
/// practice keys come from app code (`avatars/{user_id}.png`,
/// `attachments/{uuid}.pdf`) so the chance is vanishing. The S3
/// backend has no equivalent issue because S3 stores `Content-Type` as
/// part of the object's metadata.
#[derive(serde::Serialize, serde::Deserialize)]
pub(crate) struct StoredBlobMeta {
    pub content_type: String,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub etag: Option<String>,
}

fn meta_sidecar_path(path: &std::path::Path) -> std::path::PathBuf {
    let mut name = path.file_name().map_or_else(
        || std::ffi::OsString::from("blob"),
        std::ffi::OsStr::to_owned,
    );
    name.push(".meta");
    path.with_file_name(name)
}

/// Write the sidecar metadata after the bytes have committed.
///
/// Goes through the same temp-file + atomic-rename pattern as blob
/// bytes so a hostile or accidental symlink at the sidecar path can't
/// be followed: `tokio::fs::write` would otherwise dereference a
/// symlink and clobber arbitrary files reachable through it. The
/// temp file uses `create_new(true)` so even on the temp path an
/// existing file or symlink errors out (the uuid suffix means an
/// attacker can't predict the path), and the final rename replaces
/// the dirent atomically without following whatever was at the
/// destination.
///
/// Returns `Ok(())` on success and `Err(())` on any logged failure
/// (already-logged inside, callers don't need to log again). On
/// failure callers should delete any pre-existing sidecar so
/// `head` / serving don't return stale `content_type` for the freshly
/// committed bytes.
#[allow(clippy::cognitive_complexity)]
async fn write_meta_sidecar(blob_path: &std::path::Path, meta: &StoredBlobMeta) -> Result<(), ()> {
    use tokio::io::AsyncWriteExt as _;

    let path = meta_sidecar_path(blob_path);
    let bytes = match serde_json::to_vec(meta) {
        Ok(b) => b,
        Err(err) => {
            tracing::warn!(error = %err, "failed to serialize blob metadata sidecar");
            return Err(());
        }
    };

    let tmp = temp_sibling_path(&path);
    let mut file = match tokio::fs::OpenOptions::new()
        .write(true)
        .create_new(true)
        .open(&tmp)
        .await
    {
        Ok(f) => f,
        Err(err) => {
            tracing::warn!(
                error = %err,
                tmp = %tmp.display(),
                "failed to create blob metadata sidecar temp file"
            );
            return Err(());
        }
    };
    if let Err(err) = file.write_all(&bytes).await {
        let _ = tokio::fs::remove_file(&tmp).await;
        tracing::warn!(error = %err, "failed to write blob metadata sidecar bytes");
        return Err(());
    }
    if let Err(err) = file.flush().await {
        let _ = tokio::fs::remove_file(&tmp).await;
        tracing::warn!(error = %err, "failed to flush blob metadata sidecar");
        return Err(());
    }
    drop(file);
    if let Err(err) = atomic_replace(&tmp, &path).await {
        let _ = tokio::fs::remove_file(&tmp).await;
        tracing::warn!(
            error = %err,
            sidecar = %path.display(),
            "failed to commit blob metadata sidecar"
        );
        return Err(());
    }
    Ok(())
}

/// On a failed sidecar write, remove any pre-existing sidecar so a
/// future `head`/serve request returns the `application/octet-stream`
/// fallback rather than stale `content_type` from the previous put.
/// The bytes are already committed; we'd rather serve "I don't know"
/// than misrepresent the MIME.
async fn drop_stale_sidecar(blob_path: &std::path::Path) {
    let path = meta_sidecar_path(blob_path);
    if let Err(err) = tokio::fs::remove_file(&path).await
        && err.kind() != std::io::ErrorKind::NotFound
    {
        tracing::warn!(
            error = %err,
            sidecar = %path.display(),
            "failed to clear stale blob metadata sidecar after sidecar-write failure"
        );
    }
}

/// Read the sidecar metadata for a blob. Returns `None` for a missing
/// or unparseable sidecar so the serving / `head` paths can fall back
/// gracefully.
async fn read_meta_sidecar(blob_path: &std::path::Path) -> Option<StoredBlobMeta> {
    let path = meta_sidecar_path(blob_path);
    let bytes = tokio::fs::read(&path).await.ok()?;
    serde_json::from_slice(&bytes).ok()
}

/// Percent-encode each `/`-separated segment of `key` for use in a URL
/// path. Segment separators stay raw so the path tree survives.
fn encode_key_path(key: &str) -> String {
    use percent_encoding::{AsciiSet, CONTROLS, utf8_percent_encode};
    // RFC 3986 path segment: encode controls, path-reserved, and
    // anything that would alias another segment or query/fragment
    // delimiter.
    const PATH_SEGMENT: &AsciiSet = &CONTROLS
        .add(b' ')
        .add(b'"')
        .add(b'#')
        .add(b'%')
        .add(b'/')
        .add(b'<')
        .add(b'>')
        .add(b'?')
        .add(b'`')
        .add(b'{')
        .add(b'}')
        .add(b'\\');

    let mut result = String::with_capacity(key.len() + 16);
    let mut first = true;
    for segment in key.split('/') {
        if !first {
            result.push('/');
        }
        first = false;
        result.extend(utf8_percent_encode(segment, PATH_SEGMENT));
    }
    result
}

fn hex<B: AsRef<[u8]>>(bytes: B) -> String {
    let mut s = String::with_capacity(bytes.as_ref().len() * 2);
    for b in bytes.as_ref() {
        use std::fmt::Write as _;
        let _ = write!(s, "{b:02x}");
    }
    s
}

fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
    use subtle::ConstantTimeEq;
    a.ct_eq(b).into()
}

/// Verify a signed blob URL against `current` and each `previous` key.
///
/// Expiry is checked first (same for all keys). The signature is then compared
/// against every key using constant-time comparison; the first match wins.
/// This enables a rotation grace window: sign new URLs with `current` while
/// URLs that were signed with an old key continue to serve until their expiry.
pub(crate) fn verify_with_rotation(
    current: &SigningKey,
    previous: &[SigningKey],
    blob_key: &str,
    expires_at: u64,
    signature: &str,
) -> Result<(), BlobStoreError> {
    let now = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .map_or(0, |d| d.as_secs());
    if expires_at < now {
        return Err(BlobStoreError::Signature("signed url expired".into()));
    }
    let expected_current = sign(current.as_bytes(), blob_key, expires_at);
    if constant_time_eq(expected_current.as_bytes(), signature.as_bytes()) {
        return Ok(());
    }
    for prev in previous {
        let expected = sign(prev.as_bytes(), blob_key, expires_at);
        if constant_time_eq(expected.as_bytes(), signature.as_bytes()) {
            return Ok(());
        }
    }
    Err(BlobStoreError::Signature("signature mismatch".into()))
}

// ── Serving route ──────────────────────────────────────────────

/// Build the axum router that serves signed local-blob URLs.
///
/// This is mounted by the framework at the configured `mount_path`.
pub fn serve_router(store: &LocalBlobStore) -> axum::Router<crate::AppState> {
    use axum::extract::{Path, Query};
    use axum::response::IntoResponse;

    #[derive(Debug, serde::Deserialize)]
    struct SignedQuery {
        exp: u64,
        sig: String,
    }

    let store_for_route = store.clone();
    let mount = format!("{}/{{*key}}", store.mount_path().trim_end_matches('/'));

    let handler = move |Path(blob_key): Path<String>, Query(q): Query<SignedQuery>| {
        let store = store_for_route.clone();
        async move {
            if let Err(err) = verify_with_rotation(
                &store.inner.signing_key,
                &store.inner.previous_signing_keys,
                &blob_key,
                q.exp,
                &q.sig,
            ) {
                return (StatusCode::FORBIDDEN, err.to_string()).into_response();
            }
            match store.get_with_meta(&blob_key).await {
                Ok((bytes, meta)) => {
                    let content_type = meta
                        .map_or_else(|| "application/octet-stream".to_owned(), |m| m.content_type);
                    ([(http::header::CONTENT_TYPE, content_type)], bytes).into_response()
                }
                Err(BlobStoreError::NotFound(_)) => {
                    (StatusCode::NOT_FOUND, "not found").into_response()
                }
                Err(err) => err.into_autumn_error().into_response(),
            }
        }
    };

    axum::Router::new().route(&mount, axum::routing::get(handler))
}

#[cfg(test)]
mod tests {
    use super::*;
    use bytes::Bytes;
    use futures::stream;

    fn temp_root() -> tempfile::TempDir {
        tempfile::tempdir().unwrap()
    }

    fn store(root: &Path) -> LocalBlobStore {
        LocalBlobStore::new(
            "test",
            root.to_path_buf(),
            "/_blobs",
            Duration::from_secs(60),
            SigningKey::new(b"test-key".to_vec()),
            vec![],
        )
        .unwrap()
    }

    #[tokio::test]
    async fn put_get_round_trip() {
        let dir = temp_root();
        let s = store(dir.path());
        let blob = s
            .put("a/b.png", "image/png", Bytes::from_static(b"abc"))
            .await
            .unwrap();
        assert_eq!(blob.byte_size, 3);
        assert!(blob.etag.is_some());
        let bytes = s.get("a/b.png").await.unwrap();
        assert_eq!(&bytes[..], b"abc");
    }

    #[tokio::test]
    async fn put_stream_round_trip() {
        let dir = temp_root();
        let s = store(dir.path());
        let chunks: Vec<Result<Bytes, BlobStoreError>> = vec![
            Ok(Bytes::from_static(b"hello, ")),
            Ok(Bytes::from_static(b"world")),
        ];
        let stream: ByteStream<'static> = Box::pin(stream::iter(chunks));
        let blob = s
            .put_stream("greet.txt", "text/plain", stream)
            .await
            .unwrap();
        assert_eq!(blob.byte_size, 12);
        let bytes = s.get("greet.txt").await.unwrap();
        assert_eq!(&bytes[..], b"hello, world");
    }

    #[tokio::test]
    async fn get_missing_returns_not_found() {
        let dir = temp_root();
        let s = store(dir.path());
        let err = s.get("missing.txt").await.unwrap_err();
        assert!(matches!(err, BlobStoreError::NotFound(_)));
    }

    #[tokio::test]
    async fn delete_idempotent() {
        let dir = temp_root();
        let s = store(dir.path());
        s.delete("nope").await.unwrap();
        let _ = s
            .put("k.txt", "text/plain", Bytes::from_static(b"x"))
            .await
            .unwrap();
        s.delete("k.txt").await.unwrap();
        assert!(matches!(
            s.get("k.txt").await.unwrap_err(),
            BlobStoreError::NotFound(_)
        ));
    }

    #[tokio::test]
    async fn rejects_traversal_keys() {
        let dir = temp_root();
        let s = store(dir.path());
        let err = s
            .put("../escape.txt", "text/plain", Bytes::from_static(b"x"))
            .await
            .unwrap_err();
        assert!(matches!(err, BlobStoreError::InvalidInput(_)));
    }

    #[test]
    fn new_rejects_mount_path_without_leading_slash() {
        let dir = temp_root();
        let err = LocalBlobStore::new(
            "test",
            dir.path().to_path_buf(),
            "_blobs", // missing leading slash
            Duration::from_secs(60),
            SigningKey::new(b"k".to_vec()),
            vec![],
        )
        .unwrap_err();
        assert!(matches!(err, BlobStoreError::InvalidInput(_)));
    }

    /// A hostile or accidental symlink inside the storage tree can
    /// turn a legitimate-looking key into a path-escape. Pin that we
    /// catch the canonical-path mismatch before any IO happens.
    #[cfg(unix)]
    #[tokio::test]
    async fn rejects_keys_traversing_root_escaping_symlinks() {
        use std::os::unix::fs::symlink;

        let outside = tempfile::tempdir().unwrap();
        // Create a sensitive file outside the storage root.
        std::fs::write(outside.path().join("secret"), b"do not read").unwrap();

        let dir = temp_root();
        // root/escape -> outside_dir
        symlink(outside.path(), dir.path().join("escape")).unwrap();

        let s = store(dir.path());
        let err = s.get("escape/secret").await.unwrap_err();
        assert!(
            matches!(err, BlobStoreError::PermissionDenied(_)),
            "expected PermissionDenied, got {err:?}"
        );

        // And the same for writes — `put` to a key that resolves
        // outside the root must refuse before any bytes hit disk.
        let err = s
            .put(
                "escape/leaked.txt",
                "text/plain",
                Bytes::from_static(b"oops"),
            )
            .await
            .unwrap_err();
        assert!(matches!(err, BlobStoreError::PermissionDenied(_)));
        // Outside dir is untouched.
        assert!(!outside.path().join("leaked.txt").exists());
    }

    /// A hostile symlink planted at the *sidecar* path is the same
    /// threat as one planted at the blob path: the naïve
    /// `tokio::fs::write` would follow it and clobber arbitrary
    /// targets. The temp-file + atomic-rename pattern in
    /// `write_meta_sidecar` replaces the dirent atomically without
    /// following whatever was there.
    #[cfg(unix)]
    #[tokio::test]
    async fn sidecar_write_does_not_follow_hostile_symlink() {
        use std::os::unix::fs::symlink;

        let outside = tempfile::tempdir().unwrap();
        let target = outside.path().join("untouchable");
        std::fs::write(&target, b"original-contents").unwrap();

        let dir = temp_root();
        let s = store(dir.path());

        // Plant a symlink at the *sidecar* path before the put runs.
        // The sidecar for key `victim.bin` is `victim.bin.meta`.
        let sidecar_path = dir.path().join("victim.bin.meta");
        symlink(&target, &sidecar_path).unwrap();

        // The put succeeds (sidecar errors are logged, not surfaced) —
        // the important invariant is the symlink target.
        s.put("victim.bin", "image/png", Bytes::from_static(b"pixels"))
            .await
            .unwrap();

        // The original symlink target must still hold its original
        // bytes, *not* the sidecar JSON.
        assert_eq!(std::fs::read(&target).unwrap(), b"original-contents");
    }

    #[test]
    fn signature_round_trip() {
        let key = b"k";
        let sig = sign(key, "blob/1.png", 99);
        verify(key, "blob/1.png", u64::MAX / 2, &sig).unwrap_err();
        // Use a now-future expiry for verification.
        let exp = SystemTime::now()
            .checked_add(Duration::from_secs(60))
            .unwrap()
            .duration_since(UNIX_EPOCH)
            .unwrap()
            .as_secs();
        let sig = sign(key, "blob/1.png", exp);
        verify(key, "blob/1.png", exp, &sig).unwrap();
    }

    #[test]
    fn signature_rejects_wrong_key() {
        let exp = SystemTime::now()
            .checked_add(Duration::from_secs(60))
            .unwrap()
            .duration_since(UNIX_EPOCH)
            .unwrap()
            .as_secs();
        let sig = sign(b"alpha", "blob/1.png", exp);
        let err = verify(b"beta", "blob/1.png", exp, &sig).unwrap_err();
        assert!(matches!(err, BlobStoreError::Signature(_)));
    }

    #[test]
    fn signature_rejects_expired() {
        let exp = 1; // long ago
        let sig = sign(b"k", "blob/1.png", exp);
        let err = verify(b"k", "blob/1.png", exp, &sig).unwrap_err();
        assert!(matches!(err, BlobStoreError::Signature(_)));
    }

    #[tokio::test]
    async fn presigned_url_includes_signature_and_exp() {
        let dir = temp_root();
        let s = store(dir.path());
        let url = s
            .presigned_url("a/b.png", Duration::from_secs(120))
            .await
            .unwrap();
        assert!(url.starts_with("/_blobs/a/b.png?exp="));
        assert!(url.contains("&sig="));
    }

    #[tokio::test]
    async fn presigned_url_percent_encodes_reserved_chars() {
        let dir = temp_root();
        let s = store(dir.path());
        // Space and `#` pass `validate_key` (the Windows-reserved
        // rejection set is `< > : " | ? *` + control bytes only) but
        // still need percent-encoding inside the URL path.
        let url = s
            .presigned_url("user 1/note#1.png", Duration::from_secs(120))
            .await
            .unwrap();
        // `/` stays raw as a segment separator.
        assert!(
            url.starts_with("/_blobs/user%201/note%231.png?exp="),
            "unexpected URL: {url}"
        );
        assert!(url.contains("&sig="));
    }

    #[tokio::test]
    async fn put_stream_cleans_up_partial_file_on_error() {
        use futures::stream;

        let dir = temp_root();
        let s = store(dir.path());

        // First chunk succeeds, second yields an error to short-circuit
        // the write.
        let chunks: Vec<Result<Bytes, BlobStoreError>> = vec![
            Ok(Bytes::from_static(b"first")),
            Err(BlobStoreError::Backend("boom".into())),
        ];
        let stream: ByteStream<'static> = Box::pin(stream::iter(chunks));
        let err = s
            .put_stream("interrupted.bin", "application/octet-stream", stream)
            .await
            .unwrap_err();
        assert!(matches!(err, BlobStoreError::Backend(_)));

        // The partial file must not be left on disk.
        let path = dir.path().join("interrupted.bin");
        assert!(!path.exists(), "partial blob was not cleaned up");
        assert!(matches!(
            s.get("interrupted.bin").await.unwrap_err(),
            BlobStoreError::NotFound(_)
        ));
    }

    #[test]
    fn encode_key_path_passes_segments_separately() {
        assert_eq!(encode_key_path("foo"), "foo");
        assert_eq!(encode_key_path("a/b/c"), "a/b/c");
        assert_eq!(encode_key_path("a b/c?d"), "a%20b/c%3Fd");
        assert_eq!(encode_key_path("hash#frag/q"), "hash%23frag/q");
        assert_eq!(encode_key_path(""), "");
        assert_eq!(encode_key_path("a/"), "a/");
        assert_eq!(encode_key_path("/b"), "/b");
        assert_eq!(encode_key_path("🚀/path"), "%F0%9F%9A%80/path");
    }

    #[tokio::test]
    async fn put_replaces_atomically() {
        // Successive `put` calls to the same key replace via temp-file +
        // rename. Concrete fault injection for a mid-write IO error is
        // exercised through `put_stream_cleans_up_partial_file_on_error`;
        // here we just confirm the happy path of atomic replacement.
        let dir = temp_root();
        let s = store(dir.path());
        s.put(
            "k.bin",
            "application/octet-stream",
            Bytes::from_static(b"first"),
        )
        .await
        .unwrap();
        s.put(
            "k.bin",
            "application/octet-stream",
            Bytes::from_static(b"second"),
        )
        .await
        .unwrap();
        assert_eq!(&s.get("k.bin").await.unwrap()[..], b"second");

        // No leftover temp files in the storage root.
        let entries: Vec<_> = std::fs::read_dir(dir.path())
            .unwrap()
            .filter_map(Result::ok)
            .map(|e| e.file_name().to_string_lossy().into_owned())
            .collect();
        assert!(
            !entries.iter().any(|n| n.contains(".tmp.")),
            "temp file leaked: {entries:?}"
        );
    }

    #[tokio::test]
    async fn atomic_replace_overwrites_existing_destination() {
        let dir = temp_root();
        let dst = dir.path().join("target.bin");
        tokio::fs::write(&dst, b"old").await.unwrap();
        let tmp = dir.path().join("staging.tmp");
        tokio::fs::write(&tmp, b"new").await.unwrap();

        atomic_replace(&tmp, &dst).await.unwrap();

        assert_eq!(tokio::fs::read(&dst).await.unwrap(), b"new");
        assert!(!tmp.exists(), "temp file should be consumed by rename");
    }

    #[tokio::test]
    async fn atomic_replace_creates_new_destination() {
        let dir = temp_root();
        let dst = dir.path().join("fresh.bin");
        let tmp = dir.path().join("staging.tmp");
        tokio::fs::write(&tmp, b"hello").await.unwrap();

        atomic_replace(&tmp, &dst).await.unwrap();

        assert_eq!(tokio::fs::read(&dst).await.unwrap(), b"hello");
    }

    #[tokio::test]
    async fn atomic_replace_propagates_io_errors() {
        let dir = temp_root();
        let tmp = dir.path().join("missing.tmp"); // never created
        let dst = dir.path().join("target.bin");
        let err = atomic_replace(&tmp, &dst).await.unwrap_err();
        assert!(
            matches!(err.kind(), std::io::ErrorKind::NotFound),
            "expected NotFound, got {:?}",
            err.kind()
        );
    }

    #[test]
    fn temp_sibling_path_keeps_parent_directory() {
        let original = std::path::Path::new("/var/lib/blobs/avatars/me.png");
        let tmp = temp_sibling_path(original);
        assert_eq!(tmp.parent(), original.parent());
        let name = tmp.file_name().unwrap().to_string_lossy();
        assert!(name.starts_with("me.png.tmp."));
    }

    #[test]
    fn backup_sibling_path_keeps_parent_directory() {
        let original = std::path::Path::new("/var/lib/blobs/avatars/me.png");
        let backup = backup_sibling_path(original);
        assert_eq!(backup.parent(), original.parent());
        let name = backup.file_name().unwrap().to_string_lossy();
        assert!(name.starts_with("me.png.bak."));
    }

    #[test]
    fn meta_sidecar_path_appends_meta_suffix() {
        let blob = std::path::Path::new("/var/lib/blobs/avatars/me.png");
        let sidecar = meta_sidecar_path(blob);
        assert_eq!(sidecar.parent(), blob.parent());
        assert_eq!(sidecar.file_name().unwrap(), "me.png.meta");
    }

    #[tokio::test]
    async fn put_persists_content_type_for_head_and_serve() {
        let dir = temp_root();
        let s = store(dir.path());
        let blob = s
            .put("a/b.png", "image/png", Bytes::from_static(b"abc"))
            .await
            .unwrap();
        assert_eq!(blob.content_type, "image/png");

        let meta = s.head("a/b.png").await.unwrap().expect("blob exists");
        assert_eq!(meta.content_type, "image/png");
        assert!(meta.etag.is_some(), "etag should round-trip via sidecar");
    }

    #[tokio::test]
    async fn delete_cleans_up_meta_sidecar() {
        let dir = temp_root();
        let s = store(dir.path());
        s.put("k.png", "image/png", Bytes::from_static(b"x"))
            .await
            .unwrap();
        let resolved = s.safe_path_for_key("k.png").await.unwrap();
        assert!(meta_sidecar_path(&resolved).exists());

        s.delete("k.png").await.unwrap();
        assert!(!meta_sidecar_path(&resolved).exists());
    }

    /// When the blob removal fails, the sidecar must stay in place so
    /// the failed delete is side-effect-free as far as `head` / serve
    /// are concerned. Force the failure by making the blob path itself
    /// a directory (POSIX `unlink` errors on directories with
    /// `IsADirectory` / `EISDIR`); permissions-based forcings don't
    /// work uniformly when tests run as root.
    #[tokio::test]
    async fn delete_keeps_sidecar_when_blob_remove_fails() {
        let dir = temp_root();
        let s = store(dir.path());

        // Put `pinned.bin` as a *directory* so `remove_file` errors,
        // and pre-stage a sidecar so we can verify it survives.
        let blob_path = dir.path().join("pinned.bin");
        tokio::fs::create_dir(&blob_path).await.unwrap();
        let sidecar = meta_sidecar_path(&blob_path);
        tokio::fs::write(&sidecar, br#"{"content_type":"image/png"}"#)
            .await
            .unwrap();
        assert!(blob_path.is_dir());
        assert!(sidecar.is_file());

        let result = s.delete("pinned.bin").await;
        assert!(
            result.is_err(),
            "expected error: blob path is a directory, remove_file should fail"
        );
        // The new ordering propagates the blob-delete error before
        // touching the sidecar, so the sidecar stays behind.
        assert!(
            sidecar.exists(),
            "sidecar must survive a failed blob delete"
        );
        // And the (directory) blob is also still there.
        assert!(blob_path.is_dir());
    }

    #[tokio::test]
    async fn head_falls_back_to_octet_stream_without_sidecar() {
        // Simulate an older blob written without a sidecar.
        let dir = temp_root();
        let s = store(dir.path());
        let path = dir.path().join("legacy.bin");
        tokio::fs::write(&path, b"raw").await.unwrap();
        let meta = s.head("legacy.bin").await.unwrap().expect("blob exists");
        assert_eq!(meta.content_type, "application/octet-stream");
        assert_eq!(meta.byte_size, 3);
        assert!(meta.etag.is_none());
    }

    #[tokio::test]
    async fn drop_stale_sidecar_removes_existing_metadata() {
        // The recovery path used by `put` / `put_stream` when a sidecar
        // write fails after the bytes commit: we delete the old
        // sidecar so future `head`/serve calls fall back to
        // octet-stream rather than reporting stale MIME for the new
        // bytes.
        let dir = temp_root();
        let blob = dir.path().join("victim.bin");
        let sidecar = meta_sidecar_path(&blob);
        tokio::fs::write(&sidecar, br#"{"content_type":"image/png"}"#)
            .await
            .unwrap();
        assert!(sidecar.exists());

        drop_stale_sidecar(&blob).await;
        assert!(!sidecar.exists());

        // Idempotent — calling again on a missing sidecar is a no-op,
        // not an error.
        drop_stale_sidecar(&blob).await;
    }

    #[tokio::test]
    async fn read_meta_sidecar_handles_missing_and_malformed() {
        let dir = temp_root();
        // Missing sidecar → None (file at the blob path doesn't matter
        // for this helper; we're only testing sidecar read behavior).
        let blob_path = dir.path().join("absent.bin");
        assert!(read_meta_sidecar(&blob_path).await.is_none());

        // Malformed JSON in the sidecar → None (graceful degradation;
        // the serving route falls back to octet-stream).
        let blob_path = dir.path().join("malformed.bin");
        tokio::fs::write(meta_sidecar_path(&blob_path), b"not json")
            .await
            .unwrap();
        assert!(read_meta_sidecar(&blob_path).await.is_none());
    }

    #[tokio::test]
    async fn get_with_meta_returns_bytes_plus_sidecar_metadata() {
        let dir = temp_root();
        let s = store(dir.path());
        s.put(
            "doc.pdf",
            "application/pdf",
            Bytes::from_static(b"%PDF-1.4"),
        )
        .await
        .unwrap();
        let (bytes, meta) = s.get_with_meta("doc.pdf").await.unwrap();
        assert_eq!(&bytes[..], b"%PDF-1.4");
        let m = meta.expect("sidecar should be present");
        assert_eq!(m.content_type, "application/pdf");
        assert!(m.etag.is_some());
    }

    /// Simulates the Windows fallback's "second rename fails" branch
    /// directly (POSIX `rename` always replaces, so the production
    /// path on Linux never enters the `AlreadyExists` arm). The
    /// invariant we care about is: if the recovery-rename succeeds,
    /// the original `dst` content is preserved when the second
    /// rename fails.
    #[tokio::test]
    async fn atomic_replace_recovery_restores_dst_on_failure() {
        let dir = temp_root();
        let dst = dir.path().join("target.bin");
        tokio::fs::write(&dst, b"old-blob").await.unwrap();

        // Manually drive the recovery sequence. Move dst aside …
        let backup = backup_sibling_path(&dst);
        tokio::fs::rename(&dst, &backup).await.unwrap();
        // … attempt the second rename with a non-existent source so it
        // fails (this is the "transient failure on the new write"
        // branch the production code's `match` arm catches) …
        let err = tokio::fs::rename(dir.path().join("never.tmp"), &dst)
            .await
            .unwrap_err();
        assert_eq!(err.kind(), std::io::ErrorKind::NotFound);
        // … and restore the backup, which is exactly what the
        // production path does on rename failure.
        tokio::fs::rename(&backup, &dst).await.unwrap();

        // The original blob bytes are intact.
        assert_eq!(tokio::fs::read(&dst).await.unwrap(), b"old-blob");
    }

    #[test]
    fn encode_key_path_does_not_skip_leading_slash() {
        let key = "/some/key";
        let encoded = encode_key_path(key);
        // The first segment before the split '/' is empty
        assert_eq!(encoded, "/some/key");
    }

    #[test]
    fn sha256_hex_computes_correct_hash() {
        // e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 is empty string
        assert_eq!(
            sha256_hex(b""),
            "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
        );
    }

    #[test]
    fn hex_computes_correct_hex() {
        assert_eq!(hex(b"xyz"), "78797a");
    }

    #[test]
    fn verify_rejects_empty_signature() {
        let key = b"secret";
        let now = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap()
            .as_secs();
        let expires = now + 3600;
        let result = verify(key, "blob", expires, "");
        assert!(matches!(result, Err(BlobStoreError::Signature(_))));
    }

    #[tokio::test]
    async fn safe_path_for_key_rejects_missing_directory() {
        let dir = tempfile::tempdir().unwrap();
        let path = dir.path().to_path_buf();
        let s2 = store(&path);
        dir.close().unwrap();

        let err = s2.safe_path_for_key("some_blob").await.unwrap_err();
        assert!(
            matches!(
                err,
                BlobStoreError::Io(_) | BlobStoreError::PermissionDenied(_)
            ),
            "Unexpected error: {err:?}"
        );
    }

    #[tokio::test]
    async fn get_with_meta_propagates_io_errors() {
        let dir = temp_root();
        let s = store(dir.path());

        let path = dir.path().join("some_blob");
        // create a directory so that get_with_meta reading file fails
        tokio::fs::create_dir(&path).await.unwrap();
        let Err(err) = s.get_with_meta("some_blob").await else {
            panic!("Expected error");
        };
        assert!(matches!(err, BlobStoreError::Io(_)));
    }

    #[tokio::test]
    async fn drop_stale_sidecar_is_idempotent() {
        let dir = temp_root();
        let blob = dir.path().join("victim.bin");
        let sidecar = meta_sidecar_path(&blob);

        drop_stale_sidecar(&blob).await;
        drop_stale_sidecar(&blob).await;
        // Make sure doing it when file doesn't exist doesn't panic
        assert!(!sidecar.exists());
    }

    #[tokio::test]
    async fn drop_stale_sidecar_ignores_and_survives_non_not_found_errors() {
        // Create a directory where the sidecar should be so that remove_file returns EISDIR
        let dir = temp_root();
        let blob = dir.path().join("victim.bin");
        let sidecar = meta_sidecar_path(&blob);
        tokio::fs::create_dir(&sidecar).await.unwrap();

        // The function drop_stale_sidecar ignores all errors internally but prints a warning.
        // It's `async fn drop_stale_sidecar`, no return value.
        // We will just run it and ensure it doesn't panic on a non-not-found error.
        drop_stale_sidecar(&blob).await;
        assert!(sidecar.exists()); // Because it couldn't remove a dir
    }

    #[tokio::test]
    async fn get_with_meta_when_meta_missing_returns_none() {
        let dir = temp_root();
        let s = store(dir.path());

        // Write the blob but no sidecar
        let path = dir.path().join("blob.bin");
        tokio::fs::write(&path, b"data").await.unwrap();

        let (bytes, meta) = s.get_with_meta("blob.bin").await.unwrap();
        assert_eq!(&bytes[..], b"data");
        assert!(meta.is_none());
    }

    #[tokio::test]
    async fn atomic_replace_handles_already_exists_on_backup_rename() {
        let dir = temp_root();
        let dst = dir.path().join("target.bin");
        tokio::fs::write(&dst, b"old").await.unwrap();

        let tmp = dir.path().join("staging.tmp");
        tokio::fs::write(&tmp, b"new").await.unwrap();

        // Create a backup file manually to trigger AlreadyExists
        let backup = backup_sibling_path(&dst);
        tokio::fs::write(&backup, b"interloper").await.unwrap();

        // Atomic replace will encounter AlreadyExists when renaming `dst` to `backup`
        // It will then retry. Before it loops, it does nothing if there is an interloper? No, the code says:
        // if err.kind() == std::io::ErrorKind::AlreadyExists ... Wait, rename can overwrite. But on Windows `rename` fails if dst exists.
        // It's possible `AlreadyExists` or `PermissionDenied` depending on OS. The code is:
        // match tokio::fs::rename(dst, backup_path).await {
        //   Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
        //       let _ = tokio::fs::remove_file(backup_path).await;
        //       continue;
        //   } ...
        // So we can simulate this by mocking, or relying on `rename` failing if exists (not true on Unix).
        // Since we are on Unix, rename just replaces. To trigger this we can't easily do it.
        // Let's at least test we can still replace if backup exists.
        atomic_replace(&tmp, &dst).await.unwrap();

        assert_eq!(tokio::fs::read(&dst).await.unwrap(), b"new");
    }

    #[tokio::test]
    async fn put_rejects_missing_directory_to_trigger_io_error() {
        let missing = tempfile::tempdir().unwrap().path().to_path_buf();
        let s2 = store(&missing);
        std::fs::remove_dir_all(&missing).unwrap();

        let err = s2
            .put("some_blob", "text/plain", bytes::Bytes::from_static(b"xyz"))
            .await
            .unwrap_err();
        assert!(matches!(
            err,
            BlobStoreError::Io(_) | BlobStoreError::PermissionDenied(_)
        ));
    }

    #[tokio::test]
    async fn delete_rejects_missing_directory_to_trigger_io_error() {
        let missing = tempfile::tempdir().unwrap().path().to_path_buf();
        let s2 = store(&missing);
        std::fs::remove_dir_all(&missing).unwrap();

        let err = s2.delete("some_blob").await.unwrap_err();
        assert!(matches!(
            err,
            BlobStoreError::Io(_) | BlobStoreError::PermissionDenied(_)
        ));
    }

    #[tokio::test]
    async fn head_rejects_missing_directory_to_trigger_io_error() {
        let missing = tempfile::tempdir().unwrap().path().to_path_buf();
        let s2 = store(&missing);
        std::fs::remove_dir_all(&missing).unwrap();

        let err = s2.head("some_blob").await.unwrap_err();
        assert!(matches!(
            err,
            BlobStoreError::Io(_) | BlobStoreError::PermissionDenied(_)
        ));
    }

    #[test]
    fn provider_id_is_local() {
        let dir = temp_root();
        let s = LocalBlobStore::new(
            "local_test_id",
            dir.path().to_path_buf(),
            "/mnt",
            std::time::Duration::from_secs(3600),
            SigningKey::random(),
            vec![],
        )
        .unwrap();
        assert_eq!(s.provider_id(), "local_test_id");
    }

    #[test]
    fn signing_key_debug_does_not_leak_material() {
        let key = SigningKey::new(b"super-secret".to_vec());
        let dbg = format!("{key:?}");
        assert!(!dbg.contains("super-secret"));
        assert!(dbg.contains("len"));
    }

    // ── Previous-key rotation (RED phase) ──────────────────────────────────

    #[tokio::test]
    async fn blob_url_signed_with_previous_key_still_verifies() {
        let dir = temp_root();
        let old_key = SigningKey::new(b"old-key-32-bytes-xxxxxxxxxxxxxxx".to_vec());
        let new_key = SigningKey::new(b"new-key-32-bytes-xxxxxxxxxxxxxxx".to_vec());

        // Store is built with new key + old key as previous
        let store = LocalBlobStore::new(
            "test",
            dir.path(),
            "/_blobs",
            Duration::from_secs(60),
            new_key,
            vec![old_key.clone()],
        )
        .unwrap();

        store
            .put("a/b.txt", "text/plain", bytes::Bytes::from_static(b"hi"))
            .await
            .unwrap();

        // Sign a URL with the old key directly
        let exp = std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .unwrap()
            .as_secs()
            + 3600;
        let old_sig = sign(old_key.as_bytes(), "a/b.txt", exp);

        // The store should accept it via its previous-key list
        assert!(
            verify_with_rotation(
                &store.inner.signing_key,
                &store.inner.previous_signing_keys,
                "a/b.txt",
                exp,
                &old_sig
            )
            .is_ok(),
            "old-key signed URL must verify during grace window"
        );
    }

    #[test]
    fn blob_url_expired_with_previous_key_still_rejects() {
        let old_key = SigningKey::new(b"old-key-32-bytes-xxxxxxxxxxxxxxx".to_vec());
        let new_key = SigningKey::new(b"new-key-32-bytes-xxxxxxxxxxxxxxx".to_vec());
        let expired_exp = 1u64; // Unix epoch + 1s — already expired
        let old_sig = sign(old_key.as_bytes(), "a/b.txt", expired_exp);
        let result = verify_with_rotation(&new_key, &[old_key], "a/b.txt", expired_exp, &old_sig);
        assert!(
            result.is_err(),
            "expired URL must be rejected even with valid previous key"
        );
    }
}