dbsp 0.287.0

Continuous streaming analytics engine
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
//! # Traces
//!
//! A "trace" describes how a collection of key-value pairs changes over time.
//! A "batch" is a mostly immutable trace.  This module provides traits and
//! structures for expressing traces in DBSP as collections of `(key, val, time,
//! diff)` tuples.
//!
//! The base trait for a trace is [`BatchReader`], which allows a trace to be
//! read in sorted order by key and value.  `BatchReader` provides [`Cursor`] to
//! step through a batch's tuples without modifying them.
//!
//! The [`Batch`] trait extends [`BatchReader`] with types and methods for
//! creating new traces from ordered tuples ([`Batch::Builder`]) or unordered
//! tuples ([`Batch::Batcher`]), or by merging traces of like types.
//!
//! The [`Trace`] trait, which also extends [`BatchReader`], adds methods to
//! append new batches.  New tuples must not have times earlier than any of the
//! tuples already in the trace.
//!
//! # Time within traces
//!
//! See the [time](crate::time) module documentation for a description of
//! logical times.
//!
//! Traces are sorted by key and value.  They are not sorted with respect to
//! time: reading a trace might obtain out of order and duplicate times among
//! the `(time, diff)` pairs associated with a key and value.

use crate::circuit::metadata::OperatorMeta;
use crate::dynamic::{ClonableTrait, DynDataTyped, DynUnit, Weight};
use crate::storage::buffer_cache::CacheStats;
use crate::storage::file::SerializerInner;
pub use crate::storage::file::{DbspSerializer, Deserializable, Deserializer, Rkyv};
use crate::trace::cursor::{
    DefaultPushCursor, FilteredMergeCursor, FilteredMergeCursorWithSnapshot, PushCursor,
    UnfilteredMergeCursor,
};
use crate::utils::IsNone;
use crate::{dynamic::ArchivedDBData, storage::buffer_cache::FBuf};
use cursor::CursorFactory;
use enum_map::Enum;
use feldera_storage::fbuf::FBufSerializer;
use feldera_storage::{FileCommitter, FileReader, StoragePath};
use rand::{Rng, thread_rng};
use rkyv::ser::Serializer as _;
use size_of::SizeOf;
use std::any::TypeId;
use std::sync::Arc;
use std::{fmt::Debug, hash::Hash};

pub mod cursor;
pub mod filter;
pub mod layers;
pub mod ord;
pub mod spine_async;
pub use spine_async::{BatchReaderWithSnapshot, ListMerger, Spine, SpineSnapshot, WithSnapshot};

#[cfg(test)]
pub mod test;

pub use ord::{
    FallbackIndexedWSet, FallbackIndexedWSetBuilder, FallbackIndexedWSetFactories,
    FallbackKeyBatch, FallbackKeyBatchFactories, FallbackValBatch, FallbackValBatchFactories,
    FallbackWSet, FallbackWSetBuilder, FallbackWSetFactories, FileIndexedWSet,
    FileIndexedWSetFactories, FileKeyBatch, FileKeyBatchFactories, FileValBatch,
    FileValBatchFactories, FileWSet, FileWSetFactories, OrdIndexedWSet, OrdIndexedWSetBuilder,
    OrdIndexedWSetFactories, OrdKeyBatch, OrdKeyBatchFactories, OrdValBatch, OrdValBatchFactories,
    OrdWSet, OrdWSetBuilder, OrdWSetFactories, VecIndexedWSet, VecIndexedWSetFactories,
    VecKeyBatch, VecKeyBatchFactories, VecValBatch, VecValBatchFactories, VecWSet,
    VecWSetFactories,
};

use rkyv::{Deserialize, archived_root};

use crate::{
    Error, NumEntries, Timestamp,
    algebra::MonoidValue,
    dynamic::{DataTrait, DynPair, DynVec, DynWeightedPairs, Erase, Factory, WeightTrait},
    storage::file::reader::Error as ReaderError,
    storage::filter_stats::FilterStats,
};
pub use cursor::{Cursor, MergeCursor};
pub use filter::{Filter, GroupFilter};
pub use layers::Trie;

/// Trait for data stored in batches.
///
/// This trait is used as a bound on `BatchReader::Key` and `BatchReader::Val`
/// associated types (see [`trait BatchReader`]).  Hence when writing code that
/// must be generic over any relational data, it is sufficient to impose
/// `DBData` as a trait bound on types.  Conversely, a trait bound of the form
/// `B: BatchReader` implies `B::Key: DBData` and `B::Val: DBData`.
pub trait DBData:
    Default
    + Clone
    + Eq
    + Ord
    + Hash
    + SizeOf
    + Send
    + Sync
    + Debug
    + ArchivedDBData
    + IsNone<Inner: ArchivedDBData>
    + 'static
{
}

/// Automatically implement DBData for everything that satisfied the bounds.
impl<T> DBData for T where
    T: Default
        + Clone
        + Eq
        + Ord
        + Hash
        + SizeOf
        + Send
        + Sync
        + Debug
        + ArchivedDBData
        + IsNone<Inner: ArchivedDBData>
        + 'static
{
}

/// A spine that is serialized to a file.
#[derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive)]
pub(crate) struct CommittedSpine {
    pub batches: Vec<String>,
    pub merged: Vec<(String, String)>,
    pub effort: u64,
    pub dirty: bool,
}

/// Deserializes `bytes` as type `T` using `rkyv`, tolerating `bytes` being
/// misaligned.
pub fn unaligned_deserialize<T: Deserializable>(bytes: &[u8]) -> T {
    let mut aligned_bytes = FBuf::new();
    aligned_bytes.extend_from_slice(bytes);
    aligned_deserialize(&aligned_bytes)
}

/// Deserializes `bytes` as type `T` using `rkyv`.  `bytes` must be properly
/// aligned.
pub fn aligned_deserialize<T: Deserializable>(bytes: &[u8]) -> T {
    unsafe { archived_root::<T>(bytes) }
        .deserialize(&mut Deserializer::default())
        .unwrap()
}

/// Trait for data types used as weights.
///
/// A type used for weights in a batch (i.e., as `BatchReader::R`) must behave
/// as a monoid, i.e., a set with an associative `+` operation and a neutral
/// element (zero).
///
/// Some applications use a weight as a ring, that is, require it to support
/// multiplication too.
///
/// Finally, some applications require it to have `<` and `>` operations, in
/// particular to distinguish whether something is an insertion or deletion.
///
/// Signed integer types such as `i32` and `i64` are suitable as weights,
/// although if there is overflow then the results will be wrong.
///
/// When writing code generic over any weight type, it is sufficient to impose
/// `DBWeight` as a trait bound on types.  Conversely, a trait bound of the form
/// `B: BatchReader` implies `B::R: DBWeight`.
pub trait DBWeight: DBData + MonoidValue {}
impl<T> DBWeight for T where T: DBData + MonoidValue {}

pub trait BatchReaderFactories<
    K: DataTrait + ?Sized,
    V: DataTrait + ?Sized,
    T,
    R: WeightTrait + ?Sized,
