1use std::collections::HashSet;
55use std::ptr;
56use std::sync::atomic::{AtomicPtr, AtomicU8, AtomicU64, AtomicUsize, Ordering};
57
58use dashmap::DashMap;
59use parking_lot::Mutex;
60
61use sochdb_core::{Result, SochDBError};
62
63const HP_PER_THREAD: usize = 2;
65
66const MAX_THREADS: usize = 128;
68
69const RECLAMATION_THRESHOLD: usize = 64;
71
72const FAT_NODE_SLOTS: usize = 8;
77
78pub const INLINE_VALUE_SIZE: usize = 56;
84
85#[repr(C)]
100pub enum ValueStorage {
101 Inline {
103 len: u8,
104 data: [u8; INLINE_VALUE_SIZE],
105 },
106 Heap(Box<[u8]>),
108 Tombstone,
110}
111
112impl std::fmt::Debug for ValueStorage {
113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 match self {
115 ValueStorage::Inline { len, .. } => write!(f, "Inline(len={})", len),
116 ValueStorage::Heap(data) => write!(f, "Heap(len={})", data.len()),
117 ValueStorage::Tombstone => write!(f, "Tombstone"),
118 }
119 }
120}
121
122impl ValueStorage {
123 #[inline]
125 pub fn new(value: Option<&[u8]>) -> Self {
126 match value {
127 None => ValueStorage::Tombstone,
128 Some(v) if v.len() <= INLINE_VALUE_SIZE => {
129 let mut data = [0u8; INLINE_VALUE_SIZE];
130 data[..v.len()].copy_from_slice(v);
131 ValueStorage::Inline {
132 len: v.len() as u8,
133 data,
134 }
135 }
136 Some(v) => ValueStorage::Heap(v.to_vec().into_boxed_slice()),
137 }
138 }
139
140 #[inline]
142 pub fn as_bytes(&self) -> Option<&[u8]> {
143 match self {
144 ValueStorage::Inline { len, data } => Some(&data[..*len as usize]),
145 ValueStorage::Heap(data) => Some(data),
146 ValueStorage::Tombstone => None,
147 }
148 }
149
150 #[inline]
152 pub fn is_tombstone(&self) -> bool {
153 matches!(self, ValueStorage::Tombstone)
154 }
155
156 #[inline]
158 pub fn is_inline(&self) -> bool {
159 matches!(self, ValueStorage::Inline { .. })
160 }
161
162 #[inline]
164 pub fn len(&self) -> usize {
165 match self {
166 ValueStorage::Inline { len, .. } => *len as usize,
167 ValueStorage::Heap(data) => data.len(),
168 ValueStorage::Tombstone => 0,
169 }
170 }
171
172 #[inline]
174 pub fn is_empty(&self) -> bool {
175 self.len() == 0
176 }
177}
178
179#[derive(Debug)]
184pub struct LockFreeVersion {
185 pub storage: ValueStorage,
187 pub txn_id: u64,
189 pub commit_ts: AtomicU64,
191 pub next: AtomicPtr<LockFreeVersion>,
193}
194
195impl LockFreeVersion {
196 #[inline]
198 pub fn new_from_slice(value: Option<&[u8]>, txn_id: u64) -> Self {
199 Self {
200 storage: ValueStorage::new(value),
201 txn_id,
202 commit_ts: AtomicU64::new(0),
203 next: AtomicPtr::new(ptr::null_mut()),
204 }
205 }
206
207 pub fn new(value: Option<Vec<u8>>, txn_id: u64) -> Self {
209 Self::new_from_slice(value.as_deref(), txn_id)
210 }
211
212 #[inline]
214 pub fn get_value(&self) -> Option<&[u8]> {
215 self.storage.as_bytes()
216 }
217
218 #[inline]
222 pub fn value_cloned(&self) -> Option<Vec<u8>> {
223 self.storage.as_bytes().map(|v| v.to_vec())
224 }
225
226 #[inline]
228 pub fn is_committed(&self) -> bool {
229 self.commit_ts.load(Ordering::Acquire) > 0
230 }
231
232 #[inline]
234 pub fn get_commit_ts(&self) -> u64 {
235 self.commit_ts.load(Ordering::Acquire)
236 }
237
238 #[inline]
240 pub fn set_commit_ts(&self, ts: u64) {
241 self.commit_ts.store(ts, Ordering::Release);
242 }
243
244 #[inline]
246 pub fn is_inline(&self) -> bool {
247 self.storage.is_inline()
248 }
249}
250
251pub struct FatNode {
262 count: AtomicU8,
264 slots: [AtomicPtr<LockFreeVersion>; FAT_NODE_SLOTS],
266 next: AtomicPtr<FatNode>,
268}
269
270impl FatNode {
271 fn new_with_first(version: *mut LockFreeVersion, older: *mut FatNode) -> Self {
273 let slots = std::array::from_fn(|i| {
274 if i == 0 {
275 AtomicPtr::new(version)
276 } else {
277 AtomicPtr::new(ptr::null_mut())
278 }
279 });
280 Self {
281 count: AtomicU8::new(1),
282 slots,
283 next: AtomicPtr::new(older),
284 }
285 }
286
287 #[inline]
293 fn try_push(
294 &self,
295 version: *mut LockFreeVersion,
296 ) -> std::result::Result<(), *mut LockFreeVersion> {
297 loop {
298 let c = self.count.load(Ordering::Acquire);
299 if c as usize >= FAT_NODE_SLOTS {
300 return Err(version); }
302 match self
304 .count
305 .compare_exchange(c, c + 1, Ordering::AcqRel, Ordering::Acquire)
306 {
307 Ok(_) => {
308 self.slots[c as usize].store(version, Ordering::Release);
310 return Ok(());
311 }
312 Err(_) => continue, }
314 }
315 }
316
317 #[inline]
319 fn slot(&self, idx: u8) -> *mut LockFreeVersion {
320 self.slots[idx as usize].load(Ordering::Acquire)
321 }
322
323 #[inline]
325 fn iter_newest_first(&self) -> impl Iterator<Item = &LockFreeVersion> {
326 let count = self.count.load(Ordering::Acquire);
327 (0..count).rev().filter_map(move |i| {
328 let ptr = self.slots[i as usize].load(Ordering::Acquire);
329 if ptr.is_null() {
330 None
331 } else {
332 Some(unsafe { &*ptr })
333 }
334 })
335 }
336}
337
338pub struct LockFreeVersionChain {
344 head: AtomicPtr<FatNode>,
346}
347
348impl Default for LockFreeVersionChain {
349 fn default() -> Self {
350 Self::new()
351 }
352}
353
354impl LockFreeVersionChain {
355 pub fn new() -> Self {
357 Self {
358 head: AtomicPtr::new(ptr::null_mut()),
359 }
360 }
361
362 pub fn add_uncommitted(&self, value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
366 let new_version = Box::into_raw(Box::new(LockFreeVersion::new(value, txn_id)));
367
368 loop {
369 let head = self.head.load(Ordering::Acquire);
370
371 if !head.is_null() {
373 let fat = unsafe { &*head };
374 let count = fat.count.load(Ordering::Acquire);
375 if count > 0 {
376 let newest = fat.slot(count - 1);
377 if !newest.is_null() {
378 let newest_ref = unsafe { &*newest };
379 if !newest_ref.is_committed() && newest_ref.txn_id != txn_id {
380 unsafe {
381 drop(Box::from_raw(new_version));
382 }
383 return Err(SochDBError::Internal("Write-write conflict".into()));
384 }
385 }
386 }
387
388 match fat.try_push(new_version) {
390 Ok(()) => return Ok(()),
391 Err(_) => {
392 let new_fat =
394 Box::into_raw(Box::new(FatNode::new_with_first(new_version, head)));
395 match self.head.compare_exchange(
396 head,
397 new_fat,
398 Ordering::AcqRel,
399 Ordering::Acquire,
400 ) {
401 Ok(_) => return Ok(()),
402 Err(_) => {
403 unsafe {
405 (*new_fat).slots[0].store(ptr::null_mut(), Ordering::Relaxed);
407 (*new_fat).count.store(0, Ordering::Relaxed);
408 drop(Box::from_raw(new_fat));
409 }
410 continue; }
412 }
413 }
414 }
415 } else {
416 let new_fat = Box::into_raw(Box::new(FatNode::new_with_first(
418 new_version,
419 ptr::null_mut(),
420 )));
421 match self
422 .head
423 .compare_exchange(head, new_fat, Ordering::AcqRel, Ordering::Acquire)
424 {
425 Ok(_) => return Ok(()),
426 Err(_) => {
427 unsafe {
428 (*new_fat).slots[0].store(ptr::null_mut(), Ordering::Relaxed);
429 (*new_fat).count.store(0, Ordering::Relaxed);
430 drop(Box::from_raw(new_fat));
431 }
432 continue;
433 }
434 }
435 }
436 }
437 }
438
439 pub fn commit(&self, txn_id: u64, commit_ts: u64) -> bool {
441 let mut fat_ptr = self.head.load(Ordering::Acquire);
442
443 while !fat_ptr.is_null() {
444 let fat = unsafe { &*fat_ptr };
445 for ver in fat.iter_newest_first() {
447 if ver.txn_id == txn_id && !ver.is_committed() {
448 ver.set_commit_ts(commit_ts);
449 return true;
450 }
451 }
452 fat_ptr = fat.next.load(Ordering::Acquire);
453 }
454
455 false
456 }
457
458 pub fn read_at(
463 &self,
464 snapshot_ts: u64,
465 current_txn_id: Option<u64>,
466 ) -> Option<&LockFreeVersion> {
467 let mut fat_ptr = self.head.load(Ordering::Acquire);
468
469 while !fat_ptr.is_null() {
470 let fat = unsafe { &*fat_ptr };
471 for version in fat.iter_newest_first() {
473 if let Some(txn_id) = current_txn_id
475 && version.txn_id == txn_id
476 && !version.is_committed()
477 {
478 return Some(version);
479 }
480
481 let commit_ts = version.get_commit_ts();
483 if commit_ts > 0 && commit_ts < snapshot_ts {
484 return Some(version);
485 }
486 }
487 fat_ptr = fat.next.load(Ordering::Acquire);
488 }
489
490 None
491 }
492
493 pub fn has_write_conflict(&self, my_txn_id: u64) -> bool {
495 let head = self.head.load(Ordering::Acquire);
496
497 if !head.is_null() {
498 let fat = unsafe { &*head };
499 let count = fat.count.load(Ordering::Acquire);
500 if count > 0 {
501 let newest = fat.slot(count - 1);
502 if !newest.is_null() {
503 let version = unsafe { &*newest };
504 return !version.is_committed() && version.txn_id != my_txn_id;
505 }
506 }
507 }
508
509 false
510 }
511}
512
513#[repr(C, align(64))]
517struct HazardRecord {
518 hazard: [AtomicPtr<LockFreeVersion>; HP_PER_THREAD],
520 active: AtomicU64,
522}
523
524impl HazardRecord {
525 const fn new() -> Self {
526 Self {
527 hazard: [
528 AtomicPtr::new(ptr::null_mut()),
529 AtomicPtr::new(ptr::null_mut()),
530 ],
531 active: AtomicU64::new(0),
532 }
533 }
534
535 fn try_acquire(&self, thread_id: u64) -> bool {
537 self.active
538 .compare_exchange(0, thread_id, Ordering::AcqRel, Ordering::Acquire)
539 .is_ok()
540 }
541
542 #[allow(dead_code)]
544 fn release(&self) {
545 for hp in &self.hazard {
547 hp.store(ptr::null_mut(), Ordering::Release);
548 }
549 self.active.store(0, Ordering::Release);
550 }
551}
552
553pub struct HazardDomain {
555 records: Vec<HazardRecord>,
557 retired: Mutex<Vec<*mut LockFreeVersion>>,
559}
560
561impl HazardDomain {
562 pub fn new(max_threads: usize) -> Self {
564 let mut records = Vec::with_capacity(max_threads);
565 for _ in 0..max_threads {
566 records.push(HazardRecord::new());
567 }
568
569 Self {
570 records,
571 retired: Mutex::new(Vec::with_capacity(RECLAMATION_THRESHOLD * 2)),
572 }
573 }
574
575 fn get_record(&self) -> Option<&HazardRecord> {
577 let thread_id = thread_id::get() as u64;
578
579 for record in &self.records {
581 if record.active.load(Ordering::Acquire) == thread_id {
582 return Some(record);
583 }
584 }
585
586 self.records
588 .iter()
589 .find(|record| record.try_acquire(thread_id))
590 }
591
592 #[inline]
594 pub fn protect(&self, ptr: *mut LockFreeVersion, slot: usize) -> bool {
595 if let Some(record) = self.get_record()
596 && slot < HP_PER_THREAD
597 {
598 record.hazard[slot].store(ptr, Ordering::Release);
599 std::sync::atomic::fence(Ordering::SeqCst);
600 return true;
601 }
602 false
603 }
604
605 #[inline]
607 pub fn clear(&self, slot: usize) {
608 if let Some(record) = self.get_record()
609 && slot < HP_PER_THREAD
610 {
611 record.hazard[slot].store(ptr::null_mut(), Ordering::Release);
612 }
613 }
614
615 pub fn retire(&self, ptr: *mut LockFreeVersion) {
617 let mut retired = self.retired.lock();
618 retired.push(ptr);
619
620 if retired.len() >= RECLAMATION_THRESHOLD {
622 self.try_reclaim(&mut retired);
623 }
624 }
625
626 fn try_reclaim(&self, retired: &mut Vec<*mut LockFreeVersion>) {
628 let mut protected: HashSet<usize> = HashSet::new();
630
631 for record in &self.records {
632 if record.active.load(Ordering::Acquire) != 0 {
633 for hp in &record.hazard {
634 let ptr = hp.load(Ordering::Acquire);
635 if !ptr.is_null() {
636 protected.insert(ptr as usize);
637 }
638 }
639 }
640 }
641
642 let mut still_retired = Vec::new();
644 for ptr in retired.drain(..) {
645 if protected.contains(&(ptr as usize)) {
646 still_retired.push(ptr);
647 } else {
648 unsafe {
650 drop(Box::from_raw(ptr));
651 }
652 }
653 }
654
655 *retired = still_retired;
656 }
657}
658
659impl Drop for HazardDomain {
660 fn drop(&mut self) {
661 let mut retired = self.retired.lock();
663 for ptr in retired.drain(..) {
664 unsafe {
665 drop(Box::from_raw(ptr));
666 }
667 }
668 }
669}
670
671mod thread_id {
673 use std::sync::atomic::{AtomicUsize, Ordering};
674
675 static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
676
677 thread_local! {
678 static THREAD_ID: usize = NEXT_ID.fetch_add(1, Ordering::Relaxed);
679 }
680
681 pub fn get() -> usize {
682 THREAD_ID.with(|id| *id)
683 }
684}
685
686pub struct LockFreeMemTable {
688 data: DashMap<Vec<u8>, LockFreeVersionChain>,
690 hazard_domain: HazardDomain,
692 size_bytes: AtomicUsize,
694}
695
696impl LockFreeMemTable {
697 pub fn new() -> Self {
699 Self {
700 data: DashMap::new(),
701 hazard_domain: HazardDomain::new(MAX_THREADS),
702 size_bytes: AtomicUsize::new(0),
703 }
704 }
705
706 pub fn read(&self, key: &[u8], snapshot_ts: u64, txn_id: Option<u64>) -> Option<Vec<u8>> {
711 let chain = self.data.get(key)?;
712
713 if let Some(version) = chain.read_at(snapshot_ts, txn_id) {
715 let ptr = version as *const LockFreeVersion as *mut LockFreeVersion;
717 self.hazard_domain.protect(ptr, 0);
718
719 let result = version.value_cloned();
722
723 self.hazard_domain.clear(0);
725
726 result
727 } else {
728 None
729 }
730 }
731
732 #[inline]
746 pub fn read_with<F, R>(
747 &self,
748 key: &[u8],
749 snapshot_ts: u64,
750 txn_id: Option<u64>,
751 f: F,
752 ) -> Option<R>
753 where
754 F: FnOnce(&[u8]) -> R,
755 {
756 let chain = self.data.get(key)?;
757
758 if let Some(version) = chain.read_at(snapshot_ts, txn_id) {
759 let ptr = version as *const LockFreeVersion as *mut LockFreeVersion;
761 self.hazard_domain.protect(ptr, 0);
762
763 let result = version.get_value().map(f);
765
766 self.hazard_domain.clear(0);
768
769 result
770 } else {
771 None
772 }
773 }
774
775 pub fn write(&self, key: Vec<u8>, value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
777 let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
778
779 let chain = self.data.entry(key.clone()).or_default();
781
782 chain.add_uncommitted(value, txn_id)?;
784
785 self.size_bytes
787 .fetch_add(key.len() + value_size + 64, Ordering::Relaxed);
788
789 Ok(())
790 }
791
792 pub fn commit(&self, txn_id: u64, commit_ts: u64, keys: &[Vec<u8>]) {
794 for key in keys {
795 if let Some(chain) = self.data.get(key) {
796 chain.commit(txn_id, commit_ts);
797 }
798 }
799 }
800
801 pub fn has_write_conflict(&self, key: &[u8], txn_id: u64) -> bool {
803 if let Some(chain) = self.data.get(key) {
804 chain.has_write_conflict(txn_id)
805 } else {
806 false
807 }
808 }
809
810 pub fn size_bytes(&self) -> usize {
812 self.size_bytes.load(Ordering::Relaxed)
813 }
814
815 pub fn len(&self) -> usize {
817 self.data.len()
818 }
819
820 pub fn is_empty(&self) -> bool {
822 self.data.is_empty()
823 }
824}
825
826unsafe impl Send for LockFreeMemTable {}
830unsafe impl Sync for LockFreeMemTable {}
831
832impl Default for LockFreeMemTable {
833 fn default() -> Self {
834 Self::new()
835 }
836}
837
838#[cfg(test)]
839mod tests {
840 use super::*;
841 use std::sync::Arc;
842 use std::thread;
843
844 #[test]
845 fn test_basic_write_read() {
846 let memtable = LockFreeMemTable::new();
847
848 memtable
850 .write(b"key1".to_vec(), Some(b"value1".to_vec()), 1)
851 .unwrap();
852
853 let val = memtable.read(b"key1", 100, Some(1));
855 assert_eq!(val, Some(b"value1".to_vec()));
856
857 let val = memtable.read(b"key1", 100, Some(2));
859 assert!(val.is_none());
860
861 memtable.commit(1, 50, &[b"key1".to_vec()]);
863 let val = memtable.read(b"key1", 100, None);
864 assert_eq!(val, Some(b"value1".to_vec()));
865 }
866
867 #[test]
868 fn test_snapshot_isolation() {
869 let memtable = LockFreeMemTable::new();
870
871 memtable
873 .write(b"key".to_vec(), Some(b"v1".to_vec()), 1)
874 .unwrap();
875 memtable.commit(1, 10, &[b"key".to_vec()]);
876
877 memtable
879 .write(b"key".to_vec(), Some(b"v2".to_vec()), 2)
880 .unwrap();
881 memtable.commit(2, 20, &[b"key".to_vec()]);
882
883 assert_eq!(memtable.read(b"key", 15, None), Some(b"v1".to_vec()));
885
886 assert_eq!(memtable.read(b"key", 25, None), Some(b"v2".to_vec()));
888 }
889
890 #[test]
891 fn test_write_conflict() {
892 let memtable = LockFreeMemTable::new();
893
894 memtable
896 .write(b"key".to_vec(), Some(b"v1".to_vec()), 1)
897 .unwrap();
898
899 let result = memtable.write(b"key".to_vec(), Some(b"v2".to_vec()), 2);
901 assert!(result.is_err());
902
903 let result = memtable.write(b"key".to_vec(), Some(b"v1_updated".to_vec()), 1);
905 assert!(result.is_ok());
906 }
907
908 #[test]
909 fn test_concurrent_reads() {
910 let memtable = Arc::new(LockFreeMemTable::new());
911
912 for i in 0..100 {
914 let key = format!("key{}", i).into_bytes();
915 let val = format!("value{}", i).into_bytes();
916 memtable.write(key.clone(), Some(val), 1).unwrap();
917 }
918 memtable.commit(
919 1,
920 10,
921 &(0..100)
922 .map(|i| format!("key{}", i).into_bytes())
923 .collect::<Vec<_>>(),
924 );
925
926 let handles: Vec<_> = (0..8)
928 .map(|t| {
929 let mt = Arc::clone(&memtable);
930 thread::spawn(move || {
931 for i in 0..100 {
932 let key = format!("key{}", i).into_bytes();
933 let expected = format!("value{}", i).into_bytes();
934 let val = mt.read(&key, 100, None);
935 assert_eq!(val, Some(expected), "Thread {} failed at key{}", t, i);
936 }
937 })
938 })
939 .collect();
940
941 for h in handles {
942 h.join().unwrap();
943 }
944 }
945
946 #[test]
947 fn test_inline_storage() {
948 let small_value = b"small".to_vec();
950 let version = LockFreeVersion::new(Some(small_value.clone()), 1);
951 assert!(version.is_inline(), "Small values should be inline");
952 assert_eq!(version.get_value(), Some(small_value.as_slice()));
953
954 let large_value = vec![42u8; 100]; let version = LockFreeVersion::new(Some(large_value.clone()), 2);
957 assert!(!version.is_inline(), "Large values should be on heap");
958 assert_eq!(version.get_value(), Some(large_value.as_slice()));
959
960 let version = LockFreeVersion::new(None, 3);
962 assert!(version.storage.is_tombstone());
963 assert_eq!(version.get_value(), None);
964 }
965
966 #[test]
967 fn test_inline_threshold() {
968 let value = vec![0u8; INLINE_VALUE_SIZE];
970 let version = LockFreeVersion::new(Some(value.clone()), 1);
971 assert!(version.is_inline(), "Values at threshold should be inline");
972
973 let value = vec![0u8; INLINE_VALUE_SIZE + 1];
975 let version = LockFreeVersion::new(Some(value), 2);
976 assert!(
977 !version.is_inline(),
978 "Values over threshold should be on heap"
979 );
980 }
981
982 #[test]
983 fn test_read_with_callback() {
984 let memtable = LockFreeMemTable::new();
985
986 memtable
987 .write(b"key1".to_vec(), Some(b"value1".to_vec()), 1)
988 .unwrap();
989 memtable.commit(1, 10, &[b"key1".to_vec()]);
990
991 let len = memtable.read_with(b"key1", 100, None, |v| v.len());
993 assert_eq!(len, Some(6)); let matches = memtable.read_with(b"key1", 100, None, |v| v == b"value1");
997 assert_eq!(matches, Some(true));
998 }
999
1000 #[test]
1001 fn test_fat_node_overflow() {
1002 let memtable = LockFreeMemTable::new();
1005
1006 for i in 0..12u64 {
1007 memtable
1009 .write(b"key".to_vec(), Some(format!("v{}", i).into_bytes()), i + 1)
1010 .unwrap();
1011 memtable.commit(i + 1, (i + 1) * 10, &[b"key".to_vec()]);
1012 }
1013
1014 let val = memtable.read(b"key", 200, None);
1016 assert_eq!(val, Some(b"v11".to_vec()));
1017
1018 let val = memtable.read(b"key", 55, None);
1020 assert_eq!(val, Some(b"v4".to_vec()));
1021
1022 let val = memtable.read(b"key", 5, None);
1024 assert_eq!(val, None);
1025 }
1026
1027 #[test]
1028 fn test_fat_node_concurrent_writes() {
1029 use std::sync::Arc;
1030 use std::thread;
1031
1032 let memtable = Arc::new(LockFreeMemTable::new());
1033
1034 let mut handles = Vec::new();
1036 for t in 0..4u64 {
1037 let mt = Arc::clone(&memtable);
1038 handles.push(thread::spawn(move || {
1039 for i in 0..20u64 {
1040 let key = format!("k{}-{}", t, i).into_bytes();
1041 let val = format!("v{}-{}", t, i).into_bytes();
1042 let txn_id = t * 1000 + i + 1;
1043 mt.write(key.clone(), Some(val), txn_id).unwrap();
1044 mt.commit(txn_id, txn_id * 10, &[key]);
1045 }
1046 }));
1047 }
1048
1049 for h in handles {
1050 h.join().unwrap();
1051 }
1052
1053 for t in 0..4u64 {
1055 for i in 0..20u64 {
1056 let key = format!("k{}-{}", t, i).into_bytes();
1057 let val = memtable.read(&key, u64::MAX, None);
1058 assert_eq!(
1059 val,
1060 Some(format!("v{}-{}", t, i).into_bytes()),
1061 "Missing key k{}-{}",
1062 t,
1063 i
1064 );
1065 }
1066 }
1067 }
1068}