triblespace-core 0.33.0

The triblespace core implementation.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
//! This module provides a high-level API for storing and retrieving data from repositories.
//! The design is inspired by Git, but with a focus on object/content-addressed storage.
//! It separates storage concerns from the data model, and reduces the mutable state of the repository,
//! to an absolute minimum, making it easier to reason about and allowing for different storage backends.
//!
//! Blob repositories are collections of blobs that can be content-addressed by their hash.
#![allow(clippy::type_complexity)]
//! This is typically local `.pile` file or a S3 bucket or a similar service.
//! On their own they have no notion of branches or commits, or other stateful constructs.
//! As such they also don't have a notion of time, order or history,
//! massively relaxing the constraints on storage.
//! This makes it possible to use a wide range of storage services, including those that don't support
//! atomic transactions or have other limitations.
//!
//! Branch repositories on the other hand are a stateful construct that can be used to represent a branch pointing to a specific commit.
//! They are stored in a separate repository, typically a  local `.pile` file, a database or an S3 compatible service with a compare-and-swap operation,
//! and can be used to represent the state of a repository at a specific point in time.
//!
//! Technically, branches are just a mapping from a branch id to a blob hash,
//! But because TribleSets are themselves easily stored in a blob, and because
//! trible commit histories are an append-only chain of TribleSet metadata,
//! the hash of the head is sufficient to represent the entire history of a branch.
//!
//! ## Basic usage
//!
//! ```rust,ignore
//! use ed25519_dalek::SigningKey;
//! use rand::rngs::OsRng;
//! use triblespace::prelude::*;
//! use triblespace::prelude::valueschemas::{GenId, ShortString};
//! use triblespace::repo::{memoryrepo::MemoryRepo, Repository};
//!
//! let storage = MemoryRepo::default();
//! let mut repo = Repository::new(storage, SigningKey::generate(&mut OsRng), TribleSet::new()).unwrap();
//! let branch_id = repo.create_branch("main", None).expect("create branch");
//! let mut ws = repo.pull(*branch_id).expect("pull branch");
//!
//! attributes! {
//!     "8F180883F9FD5F787E9E0AF0DF5866B9" as pub author: GenId;
//!     "0DBB530B37B966D137C50B943700EDB2" as pub firstname: ShortString;
//!     "6BAA463FD4EAF45F6A103DB9433E4545" as pub lastname: ShortString;
//! }
//! let author = fucid();
//! ws.commit(
//!     entity!{ &author @
//!         literature::firstname: "Frank",
//!         literature::lastname: "Herbert",
//!      },
//!     "initial commit",
//! );
//!
//! // Single-attempt push: `try_push` uploads local blobs and attempts a
//! // single CAS update. On conflict it returns a workspace containing the
//! // new branch state which you should merge into before retrying.
//! match repo.try_push(&mut ws).expect("try_push") {
//!     None => {}
//!     Some(_) => panic!("unexpected conflict"),
//! }
//! ```
//!
//! `create_branch` registers a new branch and returns an [`ExclusiveId`](crate::id::ExclusiveId) guard.
//! `pull` creates a new workspace from an existing branch while
//! `branch_from` can be used to start a new branch from a specific commit
//! handle. See `examples/workspace.rs` for a more complete example.
//!
//! ## Handling conflicts
//!
//! The single-attempt primitive is [`Repository::try_push`]. It returns
//! `Ok(None)` on success or `Ok(Some(conflict_ws))` when the branch advanced
//! concurrently. Callers that want explicit conflict handling may use this
//! form:
//!
//! ```rust,ignore
//! while let Some(mut other) = repo.try_push(&mut ws)? {
//!     // Merge our staged changes into the incoming workspace and retry.
//!     other.merge(&mut ws)?;
//!     ws = other;
//! }
//! ```
//!
//! For convenience `Repository::push` is provided as a retrying wrapper that
//! performs the merge-and-retry loop for you. Call `push` when you prefer the
//! repository to handle conflicts automatically; call `try_push` when you need
//! to inspect or control the intermediate conflict workspace yourself.
//!
//! `push` performs a compare‐and‐swap (CAS) update on the branch metadata.
//! This optimistic concurrency control keeps branches consistent without
//! locking and can be emulated by many storage systems (for example by
//! using conditional writes on S3).
//!
//! ## Git parallels
//!
//! The API deliberately mirrors concepts from Git to make its usage familiar:
//!
//! - A [`Repository`] stores commits and branch metadata similar to a remote.
//! - [`Workspace`] is akin to a working directory combined with an index. It
//!   tracks changes against a branch head until you `push` them.
//! - `create_branch` and `branch_from` correspond to creating new branches from
//!   scratch or from a specific commit, respectively.
//! - `push` updates the repository atomically. If the branch advanced in the
//!   meantime, you receive a conflict workspace which can be merged before
//!   retrying the push.
//! - `pull` is similar to cloning a branch into a new workspace.
//!
//! `pull` uses the repository's default signing key for new commits. If you
//! need to work with a different identity, the `_with_key` variants allow providing
//! an explicit key when creating branches or pulling workspaces.
//!
//! These parallels should help readers leverage their Git knowledge when
//! working with trible repositories.
//!
/// Branch metadata construction and signature verification.
pub mod branch;
/// Commit metadata construction and signature verification.
pub mod commit;
/// Storage adapter that delegates blobs and branches to separate backends.
pub mod hybridstore;
/// Fully in-memory repository implementation for tests and ephemeral use.
pub mod memoryrepo;
#[cfg(feature = "object-store")]
/// Repository backed by an `object_store`-compatible remote (S3, local FS, etc.).
pub mod objectstore;
/// Local file-based pile storage backend.
pub mod pile;

/// Trait for storage backends that require explicit close/cleanup.
///
/// Not all storage backends need to implement this; implementations that have
/// nothing to do on close may return Ok(()) or use `Infallible` as the error
/// type.
pub trait StorageClose {
    /// Error type returned by `close`.
    type Error: std::error::Error;

    /// Consume the storage and perform any necessary cleanup.
    fn close(self) -> Result<(), Self::Error>;
}

// Convenience impl for repositories whose storage supports explicit close.
impl<Storage> Repository<Storage>
where
    Storage: BlobStore<Blake3> + BranchStore<Blake3> + StorageClose,
{
    /// Close the repository's underlying storage if it supports explicit
    /// close operations.
    ///
    /// This method is only available when the storage type implements
    /// [`StorageClose`]. It consumes the repository and delegates to the
    /// storage's `close` implementation, returning any error produced.
    pub fn close(self) -> Result<(), <Storage as StorageClose>::Error> {
        self.storage.close()
    }
}

use crate::macros::pattern;
use std::collections::{HashSet, VecDeque};
use std::convert::Infallible;
use std::error::Error;
use std::fmt::Debug;
use std::fmt::{self};

use commit::commit_metadata;
use hifitime::Epoch;
use itertools::Itertools;

use crate::blob::schemas::simplearchive::UnarchiveError;
use crate::blob::schemas::UnknownBlob;
use crate::blob::Blob;
use crate::blob::BlobSchema;
use crate::blob::MemoryBlobStore;
use crate::blob::ToBlob;
use crate::blob::TryFromBlob;
use crate::find;
use crate::id::genid;
use crate::id::Id;
use crate::patch::Entry;
use crate::patch::IdentitySchema;
use crate::patch::PATCH;
use crate::prelude::valueschemas::GenId;
use crate::repo::branch::branch_metadata;
use crate::trible::TribleSet;
use crate::value::schemas::hash::Handle;
use crate::value::schemas::hash::HashProtocol;
use crate::value::Value;
use crate::value::ValueSchema;
use crate::value::VALUE_LEN;
use ed25519_dalek::SigningKey;

use crate::blob::schemas::longstring::LongString;
use crate::blob::schemas::simplearchive::SimpleArchive;
use crate::prelude::*;
use crate::value::schemas::ed25519 as ed;
use crate::value::schemas::hash::Blake3;
use crate::value::schemas::shortstring::ShortString;
use crate::value::schemas::time::NsTAIInterval;

attributes! {
    /// The actual data of the commit.
    "4DD4DDD05CC31734B03ABB4E43188B1F" as pub content: Handle<Blake3, SimpleArchive>;
    /// Metadata describing the commit content.
    "88B59BD497540AC5AECDB7518E737C87" as pub metadata: Handle<Blake3, SimpleArchive>;
    /// A commit that this commit is based on.
    "317044B612C690000D798CA660ECFD2A" as pub parent: Handle<Blake3, SimpleArchive>;
    /// A (potentially long) message describing the commit.
    "B59D147839100B6ED4B165DF76EDF3BB" as pub message: Handle<Blake3, LongString>;
    /// A short message describing the commit.
    "12290C0BE0E9207E324F24DDE0D89300" as pub short_message: ShortString;
    /// The hash of the first commit in the commit chain of the branch.
    "272FBC56108F336C4D2E17289468C35F" as pub head: Handle<Blake3, SimpleArchive>;
    /// An id used to track the branch.
    "8694CC73AF96A5E1C7635C677D1B928A" as pub branch: GenId;
    /// Timestamp range when this commit was created (legacy, use `metadata::created_at`).
    "71FF566AB4E3119FC2C5E66A18979586" as pub timestamp: NsTAIInterval;
    /// The author of the signature identified by their ed25519 public key.
    "ADB4FFAD247C886848161297EFF5A05B" as pub signed_by: ed::ED25519PublicKey;
    /// The `r` part of a ed25519 signature.
    "9DF34F84959928F93A3C40AEB6E9E499" as pub signature_r: ed::ED25519RComponent;
    /// The `s` part of a ed25519 signature.
    "1ACE03BF70242B289FDF00E4327C3BC6" as pub signature_s: ed::ED25519SComponent;
}

/// The `ListBlobs` trait is used to list all blobs in a repository.
pub trait BlobStoreList<H: HashProtocol> {
    /// Iterator over blob handles in the store.
    type Iter<'a>: Iterator<Item = Result<Value<Handle<H, UnknownBlob>>, Self::Err>>
    where
        Self: 'a;
    /// Error type for listing operations.
    type Err: Error + Debug + Send + Sync + 'static;

    /// Lists all blobs in the repository.
    fn blobs<'a>(&'a self) -> Self::Iter<'a>;
}