>: Clone + Send + Sync
{
    // type BatchItemVTable: BatchItemTypeDescr<Key = K, Val = V, Item = I, R = R>;
    fn new<KType, VType, RType>() -> Self
    where
        KType: DBData + Erase<K>,
        VType: DBData + Erase<V>,
        RType: DBWeight + Erase<R>;

    fn key_factory(&self) -> &'static dyn Factory<K>;
    fn keys_factory(&self) -> &'static dyn Factory<DynVec<K>>;
    fn val_factory(&self) -> &'static dyn Factory<V>;
    fn weight_factory(&self) -> &'static dyn Factory<R>;
}

// TODO: use Tuple3 instead
pub type WeightedItem<K, V, R> = DynPair<DynPair<K, V>, R>;

pub trait BatchFactories<K: DataTrait + ?Sized, V: DataTrait + ?Sized, T, R: WeightTrait + ?Sized>:
    BatchReaderFactories<K, V, T, R>
{
    fn item_factory(&self) -> &'static dyn Factory<DynPair<K, V>>;

    fn weighted_items_factory(&self) -> &'static dyn Factory<DynWeightedPairs<DynPair<K, V>, R>>;
    fn weighted_vals_factory(&self) -> &'static dyn Factory<DynWeightedPairs<V, R>>;
    fn weighted_item_factory(&self) -> &'static dyn Factory<WeightedItem<K, V, R>>;

    /// Factory for a vector of (T, R) or `None` if `T` is `()`.
    fn time_diffs_factory(
        &self,
    ) -> Option<&'static dyn Factory<DynWeightedPairs<DynDataTyped<T>, R>>>;
}

/// A set of `(key, val, time, diff)` tuples that can be read and extended.
///
/// `Trace` extends [`BatchReader`], most notably with [`insert`][Self::insert]
/// for adding new batches of tuples.
///
/// See [crate documentation](crate::trace) for more information on batches and
/// traces.
pub trait Trace: BatchReader {
    /// The type of an immutable collection of updates.
    type Batch: Batch<
            Key = Self::Key,
            Val = Self::Val,
            Time = Self::Time,
            R = Self::R,
            Factories = Self::Factories,
        >;

    /// Allocates a new empty trace.
    fn new(factories: &Self::Factories) -> Self;

    /// Sets a compaction frontier, i.e., a timestamp such that timestamps
    /// below the frontier are indistinguishable to DBSP, therefore any `ts`
    /// in the trace can be safely replaced with `ts.join(frontier)` without
    /// affecting the output of the circuit.  By applying this replacement,
    /// updates to the same (key, value) pairs applied during different steps
    /// can be merged or discarded.
    ///
    /// The compaction is performed lazily at merge time.
    fn set_frontier(&mut self, frontier: &Self::Time);

    /// Exert merge effort, even without updates.
    fn exert(&mut self, effort: &mut isize);

    /// Merge all updates in a trace into a single batch.
    fn consolidate(self) -> Option<Self::Batch>;

    /// Introduces a batch of updates to the trace.
    fn insert(&mut self, batch: Self::Batch);

    /// Introduces a batch of updates to the trace. More efficient that cloning
    /// a batch and calling `insert`.
    fn insert_arc(&mut self, batch: Arc<Self::Batch>);

    /// Clears the value of the "dirty" flag to `false`.
    ///
    /// The "dirty" flag is used to efficiently track changes to the trace,
    /// e.g., as part of checking whether a circuit has reached a fixed point.
    /// Pushing a non-empty batch to the trace sets the flag to `true`. The
    /// [`Self::dirty`] method returns true iff the trace has changed since the
    /// last call to `clear_dirty_flag`.
    fn clear_dirty_flag(&mut self);

    /// Returns the value of the dirty flag.
    fn dirty(&self) -> bool;

    /// Informs the trace that keys that don't pass the filter are no longer
    /// used and can be removed from the trace.
    ///
    /// The implementation is not required to remove truncated keys instantly
    /// or at all.  This method is just a hint that keys that don't pass the
    /// filter are no longer of interest to the consumer of the trace and
    /// can be garbage collected.
    ///
    /// # Rationale
    ///
    /// This API is similar to the old API `BatchReader::truncate_keys_below`,
    /// but in [Trace] instead of [BatchReader].  The difference is that a batch
    /// can truncate its keys instanly by simply moving an internal pointer to
    /// the first remaining key.  However, there is no similar way to retain
    /// keys based on arbitrary predicates, this can only be done efficiently as
    /// part of trace maintenance when either merging or compacting batches.
    fn retain_keys(&mut self, filter: Filter<Self::Key>);

    /// Informs the trace that values that don't pass the filter are no longer
    /// used and can be removed from the trace.
    ///
    /// The implementation is not required to remove truncated values instantly
    /// or at all.  This method is just a hint that values that don't pass the
    /// filter are no longer of interest to the consumer of the trace and
    /// can be garbage collected.
    fn retain_values(&mut self, filter: GroupFilter<Self::Val>);

    fn key_filter(&self) -> &Option<Filter<Self::Key>>;
    fn value_filter(&self) -> &Option<GroupFilter<Self::Val>>;

    /// Writes this trace to storage beneath `base`, using `pid` as a file name
    /// prefix.  Adds the files that were written to `files` so that they can be
    /// committed later.
    fn save(
        &mut self,
        base: &StoragePath,
        pid: &str,
        files: &mut Vec<Arc<dyn FileCommitter>>,
    ) -> Result<(), Error>;

    /// Reads this trace back from storage under `base` with `pid` as the
    /// prefix.
    fn restore(&mut self, base: &StoragePath, pid: &str) -> Result<(), Error>;

    /// Allows the trace to report additional metadata.
    fn metadata(&self, _meta: &mut OperatorMeta) {}
}

/// Where a batch is stored.
#[derive(Copy, Clone, Debug, PartialEq, Eq, Enum)]
pub enum BatchLocation {
    /// In RAM.
    Memory,

    /// On disk.
    Storage,
}

// impl BatchLocation {
//     fn as_str(&self) -> &'static str {
//         match self {
//             Self::Memory => "memory",
//             Self::Storage => "storage",
//         }
//     }
// }

