1use std::collections::HashSet;
55use std::ptr;
56use std::sync::atomic::{AtomicPtr, AtomicU8, AtomicU64, AtomicUsize, Ordering};
57
58use dashmap::DashMap;
59use parking_lot::Mutex;
60
61use sochdb_core::{Result, SochDBError};
62
63const HP_PER_THREAD: usize = 2;
65
66const MAX_THREADS: usize = 128;
68
69const RECLAMATION_THRESHOLD: usize = 64;
71
72const FAT_NODE_SLOTS: usize = 8;
77
78pub const INLINE_VALUE_SIZE: usize = 56;
84
85#[repr(C)]
100pub enum ValueStorage {
101 Inline {
103 len: u8,
104 data: [u8; INLINE_VALUE_SIZE],
105 },
106 Heap(Box<[u8]>),
108 Tombstone,
110}
111
112impl std::fmt::Debug for ValueStorage {
113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 match self {
115 ValueStorage::Inline { len, .. } => write!(f, "Inline(len={})", len),
116 ValueStorage::Heap(data) => write!(f, "Heap(len={})", data.len()),
117 ValueStorage::Tombstone => write!(f, "Tombstone"),
118 }
119 }
120}
121
122impl ValueStorage {
123 #[inline]
125 pub fn new(value: Option<&[u8]>) -> Self {
126 match value {
127 None => ValueStorage::Tombstone,
128 Some(v) if v.len() <= INLINE_VALUE_SIZE => {
129 let mut data = [0u8; INLINE_VALUE_SIZE];
130 data[..v.len()].copy_from_slice(v);
131 ValueStorage::Inline {
132 len: v.len() as u8,
133 data,
134 }
135 }
136 Some(v) => ValueStorage::Heap(v.to_vec().into_boxed_slice()),
137 }
138 }
139
140 #[inline]
142 pub fn as_bytes(&self) -> Option<&[u8]> {
143 match self {
144 ValueStorage::Inline { len, data } => Some(&data[..*len as usize]),
145 ValueStorage::Heap(data) => Some(data),
146 ValueStorage::Tombstone => None,
147 }
148 }
149
150 #[inline]
152 pub fn is_tombstone(&self) -> bool {
153 matches!(self, ValueStorage::Tombstone)
154 }
155
156 #[inline]
158 pub fn is_inline(&self) -> bool {
159 matches!(self, ValueStorage::Inline { .. })
160 }
161
162 #[inline]
164 pub fn len(&self) -> usize {
165 match self {
166 ValueStorage::Inline { len, .. } => *len as usize,
167 ValueStorage::Heap(data) => data.len(),
168 ValueStorage::Tombstone => 0,
169 }
170 }
171
172 #[inline]
174 pub fn is_empty(&self) -> bool {
175 self.len() == 0
176 }
177}
178
179#[derive(Debug)]
184pub struct LockFreeVersion {
185 pub storage: ValueStorage,
187 pub txn_id: u64,
189 pub commit_ts: AtomicU64,
191 pub next: AtomicPtr<LockFreeVersion>,
193}
194
195impl LockFreeVersion {
196 #[inline]
198 pub fn new_from_slice(value: Option<&[u8]>, txn_id: u64) -> Self {
199 Self {
200 storage: ValueStorage::new(value),
201 txn_id,
202 commit_ts: AtomicU64::new(0),
203 next: AtomicPtr::new(ptr::null_mut()),
204 }
205 }
206
207 pub fn new(value: Option<Vec<u8>>, txn_id: u64) -> Self {
209 Self::new_from_slice(value.as_deref(), txn_id)
210 }
211
212 #[inline]
214 pub fn get_value(&self) -> Option<&[u8]> {
215 self.storage.as_bytes()
216 }
217
218 #[inline]
222 pub fn value_cloned(&self) -> Option<Vec<u8>> {
223 self.storage.as_bytes().map(|v| v.to_vec())
224 }
225
226 #[inline]
228 pub fn is_committed(&self) -> bool {
229 self.commit_ts.load(Ordering::Acquire) > 0
230 }
231
232 #[inline]
234 pub fn get_commit_ts(&self) -> u64 {
235 self.commit_ts.load(Ordering::Acquire)
236 }
237
238 #[inline]
240 pub fn set_commit_ts(&self, ts: u64) {
241 self.commit_ts.store(ts, Ordering::Release);
242 }
243
244 #[inline]
246 pub fn is_inline(&self) -> bool {
247 self.storage.is_inline()
248 }
249}
250
251pub struct FatNode {
262 count: AtomicU8,
264 slots: [AtomicPtr<LockFreeVersion>; FAT_NODE_SLOTS],
266 next: AtomicPtr<FatNode>,
268}
269
270impl FatNode {
271 fn new_with_first(version: *mut LockFreeVersion, older: *mut FatNode) -> Self {
273 let slots = std::array::from_fn(|i| {
274 if i == 0 {
275 AtomicPtr::new(version)
276 } else {
277 AtomicPtr::new(ptr::null_mut())
278 }
279 });
280 Self {
281 count: AtomicU8::new(1),
282 slots,
283 next: AtomicPtr::new(older),
284 }
285 }
286
287 #[inline]
293 fn try_push(&self, version: *mut LockFreeVersion) -> std::result::Result<(), *mut LockFreeVersion> {
294 loop {
295 let c = self.count.load(Ordering::Acquire);
296 if c as usize >= FAT_NODE_SLOTS {
297 return Err(version); }
299 match self.count.compare_exchange(c, c + 1, Ordering::AcqRel, Ordering::Acquire) {
301 Ok(_) => {
302 self.slots[c as usize].store(version, Ordering::Release);
304 return Ok(());
305 }
306 Err(_) => continue, }
308 }
309 }
310
311 #[inline]
313 fn slot(&self, idx: u8) -> *mut LockFreeVersion {
314 self.slots[idx as usize].load(Ordering::Acquire)
315 }
316
317 #[inline]
319 fn iter_newest_first(&self) -> impl Iterator<Item = &LockFreeVersion> {
320 let count = self.count.load(Ordering::Acquire);
321 (0..count).rev().filter_map(move |i| {
322 let ptr = self.slots[i as usize].load(Ordering::Acquire);
323 if ptr.is_null() { None } else { Some(unsafe { &*ptr }) }
324 })
325 }
326}
327
328pub struct LockFreeVersionChain {
334 head: AtomicPtr<FatNode>,
336}
337
338impl Default for LockFreeVersionChain {
339 fn default() -> Self {
340 Self::new()
341 }
342}
343
344impl LockFreeVersionChain {
345 pub fn new() -> Self {
347 Self {
348 head: AtomicPtr::new(ptr::null_mut()),
349 }
350 }
351
352 pub fn add_uncommitted(&self, value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
356 let new_version = Box::into_raw(Box::new(LockFreeVersion::new(value, txn_id)));
357
358 loop {
359 let head = self.head.load(Ordering::Acquire);
360
361 if !head.is_null() {
363 let fat = unsafe { &*head };
364 let count = fat.count.load(Ordering::Acquire);
365 if count > 0 {
366 let newest = fat.slot(count - 1);
367 if !newest.is_null() {
368 let newest_ref = unsafe { &*newest };
369 if !newest_ref.is_committed() && newest_ref.txn_id != txn_id {
370 unsafe { drop(Box::from_raw(new_version)); }
371 return Err(SochDBError::Internal("Write-write conflict".into()));
372 }
373 }
374 }
375
376 match fat.try_push(new_version) {
378 Ok(()) => return Ok(()),
379 Err(_) => {
380 let new_fat = Box::into_raw(Box::new(FatNode::new_with_first(new_version, head)));
382 match self.head.compare_exchange(head, new_fat, Ordering::AcqRel, Ordering::Acquire) {
383 Ok(_) => return Ok(()),
384 Err(_) => {
385 unsafe {
387 (*new_fat).slots[0].store(ptr::null_mut(), Ordering::Relaxed);
389 (*new_fat).count.store(0, Ordering::Relaxed);
390 drop(Box::from_raw(new_fat));
391 }
392 continue; }
394 }
395 }
396 }
397 } else {
398 let new_fat = Box::into_raw(Box::new(FatNode::new_with_first(new_version, ptr::null_mut())));
400 match self.head.compare_exchange(head, new_fat, Ordering::AcqRel, Ordering::Acquire) {
401 Ok(_) => return Ok(()),
402 Err(_) => {
403 unsafe {
404 (*new_fat).slots[0].store(ptr::null_mut(), Ordering::Relaxed);
405 (*new_fat).count.store(0, Ordering::Relaxed);
406 drop(Box::from_raw(new_fat));
407 }
408 continue;
409 }
410 }
411 }
412 }
413 }
414
415 pub fn commit(&self, txn_id: u64, commit_ts: u64) -> bool {
417 let mut fat_ptr = self.head.load(Ordering::Acquire);
418
419 while !fat_ptr.is_null() {
420 let fat = unsafe { &*fat_ptr };
421 for ver in fat.iter_newest_first() {
423 if ver.txn_id == txn_id && !ver.is_committed() {
424 ver.set_commit_ts(commit_ts);
425 return true;
426 }
427 }
428 fat_ptr = fat.next.load(Ordering::Acquire);
429 }
430
431 false
432 }
433
434 pub fn read_at(
439 &self,
440 snapshot_ts: u64,
441 current_txn_id: Option<u64>,
442 ) -> Option<&LockFreeVersion> {
443 let mut fat_ptr = self.head.load(Ordering::Acquire);
444
445 while !fat_ptr.is_null() {
446 let fat = unsafe { &*fat_ptr };
447 for version in fat.iter_newest_first() {
449 if let Some(txn_id) = current_txn_id
451 && version.txn_id == txn_id
452 && !version.is_committed()
453 {
454 return Some(version);
455 }
456
457 let commit_ts = version.get_commit_ts();
459 if commit_ts > 0 && commit_ts < snapshot_ts {
460 return Some(version);
461 }
462 }
463 fat_ptr = fat.next.load(Ordering::Acquire);
464 }
465
466 None
467 }
468
469 pub fn has_write_conflict(&self, my_txn_id: u64) -> bool {
471 let head = self.head.load(Ordering::Acquire);
472
473 if !head.is_null() {
474 let fat = unsafe { &*head };
475 let count = fat.count.load(Ordering::Acquire);
476 if count > 0 {
477 let newest = fat.slot(count - 1);
478 if !newest.is_null() {
479 let version = unsafe { &*newest };
480 return !version.is_committed() && version.txn_id != my_txn_id;
481 }
482 }
483 }
484
485 false
486 }
487}
488
489#[repr(C, align(64))]
493struct HazardRecord {
494 hazard: [AtomicPtr<LockFreeVersion>; HP_PER_THREAD],
496 active: AtomicU64,
498}
499
500impl HazardRecord {
501 const fn new() -> Self {
502 Self {
503 hazard: [
504 AtomicPtr::new(ptr::null_mut()),
505 AtomicPtr::new(ptr::null_mut()),
506 ],
507 active: AtomicU64::new(0),
508 }
509 }
510
511 fn try_acquire(&self, thread_id: u64) -> bool {
513 self.active
514 .compare_exchange(0, thread_id, Ordering::AcqRel, Ordering::Acquire)
515 .is_ok()
516 }
517
518 #[allow(dead_code)]
520 fn release(&self) {
521 for hp in &self.hazard {
523 hp.store(ptr::null_mut(), Ordering::Release);
524 }
525 self.active.store(0, Ordering::Release);
526 }
527}
528
529pub struct HazardDomain {
531 records: Vec<HazardRecord>,
533 retired: Mutex<Vec<*mut LockFreeVersion>>,
535}
536
537impl HazardDomain {
538 pub fn new(max_threads: usize) -> Self {
540 let mut records = Vec::with_capacity(max_threads);
541 for _ in 0..max_threads {
542 records.push(HazardRecord::new());
543 }
544
545 Self {
546 records,
547 retired: Mutex::new(Vec::with_capacity(RECLAMATION_THRESHOLD * 2)),
548 }
549 }
550
551 fn get_record(&self) -> Option<&HazardRecord> {
553 let thread_id = thread_id::get() as u64;
554
555 for record in &self.records {
557 if record.active.load(Ordering::Acquire) == thread_id {
558 return Some(record);
559 }
560 }
561
562 self.records
564 .iter()
565 .find(|record| record.try_acquire(thread_id))
566 }
567
568 #[inline]
570 pub fn protect(&self, ptr: *mut LockFreeVersion, slot: usize) -> bool {
571 if let Some(record) = self.get_record()
572 && slot < HP_PER_THREAD
573 {
574 record.hazard[slot].store(ptr, Ordering::Release);
575 std::sync::atomic::fence(Ordering::SeqCst);
576 return true;
577 }
578 false
579 }
580
581 #[inline]
583 pub fn clear(&self, slot: usize) {
584 if let Some(record) = self.get_record()
585 && slot < HP_PER_THREAD
586 {
587 record.hazard[slot].store(ptr::null_mut(), Ordering::Release);
588 }
589 }
590
591 pub fn retire(&self, ptr: *mut LockFreeVersion) {
593 let mut retired = self.retired.lock();
594 retired.push(ptr);
595
596 if retired.len() >= RECLAMATION_THRESHOLD {
598 self.try_reclaim(&mut retired);
599 }
600 }
601
602 fn try_reclaim(&self, retired: &mut Vec<*mut LockFreeVersion>) {
604 let mut protected: HashSet<usize> = HashSet::new();
606
607 for record in &self.records {
608 if record.active.load(Ordering::Acquire) != 0 {
609 for hp in &record.hazard {
610 let ptr = hp.load(Ordering::Acquire);
611 if !ptr.is_null() {
612 protected.insert(ptr as usize);
613 }
614 }
615 }
616 }
617
618 let mut still_retired = Vec::new();
620 for ptr in retired.drain(..) {
621 if protected.contains(&(ptr as usize)) {
622 still_retired.push(ptr);
623 } else {
624 unsafe {
626 drop(Box::from_raw(ptr));
627 }
628 }
629 }
630
631 *retired = still_retired;
632 }
633}
634
635impl Drop for HazardDomain {
636 fn drop(&mut self) {
637 let mut retired = self.retired.lock();
639 for ptr in retired.drain(..) {
640 unsafe {
641 drop(Box::from_raw(ptr));
642 }
643 }
644 }
645}
646
647mod thread_id {
649 use std::sync::atomic::{AtomicUsize, Ordering};
650
651 static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
652
653 thread_local! {
654 static THREAD_ID: usize = NEXT_ID.fetch_add(1, Ordering::Relaxed);
655 }
656
657 pub fn get() -> usize {
658 THREAD_ID.with(|id| *id)
659 }
660}
661
662pub struct LockFreeMemTable {
664 data: DashMap<Vec<u8>, LockFreeVersionChain>,
666 hazard_domain: HazardDomain,
668 size_bytes: AtomicUsize,
670}
671
672impl LockFreeMemTable {
673 pub fn new() -> Self {
675 Self {
676 data: DashMap::new(),
677 hazard_domain: HazardDomain::new(MAX_THREADS),
678 size_bytes: AtomicUsize::new(0),
679 }
680 }
681
682 pub fn read(&self, key: &[u8], snapshot_ts: u64, txn_id: Option<u64>) -> Option<Vec<u8>> {
687 let chain = self.data.get(key)?;
688
689 if let Some(version) = chain.read_at(snapshot_ts, txn_id) {
691 let ptr = version as *const LockFreeVersion as *mut LockFreeVersion;
693 self.hazard_domain.protect(ptr, 0);
694
695 let result = version.value_cloned();
698
699 self.hazard_domain.clear(0);
701
702 result
703 } else {
704 None
705 }
706 }
707
708 #[inline]
722 pub fn read_with<F, R>(
723 &self,
724 key: &[u8],
725 snapshot_ts: u64,
726 txn_id: Option<u64>,
727 f: F,
728 ) -> Option<R>
729 where
730 F: FnOnce(&[u8]) -> R,
731 {
732 let chain = self.data.get(key)?;
733
734 if let Some(version) = chain.read_at(snapshot_ts, txn_id) {
735 let ptr = version as *const LockFreeVersion as *mut LockFreeVersion;
737 self.hazard_domain.protect(ptr, 0);
738
739 let result = version.get_value().map(f);
741
742 self.hazard_domain.clear(0);
744
745 result
746 } else {
747 None
748 }
749 }
750
751 pub fn write(&self, key: Vec<u8>, value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
753 let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
754
755 let chain = self.data.entry(key.clone()).or_default();
757
758 chain.add_uncommitted(value, txn_id)?;
760
761 self.size_bytes
763 .fetch_add(key.len() + value_size + 64, Ordering::Relaxed);
764
765 Ok(())
766 }
767
768 pub fn commit(&self, txn_id: u64, commit_ts: u64, keys: &[Vec<u8>]) {
770 for key in keys {
771 if let Some(chain) = self.data.get(key) {
772 chain.commit(txn_id, commit_ts);
773 }
774 }
775 }
776
777 pub fn has_write_conflict(&self, key: &[u8], txn_id: u64) -> bool {
779 if let Some(chain) = self.data.get(key) {
780 chain.has_write_conflict(txn_id)
781 } else {
782 false
783 }
784 }
785
786 pub fn size_bytes(&self) -> usize {
788 self.size_bytes.load(Ordering::Relaxed)
789 }
790
791 pub fn len(&self) -> usize {
793 self.data.len()
794 }
795
796 pub fn is_empty(&self) -> bool {
798 self.data.is_empty()
799 }
800}
801
802unsafe impl Send for LockFreeMemTable {}
806unsafe impl Sync for LockFreeMemTable {}
807
808impl Default for LockFreeMemTable {
809 fn default() -> Self {
810 Self::new()
811 }
812}
813
814#[cfg(test)]
815mod tests {
816 use super::*;
817 use std::sync::Arc;
818 use std::thread;
819
820 #[test]
821 fn test_basic_write_read() {
822 let memtable = LockFreeMemTable::new();
823
824 memtable
826 .write(b"key1".to_vec(), Some(b"value1".to_vec()), 1)
827 .unwrap();
828
829 let val = memtable.read(b"key1", 100, Some(1));
831 assert_eq!(val, Some(b"value1".to_vec()));
832
833 let val = memtable.read(b"key1", 100, Some(2));
835 assert!(val.is_none());
836
837 memtable.commit(1, 50, &[b"key1".to_vec()]);
839 let val = memtable.read(b"key1", 100, None);
840 assert_eq!(val, Some(b"value1".to_vec()));
841 }
842
843 #[test]
844 fn test_snapshot_isolation() {
845 let memtable = LockFreeMemTable::new();
846
847 memtable
849 .write(b"key".to_vec(), Some(b"v1".to_vec()), 1)
850 .unwrap();
851 memtable.commit(1, 10, &[b"key".to_vec()]);
852
853 memtable
855 .write(b"key".to_vec(), Some(b"v2".to_vec()), 2)
856 .unwrap();
857 memtable.commit(2, 20, &[b"key".to_vec()]);
858
859 assert_eq!(memtable.read(b"key", 15, None), Some(b"v1".to_vec()));
861
862 assert_eq!(memtable.read(b"key", 25, None), Some(b"v2".to_vec()));
864 }
865
866 #[test]
867 fn test_write_conflict() {
868 let memtable = LockFreeMemTable::new();
869
870 memtable
872 .write(b"key".to_vec(), Some(b"v1".to_vec()), 1)
873 .unwrap();
874
875 let result = memtable.write(b"key".to_vec(), Some(b"v2".to_vec()), 2);
877 assert!(result.is_err());
878
879 let result = memtable.write(b"key".to_vec(), Some(b"v1_updated".to_vec()), 1);
881 assert!(result.is_ok());
882 }
883
884 #[test]
885 fn test_concurrent_reads() {
886 let memtable = Arc::new(LockFreeMemTable::new());
887
888 for i in 0..100 {
890 let key = format!("key{}", i).into_bytes();
891 let val = format!("value{}", i).into_bytes();
892 memtable.write(key.clone(), Some(val), 1).unwrap();
893 }
894 memtable.commit(
895 1,
896 10,
897 &(0..100)
898 .map(|i| format!("key{}", i).into_bytes())
899 .collect::<Vec<_>>(),
900 );
901
902 let handles: Vec<_> = (0..8)
904 .map(|t| {
905 let mt = Arc::clone(&memtable);
906 thread::spawn(move || {
907 for i in 0..100 {
908 let key = format!("key{}", i).into_bytes();
909 let expected = format!("value{}", i).into_bytes();
910 let val = mt.read(&key, 100, None);
911 assert_eq!(val, Some(expected), "Thread {} failed at key{}", t, i);
912 }
913 })
914 })
915 .collect();
916
917 for h in handles {
918 h.join().unwrap();
919 }
920 }
921
922 #[test]
923 fn test_inline_storage() {
924 let small_value = b"small".to_vec();
926 let version = LockFreeVersion::new(Some(small_value.clone()), 1);
927 assert!(version.is_inline(), "Small values should be inline");
928 assert_eq!(version.get_value(), Some(small_value.as_slice()));
929
930 let large_value = vec![42u8; 100]; let version = LockFreeVersion::new(Some(large_value.clone()), 2);
933 assert!(!version.is_inline(), "Large values should be on heap");
934 assert_eq!(version.get_value(), Some(large_value.as_slice()));
935
936 let version = LockFreeVersion::new(None, 3);
938 assert!(version.storage.is_tombstone());
939 assert_eq!(version.get_value(), None);
940 }
941
942 #[test]
943 fn test_inline_threshold() {
944 let value = vec![0u8; INLINE_VALUE_SIZE];
946 let version = LockFreeVersion::new(Some(value.clone()), 1);
947 assert!(version.is_inline(), "Values at threshold should be inline");
948
949 let value = vec![0u8; INLINE_VALUE_SIZE + 1];
951 let version = LockFreeVersion::new(Some(value), 2);
952 assert!(
953 !version.is_inline(),
954 "Values over threshold should be on heap"
955 );
956 }
957
958 #[test]
959 fn test_read_with_callback() {
960 let memtable = LockFreeMemTable::new();
961
962 memtable
963 .write(b"key1".to_vec(), Some(b"value1".to_vec()), 1)
964 .unwrap();
965 memtable.commit(1, 10, &[b"key1".to_vec()]);
966
967 let len = memtable.read_with(b"key1", 100, None, |v| v.len());
969 assert_eq!(len, Some(6)); let matches = memtable.read_with(b"key1", 100, None, |v| v == b"value1");
973 assert_eq!(matches, Some(true));
974 }
975
976 #[test]
977 fn test_fat_node_overflow() {
978 let memtable = LockFreeMemTable::new();
981
982 for i in 0..12u64 {
983 memtable
985 .write(
986 b"key".to_vec(),
987 Some(format!("v{}", i).into_bytes()),
988 i + 1,
989 )
990 .unwrap();
991 memtable.commit(i + 1, (i + 1) * 10, &[b"key".to_vec()]);
992 }
993
994 let val = memtable.read(b"key", 200, None);
996 assert_eq!(val, Some(b"v11".to_vec()));
997
998 let val = memtable.read(b"key", 55, None);
1000 assert_eq!(val, Some(b"v4".to_vec()));
1001
1002 let val = memtable.read(b"key", 5, None);
1004 assert_eq!(val, None);
1005 }
1006
1007 #[test]
1008 fn test_fat_node_concurrent_writes() {
1009 use std::sync::Arc;
1010 use std::thread;
1011
1012 let memtable = Arc::new(LockFreeMemTable::new());
1013
1014 let mut handles = Vec::new();
1016 for t in 0..4u64 {
1017 let mt = Arc::clone(&memtable);
1018 handles.push(thread::spawn(move || {
1019 for i in 0..20u64 {
1020 let key = format!("k{}-{}", t, i).into_bytes();
1021 let val = format!("v{}-{}", t, i).into_bytes();
1022 let txn_id = t * 1000 + i + 1;
1023 mt.write(key.clone(), Some(val), txn_id).unwrap();
1024 mt.commit(txn_id, txn_id * 10, &[key]);
1025 }
1026 }));
1027 }
1028
1029 for h in handles {
1030 h.join().unwrap();
1031 }
1032
1033 for t in 0..4u64 {
1035 for i in 0..20u64 {
1036 let key = format!("k{}-{}", t, i).into_bytes();
1037 let val = memtable.read(&key, u64::MAX, None);
1038 assert_eq!(
1039 val,
1040 Some(format!("v{}-{}", t, i).into_bytes()),
1041 "Missing key k{}-{}",
1042 t,
1043 i
1044 );
1045 }
1046 }
1047 }
1048}