/// Metadata about a blob in a repository.
#[derive(Debug, Clone)]
pub struct BlobMetadata {
    /// Timestamp in milliseconds since UNIX epoch when the blob was created/stored.
    pub timestamp: u64,
    /// Length of the blob in bytes.
    pub length: u64,
}

/// Trait exposing metadata lookup for blobs available in a repository reader.
pub trait BlobStoreMeta<H: HashProtocol> {
    /// Error type returned by metadata calls.
    type MetaError: std::error::Error + Send + Sync + 'static;

    /// Returns metadata for the blob identified by `handle`, or `None` if
    /// the blob is not present.
    fn metadata<S>(
        &self,
        handle: Value<Handle<H, S>>,
    ) -> Result<Option<BlobMetadata>, Self::MetaError>
    where
        S: BlobSchema + 'static,
        Handle<H, S>: ValueSchema;
}

/// Trait exposing a monotonic "forget" operation.
///
/// Forget is idempotent and monotonic: it removes materialization from a
/// particular repository but does not semantically delete derived facts.
pub trait BlobStoreForget<H: HashProtocol> {
    /// Error type for forget operations.
    type ForgetError: std::error::Error + Send + Sync + 'static;

    /// Removes the materialized blob identified by `handle` from this store.
    fn forget<S>(&mut self, handle: Value<Handle<H, S>>) -> Result<(), Self::ForgetError>
    where
        S: BlobSchema + 'static,
        Handle<H, S>: ValueSchema;
}

/// The `GetBlob` trait is used to retrieve blobs from a repository.
pub trait BlobStoreGet<H: HashProtocol> {
    /// Error type for get operations, parameterised by the deserialization error.
    type GetError<E: std::error::Error + Send + Sync + 'static>: Error + Send + Sync + 'static;

    /// Retrieves a blob from the repository by its handle.
    /// The handle is a unique identifier for the blob, and is used to retrieve it from the repository.
    /// The blob is returned as a [`Blob`] object, which contains the raw bytes of the blob,
    /// which can be deserialized via the appropriate schema type, which is specified by the `T` type parameter.
    ///
    /// # Errors
    /// Returns an error if the blob could not be found in the repository.
    /// The error type is specified by the `Err` associated type.
    fn get<T, S>(
        &self,
        handle: Value<Handle<H, S>>,
    ) -> Result<T, Self::GetError<<T as TryFromBlob<S>>::Error>>
    where
        S: BlobSchema + 'static,
        T: TryFromBlob<S>,
        Handle<H, S>: ValueSchema;
}

/// The `PutBlob` trait is used to store blobs in a repository.
pub trait BlobStorePut<H: HashProtocol> {
    /// Error type for put operations.
    type PutError: Error + Debug + Send + Sync + 'static;

    /// Serialises `item` as a blob, stores it, and returns its handle.
    fn put<S, T>(&mut self, item: T) -> Result<Value<Handle<H, S>>, Self::PutError>
    where
        S: BlobSchema + 'static,
        T: ToBlob<S>,
        Handle<H, S>: ValueSchema;
}

/// Combined read/write blob storage.
///
/// Extends [`BlobStorePut`] with the ability to create a shareable
/// [`Reader`](BlobStore::Reader) snapshot for concurrent reads.
pub trait BlobStore<H: HashProtocol>: BlobStorePut<H> {
    /// A clonable reader handle for concurrent blob lookups.
    type Reader: BlobStoreGet<H> + BlobStoreList<H> + Clone + Send + PartialEq + Eq + 'static;
    /// Error type for creating a reader.
    type ReaderError: Error + Debug + Send + Sync + 'static;
    /// Creates a shareable reader snapshot of the current store state.
    fn reader(&mut self) -> Result<Self::Reader, Self::ReaderError>;
}

/// Trait for blob stores that can retain a supplied set of handles.
pub trait BlobStoreKeep<H: HashProtocol> {
    /// Retain only the blobs identified by `handles`.
    fn keep<I>(&mut self, handles: I)
    where
        I: IntoIterator<Item = Value<Handle<H, UnknownBlob>>>;
}

/// Outcome of a compare-and-swap branch update.
#[derive(Debug)]
pub enum PushResult<H>
where
    H: HashProtocol,
{
    /// The CAS succeeded — the branch now points to the new commit.
    Success(),
    /// The CAS failed — the branch had advanced. Contains the current
    /// head, or `None` if the branch was deleted concurrently.
    Conflict(Option<Value<Handle<H, SimpleArchive>>>),
}

/// Storage backend for branch metadata (branch-id → commit-handle mapping).
///
/// This is the stateful counterpart to [`BlobStore`]: blob stores are
/// content-addressed and orderless, while branch stores track a single
/// mutable pointer per branch. The update operation uses compare-and-swap
/// semantics so multiple writers can coordinate without locks.
pub trait BranchStore<H: HashProtocol> {
    /// Error type for listing branches.
    type BranchesError: Error + Debug + Send + Sync + 'static;
    /// Error type for head lookups.
    type HeadError: Error + Debug + Send + Sync + 'static;
    /// Error type for CAS updates.
    type UpdateError: Error + Debug + Send + Sync + 'static;

    /// Iterator over branch IDs.
    type ListIter<'a>: Iterator<Item = Result<Id, Self::BranchesError>>
    where
        Self: 'a;

    /// Lists all branches in the repository.
    /// This function returns a stream of branch ids.
    fn branches<'a>(&'a mut self) -> Result<Self::ListIter<'a>, Self::BranchesError>;

    // NOTE: keep the API lean — callers may call `branches()` and handle the
    // fallible iterator directly; we avoid adding an extra helper here.

    /// Retrieves a branch from the repository by its id.
    /// The id is a unique identifier for the branch, and is used to retrieve it from the repository.
    ///
    /// # Errors
    /// Returns an error if the branch could not be found in the repository.
    ///
    /// # Parameters
    /// * `id` - The id of the branch to retrieve.
    ///
    /// # Returns
    /// * A future that resolves to the handle of the branch.
    /// * The handle is a unique identifier for the branch, and is used to retrieve it from the repository.
    fn head(&mut self, id: Id) -> Result<Option<Value<Handle<H, SimpleArchive>>>, Self::HeadError>;

    /// Puts a branch on the repository, creating or updating it.
    ///
    /// # Parameters
    /// * `old` - Expected current value of the branch (None if creating new)
    /// * `new` - Value to update the branch to (None deletes the branch)
    ///
    /// # Returns
    /// * `Success` - Push completed successfully
    /// * `Conflict(current)` - Failed because the branch's current value doesn't match `old`
    ///   (contains the actual current value for conflict resolution)
    fn update(
        &mut self,
        id: Id,
        old: Option<Value<Handle<H, SimpleArchive>>>,
        new: Option<Value<Handle<H, SimpleArchive>>>,
    ) -> Result<PushResult<H>, Self::UpdateError>;
}

/// Error returned by [`transfer`] when copying blobs between stores.
#[derive(Debug)]
pub enum TransferError<ListErr, LoadErr, StoreErr> {
    /// Failed to list handles from the source.
    List(ListErr),
    /// Failed to load a blob from the source.
    Load(LoadErr),
    /// Failed to store a blob in the target.
    Store(StoreErr),
}

impl<ListErr, LoadErr, StoreErr> fmt::Display for TransferError<ListErr, LoadErr, StoreErr> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "failed to transfer blob")
    }
}

impl<ListErr, LoadErr, StoreErr> Error for TransferError<ListErr, LoadErr, StoreErr>
where
    ListErr: Debug + Error + 'static,
    LoadErr: Debug + Error + 'static,
    StoreErr: Debug + Error + 'static,
{
    fn source(&self) -> Option<&(dyn Error + 'static)> {
        match self {
            Self::List(e) => Some(e),
            Self::Load(e) => Some(e),
            Self::Store(e) => Some(e),
        }
    }
}

/// Copies the specified blob handles from `source` into `target`.
pub fn transfer<'a, BS, BT, HS, HT, Handles>(
    source: &'a BS,
    target: &'a mut BT,
    handles: Handles,
) -> impl Iterator<
    Item = Result<
        (
            Value<Handle<HS, UnknownBlob>>,
            Value<Handle<HT, UnknownBlob>>,
        ),
        TransferError<
            Infallible,
            <BS as BlobStoreGet<HS>>::GetError<Infallible>,
            <BT as BlobStorePut<HT>>::PutError,
        >,
    >,
> + 'a
where
    BS: BlobStoreGet<HS> + 'a,
    BT: BlobStorePut<HT> + 'a,
    HS: 'static + HashProtocol,
    HT: 'static + HashProtocol,
    Handles: IntoIterator<Item = Value<Handle<HS, UnknownBlob>>> + 'a,
    Handles::IntoIter: 'a,
{
    handles.into_iter().map(move |source_handle| {
        let blob: Blob<UnknownBlob> = source.get(source_handle).map_err(TransferError::Load)?;

        Ok((
            source_handle,
            (target.put(blob).map_err(TransferError::Store)?),
        ))
    })
}

/// Iterator that visits every blob handle reachable from a set of roots.
pub struct ReachableHandles<'a, BS, H>
where
    BS: BlobStoreGet<H>,
    H: 'static + HashProtocol,
{
    source: &'a BS,
    queue: VecDeque<Value<Handle<H, UnknownBlob>>>,
    visited: HashSet<[u8; VALUE_LEN]>,
}

impl<'a, BS, H> ReachableHandles<'a, BS, H>
where
    BS: BlobStoreGet<H>,
    H: 'static + HashProtocol,
{
    fn new(source: &'a BS, roots: impl IntoIterator<Item = Value<Handle<H, UnknownBlob>>>) -> Self {
        let mut queue = VecDeque::new();
        for handle in roots {
            queue.push_back(handle);
        }

        Self {
            source,
            queue,
            visited: HashSet::new(),
        }
    }

    fn enqueue_from_blob(&mut self, blob: &Blob<UnknownBlob>) {
        let bytes = blob.bytes.as_ref();
        let mut offset = 0usize;

        while offset + VALUE_LEN <= bytes.len() {
            let mut raw = [0u8; VALUE_LEN];
            raw.copy_from_slice(&bytes[offset..offset + VALUE_LEN]);

            if !self.visited.contains(&raw) {
                let candidate = Value::<Handle<H, UnknownBlob>>::new(raw);
                if self
                    .source
                    .get::<anybytes::Bytes, UnknownBlob>(candidate)
                    .is_ok()
                {
                    self.queue.push_back(candidate);
                }
            }

            offset += VALUE_LEN;
        }
    }
}