/// A set of `(key, value, time, diff)` tuples whose contents may be read in
/// order by key and value.
///
/// A `BatchReader` is a mostly read-only interface.  This is especially useful
/// for views derived from other sources in ways that prevent the construction
/// of batches from the type of data in the view (for example, filtered views,
/// or views with extended time coordinates).
///
/// See [crate documentation](crate::trace) for more information on batches and
/// traces.
///
/// # Object safety
///
/// `BatchReader` is not object safe (it cannot be used as `dyn BatchReader`),
/// but [Cursor] is, which can often be a useful substitute.
pub trait BatchReader: Debug + NumEntries + Rkyv + SizeOf + 'static
where
    Self: Sized,
{
    type Factories: BatchFactories<Self::Key, Self::Val, Self::Time, Self::R>;

    /// Key by which updates are indexed.
    type Key: DataTrait + ?Sized;

    /// Values associated with keys.
    type Val: DataTrait + ?Sized;

    /// Timestamps associated with updates
    type Time: Timestamp;

    /// Associated update.
    type R: WeightTrait + ?Sized;

    /// The type used to enumerate the batch's contents.
    type Cursor<'s>: Cursor<Self::Key, Self::Val, Self::Time, Self::R> + Clone + Send
    where
        Self: 's;

    // type Consumer: Consumer<Self::Key, Self::Val, Self::R, Self::Time>;

    fn factories(&self) -> Self::Factories;

    /// Acquires a cursor to the batch's contents.
    fn cursor(&self) -> Self::Cursor<'_>;

    /// Acquires a [PushCursor] for the batch's contents.
    fn push_cursor(
        &self,
    ) -> Box<dyn PushCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_> {
        Box::new(DefaultPushCursor::new(self.cursor()))
    }

    /// Acquires a [MergeCursor] for the batch's contents.
    fn merge_cursor(
        &self,
        key_filter: Option<Filter<Self::Key>>,
        value_filter: Option<GroupFilter<Self::Val>>,
    ) -> Box<dyn MergeCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_> {
        if key_filter.is_none() && value_filter.is_none() {
            Box::new(UnfilteredMergeCursor::new(self.cursor()))
        } else if let Some(GroupFilter::Simple(filter)) = value_filter {
            Box::new(FilteredMergeCursor::new(
                self.cursor(),
                key_filter,
                Some(filter),
            ))
        } else {
            // Other forms of GroupFilters cannot be evaluated without a trace snapshot -- don't filter values
            // in such cursors.
            Box::new(FilteredMergeCursor::new(self.cursor(), key_filter, None))
        }
    }

    /// Similar to `merge_cursor`, but invoked in the context of a spine merger.
    /// Takes the current spine snapshot as an extra argument and uses it to evaluate `value_filter` precisely.
    fn merge_cursor_with_snapshot<'a, S>(
        &'a self,
        key_filter: Option<Filter<Self::Key>>,
        value_filter: Option<GroupFilter<Self::Val>>,
        snapshot: &'a Option<Arc<S>>,
    ) -> Box<dyn MergeCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + 'a>
    where
        S: BatchReader<Key = Self::Key, Val = Self::Val, Time = Self::Time, R = Self::R>,
    {
        let Some(snapshot) = snapshot else {
            return self.merge_cursor(key_filter, value_filter);
        };
        if key_filter.is_none() && value_filter.is_none() {
            Box::new(UnfilteredMergeCursor::new(self.cursor()))
        } else if value_filter.is_none() {
            Box::new(FilteredMergeCursor::new(self.cursor(), key_filter, None))
        } else if let Some(GroupFilter::Simple(filter)) = value_filter {
            Box::new(FilteredMergeCursor::new(
                self.cursor(),
                key_filter,
                Some(filter),
            ))
        } else {
            Box::new(FilteredMergeCursorWithSnapshot::new(
                self.cursor(),
                key_filter,
                value_filter.unwrap(),
                snapshot,
            ))
        }
    }

    /// Acquires a merge cursor for the batch's contents.
    fn consuming_cursor(
        &mut self,
        key_filter: Option<Filter<Self::Key>>,
        value_filter: Option<GroupFilter<Self::Val>>,
    ) -> Box<dyn MergeCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_> {
        self.merge_cursor(key_filter, value_filter)
    }
    //fn consumer(self) -> Self::Consumer;

    /// The number of keys in the batch.
    // TODO: return `(usize, Option<usize>)`, similar to
    // `Iterator::size_hint`, since not all implementations
    // can compute the number of keys precisely.  Same for
    // `len()`.
    fn key_count(&self) -> usize;

    /// The number of updates in the batch.
    fn len(&self) -> usize;

    /// The memory or storage size of the batch in bytes.
    ///
    /// This can be an approximation, such as the size of an on-disk file for a
    /// stored batch.
    ///
    /// Implementations of this function can be expensive because they might
    /// require iterating through all the data in a batch.  Currently this is
    /// only used to decide whether to keep the result of a merge in memory or
    /// on storage.  For this case, the merge will visit and copy all the data
    /// in the batch. The batch will be discarded afterward, which means that
    /// the implementation need not attempt to cache the return value.
    fn approximate_byte_size(&self) -> usize;

    /// Statistics of the secondary membership filter used by
    /// [Cursor::seek_key_exact] after the range filter.
    ///
    /// Today this is usually a Bloom filter. Batches without such a filter
    /// should return `FilterStats::default()`.
    fn membership_filter_stats(&self) -> FilterStats;

    /// Statistics of the in-memory range filter used by
    /// [Cursor::seek_key_exact].
    ///
    /// Batches without a range filter should return `FilterStats::default()`.
    fn range_filter_stats(&self) -> FilterStats {
        FilterStats::default()
    }

    /// Where the batch's data is stored.
    fn location(&self) -> BatchLocation {
        BatchLocation::Memory
    }

    /// Storage cache access statistics for this batch only.
    ///
    /// Most batches are in-memory, so they don't have any statistics.
    fn cache_stats(&self) -> CacheStats {
        CacheStats::default()
    }

    /// True if the batch is empty.
    fn is_empty(&self) -> bool {
        self.len() == 0
    }

    /// Returns a uniform random sample of distincts keys from the batch.
    ///
    /// Does not take into account the number values associated with each
    /// key and their weights, i.e., a key that has few values is as likely
    /// to appear in the output sample as a key with many values.
    ///
    /// # Arguments
    ///
    /// * `rng` - random number generator used to generate the sample.
    ///
    /// * `sample_size` - requested sample size.
    ///
    /// * `sample` - output
    ///
    /// # Invariants
    ///
    /// The actual sample computed by the method can be smaller than
    /// `sample_size` even if `self` contains `>sample_size` keys.
    ///
    /// A correct implementation must enforce the following invariants:
    ///
    /// * The output sample size cannot exceed `sample_size`.
    ///
    /// * The output sample can only contain keys present in `self` (with
    ///   non-zero weights).
    ///
    /// * If `sample_size` is greater than or equal to the number of keys
    ///   present in `self` (with non-zero weights), the resulting sample must
    ///   contain all such keys.
    ///
    /// * The output sample contains keys sorted in ascending order.
    fn sample_keys<RG>(&self, rng: &mut RG, sample_size: usize, sample: &mut DynVec<Self::Key>)
    where
        Self::Time: PartialEq<()>,
        RG: Rng;

    /// Returns num_partitions-1 keys from the batch that partition the batch into num_partitions
    /// approximately equal size ranges 0..key1, key1..key2, ... , key_num_partitions-1..last_key_in_the_batch.
    ///
    /// The default implementation uses the sample_keys method to sample num_partitions^2 keys and
    /// picks keys num_partitions, 2*num_partitions, ..,num_partitions-1*num_partitions as boundaries.
    ///
    /// # Arguments
    ///
    /// * `num_partitions` - number of partitions to create.
    /// * `bounds` - output vector to store the partition boundaries.
    fn partition_keys(&self, num_partitions: usize, bounds: &mut DynVec<Self::Key>)
    where
        Self::Time: PartialEq<()>,
    {
        bounds.clear();
        if num_partitions <= 1 {
            return;
        }

        let sample_size = num_partitions * num_partitions;

        let mut sample = self.factories().keys_factory().default_box();
        self.sample_keys(&mut thread_rng(), sample_size, sample.as_mut());

        // Pick evenly distributed keys as boundaries
        let sample_len = sample.len();
        if sample_len == 0 {
            return;
        }

        if sample_len >= num_partitions {
            // Pick num_bounds evenly distributed indices from the sample
            // These divide the sample into num_bounds + 1 roughly equal parts
            for i in 0..num_partitions - 1 {
                let idx = ((i + 1) * sample_len) / num_partitions;
                let idx = idx.min(sample_len - 1);
                bounds.push_ref(sample.index(idx));
            }
        } else {
            // If we have fewer samples than needed, use what we have
            for i in 0..sample_len {
                bounds.push_ref(sample.index(i));
            }
        }
    }

    /// Creates and returns a new batch that is a subset of this one, containing
    /// only the key-value pairs whose keys are in `keys`. May also return
    /// `None`, the default implementation, if the batch doesn't want to
    /// implement this method.  In particular, a batch for which access through
    /// a cursor is fast should return `None` to avoid the expense of copying
    /// data.
    ///
    /// # Rationale
    ///
    /// This method enables performance optimizations for the case where these
    /// assumptions hold:
    ///
    /// 1. Individual [Batch]es flowing through a circuit are small enough to
    ///    fit comfortably in memory.
    ///
    /// 2. [Trace]s accumulated over time as a circuit executes may become large
    ///    enough that they must be maintained in external storage.
    ///
    /// If an operator needs to fetch all of the data from a `trace` that
    /// corresponds to some set of `keys`, then, given these assumptions, doing
    /// so one key at a time with a cursor will be slow because every key fetch
    /// potentially incurs a round trip to the storage, with total latency O(n)
    /// in the number of keys. This method gives the batch implementation the
    /// opportunity to implement parallel fetch for `trace.fetch(key)`, with
    /// total latency O(1) in the number of keys.
    #[allow(async_fn_in_trait)]
    async fn fetch<B>(
        &self,
        keys: &B,
    ) -> Option<Box<dyn CursorFactory<Self::Key, Self::Val, Self::Time, Self::R>>>
    where
        B: BatchReader<Key = Self::Key, Time = ()>,
    {
        let _ = keys;
        None
    }

    fn keys(&self) -> Option<&DynVec<Self::Key>> {
        None
    }
}