impl<'a, BS, H> Iterator for ReachableHandles<'a, BS, H>
where
    BS: BlobStoreGet<H>,
    H: 'static + HashProtocol,
{
    type Item = Value<Handle<H, UnknownBlob>>;

    fn next(&mut self) -> Option<Self::Item> {
        while let Some(handle) = self.queue.pop_front() {
            let raw = handle.raw;

            if !self.visited.insert(raw) {
                continue;
            }

            if let Ok(blob) = self.source.get(handle) {
                self.enqueue_from_blob(&blob);
            }

            return Some(handle);
        }

        None
    }
}

/// Create a breadth-first iterator over blob handles reachable from `roots`.
pub fn reachable<'a, BS, H>(
    source: &'a BS,
    roots: impl IntoIterator<Item = Value<Handle<H, UnknownBlob>>>,
) -> ReachableHandles<'a, BS, H>
where
    BS: BlobStoreGet<H>,
    H: 'static + HashProtocol,
{
    ReachableHandles::new(source, roots)
}

/// Iterate over every 32-byte candidate in the value column of a [`TribleSet`].
///
/// This is a conservative conversion used when scanning metadata for potential
/// blob handles. Each 32-byte chunk is treated as a `Handle<H, UnknownBlob>`.
/// Callers can feed the resulting iterator into [`BlobStoreKeep::keep`] or other
/// helpers that accept collections of handles.
pub fn potential_handles<'a, H>(
    set: &'a TribleSet,
) -> impl Iterator<Item = Value<Handle<H, UnknownBlob>>> + 'a
where
    H: HashProtocol,
{
    set.vae.iter().map(|raw| {
        let mut value = [0u8; VALUE_LEN];
        value.copy_from_slice(&raw[0..VALUE_LEN]);
        Value::<Handle<H, UnknownBlob>>::new(value)
    })
}

/// An error that can occur when creating a commit.
/// This error can be caused by a failure to store the content or metadata blobs.
#[derive(Debug)]
pub enum CreateCommitError<BlobErr: Error + Debug + Send + Sync + 'static> {
    /// Failed to store the content blob.
    ContentStorageError(BlobErr),
    /// Failed to store the commit metadata blob.
    CommitStorageError(BlobErr),
}

impl<BlobErr: Error + Debug + Send + Sync + 'static> fmt::Display for CreateCommitError<BlobErr> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            CreateCommitError::ContentStorageError(e) => write!(f, "Content storage failed: {e}"),
            CreateCommitError::CommitStorageError(e) => {
                write!(f, "Commit metadata storage failed: {e}")
            }
        }
    }
}

impl<BlobErr: Error + Debug + Send + Sync + 'static> Error for CreateCommitError<BlobErr> {
    fn source(&self) -> Option<&(dyn Error + 'static)> {
        match self {
            CreateCommitError::ContentStorageError(e) => Some(e),
            CreateCommitError::CommitStorageError(e) => Some(e),
        }
    }
}

/// Error returned by [`Workspace::merge`].
#[derive(Debug)]
pub enum MergeError {
    /// The merge failed because the workspaces have different base repos.
    DifferentRepos(),
}

/// Error returned by [`Repository::push`] and [`Repository::try_push`].
#[derive(Debug)]
pub enum PushError<Storage: BranchStore<Blake3> + BlobStore<Blake3>> {
    /// An error occurred while enumerating the branch storage branches.
    StorageBranches(Storage::BranchesError),
    /// An error occurred while creating a blob reader.
    StorageReader(<Storage as BlobStore<Blake3>>::ReaderError),
    /// An error occurred while reading metadata blobs.
    StorageGet(
        <<Storage as BlobStore<Blake3>>::Reader as BlobStoreGet<Blake3>>::GetError<UnarchiveError>,
    ),
    /// An error occurred while transferring blobs to the repository.
    StoragePut(<Storage as BlobStorePut<Blake3>>::PutError),
    /// An error occurred while updating the branch storage.
    BranchUpdate(Storage::UpdateError),
    /// Malformed branch metadata.
    BadBranchMetadata(),
    /// Merge failed while retrying a push.
    MergeError(MergeError),
}

// Allow using the `?` operator to convert MergeError into PushError in
// contexts where PushError is the function error type. This keeps call sites
// succinct by avoiding manual mapping closures like
// `.map_err(|e| PushError::MergeError(e))?`.
impl<Storage> From<MergeError> for PushError<Storage>
where
    Storage: BranchStore<Blake3> + BlobStore<Blake3>,
{
    fn from(e: MergeError) -> Self {
        PushError::MergeError(e)
    }
}

// Note: we intentionally avoid generic `From` impls for storage-associated
// error types because they can overlap with other blanket implementations
// and lead to coherence conflicts. Call sites use explicit mapping via the
// enum variant constructors (e.g. `map_err(PushError::StoragePut)`) where
// needed which keeps conversions explicit and stable.

/// Error returned by [`Repository::create_branch`] and related methods.
#[derive(Debug)]
pub enum BranchError<Storage>
where
    Storage: BranchStore<Blake3> + BlobStore<Blake3>,
{
    /// An error occurred while creating a blob reader.
    StorageReader(<Storage as BlobStore<Blake3>>::ReaderError),
    /// An error occurred while reading metadata blobs.
    StorageGet(
        <<Storage as BlobStore<Blake3>>::Reader as BlobStoreGet<Blake3>>::GetError<UnarchiveError>,
    ),
    /// An error occurred while storing blobs.
    StoragePut(<Storage as BlobStorePut<Blake3>>::PutError),
    /// An error occurred while retrieving branch heads.
    BranchHead(Storage::HeadError),
    /// An error occurred while updating the branch storage.
    BranchUpdate(Storage::UpdateError),
    /// The branch already exists.
    AlreadyExists(),
    /// The referenced base branch does not exist.
    BranchNotFound(Id),
}

/// Error returned by [`Repository::lookup_branch`].
#[derive(Debug)]
pub enum LookupError<Storage>
where
    Storage: BranchStore<Blake3> + BlobStore<Blake3>,
{
    /// Failed to enumerate branches.
    StorageBranches(Storage::BranchesError),
    /// Failed to read a branch head.
    BranchHead(Storage::HeadError),
    /// Failed to create a blob reader.
    StorageReader(<Storage as BlobStore<Blake3>>::ReaderError),
    /// Failed to read a metadata blob.
    StorageGet(
        <<Storage as BlobStore<Blake3>>::Reader as BlobStoreGet<Blake3>>::GetError<UnarchiveError>,
    ),
    /// Multiple branches were found with the given name.
    NameConflict(Vec<Id>),
    /// Branch metadata is malformed.
    BadBranchMetadata(),
}

/// Error returned by [`Repository::ensure_branch`].
#[derive(Debug)]
pub enum EnsureBranchError<Storage>
where
    Storage: BranchStore<Blake3> + BlobStore<Blake3>,
{
    /// Failed to look up the branch.
    Lookup(LookupError<Storage>),
    /// Failed to create the branch.
    Create(BranchError<Storage>),
}

/// High-level wrapper combining a blob store and branch store into a usable
/// repository API.
///
/// The [`Repository`] type exposes convenience methods for creating branches,
/// committing data and pushing changes while delegating actual storage to the
/// given [`BlobStore`] and [`BranchStore`] implementations.
pub struct Repository<Storage: BlobStore<Blake3> + BranchStore<Blake3>> {
    storage: Storage,
    signing_key: SigningKey,
    commit_metadata: MetadataHandle,
}

/// Error returned by [`Repository::pull`].
pub enum PullError<BranchStorageErr, BlobReaderErr, BlobStorageErr>
where
    BranchStorageErr: Error,
    BlobReaderErr: Error,
    BlobStorageErr: Error,
{
    /// The branch does not exist in the repository.
    BranchNotFound(Id),
    /// An error occurred while accessing the branch storage.
    BranchStorage(BranchStorageErr),
    /// An error occurred while creating a blob reader.
    BlobReader(BlobReaderErr),
    /// An error occurred while accessing the blob storage.
    BlobStorage(BlobStorageErr),
    /// The branch metadata is malformed or does not contain the expected fields.
    BadBranchMetadata(),
}

impl<B, R, C> fmt::Debug for PullError<B, R, C>
where
    B: Error + fmt::Debug,
    R: Error + fmt::Debug,
    C: Error + fmt::Debug,
{
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            PullError::BranchNotFound(id) => f.debug_tuple("BranchNotFound").field(id).finish(),
            PullError::BranchStorage(e) => f.debug_tuple("BranchStorage").field(e).finish(),
            PullError::BlobReader(e) => f.debug_tuple("BlobReader").field(e).finish(),
            PullError::BlobStorage(e) => f.debug_tuple("BlobStorage").field(e).finish(),
            PullError::BadBranchMetadata() => f.debug_tuple("BadBranchMetadata").finish(),
        }
    }
}

impl<Storage> Repository<Storage>
where
    Storage: BlobStore<Blake3> + BranchStore<Blake3>,
{
    /// Creates a new repository with the given storage, signing key, and commit metadata.
    ///
    /// The `commit_metadata` TribleSet is stored as a blob in the repository and attached
    /// to every commit created through this repository's workspaces.
    pub fn new(
        mut storage: Storage,
        signing_key: SigningKey,
        commit_metadata: TribleSet,
    ) -> Result<Self, <Storage as BlobStorePut<Blake3>>::PutError> {
        let commit_metadata = storage.put(commit_metadata)?;
        Ok(Self {
            storage,
            signing_key,
            commit_metadata,
        })
    }

    /// Consume the repository and return the underlying storage backend.
    ///
    /// This is useful for callers that need to take ownership of the storage
    /// (for example to call `close()` on a [`Pile`]) instead of letting the
    /// repository drop it implicitly.
    pub fn into_storage(self) -> Storage {
        self.storage
    }

    /// Borrow the underlying storage backend.
    pub fn storage(&self) -> &Storage {
        &self.storage
    }

    /// Borrow the underlying storage backend mutably.
    pub fn storage_mut(&mut self) -> &mut Storage {
        &mut self.storage
    }

    /// Replace the repository signing key.
    pub fn set_signing_key(&mut self, signing_key: SigningKey) {
        self.signing_key = signing_key;
    }

    /// Returns the repository commit metadata handle.
    pub fn commit_metadata(&self) -> MetadataHandle {
        self.commit_metadata
    }

    /// Initializes a new branch in the repository.
    /// Branches are the only mutable state in the repository,
    /// and are used to represent the state of a commit chain at a specific point in time.
    /// A branch must always point to a commit, and this function can be used to create a new branch.
    ///
    /// Creates a new branch in the repository.
    /// This branch is a pointer to a specific commit in the repository.
    /// The branch is created with name and is initialized to point to the opionally given commit.
    /// The branch is signed by the branch signing key.
    ///
    /// # Parameters
    /// * `branch_name` - Name of the new branch.
    /// * `commit` - Commit to initialize the branch from.
    pub fn create_branch(
        &mut self,
        branch_name: &str,
        commit: Option<CommitHandle>,
    ) -> Result<ExclusiveId, BranchError<Storage>> {
        self.create_branch_with_key(branch_name, commit, self.signing_key.clone())
    }

    /// Same as [`branch_from`] but uses the provided signing key.
    pub fn create_branch_with_key(
        &mut self,
        branch_name: &str,
        commit: Option<CommitHandle>,
        signing_key: SigningKey,
    ) -> Result<ExclusiveId, BranchError<Storage>> {
        let branch_id = genid();
        let name_blob = branch_name.to_owned().to_blob();
        let name_handle = name_blob.get_handle::<Blake3>();
        self.storage
            .put(name_blob)
            .map_err(|e| BranchError::StoragePut(e))?;

        let branch_set = if let Some(commit) = commit {
            let reader = self
                .storage
                .reader()
                .map_err(|e| BranchError::StorageReader(e))?;
            let set: TribleSet = reader.get(commit).map_err(|e| BranchError::StorageGet(e))?;

            branch::branch_metadata(&signing_key, *branch_id, name_handle, Some(set.to_blob()))
        } else {
            branch::branch_unsigned(*branch_id, name_handle, None)
        };

        let branch_blob = branch_set.to_blob();
        let branch_handle = self
            .storage
            .put(branch_blob)
            .map_err(|e| BranchError::StoragePut(e))?;
        let push_result = self
            .storage
            .update(*branch_id, None, Some(branch_handle))
            .map_err(|e| BranchError::BranchUpdate(e))?;

        match push_result {
            PushResult::Success() => Ok(branch_id),
            PushResult::Conflict(_) => Err(BranchError::AlreadyExists()),
        }
    }

    /// Look up a branch by name.
    ///
    /// Iterates all branches, reads each one's metadata, and returns the ID
    /// of the branch whose name matches. Returns `Ok(None)` if no branch has
    /// that name, or `LookupError::NameConflict` if multiple branches share it.
    pub fn lookup_branch(
        &mut self,
        name: &str,
    ) -> Result<Option<Id>, LookupError<Storage>> {
        let branch_ids: Vec<Id> = self
            .storage
            .branches()
            .map_err(LookupError::StorageBranches)?
            .collect::<Result<Vec<_>, _>>()
            .map_err(LookupError::StorageBranches)?;

        let mut matches = Vec::new();

        for branch_id in branch_ids {
            let Some(meta_handle) = self
                .storage
                .head(branch_id)
                .map_err(LookupError::BranchHead)?
            else {
                continue;
            };

            let reader = self
                .storage
                .reader()
                .map_err(LookupError::StorageReader)?;
            let meta_set: TribleSet =
                reader.get(meta_handle).map_err(LookupError::StorageGet)?;

            let Ok((name_handle,)) = find!(
                (n: Value<Handle<Blake3, LongString>>),
                pattern!(&meta_set, [{ crate::metadata::name: ?n }])
            )
            .exactly_one()
            else {
                continue;
            };

            let Ok(branch_name): Result<anybytes::View<str>, _> = reader.get(name_handle) else {
                continue;
            };

            if branch_name.as_ref() == name {
                matches.push(branch_id);
            }
        }

        match matches.len() {
            0 => Ok(None),
            1 => Ok(Some(matches[0])),
            _ => Err(LookupError::NameConflict(matches)),
        }
    }

    /// Ensure a branch with the given name exists, creating it if necessary.
    ///
    /// If a branch named `name` already exists, returns its ID.
    /// If no such branch exists, creates a new one (optionally from the given
    /// commit) and returns its ID.
    ///
    /// Errors if multiple branches share the same name (ambiguous).
    pub fn ensure_branch(
        &mut self,
        name: &str,
        commit: Option<CommitHandle>,
    ) -> Result<Id, EnsureBranchError<Storage>> {
        match self
            .lookup_branch(name)
            .map_err(EnsureBranchError::Lookup)?
        {
            Some(id) => Ok(id),
            None => {
                let id = self
                    .create_branch(name, commit)
                    .map_err(EnsureBranchError::Create)?;
                Ok(*id)
            }
        }
    }

    /// Pulls an existing branch using the repository's signing key.
    /// The workspace inherits the repository default metadata if configured.
    pub fn pull(
        &mut self,
        branch_id: Id,
    ) -> Result<
        Workspace<Storage>,
        PullError<
            Storage::HeadError,
            Storage::ReaderError,
            <Storage::Reader as BlobStoreGet<Blake3>>::GetError<UnarchiveError>,
        >,
    > {
        self.pull_with_key(branch_id, self.signing_key.clone())
    }

    /// Same as [`pull`] but overrides the signing key.
    pub fn pull_with_key(
        &mut self,
        branch_id: Id,
        signing_key: SigningKey,
    ) -> Result<
        Workspace<Storage>,
        PullError<
            Storage::HeadError,
            Storage::ReaderError,
            <Storage::Reader as BlobStoreGet<Blake3>>::GetError<UnarchiveError>,
        >,
    > {
        // 1. Get the branch metadata head from the branch store.
        let base_branch_meta_handle = match self.storage.head(branch_id) {
            Ok(Some(handle)) => handle,
            Ok(None) => return Err(PullError::BranchNotFound(branch_id)),
            Err(e) => return Err(PullError::BranchStorage(e)),
        };
        // 2. Get the current commit from the branch metadata.
        let reader = self.storage.reader().map_err(PullError::BlobReader)?;
        let base_branch_meta: TribleSet = match reader.get(base_branch_meta_handle) {
            Ok(meta_set) => meta_set,
            Err(e) => return Err(PullError::BlobStorage(e)),
        };

        let head_ = match find!(
            (head_: Value<_>),
            pattern!(&base_branch_meta, [{ head: ?head_ }])
        )
        .at_most_one()
        {
            Ok(Some((h,))) => Some(h),
            Ok(None) => None,
            Err(_) => return Err(PullError::BadBranchMetadata()),
        };
        // Create workspace with the current commit and base blobs.
        let base_blobs = self.storage.reader().map_err(PullError::BlobReader)?;
        Ok(Workspace {
            base_blobs,
            local_blobs: MemoryBlobStore::new(),
            head: head_,
            base_head: head_,
            base_branch_id: branch_id,
            base_branch_meta: base_branch_meta_handle,
            signing_key,
            commit_metadata: self.commit_metadata,
        })
    }

    /// Pushes the workspace's new blobs and commit to the persistent repository.
    /// This syncs the local BlobSet with the repository's BlobStore and performs
    /// an atomic branch update (using the stored base_branch_meta).
    pub fn push(&mut self, workspace: &mut Workspace<Storage>) -> Result<(), PushError<Storage>> {
        // Retrying push: attempt a single push and, on conflict, merge the
        // local workspace into the returned conflict workspace and retry.
        // This implements the common push-merge-retry loop as a convenience
        // wrapper around `try_push`.
        while let Some(mut conflict_ws) = self.try_push(workspace)? {
            // Keep the previous merge order: merge the caller's staged
            // changes into the incoming conflict workspace. This preserves
            // the semantic ordering of parents used in the merge commit.
            conflict_ws.merge(workspace)?;

            // Move the merged incoming workspace into the caller's workspace
            // so the next try_push operates against the fresh branch state.
            // Using assignment here is equivalent to `swap` but avoids
            // retaining the previous `workspace` contents in the temp var.
            *workspace = conflict_ws;
        }

        Ok(())
    }

    /// Single-attempt push: upload local blobs and try to update the branch
    /// head once. Returns `Ok(None)` on success, or `Ok(Some(conflict_ws))`
    /// when the branch was updated concurrently and the caller should merge.
    pub fn try_push(
        &mut self,
        workspace: &mut Workspace<Storage>,
    ) -> Result<Option<Workspace<Storage>>, PushError<Storage>> {
        // 1. Sync `workspace.local_blobs` to repository's BlobStore.
        let workspace_reader = workspace.local_blobs.reader().unwrap();
        for handle in workspace_reader.blobs() {
            let handle = handle.expect("infallible blob enumeration");
            let blob: Blob<UnknownBlob> =
                workspace_reader.get(handle).expect("infallible blob read");
            self.storage.put(blob).map_err(PushError::StoragePut)?;
        }

        // 1.5 If the workspace's head did not change since the workspace was
        // created, there's no commit to reference and therefore no branch
        // metadata update is required. This avoids touching the branch store
        // in the common case where only blobs were staged or nothing changed.
        if workspace.base_head == workspace.head {
            return Ok(None);
        }

        // 2. Create a new branch meta blob referencing the new workspace head.
        let repo_reader = self.storage.reader().map_err(PushError::StorageReader)?;
        let base_branch_meta: TribleSet = repo_reader
            .get(workspace.base_branch_meta)
            .map_err(PushError::StorageGet)?;

        let Ok((branch_name,)) = find!(
            (name: Value<Handle<Blake3, LongString>>),
            pattern!(base_branch_meta, [{ crate::metadata::name: ?name }])
        )
        .exactly_one() else {
            return Err(PushError::BadBranchMetadata());
        };

        let head_handle = workspace.head.ok_or(PushError::BadBranchMetadata())?;
        let head_: TribleSet = repo_reader
            .get(head_handle)
            .map_err(PushError::StorageGet)?;

        let branch_meta = branch_metadata(
            &workspace.signing_key,
            workspace.base_branch_id,
            branch_name,
            Some(head_.to_blob()),
        );

        let branch_meta_handle = self
            .storage
            .put(branch_meta)
            .map_err(PushError::StoragePut)?;

        // 3. Use CAS (comparing against workspace.base_branch_meta) to update the branch pointer.
        let result = self
            .storage
            .update(
                workspace.base_branch_id,
                Some(workspace.base_branch_meta),
                Some(branch_meta_handle),
            )
            .map_err(PushError::BranchUpdate)?;

        match result {
            PushResult::Success() => {
                // Update workspace base pointers so subsequent pushes can detect
                // that the workspace is already synchronized and avoid re-upload.
                workspace.base_branch_meta = branch_meta_handle;
                workspace.base_head = workspace.head;
                // Refresh the workspace base blob reader to ensure newly
                // uploaded blobs are visible to subsequent checkout operations.
                workspace.base_blobs = self.storage.reader().map_err(PushError::StorageReader)?;
                // Clear staged local blobs now that they have been uploaded and
                // the branch metadata updated. This frees memory and prevents
                // repeated uploads of the same staged blobs on subsequent pushes.
                workspace.local_blobs = MemoryBlobStore::new();
                Ok(None)
            }
            PushResult::Conflict(conflicting_meta) => {
                let conflicting_meta = conflicting_meta.ok_or(PushError::BadBranchMetadata())?;

                let repo_reader = self.storage.reader().map_err(PushError::StorageReader)?;
                let branch_meta: TribleSet = repo_reader
                    .get(conflicting_meta)
                    .map_err(PushError::StorageGet)?;

                let head_ = match find!((head_: Value<_>),
                    pattern!(&branch_meta, [{ head: ?head_ }])
                )
                .at_most_one()
                {
                    Ok(Some((h,))) => Some(h),
                    Ok(None) => None,
                    Err(_) => return Err(PushError::BadBranchMetadata()),
                };

                let conflict_ws = Workspace {
                    base_blobs: self.storage.reader().map_err(PushError::StorageReader)?,
                    local_blobs: MemoryBlobStore::new(),
                    head: head_,
                    base_head: head_,
                    base_branch_id: workspace.base_branch_id,
                    base_branch_meta: conflicting_meta,
                    signing_key: workspace.signing_key.clone(),
                    commit_metadata: workspace.commit_metadata,
                };

                Ok(Some(conflict_ws))
            }
        }
    }
}