impl<B> BatchReader for Arc<B>
where
    B: BatchReader,
{
    type Factories = B::Factories;
    type Key = B::Key;
    type Val = B::Val;
    type Time = B::Time;
    type R = B::R;
    type Cursor<'s> = B::Cursor<'s>;
    fn factories(&self) -> Self::Factories {
        (**self).factories()
    }
    fn cursor(&self) -> Self::Cursor<'_> {
        (**self).cursor()
    }
    fn merge_cursor(
        &self,
        key_filter: Option<Filter<Self::Key>>,
        value_filter: Option<GroupFilter<Self::Val>>,
    ) -> Box<dyn MergeCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_> {
        (**self).merge_cursor(key_filter, value_filter)
    }
    fn key_count(&self) -> usize {
        (**self).key_count()
    }
    fn len(&self) -> usize {
        (**self).len()
    }
    fn approximate_byte_size(&self) -> usize {
        (**self).approximate_byte_size()
    }
    fn membership_filter_stats(&self) -> FilterStats {
        (**self).membership_filter_stats()
    }
    fn range_filter_stats(&self) -> FilterStats {
        (**self).range_filter_stats()
    }
    fn location(&self) -> BatchLocation {
        (**self).location()
    }
    fn cache_stats(&self) -> CacheStats {
        (**self).cache_stats()
    }
    fn is_empty(&self) -> bool {
        (**self).is_empty()
    }
    fn sample_keys<RG>(&self, rng: &mut RG, sample_size: usize, sample: &mut DynVec<Self::Key>)
    where
        Self::Time: PartialEq<()>,
        RG: Rng,
    {
        (**self).sample_keys(rng, sample_size, sample)
    }
    fn consuming_cursor(
        &mut self,
        key_filter: Option<Filter<Self::Key>>,
        value_filter: Option<GroupFilter<Self::Val>>,
    ) -> Box<dyn MergeCursor<Self::Key, Self::Val, Self::Time, Self::R> + Send + '_> {
        (**self).merge_cursor(key_filter, value_filter)
    }
    async fn fetch<KB>(
        &self,
        keys: &KB,
    ) -> Option<Box<dyn CursorFactory<Self::Key, Self::Val, Self::Time, Self::R>>>
    where
        KB: BatchReader<Key = Self::Key, Time = ()>,
    {
        (**self).fetch(keys).await
    }
    fn keys(&self) -> Option<&DynVec<Self::Key>> {
        (**self).keys()
    }
}