/// A handle to a commit blob in the repository.
pub type CommitHandle = Value<Handle<Blake3, SimpleArchive>>;
type MetadataHandle = Value<Handle<Blake3, SimpleArchive>>;
/// A set of commit handles, used by [`CommitSelector`] and [`Checkout`].
pub type CommitSet = PATCH<VALUE_LEN, IdentitySchema, ()>;
type BranchMetaHandle = Value<Handle<Blake3, SimpleArchive>>;

/// The result of a [`Workspace::checkout`] operation: a [`TribleSet`] paired
/// with the set of commits that produced it. Pass the commit set as the start
/// of a range selector to obtain incremental deltas on the next checkout.
///
/// [`Checkout`] dereferences to [`TribleSet`], so it can be used directly with
/// `find!`, `pattern!`, and `pattern_changes!`.
///
/// # Example: incremental updates
///
/// ```rust,ignore
/// let mut changed = repo.pull(branch_id)?.checkout(..)?;
/// let mut full = changed.facts().clone();
///
/// loop {
///     // full already includes changed
///     for result in pattern_changes!(&full, &changed, [{ ... }]) {
///         // process new results
///     }
///
///     // Advance — exclude exactly the commits we already processed.
///     changed = repo.pull(branch_id)?.checkout(changed.commits()..)?;
///     full += &changed;
/// }
/// ```
#[derive(Debug, Clone)]
pub struct Checkout {
    facts: TribleSet,
    commits: CommitSet,
}

impl PartialEq<TribleSet> for Checkout {
    fn eq(&self, other: &TribleSet) -> bool {
        self.facts == *other
    }
}

impl PartialEq<Checkout> for TribleSet {
    fn eq(&self, other: &Checkout) -> bool {
        *self == other.facts
    }
}

impl Checkout {
    /// The checked-out tribles.
    pub fn facts(&self) -> &TribleSet {
        &self.facts
    }

    /// The set of commits that produced this checkout. Use as the start of a
    /// range selector (`checkout.commits()..`) to exclude these commits
    /// on the next checkout and obtain only new data.
    pub fn commits(&self) -> CommitSet {
        self.commits.clone()
    }

    /// Consume the checkout and return the inner TribleSet.
    pub fn into_facts(self) -> TribleSet {
        self.facts
    }
}

impl std::ops::Deref for Checkout {
    type Target = TribleSet;
    fn deref(&self) -> &TribleSet {
        &self.facts
    }
}

impl std::ops::AddAssign<&Checkout> for Checkout {
    fn add_assign(&mut self, rhs: &Checkout) {
        self.facts += rhs.facts.clone();
        self.commits.union(rhs.commits.clone());
    }
}

impl std::ops::Add for Checkout {
    type Output = Self;
    fn add(mut self, rhs: Self) -> Self {
        self.facts += rhs.facts;
        self.commits.union(rhs.commits);
        self
    }
}

impl std::ops::Add<&Checkout> for Checkout {
    type Output = Self;
    fn add(mut self, rhs: &Checkout) -> Self {
        self += rhs;
        self
    }
}

/// The Workspace represents the mutable working area or "staging" state.
/// It was formerly known as `Head`. It is sent to worker threads,
/// modified (via commits, merges, etc.), and then merged back into the Repository.
pub struct Workspace<Blobs: BlobStore<Blake3>> {
    /// A local BlobStore that holds any new blobs (commits, trees, deltas) before they are synced.
    local_blobs: MemoryBlobStore<Blake3>,
    /// The blob storage base for the workspace.
    base_blobs: Blobs::Reader,
    /// The branch id this workspace is tracking; None for a detached workspace.
    base_branch_id: Id,
    /// The meta-handle corresponding to the base branch state used for CAS.
    base_branch_meta: BranchMetaHandle,
    /// Handle to the current commit in the working branch. `None` for an empty branch.
    head: Option<CommitHandle>,
    /// The branch head snapshot when this workspace was created (pull time).
    ///
    /// This allows `try_push` to cheaply detect whether the commit head has
    /// advanced since the workspace was created without querying the remote
    /// branch store.
    base_head: Option<CommitHandle>,
    /// Signing key used for commit/branch signing.
    signing_key: SigningKey,
    /// Metadata handle for commits created in this workspace.
    commit_metadata: MetadataHandle,
}

impl<Blobs> fmt::Debug for Workspace<Blobs>
where
    Blobs: BlobStore<Blake3>,
    Blobs::Reader: fmt::Debug,
{
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("Workspace")
            .field("local_blobs", &self.local_blobs)
            .field("base_blobs", &self.base_blobs)
            .field("base_branch_id", &self.base_branch_id)
            .field("base_branch_meta", &self.base_branch_meta)
            .field("base_head", &self.base_head)
            .field("head", &self.head)
            .field("commit_metadata", &self.commit_metadata)
            .finish()
    }
}

/// Helper trait for [`Workspace::checkout`] specifying commit handles or ranges.
pub trait CommitSelector<Blobs: BlobStore<Blake3>> {
    fn select(
        self,
        ws: &mut Workspace<Blobs>,
    ) -> Result<
        CommitSet,
        WorkspaceCheckoutError<<Blobs::Reader as BlobStoreGet<Blake3>>::GetError<UnarchiveError>>,
    >;
}

/// Selector that returns every commit reachable from a starting selector.
pub struct Ancestors<S>(pub S);

/// Convenience function to create an [`Ancestors`] selector.
pub fn ancestors<S>(selector: S) -> Ancestors<S> {
    Ancestors(selector)
}

/// Selector that walks every commit in the input set back N parent steps,
/// following all parent links (including merge parents). Returns the set
/// of all commits found at exactly depth N from the starting set.
///
/// This is a wavefront expansion: at each step, every commit in the current
/// frontier is replaced by all of its parents. After N steps the frontier
/// is the result.
pub struct NthAncestors<S>(pub S, pub usize);

/// Walk `selector` back `n` parent steps through all parent links.
pub fn nth_ancestors<S>(selector: S, n: usize) -> NthAncestors<S> {
    NthAncestors(selector, n)
}

/// Selector that returns the direct parents of commits from a starting selector.
pub struct Parents<S>(pub S);

/// Convenience function to create a [`Parents`] selector.
pub fn parents<S>(selector: S) -> Parents<S> {
    Parents(selector)
}

/// Selector that returns commits reachable from either of two selectors but
/// not both.
pub struct SymmetricDiff<A, B>(pub A, pub B);

/// Convenience function to create a [`SymmetricDiff`] selector.
pub fn symmetric_diff<A, B>(a: A, b: B) -> SymmetricDiff<A, B> {
    SymmetricDiff(a, b)
}

/// Selector that returns the union of commits returned by two selectors.
pub struct Union<A, B> {
    left: A,
    right: B,
}

/// Convenience function to create a [`Union`] selector.
pub fn union<A, B>(left: A, right: B) -> Union<A, B> {
    Union { left, right }
}

/// Selector that returns the intersection of commits returned by two selectors.
pub struct Intersect<A, B> {
    left: A,
    right: B,
}

/// Convenience function to create an [`Intersect`] selector.
pub fn intersect<A, B>(left: A, right: B) -> Intersect<A, B> {
    Intersect { left, right }
}

/// Selector that returns commits from the left selector that are not also
/// returned by the right selector.
pub struct Difference<A, B> {
    left: A,
    right: B,
}

/// Convenience function to create a [`Difference`] selector.
pub fn difference<A, B>(left: A, right: B) -> Difference<A, B> {
    Difference { left, right }
}

/// Selector that returns commits with timestamps in the given inclusive range.
pub struct TimeRange(pub Epoch, pub Epoch);