/// A [`BatchReader`] plus features for constructing new batches.
///
/// [`Batch`] extends [`BatchReader`] with types for constructing new batches
/// from ordered tuples ([`Self::Builder`]) or unordered tuples
/// ([`Self::Batcher`]), or by merging traces of like types, plus some
/// convenient methods for using those types.
///
/// See [crate documentation](crate::trace) for more information on batches and
/// traces.
pub trait Batch: BatchReader + Clone + Send + Sync
where
    Self: Sized,
{
    /// A batch type equivalent to `Self`, but with timestamp type `T` instead of `Self::Time`.
    type Timed<T: Timestamp>: Batch<
            Key = <Self as BatchReader>::Key,
            Val = <Self as BatchReader>::Val,
            Time = T,
            R = <Self as BatchReader>::R,
        >;

    /// A type used to assemble batches from disordered updates.
    type Batcher: Batcher<Self>;

    /// A type used to assemble batches from ordered update sequences.
    type Builder: Builder<Self>;

    /// Assemble an unordered vector of weighted items into a batch.
    #[allow(clippy::type_complexity)]
    fn dyn_from_tuples(
        factories: &Self::Factories,
        time: Self::Time,
        tuples: &mut Box<DynWeightedPairs<DynPair<Self::Key, Self::Val>, Self::R>>,
    ) -> Self {
        let mut batcher = Self::Batcher::new_batcher(factories, time);
        batcher.push_batch(tuples);
        batcher.seal()
    }

    /// Creates a new batch as a copy of `batch`, using `timestamp` for all of
    /// the new batch's timestamps This is useful for adding a timestamp to a
    /// batch, or for converting between different batch implementations
    /// (e.g. writing an in-memory batch to disk).
    ///
    /// TODO: for adding a timestamp to a batch, this could be implemented more
    /// efficiently by having a special batch type where all updates have the same
    /// timestamp, as this is the only kind of batch that we ever create directly in
    /// DBSP; batches with multiple timestamps are only created as a result of
    /// merging.  The main complication is that we will need to extend the trace
    /// implementation to work with batches of multiple types.  This shouldn't be
    /// too hard and is on the todo list.
    fn from_batch<BI>(batch: &BI, timestamp: &Self::Time, factories: &Self::Factories) -> Self
    where
        BI: BatchReader<Key = Self::Key, Val = Self::Val, Time = (), R = Self::R>,
    {
        // Source and destination types are usually the same in the top-level scope.
        // Optimize for this case by simply cloning the source batch. If the batch is
        // implemented as `Arc` internally, this is essentially zero cost.
        if TypeId::of::<BI>() == TypeId::of::<Self>() {
            unsafe { std::mem::transmute::<&BI, &Self>(batch).clone() }
        } else {
            Self::from_cursor(
                batch.cursor(),
                timestamp,
                factories,
                batch.key_count(),
                batch.len(),
            )
        }
    }

    /// Like `from_batch`, but avoids cloning the batch if the output type is identical to the input type.
    fn from_arc_batch<BI>(
        batch: &Arc<BI>,
        timestamp: &Self::Time,
        factories: &Self::Factories,
    ) -> Arc<Self>
    where
        BI: BatchReader<Key = Self::Key, Val = Self::Val, Time = (), R = Self::R>,
    {
        // Source and destination types are usually the same in the top-level scope.
        // Optimize for this case by simply cloning the source batch. If the batch is
        // implemented as `Arc` internally, this is essentially zero cost.
        if TypeId::of::<BI>() == TypeId::of::<Self>() {
            unsafe { std::mem::transmute::<&Arc<BI>, &Arc<Self>>(batch).clone() }
        } else {
            Arc::new(Self::from_cursor(
                batch.cursor(),
                timestamp,
                factories,
                batch.key_count(),
                batch.len(),
            ))
        }
    }

    /// Creates a new batch as a copy of the tuples accessible via `cursor``,
    /// using `timestamp` for all of the new batch's timestamps.
    fn from_cursor<C>(
        mut cursor: C,
        timestamp: &Self::Time,
        factories: &Self::Factories,
        key_capacity: usize,
        value_capacity: usize,
    ) -> Self
    where
        C: Cursor<Self::Key, Self::Val, (), Self::R>,
    {
        let mut builder = Self::Builder::with_capacity(factories, key_capacity, value_capacity);
        while cursor.key_valid() {
            let mut any_values = false;
            while cursor.val_valid() {
                let weight = cursor.weight();
                debug_assert!(!weight.is_zero());
                builder.push_time_diff(timestamp, weight);
                builder.push_val(cursor.val());
                any_values = true;
                cursor.step_val();
            }
            if any_values {
                builder.push_key(cursor.key());
            }
            cursor.step_key();
        }
        builder.done()
    }

    /// Creates an empty batch.
    fn dyn_empty(factories: &Self::Factories) -> Self {
        Self::Builder::new_builder(factories).done()
    }

    /// Returns elements from `self` that satisfy a predicate.
    fn filter(&self, predicate: &dyn Fn(&Self::Key, &Self::Val) -> bool) -> Self
    where
        Self::Time: PartialEq<()> + From<()>,
    {
        let factories = self.factories();
        let mut builder = Self::Builder::new_builder(&factories);
        let mut cursor = self.cursor();

        while cursor.key_valid() {
            let mut any_values = false;
            while cursor.val_valid() {
                if predicate(cursor.key(), cursor.val()) {
                    builder.push_diff(cursor.weight());
                    builder.push_val(cursor.val());
                    any_values = true;
                }
                cursor.step_val();
            }
            if any_values {
                builder.push_key(cursor.key());
            }
            cursor.step_key();
        }

        builder.done()
    }

    /// If this batch is not on storage, but supports writing itself to storage,
    /// this method writes it to storage and returns the stored version.
    fn persisted(&self) -> Option<Self> {
        None
    }

    /// This functions returns the file that can be used to restore the batch's
    /// contents.
    ///
    /// If the batch can not be persisted, this function returns None.
    fn file_reader(&self) -> Option<Arc<dyn FileReader>> {
        None
    }

    fn from_path(_factories: &Self::Factories, _path: &StoragePath) -> Result<Self, ReaderError> {
        Err(ReaderError::Unsupported)
    }

    /// The number of tuples with negative weights in the batch.
    ///
    /// This metric is used in merger heuristics. Negative weights are likely to cancel out
    /// with positive weights when merging the batch with other batches in the spine; therefore the
    /// merger will merge such batches more aggressively than it otherwise would based on the batch
    /// size only.
    ///
    /// This heuristic is not useful for all batch types, in particular negative weights in batches
    /// produced by recursive queries do not generally cancel out. For such batches we don't track
    /// negative weights, and this method returns `None`.
    fn negative_weight_count(&self) -> Option<u64>;
}

/// Functionality for collecting and batching updates.
pub trait Batcher<Output>: SizeOf
where
    Output: Batch,
{
    /// Allocates a new empty batcher.  All tuples in the batcher (and its
    /// output batch) will have timestamp `time`.
    fn new_batcher(vtables: &Output::Factories, time: Output::Time) -> Self;

    /// Adds an unordered batch of elements to the batcher.
    fn push_batch(
        &mut self,
        batch: &mut Box<DynWeightedPairs<DynPair<Output::Key, Output::Val>, Output::R>>,
    );

    /// Adds a consolidated batch of elements to the batcher.
    ///
    /// A consolidated batch is sorted and contains no duplicates or zero
    /// weights.
    fn push_consolidated_batch(
        &mut self,
        batch: &mut Box<DynWeightedPairs<DynPair<Output::Key, Output::Val>, Output::R>>,
    );

    /// Returns the number of tuples in the batcher.
    fn tuples(&self) -> usize;

    /// Returns all updates not greater or equal to an element of `upper`.
    fn seal(self) -> Output;
}