/// Convenience function to create a [`TimeRange`] selector.
pub fn time_range(start: Epoch, end: Epoch) -> TimeRange {
    TimeRange(start, end)
}

/// Selector that filters commits returned by another selector.
pub struct Filter<S, F> {
    selector: S,
    filter: F,
}

/// Convenience function to create a [`Filter`] selector.
pub fn filter<S, F>(selector: S, filter: F) -> Filter<S, F> {
    Filter { selector, filter }
}

impl<Blobs> CommitSelector<Blobs> for CommitHandle
where
    Blobs: BlobStore<Blake3>,
{
    fn select(
        self,
        _ws: &mut Workspace<Blobs>,
    ) -> Result<
        CommitSet,
        WorkspaceCheckoutError<<Blobs::Reader as BlobStoreGet<Blake3>>::GetError<UnarchiveError>>,
    > {
        let mut patch = CommitSet::new();
        patch.insert(&Entry::new(&self.raw));
        Ok(patch)
    }
}

impl<Blobs> CommitSelector<Blobs> for CommitSet
where
    Blobs: BlobStore<Blake3>,
{
    fn select(
        self,
        _ws: &mut Workspace<Blobs>,
    ) -> Result<
        CommitSet,
        WorkspaceCheckoutError<<Blobs::Reader as BlobStoreGet<Blake3>>::GetError<UnarchiveError>>,
    > {
        Ok(self)
    }
}

impl<Blobs> CommitSelector<Blobs> for Vec<CommitHandle>
where
    Blobs: BlobStore<Blake3>,
{
    fn select(
        self,
        _ws: &mut Workspace<Blobs>,
    ) -> Result<
        CommitSet,
        WorkspaceCheckoutError<<Blobs::Reader as BlobStoreGet<Blake3>>::GetError<UnarchiveError>>,
    > {
        let mut patch = CommitSet::new();
        for handle in self {
            patch.insert(&Entry::new(&handle.raw));
        }
        Ok(patch)
    }
}

impl<Blobs> CommitSelector<Blobs> for &[CommitHandle]
where
    Blobs: BlobStore<Blake3>,
{
    fn select(
        self,
        _ws: &mut Workspace<Blobs>,
    ) -> Result<
        CommitSet,
        WorkspaceCheckoutError<<Blobs::Reader as BlobStoreGet<Blake3>>::GetError<UnarchiveError>>,
    > {
        let mut patch = CommitSet::new();
        for handle in self {
            patch.insert(&Entry::new(&handle.raw));
        }
        Ok(patch)
    }
}

impl<Blobs> CommitSelector<Blobs> for Option<CommitHandle>
where
    Blobs: BlobStore<Blake3>,
{
    fn select(
        self,
        _ws: &mut Workspace<Blobs>,
    ) -> Result<
        CommitSet,
        WorkspaceCheckoutError<<Blobs::Reader as BlobStoreGet<Blake3>>::GetError<UnarchiveError>>,
    > {
        let mut patch = CommitSet::new();
        if let Some(handle) = self {
            patch.insert(&Entry::new(&handle.raw));
        }
        Ok(patch)
    }
}

impl<S, Blobs> CommitSelector<Blobs> for Ancestors<S>
where
    S: CommitSelector<Blobs>,
    Blobs: BlobStore<Blake3>,
{
    fn select(
        self,
        ws: &mut Workspace<Blobs>,
    ) -> Result<
        CommitSet,
        WorkspaceCheckoutError<<Blobs::Reader as BlobStoreGet<Blake3>>::GetError<UnarchiveError>>,
    > {
        let seeds = self.0.select(ws)?;
        collect_reachable_from_patch(ws, seeds)
    }
}

impl<Blobs, S> CommitSelector<Blobs> for NthAncestors<S>
where
    Blobs: BlobStore<Blake3>,
    S: CommitSelector<Blobs>,
{
    fn select(
        self,
        ws: &mut Workspace<Blobs>,
    ) -> Result<
        CommitSet,
        WorkspaceCheckoutError<<Blobs::Reader as BlobStoreGet<Blake3>>::GetError<UnarchiveError>>,
    > {
        let mut frontier = self.0.select(ws)?;
        let mut remaining = self.1;

        while remaining > 0 && !frontier.is_empty() {
            // Collect current frontier keys before mutating.
            let keys: Vec<[u8; VALUE_LEN]> = frontier.iter().copied().collect();
            let mut next_frontier = CommitSet::new();
            for raw in keys {
                let handle = CommitHandle::new(raw);
                let meta: TribleSet =
                    ws.get(handle).map_err(WorkspaceCheckoutError::Storage)?;
                for (p,) in find!((p: Value<_>), pattern!(&meta, [{ parent: ?p }])) {
                    next_frontier.insert(&Entry::new(&p.raw));
                }
            }
            frontier = next_frontier;
            remaining -= 1;
        }

        Ok(frontier)
    }
}

impl<S, Blobs> CommitSelector<Blobs> for Parents<S>
where
    S: CommitSelector<Blobs>,
    Blobs: BlobStore<Blake3>,
{
    fn select(
        self,
        ws: &mut Workspace<Blobs>,
    ) -> Result<
        CommitSet,
        WorkspaceCheckoutError<<Blobs::Reader as BlobStoreGet<Blake3>>::GetError<UnarchiveError>>,
    > {
        let seeds = self.0.select(ws)?;
        let mut result = CommitSet::new();
        for raw in seeds.iter() {
            let handle = Value::new(*raw);
            let meta: TribleSet = ws.get(handle).map_err(WorkspaceCheckoutError::Storage)?;
            for (p,) in find!((p: Value<_>), pattern!(&meta, [{ parent: ?p }])) {
                result.insert(&Entry::new(&p.raw));
            }
        }
        Ok(result)
    }
}

impl<A, B, Blobs> CommitSelector<Blobs> for SymmetricDiff<A, B>
where
    A: CommitSelector<Blobs>,
    B: CommitSelector<Blobs>,
    Blobs: BlobStore<Blake3>,
{
    fn select(
        self,
        ws: &mut Workspace<Blobs>,
    ) -> Result<
        CommitSet,
        WorkspaceCheckoutError<<Blobs::Reader as BlobStoreGet<Blake3>>::GetError<UnarchiveError>>,
    > {
        let seeds_a = self.0.select(ws)?;
        let seeds_b = self.1.select(ws)?;
        let a = collect_reachable_from_patch(ws, seeds_a)?;
        let b = collect_reachable_from_patch(ws, seeds_b)?;
        let inter = a.intersect(&b);
        let mut union = a;
        union.union(b);
        Ok(union.difference(&inter))
    }
}

impl<A, B, Blobs> CommitSelector<Blobs> for Union<A, B>
where
    A: CommitSelector<Blobs>,
    B: CommitSelector<Blobs>,
    Blobs: BlobStore<Blake3>,
{
    fn select(
        self,
        ws: &mut Workspace<Blobs>,
    ) -> Result<
        CommitSet,
        WorkspaceCheckoutError<<Blobs::Reader as BlobStoreGet<Blake3>>::GetError<UnarchiveError>>,
    > {
        let mut left = self.left.select(ws)?;
        let right = self.right.select(ws)?;
        left.union(right);
        Ok(left)
    }
}

impl<A, B, Blobs> CommitSelector<Blobs> for Intersect<A, B>
where
    A: CommitSelector<Blobs>,
    B: CommitSelector<Blobs>,
    Blobs: BlobStore<Blake3>,
{
    fn select(
        self,
        ws: &mut Workspace<Blobs>,
    ) -> Result<
        CommitSet,
        WorkspaceCheckoutError<<Blobs::Reader as BlobStoreGet<Blake3>>::GetError<UnarchiveError>>,
    > {
        let left = self.left.select(ws)?;
        let right = self.right.select(ws)?;
        Ok(left.intersect(&right))
    }
}

impl<A, B, Blobs> CommitSelector<Blobs> for Difference<A, B>
where
    A: CommitSelector<Blobs>,
    B: CommitSelector<Blobs>,
    Blobs: BlobStore<Blake3>,
{
    fn select(
        self,
        ws: &mut Workspace<Blobs>,
    ) -> Result<
        CommitSet,
        WorkspaceCheckoutError<<Blobs::Reader as BlobStoreGet<Blake3>>::GetError<UnarchiveError>>,
    > {
        let left = self.left.select(ws)?;
        let right = self.right.select(ws)?;
        Ok(left.difference(&right))
    }
}

impl<S, F, Blobs> CommitSelector<Blobs> for Filter<S, F>
where
    Blobs: BlobStore<Blake3>,
    S: CommitSelector<Blobs>,
    F: for<'x, 'y> Fn(&'x TribleSet, &'y TribleSet) -> bool,
{
    fn select(
        self,
        ws: &mut Workspace<Blobs>,
    ) -> Result<
        CommitSet,
        WorkspaceCheckoutError<<Blobs::Reader as BlobStoreGet<Blake3>>::GetError<UnarchiveError>>,
    > {
        let patch = self.selector.select(ws)?;
        let mut result = CommitSet::new();
        let filter = self.filter;
        for raw in patch.iter() {
            let handle = Value::new(*raw);
            let meta: TribleSet = ws.get(handle).map_err(WorkspaceCheckoutError::Storage)?;

            let Ok((content_handle,)) = find!(
                (c: Value<_>),
                pattern!(&meta, [{ content: ?c }])
            )
            .exactly_one() else {
                return Err(WorkspaceCheckoutError::BadCommitMetadata());
            };

            let payload: TribleSet = ws
                .get(content_handle)
                .map_err(WorkspaceCheckoutError::Storage)?;

            if filter(&meta, &payload) {
                result.insert(&Entry::new(raw));
            }
        }
        Ok(result)
    }
}

/// Selector that yields commits touching a specific entity.
pub struct HistoryOf(pub Id);

/// Convenience function to create a [`HistoryOf`] selector.
pub fn history_of(entity: Id) -> HistoryOf {
    HistoryOf(entity)
}

impl<Blobs> CommitSelector<Blobs> for HistoryOf
where
    Blobs: BlobStore<Blake3>,
{
    fn select(
        self,
        ws: &mut Workspace<Blobs>,
    ) -> Result<
        CommitSet,
        WorkspaceCheckoutError<<Blobs::Reader as BlobStoreGet<Blake3>>::GetError<UnarchiveError>>,
    > {
        let Some(head_) = ws.head else {
            return Ok(CommitSet::new());
        };
        let entity = self.0;
        filter(
            ancestors(head_),
            move |_: &TribleSet, payload: &TribleSet| payload.iter().any(|t| t.e() == &entity),
        )
        .select(ws)
    }
}

// Generic range selectors: allow any selector type to be used as a range
// endpoint. We still walk the history reachable from the end selector but now
// stop descending a branch as soon as we encounter a commit produced by the
// start selector. This keeps the mechanics explicit—`start..end` literally
// walks from `end` until it hits `start`—while continuing to support selectors
// such as `Ancestors(...)` at either boundary.

fn collect_reachable_from_patch<Blobs: BlobStore<Blake3>>(
    ws: &mut Workspace<Blobs>,
    patch: CommitSet,
) -> Result<
    CommitSet,
    WorkspaceCheckoutError<<Blobs::Reader as BlobStoreGet<Blake3>>::GetError<UnarchiveError>>,
> {
    let mut result = CommitSet::new();
    for raw in patch.iter() {
        let handle = Value::new(*raw);
        let reach = collect_reachable(ws, handle)?;
        result.union(reach);
    }
    Ok(result)
}

fn collect_reachable_from_patch_until<Blobs: BlobStore<Blake3>>(
    ws: &mut Workspace<Blobs>,
    seeds: CommitSet,
    stop: &CommitSet,
) -> Result<
    CommitSet,
    WorkspaceCheckoutError<<Blobs::Reader as BlobStoreGet<Blake3>>::GetError<UnarchiveError>>,
> {
    let mut visited = HashSet::new();
    let mut stack: Vec<CommitHandle> = seeds.iter().map(|raw| Value::new(*raw)).collect();
    let mut result = CommitSet::new();

    while let Some(commit) = stack.pop() {
        if !visited.insert(commit) {
            continue;
        }

        if stop.get(&commit.raw).is_some() {
            continue;
        }

        result.insert(&Entry::new(&commit.raw));

        let meta: TribleSet = ws
            .local_blobs
            .reader()
            .unwrap()
            .get(commit)
            .or_else(|_| ws.base_blobs.get(commit))
            .map_err(WorkspaceCheckoutError::Storage)?;

        for (p,) in find!((p: Value<_>,), pattern!(&meta, [{ parent: ?p }])) {
            stack.push(p);
        }
    }

    Ok(result)
}

impl<T, Blobs> CommitSelector<Blobs> for std::ops::Range<T>
where
    T: CommitSelector<Blobs>,
    Blobs: BlobStore<Blake3>,
{
    fn select(
        self,
        ws: &mut Workspace<Blobs>,
    ) -> Result<
        CommitSet,
        WorkspaceCheckoutError<<Blobs::Reader as BlobStoreGet<Blake3>>::GetError<UnarchiveError>>,
    > {
        let end_patch = self.end.select(ws)?;
        let start_patch = self.start.select(ws)?;

        collect_reachable_from_patch_until(ws, end_patch, &start_patch)
    }
}

impl<T, Blobs> CommitSelector<Blobs> for std::ops::RangeFrom<T>
where
    T: CommitSelector<Blobs>,
    Blobs: BlobStore<Blake3>,
{
    fn select(
        self,
        ws: &mut Workspace<Blobs>,
    ) -> Result<
        CommitSet,
        WorkspaceCheckoutError<<Blobs::Reader as BlobStoreGet<Blake3>>::GetError<UnarchiveError>>,
    > {
        let Some(head_) = ws.head else {
            return Ok(CommitSet::new());
        };
        let exclude_patch = self.start.select(ws)?;

        let mut head_patch = CommitSet::new();
        head_patch.insert(&Entry::new(&head_.raw));

        collect_reachable_from_patch_until(ws, head_patch, &exclude_patch)
    }
}

impl<T, Blobs> CommitSelector<Blobs> for std::ops::RangeTo<T>
where
    T: CommitSelector<Blobs>,
    Blobs: BlobStore<Blake3>,
{
    fn select(
        self,
        ws: &mut Workspace<Blobs>,
    ) -> Result<
        CommitSet,
        WorkspaceCheckoutError<<Blobs::Reader as BlobStoreGet<Blake3>>::GetError<UnarchiveError>>,
    > {
        let end_patch = self.end.select(ws)?;
        collect_reachable_from_patch(ws, end_patch)
    }
}

impl<Blobs> CommitSelector<Blobs> for std::ops::RangeFull
where
    Blobs: BlobStore<Blake3>,
{
    fn select(
        self,
        ws: &mut Workspace<Blobs>,
    ) -> Result<
        CommitSet,
        WorkspaceCheckoutError<<Blobs::Reader as BlobStoreGet<Blake3>>::GetError<UnarchiveError>>,
    > {
        let Some(head_) = ws.head else {
            return Ok(CommitSet::new());
        };
        collect_reachable(ws, head_)
    }
}

impl<Blobs> CommitSelector<Blobs> for TimeRange
where
    Blobs: BlobStore<Blake3>,
{
    fn select(
        self,
        ws: &mut Workspace<Blobs>,
    ) -> Result<
        CommitSet,
        WorkspaceCheckoutError<<Blobs::Reader as BlobStoreGet<Blake3>>::GetError<UnarchiveError>>,
    > {
        let Some(head_) = ws.head else {
            return Ok(CommitSet::new());
        };
        let start = self.0;
        let end = self.1;
        filter(
            ancestors(head_),
            move |meta: &TribleSet, _payload: &TribleSet| {
                if let Ok(Some(((ts_start, ts_end),))) =
                    find!((t: (Epoch, Epoch)), pattern!(meta, [{ crate::metadata::created_at: ?t }])).at_most_one()
                {
                    ts_start <= end && ts_end >= start
                } else {
                    false
                }
            },
        )
        .select(ws)
    }
}

impl<Blobs: BlobStore<Blake3>> Workspace<Blobs> {
    /// Returns the branch id associated with this workspace.
    pub fn branch_id(&self) -> Id {
        self.base_branch_id
    }

    /// Returns the current commit handle if one exists.
    pub fn head(&self) -> Option<CommitHandle> {
        self.head
    }

    /// Returns the workspace metadata handle.
    pub fn metadata(&self) -> MetadataHandle {
        self.commit_metadata
    }

    /// Adds a blob to the workspace's local blob store.
    /// Mirrors [`BlobStorePut::put`](crate::repo::BlobStorePut) for ease of use.
    pub fn put<S, T>(&mut self, item: T) -> Value<Handle<Blake3, S>>
    where
        S: BlobSchema + 'static,
        T: ToBlob<S>,
        Handle<Blake3, S>: ValueSchema,
    {
        self.local_blobs.put(item).expect("infallible blob put")
    }

    /// Retrieves a blob from the workspace.
    ///
    /// The method first checks the workspace's local blob store and falls back
    /// to the base blob store if the blob is not found locally.
    pub fn get<T, S>(
        &mut self,
        handle: Value<Handle<Blake3, S>>,
    ) -> Result<T, <Blobs::Reader as BlobStoreGet<Blake3>>::GetError<<T as TryFromBlob<S>>::Error>>
    where
        S: BlobSchema + 'static,
        T: TryFromBlob<S>,
        Handle<Blake3, S>: ValueSchema,
    {
        self.local_blobs
            .reader()
            .unwrap()
            .get(handle)
            .or_else(|_| self.base_blobs.get(handle))
    }

    /// Performs a commit in the workspace.
    /// This method creates a new commit blob (stored in the local blobset)
    /// and updates the current commit handle.
    pub fn commit(
        &mut self,
        content_: impl Into<TribleSet>,
        message_: &str,
    ) {
        let content_ = content_.into();
        self.commit_internal(content_, Some(self.commit_metadata), Some(message_));
    }

    /// Like [`commit`](Self::commit) but attaches a one-off metadata handle
    /// instead of the repository default.
    pub fn commit_with_metadata(
        &mut self,
        content_: impl Into<TribleSet>,
        metadata_: MetadataHandle,
        message_: &str,
    ) {
        let content_ = content_.into();
        self.commit_internal(content_, Some(metadata_), Some(message_));
    }

    fn commit_internal(
        &mut self,
        content_: TribleSet,
        metadata_handle: Option<MetadataHandle>,
        message_: Option<&str>,
    ) {
        // 1. Create a commit blob from the current head, content, metadata and the commit message.
        let content_blob = content_.to_blob();
        // If a message is provided, store it as a LongString blob and pass the handle.
        let message_handle = message_.map(|m| self.put(m.to_string()));
        let parents = self.head.iter().copied();

        let commit_set = crate::repo::commit::commit_metadata(
            &self.signing_key,
            parents,
            message_handle,
            Some(content_blob.clone()),
            metadata_handle,
        );
        // 2. Store the content and commit blobs in `self.local_blobs`.
        let _ = self
            .local_blobs
            .put(content_blob)
            .expect("failed to put content blob");
        let commit_handle = self
            .local_blobs
            .put(commit_set)
            .expect("failed to put commit blob");
        // 3. Update `self.head` to point to the new commit.
        self.head = Some(commit_handle);
    }

    /// Merge another workspace (or its commit state) into this one.
    ///
    /// Notes on semantics
    /// - This operation will copy the *staged* blobs created in `other`
    ///   (i.e., `other.local_blobs`) into `self.local_blobs`, then create a
    ///   merge commit whose parents are `self.head` and `other.head`.
    /// - The merge does *not* automatically import the entire base history
    ///   reachable from `other`'s head. If the incoming parent commits
    ///   reference blobs that do not exist in this repository's storage,
    ///   reading those commits later will fail until the missing blobs are
    ///   explicitly imported (for example via `repo::transfer(reachable(...))`).
    /// - This design keeps merge permissive and leaves cross-repository blob
    ///   import as an explicit user action.
    pub fn merge(&mut self, other: &mut Workspace<Blobs>) -> Result<CommitHandle, MergeError> {
        // 1. Transfer all blobs from the other workspace to self.local_blobs.
        let other_local = other.local_blobs.reader().unwrap();
        for r in other_local.blobs() {
            let handle = r.expect("infallible blob enumeration");
            let blob: Blob<UnknownBlob> = other_local.get(handle).expect("infallible blob read");

            // Store the blob in the local workspace's blob store.
            self.local_blobs.put(blob).expect("infallible blob put");
        }
        // 2. Compute a merge commit from self.current_commit and other.current_commit.
        let parents = self.head.iter().copied().chain(other.head.iter().copied());
        let merge_commit = commit_metadata(
            &self.signing_key,
            parents,
            None, // No message for the merge commit
            None, // No content blob for the merge commit
            None, // No metadata blob for the merge commit
        );
        // 3. Store the merge commit in self.local_blobs.
        let commit_handle = self
            .local_blobs
            .put(merge_commit)
            .expect("failed to put merge commit blob");
        self.head = Some(commit_handle);

        Ok(commit_handle)
    }