/// Functionality for building batches from ordered update sequences.
///
/// This interface requires the client to push all of the time-diff pairs
/// associated with a value, then the value, then all the time-diff pairs
/// associated with the next value, then that value, and so on. Once all of the
/// values associated with the current key have been pushed, the client pushes
/// the key.
///
/// If this interface is too low-level for the client, consider wrapping it in a
/// [TupleBuilder].
///
/// # Example
///
/// To push the following tuples:
///
/// ```text
/// (k1, v1, t1, r1)
/// (k1, v1, t2, r2)
/// (k1, v2, t1, r1)
/// (k1, v3, t2, r2)
/// (k2, v1, t1, r1)
/// (k2, v1, t2, r2)
/// (k3, v1, t1, r1)
/// (k4, v2, t2, r2)
/// ```
///
/// the client would use:
///
/// ```ignore
/// builder.push_time_diff(t1, r1);
/// builder.push_time_diff(t2, r2);
/// builder.push_val(v1);
/// builder.push_time_diff(t1, r1);
/// builder.push_val(v2);
/// builder.push_time_diff(t2, r2);
/// builder.push_val(v3);
/// builder.push_key(k1);
/// builder.push_time_diff(t1, r1);
/// builder.push_time_diff(t2, r2);
/// builder.push_val(v1);
/// builder.push_key(k2);
/// builder.push_time_diff(t1, r1);
/// builder.push_val(v1);
/// builder.push_key(k3);
/// builder.push_time_diff(t2, r2);
/// builder.push_val(v2);
/// builder.push_key(k4);
/// ```
pub trait Builder<Output>: Send + SizeOf
where
    Self: Sized,
    Output: Batch,
{
    /// Creates a new builder with an initial capacity of 0.
    fn new_builder(factories: &Output::Factories) -> Self {
        Self::with_capacity(factories, 0, 0)
    }

    /// Creates an empty builder with estimated capacities for keys and
    /// key-value pairs.  Only `tuple_capacity >= key_capacity` makes sense but
    /// implementations must tolerate contradictory capacity requests.
    fn with_capacity(
        factories: &Output::Factories,
        key_capacity: usize,
        value_capacity: usize,
    ) -> Self;

    /// Creates an empty builder to hold the result of merging
    /// `batches`. Optionally, `location` can specify the preferred location for
    /// the result of the merge.
    fn for_merge<'a, B, I>(
        factories: &Output::Factories,
        batches: I,
        location: Option<BatchLocation>,
    ) -> Self
    where
        B: BatchReader,
        I: IntoIterator<Item = &'a B> + Clone,
    {
        let _ = location;
        let key_capacity = batches.clone().into_iter().map(|b| b.key_count()).sum();
        let value_capacity = batches.into_iter().map(|b| b.len()).sum();
        Self::with_capacity(factories, key_capacity, value_capacity)
    }

    /// Adds time-diff pair `(time, weight)`.
    fn push_time_diff(&mut self, time: &Output::Time, weight: &Output::R);

    /// Adds time-diff pair `(time, weight)`.
    fn push_time_diff_mut(&mut self, time: &mut Output::Time, weight: &mut Output::R) {
        self.push_time_diff(time, weight);
    }

    /// Adds value `val`.
    fn push_val(&mut self, val: &Output::Val);

    /// Adds value `val`.
    fn push_val_mut(&mut self, val: &mut Output::Val) {
        self.push_val(val);
    }

    /// Adds key `key`.
    fn push_key(&mut self, key: &Output::Key);

    /// Adds key `key`.
    fn push_key_mut(&mut self, key: &mut Output::Key) {
        self.push_key(key);
    }

    /// Adds time-diff pair `(), weight`.
    fn push_diff(&mut self, weight: &Output::R)
    where
        Output::Time: PartialEq<()>,
    {
        self.push_time_diff(&Output::Time::default(), weight);
    }

    /// Adds time-diff pair `(), weight`.
    fn push_diff_mut(&mut self, weight: &mut Output::R)
    where
        Output::Time: PartialEq<()>,
    {
        self.push_diff(weight);
    }

    /// Adds time-diff pair `(), weight` and value `val`.
    fn push_val_diff(&mut self, val: &Output::Val, weight: &Output::R)
    where
        Output::Time: PartialEq<()>,
    {
        self.push_time_diff(&Output::Time::default(), weight);
        self.push_val(val);
    }

    /// Adds time-diff pair `(), weight` and value `val`.
    fn push_val_diff_mut(&mut self, val: &mut Output::Val, weight: &mut Output::R)
    where
        Output::Time: PartialEq<()>,
    {
        self.push_val_diff(val, weight);
    }

    /// Allocates room for `additional` keys.
    fn reserve(&mut self, additional: usize) {
        let _ = additional;
    }

    fn num_keys(&self) -> usize;
    fn num_tuples(&self) -> usize;

    /// Completes building and returns the batch.
    fn done(self) -> Output;
}

/// Batch builder that accepts a full tuple at a time.
///
/// This wrapper for [Builder] allows a full tuple to be added at a time.
pub struct TupleBuilder<B, Output>
where
    B: Builder<Output>,
    Output: Batch,
{
    builder: B,
    kv: Box<DynPair<Output::Key, Output::Val>>,
    has_kv: bool,
}

impl<B, Output> TupleBuilder<B, Output>
where
    B: Builder<Output>,
    Output: Batch,
{
    pub fn new(factories: &Output::Factories, builder: B) -> Self {
        Self {
            builder,
            kv: factories.item_factory().default_box(),
            has_kv: false,
        }
    }

    pub fn num_keys(&self) -> usize {
        self.builder.num_keys()
    }

    pub fn num_tuples(&self) -> usize {
        self.builder.num_tuples()
    }

    /// Adds `element` to the batch.
    pub fn push(&mut self, element: &mut DynPair<DynPair<Output::Key, Output::Val>, Output::R>)
    where
        Output::Time: PartialEq<()>,
    {
        let (kv, w) = element.split_mut();
        let (k, v) = kv.split_mut();
        self.push_vals(k, v, &mut Output::Time::default(), w);
    }

    /// Adds tuple `(key, val, time, weight)` to the batch.
    pub fn push_refs(
        &mut self,
        key: &Output::Key,
        val: &Output::Val,
        time: &Output::Time,
        weight: &Output::R,
    ) {
        if self.has_kv {
            let (k, v) = self.kv.split_mut();
            if k != key {
                self.builder.push_val_mut(v);
                self.builder.push_key_mut(k);
                self.kv.from_refs(key, val);
            } else if v != val {
                self.builder.push_val_mut(v);
                val.clone_to(v);
            }
        } else {
            self.has_kv = true;
            self.kv.from_refs(key, val);
        }
        self.builder.push_time_diff(time, weight);
    }

    /// Adds tuple `(key, val, time, weight)` to the batch.
    pub fn push_vals(
        &mut self,
        key: &mut Output::Key,
        val: &mut Output::Val,
        time: &mut Output::Time,
        weight: &mut Output::R,
    ) {
        if self.has_kv {
            let (k, v) = self.kv.split_mut();
            if k != key {
                self.builder.push_val_mut(v);
                self.builder.push_key_mut(k);
                self.kv.from_vals(key, val);
            } else if v != val {
                self.builder.push_val_mut(v);
                val.move_to(v);
            }
        } else {
            self.has_kv = true;
            self.kv.from_vals(key, val);
        }
        self.builder.push_time_diff_mut(time, weight);
    }

    pub fn reserve(&mut self, additional: usize) {
        self.builder.reserve(additional)
    }

    /// Adds all of the tuples in `iter` to the batch.
    pub fn extend<'a, I>(&mut self, iter: I)
    where
        Output::Time: PartialEq<()>,
        I: Iterator<Item = &'a mut WeightedItem<Output::Key, Output::Val, Output::R>>,
    {
        let (lower, upper) = iter.size_hint();
        self.reserve(upper.unwrap_or(lower));

        for item in iter {
            let (kv, w) = item.split_mut();
            let (k, v) = kv.split_mut();

            self.push_vals(k, v, &mut Output::Time::default(), w);
        }
    }

    /// Completes building and returns the batch.
    pub fn done(mut self) -> Output {
        if self.has_kv {
            let (k, v) = self.kv.split_mut();
            self.builder.push_val_mut(v);
            self.builder.push_key_mut(k);
        }
        self.builder.done()
    }
}

/// Merges all of the batches in `batches`, applying `key_filter` and
/// `value_filter`, and returns the merged result.
///
/// The filters won't be applied to batches that don't get merged at all, that
/// is, if `batches` contains only one non-empty batch, or if it contains two
/// small batches that merge to become an empty batch alongside a third larger
/// batch, etc.
pub fn merge_batches<B, T>(
    factories: &B::Factories,
    batches: T,
    key_filter: &Option<Filter<B::Key>>,
    value_filter: &Option<GroupFilter<B::Val>>,
) -> B
where
    T: IntoIterator<Item = B>,
    B: Batch,
{
    // Collect input batches, discarding empty batches.
    let mut batches = batches
        .into_iter()
        .filter(|b| !b.is_empty())
        .collect::<Vec<_>>();

    // Merge groups of up to 64 input batches to one output batch each.
    //
    // In practice, there are <= 64 input batches and 1 output batch (or 0 if
    // the inputs cancel each other out).
    while batches.len() > 1 {
        let mut inputs = batches.split_off(batches.len().saturating_sub(64));
        let result: B = ListMerger::merge(
            factories,
            B::Builder::for_merge(factories, &inputs, Some(BatchLocation::Memory)),
            inputs
                .iter_mut()
                .map(|b| b.consuming_cursor(key_filter.clone(), value_filter.clone()))
                .collect(),
        );
        if !result.is_empty() {
            batches.push(result);
        }
    }

    // Take the final output batch, or synthesize an empty one if all the
    // batches added up to nothing.
    batches.pop().unwrap_or_else(|| B::dyn_empty(factories))
}

/// Merges all of the batches in `batches`, applying `key_filter` and
/// `value_filter`, and returns the merged result.
///
/// Every tuple will be passed through the filters.
pub fn merge_batches_by_reference<'a, B, T>(
    factories: &B::Factories,
    batches: T,
    key_filter: &Option<Filter<B::Key>>,
    value_filter: &Option<GroupFilter<B::Val>>,
) -> B
where
    T: IntoIterator<Item = &'a B>,
    B: Batch,
{
    // Collect input batches, discarding empty batches.
    let mut batches = batches
        .into_iter()
        .filter(|b| !b.is_empty())
        .collect::<Vec<_>>();

    // Merge groups of up to 64 input batches to one output batch each. This
    // also transforms `&B` in `batches` into `B` in `outputs`.
    //
    // In practice, there are <= 64 input batches and 1 output batch (or 0 if
    // the inputs cancel each other out).
    let mut outputs = Vec::with_capacity(batches.len().div_ceil(64));
    while !batches.is_empty() {
        let inputs = batches.split_off(batches.len().saturating_sub(64));
        let result: B = ListMerger::merge(
            factories,
            B::Builder::for_merge(
                factories,
                inputs.iter().cloned(),
                Some(BatchLocation::Memory),
            ),
            inputs
                .into_iter()
                .map(|b| b.merge_cursor(key_filter.clone(), value_filter.clone()))
                .collect(),
        );
        if !result.is_empty() {
            outputs.push(result);
        }
    }

    // Merge the output batches (in practice, either 0 or 1 of them).
    merge_batches(factories, outputs, key_filter, value_filter)
}

/// Compares two batches for equality.  This works regardless of whether the
/// batches are the same type, as long as their key, value, and weight types can
/// be compared for equality.
///
/// This can't be implemented as `PartialEq` because that is specialized for
/// comparing particular batch types (often in faster ways than this generic
/// function).  This function is mainly useful for testing in any case.
pub fn eq_batch<A, B, KA, VA, RA, KB, VB, RB>(a: &A, b: &B) -> bool
where
    A: BatchReader<Key = KA, Val = VA, Time = (), R = RA>,
    B: BatchReader<Key = KB, Val = VB, Time = (), R = RB>,
    KA: PartialEq<KB> + ?Sized,
    VA: PartialEq<VB> + ?Sized,
    RA: PartialEq<RB> + ?Sized,
    KB: ?Sized,
    VB: ?Sized,
    RB: ?Sized,
{
    let mut c1 = a.cursor();
    let mut c2 = b.cursor();
    while c1.key_valid() && c2.key_valid() {
        if c1.key() != c2.key() {
            return false;
        }
        while c1.val_valid() && c2.val_valid() {
            if c1.val() != c2.val() || c1.weight() != c2.weight() {
                return false;
            }
            c1.step_val();
            c2.step_val();
        }
        if c1.val_valid() || c2.val_valid() {
            return false;
        }
        c1.step_key();
        c2.step_key();
    }
    !c1.key_valid() && !c2.key_valid()
}

fn serialize_wset<B, K, R>(batch: &B) -> Vec<u8>
where
    B: BatchReader<Key = K, Val = DynUnit, Time = (), R = R>,
    K: DataTrait + ?Sized,
    R: WeightTrait + ?Sized,
{
    SerializerInner::to_fbuf_with_thread_local(|s| {
        let mut offsets = Vec::with_capacity(2 * batch.len());
        let mut cursor = batch.cursor();
        while cursor.key_valid() {
            offsets.push(cursor.key().serialize(s)?);
            offsets.push(cursor.weight().serialize(s)?);
            cursor.step_key();
        }
        s.serialize_value(&offsets)
    })
    .into_vec()
}

fn deserialize_wset<B, K, R>(factories: &B::Factories, data: &[u8]) -> B
where
    B: Batch<Key = K, Val = DynUnit, Time = (), R = R>,
    K: DataTrait + ?Sized,
    R: WeightTrait + ?Sized,
{
    let offsets = unsafe { archived_root::<Vec<usize>>(data) };
    assert!(offsets.len() % 2 == 0);
    let n = offsets.len() / 2;
    let mut builder = B::Builder::with_capacity(factories, n, n);
    let mut key = factories.key_factory().default_box();
    let mut diff = factories.weight_factory().default_box();
    for i in 0..n {
        unsafe { key.deserialize_from_bytes(data, offsets[i * 2] as usize) };
        unsafe { diff.deserialize_from_bytes(data, offsets[i * 2 + 1] as usize) };
        builder.push_val_diff(&(), &diff);
        builder.push_key(&key);
    }
    builder.done()
}

/// Separator that identifies the end of values for a key.
const SEPARATOR: u64 = u64::MAX;

#[cfg(debug_assertions)]
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
enum State {
    Key,
    Val,
    Diff,
}

pub struct IndexedWSetSerializer {
    fbuf: FBuf,
    offsets: Vec<usize>,
    n_keys: usize,
    n_values: usize,
    #[cfg(debug_assertions)]
    state: State,
}