    /// Create a merge commit that ties this workspace's current head and an
    /// arbitrary other commit (already present in the underlying blob store)
    /// together without requiring another [`Workspace`] instance.
    ///
    /// This does not attach any content to the merge commit.
    pub fn merge_commit(
        &mut self,
        other: Value<Handle<Blake3, SimpleArchive>>,
    ) -> Result<CommitHandle, MergeError> {
        // Validate that `other` can be loaded from either local or base blobs.
        // If it cannot be loaded we still proceed with the merge; dereference
        // failures will surface later when reading history. Callers should
        // ensure the reachable blobs were imported beforehand (e.g. by
        // combining `reachable` with `transfer`).

        let parents = self.head.iter().copied().chain(Some(other));
        let merge_commit = commit_metadata(&self.signing_key, parents, None, None, None);
        let commit_handle = self
            .local_blobs
            .put(merge_commit)
            .expect("failed to put merge commit blob");
        self.head = Some(commit_handle);
        Ok(commit_handle)
    }

    /// Returns the combined [`TribleSet`] for the specified commits.
    ///
    /// Each commit handle must reference a commit blob stored either in the
    /// workspace's local blob store or the repository's base store. The
    /// associated content blobs are loaded and unioned together. An error is
    /// returned if any commit or content blob is missing or malformed.
    fn checkout_commits<I>(
        &mut self,
        commits: I,
    ) -> Result<
        TribleSet,
        WorkspaceCheckoutError<<Blobs::Reader as BlobStoreGet<Blake3>>::GetError<UnarchiveError>>,
    >
    where
        I: IntoIterator<Item = CommitHandle>,
    {
        let local = self.local_blobs.reader().unwrap();
        let mut result = TribleSet::new();
        for commit in commits {
            let meta: TribleSet = local
                .get(commit)
                .or_else(|_| self.base_blobs.get(commit))
                .map_err(WorkspaceCheckoutError::Storage)?;

            // Some commits (for example merge commits) intentionally do not
            // carry a content blob. Treat those as no-ops during checkout so
            // callers can request ancestor ranges without failing when a
            // merge commit is encountered.
            let content_opt =
                match find!((c: Value<_>), pattern!(&meta, [{ content: ?c }])).at_most_one() {
                    Ok(Some((c,))) => Some(c),
                    Ok(None) => None,
                    Err(_) => return Err(WorkspaceCheckoutError::BadCommitMetadata()),
                };

            if let Some(c) = content_opt {
                let set: TribleSet = local
                    .get(c)
                    .or_else(|_| self.base_blobs.get(c))
                    .map_err(WorkspaceCheckoutError::Storage)?;
                result += set;
            } else {
                // No content for this commit (e.g. merge-only commit); skip it.
                continue;
            }
        }
        Ok(result)
    }

    fn checkout_commits_metadata<I>(
        &mut self,
        commits: I,
    ) -> Result<
        TribleSet,
        WorkspaceCheckoutError<<Blobs::Reader as BlobStoreGet<Blake3>>::GetError<UnarchiveError>>,
    >
    where
        I: IntoIterator<Item = CommitHandle>,
    {
        let local = self.local_blobs.reader().unwrap();
        let mut result = TribleSet::new();
        for commit in commits {
            let meta: TribleSet = local
                .get(commit)
                .or_else(|_| self.base_blobs.get(commit))
                .map_err(WorkspaceCheckoutError::Storage)?;

            let metadata_opt =
                match find!((c: Value<_>), pattern!(&meta, [{ metadata: ?c }])).at_most_one() {
                    Ok(Some((c,))) => Some(c),
                    Ok(None) => None,
                    Err(_) => return Err(WorkspaceCheckoutError::BadCommitMetadata()),
                };

            if let Some(c) = metadata_opt {
                let set: TribleSet = local
                    .get(c)
                    .or_else(|_| self.base_blobs.get(c))
                    .map_err(WorkspaceCheckoutError::Storage)?;
                result += set;
            }
        }
        Ok(result)
    }

    fn checkout_commits_with_metadata<I>(
        &mut self,
        commits: I,
    ) -> Result<
        (TribleSet, TribleSet),
        WorkspaceCheckoutError<<Blobs::Reader as BlobStoreGet<Blake3>>::GetError<UnarchiveError>>,
    >
    where
        I: IntoIterator<Item = CommitHandle>,
    {
        let local = self.local_blobs.reader().unwrap();
        let mut data = TribleSet::new();
        let mut metadata_set = TribleSet::new();
        for commit in commits {
            let meta: TribleSet = local
                .get(commit)
                .or_else(|_| self.base_blobs.get(commit))
                .map_err(WorkspaceCheckoutError::Storage)?;

            let content_opt =
                match find!((c: Value<_>), pattern!(&meta, [{ content: ?c }])).at_most_one() {
                    Ok(Some((c,))) => Some(c),
                    Ok(None) => None,
                    Err(_) => return Err(WorkspaceCheckoutError::BadCommitMetadata()),
                };

            if let Some(c) = content_opt {
                let set: TribleSet = local
                    .get(c)
                    .or_else(|_| self.base_blobs.get(c))
                    .map_err(WorkspaceCheckoutError::Storage)?;
                data += set;
            }

            let metadata_opt =
                match find!((c: Value<_>), pattern!(&meta, [{ metadata: ?c }])).at_most_one() {
                    Ok(Some((c,))) => Some(c),
                    Ok(None) => None,
                    Err(_) => return Err(WorkspaceCheckoutError::BadCommitMetadata()),
                };

            if let Some(c) = metadata_opt {
                let set: TribleSet = local
                    .get(c)
                    .or_else(|_| self.base_blobs.get(c))
                    .map_err(WorkspaceCheckoutError::Storage)?;
                metadata_set += set;
            }
        }
        Ok((data, metadata_set))
    }

    /// Returns the combined [`TribleSet`] for the specified commits or commit
    /// ranges. `spec` can be a single [`CommitHandle`], an iterator of handles
    /// or any of the standard range types over [`CommitHandle`].
    pub fn checkout<R>(
        &mut self,
        spec: R,
    ) -> Result<
        Checkout,
        WorkspaceCheckoutError<<Blobs::Reader as BlobStoreGet<Blake3>>::GetError<UnarchiveError>>,
    >
    where
        R: CommitSelector<Blobs>,
    {
        let commits = spec.select(self)?;
        let facts = self.checkout_commits(commits.iter().map(|raw| Value::new(*raw)))?;
        Ok(Checkout { facts, commits })
    }

    /// Returns the combined metadata [`TribleSet`] for the specified commits.
    /// Commits without metadata handles contribute an empty set.
    pub fn checkout_metadata<R>(
        &mut self,
        spec: R,
    ) -> Result<
        TribleSet,
        WorkspaceCheckoutError<<Blobs::Reader as BlobStoreGet<Blake3>>::GetError<UnarchiveError>>,
    >
    where
        R: CommitSelector<Blobs>,
    {
        let patch = spec.select(self)?;
        let commits = patch.iter().map(|raw| Value::new(*raw));
        self.checkout_commits_metadata(commits)
    }

    /// Returns the combined data and metadata [`TribleSet`] for the specified commits.
    /// Metadata is loaded from each commit's `metadata` handle, when present.
    pub fn checkout_with_metadata<R>(
        &mut self,
        spec: R,
    ) -> Result<
        (TribleSet, TribleSet),
        WorkspaceCheckoutError<<Blobs::Reader as BlobStoreGet<Blake3>>::GetError<UnarchiveError>>,
    >
    where
        R: CommitSelector<Blobs>,
    {
        let patch = spec.select(self)?;
        let commits = patch.iter().map(|raw| Value::new(*raw));
        self.checkout_commits_with_metadata(commits)
    }
}

#[derive(Debug)]
pub enum WorkspaceCheckoutError<GetErr: Error> {
    /// Error retrieving blobs from storage.
    Storage(GetErr),
    /// Commit metadata is malformed or ambiguous.
    BadCommitMetadata(),
}

impl<E: Error + fmt::Debug> fmt::Display for WorkspaceCheckoutError<E> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            WorkspaceCheckoutError::Storage(e) => write!(f, "storage error: {e}"),
            WorkspaceCheckoutError::BadCommitMetadata() => {
                write!(f, "commit metadata malformed")
            }
        }
    }
}

impl<E: Error + fmt::Debug> Error for WorkspaceCheckoutError<E> {}

fn collect_reachable<Blobs: BlobStore<Blake3>>(
    ws: &mut Workspace<Blobs>,
    from: CommitHandle,
) -> Result<
    CommitSet,
    WorkspaceCheckoutError<<Blobs::Reader as BlobStoreGet<Blake3>>::GetError<UnarchiveError>>,
> {
    let mut visited = HashSet::new();
    let mut stack = vec![from];
    let mut result = CommitSet::new();

    while let Some(commit) = stack.pop() {
        if !visited.insert(commit) {
            continue;
        }
        result.insert(&Entry::new(&commit.raw));

        let meta: TribleSet = ws
            .local_blobs
            .reader()
            .unwrap()
            .get(commit)
            .or_else(|_| ws.base_blobs.get(commit))
            .map_err(WorkspaceCheckoutError::Storage)?;

        for (p,) in find!((p: Value<_>,), pattern!(&meta, [{ parent: ?p }])) {
            stack.push(p);
        }
    }

    Ok(result)
}