impl IndexedWSetSerializer {
    pub fn with_capacity(estimated_keys: usize, estimated_values: usize) -> Self {
        let mut offsets = Vec::with_capacity(2 + 2 * estimated_keys + 2 * estimated_values);
        offsets.push(0);
        offsets.push(0);
        Self {
            fbuf: FBuf::new(),
            offsets,
            n_keys: 0,
            n_values: 0,
            #[cfg(debug_assertions)]
            state: State::Key,
        }
    }

    pub fn push_diff<R: WeightTrait + ?Sized>(
        &mut self,
        weight: &R,
        serializer_inner: &mut SerializerInner,
    ) {
        #[cfg(debug_assertions)]
        {
            debug_assert_ne!(self.state, State::Diff);
            self.state = State::Diff;
        }

        serializer_inner.with(FBufSerializer::new(&mut self.fbuf), |s| {
            self.offsets.push(weight.serialize(s).unwrap())
        });
    }

    pub fn push_val<V: DataTrait + ?Sized>(
        &mut self,
        val: &V,
        serializer_inner: &mut SerializerInner,
    ) {
        #[cfg(debug_assertions)]
        {
            debug_assert_eq!(self.state, State::Diff);
            self.state = State::Val;
        }

        self.n_values += 1;
        serializer_inner.with(FBufSerializer::new(&mut self.fbuf), |s| {
            self.offsets.push(val.serialize(s).unwrap())
        });
    }

    pub fn push_key<K: DataTrait + ?Sized>(
        &mut self,
        key: &K,
        serializer_inner: &mut SerializerInner,
    ) {
        #[cfg(debug_assertions)]
        {
            debug_assert_eq!(self.state, State::Val);
            self.state = State::Key;
        }

        self.offsets.push(SEPARATOR as usize);
        self.n_keys += 1;
        serializer_inner.with(FBufSerializer::new(&mut self.fbuf), |s| {
            self.offsets.push(key.serialize(s).unwrap())
        });
    }

    pub fn done(mut self, serializer_inner: &mut SerializerInner) -> FBuf {
        #[cfg(debug_assertions)]
        debug_assert_eq!(self.state, State::Key);
        self.offsets[0] = self.n_keys;
        self.offsets[1] = self.n_values;
        serializer_inner.with(FBufSerializer::new(&mut self.fbuf), |s| {
            s.serialize_value(&self.offsets).unwrap()
        });
        self.fbuf
    }
}

pub fn serialize_indexed_wset<B, K, V, R>(batch: &B, serializer_inner: &mut SerializerInner) -> FBuf
where
    B: BatchReader<Key = K, Val = V, Time = (), R = R>,
    K: DataTrait + ?Sized,
    V: DataTrait + ?Sized,
    R: WeightTrait + ?Sized,
{
    let mut serializer = IndexedWSetSerializer::with_capacity(batch.key_count(), batch.len());
    let mut cursor = batch.cursor();

    while cursor.key_valid() {
        while cursor.val_valid() {
            serializer.push_diff(cursor.weight(), serializer_inner);
            serializer.push_val(cursor.val(), serializer_inner);
            cursor.step_val();
        }
        serializer.push_key(cursor.key(), serializer_inner);
        cursor.step_key();
    }
    serializer.done(serializer_inner)
}

pub fn deserialize_indexed_wset<B, K, V, R>(factories: &B::Factories, data: &[u8]) -> B
where
    B: Batch<Key = K, Val = V, Time = (), R = R>,
    K: DataTrait + ?Sized,
    V: DataTrait + ?Sized,
    R: WeightTrait + ?Sized,
{
    let offsets = unsafe { archived_root::<Vec<usize>>(data) };
    let n_keys = offsets[0] as usize;
    let n_values = offsets[1] as usize;

    let mut builder = B::Builder::with_capacity(factories, n_keys, n_values);
    let mut key = factories.key_factory().default_box();
    let mut val = factories.val_factory().default_box();
    let mut diff = factories.weight_factory().default_box();

    let mut current_offset = 2;

    while current_offset < offsets.len() {
        while offsets[current_offset] != SEPARATOR {
            unsafe { diff.deserialize_from_bytes(data, offsets[current_offset] as usize) };
            current_offset += 1;
            unsafe { val.deserialize_from_bytes(data, offsets[current_offset] as usize) };
            current_offset += 1;

            builder.push_val_diff(&val, &diff);
        }
        current_offset += 1;

        unsafe { key.deserialize_from_bytes(data, offsets[current_offset] as usize) };
        current_offset += 1;

        builder.push_key(&key);
    }
    builder.done()
}

#[cfg(test)]
mod serialize_test {
    use crate::{
        DynZWeight, OrdIndexedZSet,
        algebra::OrdIndexedZSet as DynOrdIndexedZSet,
        dynamic::DynData,
        indexed_zset,
        storage::file::SerializerInner,
        trace::{BatchReader, deserialize_indexed_wset, serialize_indexed_wset},
    };

    #[test]
    fn test_serialize_indexed_wset() {
        let test1: OrdIndexedZSet<u64, u64> = indexed_zset! {};
        let test2 = indexed_zset! { 1 => { 1 => 1 } };
        let test3 =
            indexed_zset! { 1 => { 1 => 1, 2 => 2, 3 => 3 }, 2 => { 1 => 1, 2 => 2, 3 => 3 } };

        for test in [test1, test2, test3] {
            let serialized = serialize_indexed_wset(&*test, &mut SerializerInner::new());
            let deserialized = deserialize_indexed_wset::<
                DynOrdIndexedZSet<DynData, DynData>,
                DynData,
                DynData,
                DynZWeight,
            >(&test.factories(), &serialized);

            assert_eq!(&*test, &deserialized);
        }
    }

    #[test]
    fn test_serialize_indexed_wset_tup0_key() {
        let test1: OrdIndexedZSet<(), u64> = indexed_zset! {};
        let test2 = indexed_zset! { () => { 1 => 1 } };

        for test in [test1, test2] {
            let serialized = serialize_indexed_wset(&*test, &mut SerializerInner::new());
            let deserialized = deserialize_indexed_wset::<
                DynOrdIndexedZSet<DynData, DynData>,
                DynData,
                DynData,
                DynZWeight,
            >(&test.factories(), &serialized);

            assert_eq!(&*test, &deserialized);
        }
    }

    #[test]
    fn test_serialize_indexed_wset_tup0_val() {
        let test1: OrdIndexedZSet<u64, ()> = indexed_zset! {};
        let test2 = indexed_zset! { 1 => { () => 1 } };
        let test3 = indexed_zset! { 1 => { () => 1 }, 2 => { () => 1 } };

        for test in [test1, test2, test3] {
            let serialized = serialize_indexed_wset(&*test, &mut SerializerInner::new());
            let deserialized = deserialize_indexed_wset::<
                DynOrdIndexedZSet<DynData, DynData>,
                DynData,
                DynData,
                DynZWeight,
            >(&test.factories(), &serialized);

            assert_eq!(&*test, &deserialized);
        }
    }
}