1use std::collections::hash_map::DefaultHasher;
45use std::hash::{Hash, Hasher};
46use std::sync::atomic::{AtomicU32, AtomicU64, AtomicU8, Ordering};
47
48pub struct BloomFilter {
57 bits: Vec<AtomicU64>,
59 num_bits: usize,
61 num_hashes: usize,
63 count: AtomicU64,
65}
66
67impl BloomFilter {
68 pub fn new(expected_items: usize, false_positive_rate: f64) -> Self {
74 let ln2_squared = std::f64::consts::LN_2 * std::f64::consts::LN_2;
76 let num_bits =
77 (-(expected_items as f64) * false_positive_rate.ln() / ln2_squared).ceil() as usize;
78
79 let num_bits = num_bits.div_ceil(64) * 64;
81
82 let num_hashes =
84 ((num_bits as f64 / expected_items as f64) * std::f64::consts::LN_2).ceil() as usize;
85 let num_hashes = num_hashes.clamp(1, 16); let num_words = num_bits / 64;
88 let bits = (0..num_words).map(|_| AtomicU64::new(0)).collect();
89
90 Self {
91 bits,
92 num_bits,
93 num_hashes,
94 count: AtomicU64::new(0),
95 }
96 }
97
98 pub fn with_params(num_bits: usize, num_hashes: usize) -> Self {
100 let num_bits = num_bits.div_ceil(64) * 64;
101 let num_words = num_bits / 64;
102 let bits = (0..num_words).map(|_| AtomicU64::new(0)).collect();
103
104 Self {
105 bits,
106 num_bits,
107 num_hashes: num_hashes.max(1),
108 count: AtomicU64::new(0),
109 }
110 }
111
112 pub fn insert<T: Hash>(&self, item: &T) {
114 let (h1, h2) = self.hash_pair(item);
115
116 for i in 0..self.num_hashes {
117 let bit_index = self.combined_hash(h1, h2, i) % self.num_bits;
118 self.set_bit(bit_index);
119 }
120
121 self.count.fetch_add(1, Ordering::Relaxed);
122 }
123
124 pub fn contains<T: Hash>(&self, item: &T) -> bool {
129 let (h1, h2) = self.hash_pair(item);
130
131 for i in 0..self.num_hashes {
132 let bit_index = self.combined_hash(h1, h2, i) % self.num_bits;
133 if !self.get_bit(bit_index) {
134 return false;
135 }
136 }
137
138 true
139 }
140
141 pub fn insert_and_check<T: Hash>(&self, item: &T) -> bool {
143 let (h1, h2) = self.hash_pair(item);
144 let mut was_present = true;
145
146 for i in 0..self.num_hashes {
147 let bit_index = self.combined_hash(h1, h2, i) % self.num_bits;
148 if !self.set_bit(bit_index) {
149 was_present = false;
150 }
151 }
152
153 self.count.fetch_add(1, Ordering::Relaxed);
154 was_present
155 }
156
157 pub fn count(&self) -> u64 {
159 self.count.load(Ordering::Relaxed)
160 }
161
162 pub fn estimated_fp_rate(&self) -> f64 {
164 let bits_set = self.count_bits_set();
165 let fill_ratio = bits_set as f64 / self.num_bits as f64;
166 fill_ratio.powi(self.num_hashes as i32)
167 }
168
169 pub fn fill_ratio(&self) -> f64 {
171 self.count_bits_set() as f64 / self.num_bits as f64
172 }
173
174 pub fn memory_usage(&self) -> usize {
176 self.bits.len() * 8
177 }
178
179 pub fn clear(&self) {
181 for word in &self.bits {
182 word.store(0, Ordering::Relaxed);
183 }
184 self.count.store(0, Ordering::Relaxed);
185 }
186
187 fn hash_pair<T: Hash>(&self, item: &T) -> (u64, u64) {
189 let mut h1 = DefaultHasher::new();
190 item.hash(&mut h1);
191 let hash1 = h1.finish();
192
193 let mut h2 = DefaultHasher::new();
195 hash1.hash(&mut h2);
196 let hash2 = h2.finish();
197
198 (hash1, hash2)
199 }
200
201 fn combined_hash(&self, h1: u64, h2: u64, i: usize) -> usize {
203 h1.wrapping_add(h2.wrapping_mul(i as u64)) as usize
204 }
205
206 fn set_bit(&self, bit_index: usize) -> bool {
208 let word_index = bit_index / 64;
209 let bit_offset = bit_index % 64;
210 let mask = 1u64 << bit_offset;
211
212 let old = self.bits[word_index].fetch_or(mask, Ordering::AcqRel);
213 (old & mask) != 0
214 }
215
216 fn get_bit(&self, bit_index: usize) -> bool {
218 let word_index = bit_index / 64;
219 let bit_offset = bit_index % 64;
220 let mask = 1u64 << bit_offset;
221
222 (self.bits[word_index].load(Ordering::Acquire) & mask) != 0
223 }
224
225 fn count_bits_set(&self) -> usize {
227 self.bits
228 .iter()
229 .map(|w| w.load(Ordering::Relaxed).count_ones() as usize)
230 .sum()
231 }
232}
233
234pub struct CountingBloomFilter {
243 counters: Vec<AtomicU8>,
245 num_counters: usize,
247 num_hashes: usize,
249 count: AtomicU64,
251}
252
253impl CountingBloomFilter {
254 pub fn new(expected_items: usize, false_positive_rate: f64) -> Self {
256 let ln2_squared = std::f64::consts::LN_2 * std::f64::consts::LN_2;
257 let num_counters =
258 (-(expected_items as f64) * false_positive_rate.ln() / ln2_squared).ceil() as usize;
259 let num_counters = num_counters.max(64);
260
261 let num_hashes = ((num_counters as f64 / expected_items as f64) * std::f64::consts::LN_2)
262 .ceil() as usize;
263 let num_hashes = num_hashes.clamp(1, 16);
264
265 let num_bytes = num_counters.div_ceil(2);
267 let counters = (0..num_bytes).map(|_| AtomicU8::new(0)).collect();
268
269 Self {
270 counters,
271 num_counters,
272 num_hashes,
273 count: AtomicU64::new(0),
274 }
275 }
276
277 pub fn insert<T: Hash>(&self, item: &T) {
279 let (h1, h2) = self.hash_pair(item);
280
281 for i in 0..self.num_hashes {
282 let counter_index = self.combined_hash(h1, h2, i) % self.num_counters;
283 self.increment_counter(counter_index);
284 }
285
286 self.count.fetch_add(1, Ordering::Relaxed);
287 }
288
289 pub fn remove<T: Hash>(&self, item: &T) {
291 let (h1, h2) = self.hash_pair(item);
292
293 for i in 0..self.num_hashes {
294 let counter_index = self.combined_hash(h1, h2, i) % self.num_counters;
295 self.decrement_counter(counter_index);
296 }
297
298 self.count.fetch_sub(1, Ordering::Relaxed);
299 }
300
301 pub fn contains<T: Hash>(&self, item: &T) -> bool {
303 let (h1, h2) = self.hash_pair(item);
304
305 for i in 0..self.num_hashes {
306 let counter_index = self.combined_hash(h1, h2, i) % self.num_counters;
307 if self.get_counter(counter_index) == 0 {
308 return false;
309 }
310 }
311
312 true
313 }
314
315 pub fn count(&self) -> u64 {
317 self.count.load(Ordering::Relaxed)
318 }
319
320 fn hash_pair<T: Hash>(&self, item: &T) -> (u64, u64) {
321 let mut h1 = DefaultHasher::new();
322 item.hash(&mut h1);
323 let hash1 = h1.finish();
324
325 let mut h2 = DefaultHasher::new();
326 hash1.hash(&mut h2);
327 let hash2 = h2.finish();
328
329 (hash1, hash2)
330 }
331
332 fn combined_hash(&self, h1: u64, h2: u64, i: usize) -> usize {
333 h1.wrapping_add(h2.wrapping_mul(i as u64)) as usize
334 }
335
336 fn increment_counter(&self, counter_index: usize) {
337 let byte_index = counter_index / 2;
338 let is_high_nibble = counter_index % 2 == 1;
339
340 loop {
341 let old_byte = self.counters[byte_index].load(Ordering::Acquire);
342 let old_counter = if is_high_nibble {
343 (old_byte >> 4) & 0x0F
344 } else {
345 old_byte & 0x0F
346 };
347
348 if old_counter >= 15 {
350 return;
351 }
352
353 let new_byte = if is_high_nibble {
354 (old_byte & 0x0F) | ((old_counter + 1) << 4)
355 } else {
356 (old_byte & 0xF0) | (old_counter + 1)
357 };
358
359 if self.counters[byte_index]
360 .compare_exchange_weak(old_byte, new_byte, Ordering::AcqRel, Ordering::Acquire)
361 .is_ok()
362 {
363 return;
364 }
365 }
366 }
367
368 fn decrement_counter(&self, counter_index: usize) {
369 let byte_index = counter_index / 2;
370 let is_high_nibble = counter_index % 2 == 1;
371
372 loop {
373 let old_byte = self.counters[byte_index].load(Ordering::Acquire);
374 let old_counter = if is_high_nibble {
375 (old_byte >> 4) & 0x0F
376 } else {
377 old_byte & 0x0F
378 };
379
380 if old_counter == 0 {
382 return;
383 }
384
385 let new_byte = if is_high_nibble {
386 (old_byte & 0x0F) | ((old_counter - 1) << 4)
387 } else {
388 (old_byte & 0xF0) | (old_counter - 1)
389 };
390
391 if self.counters[byte_index]
392 .compare_exchange_weak(old_byte, new_byte, Ordering::AcqRel, Ordering::Acquire)
393 .is_ok()
394 {
395 return;
396 }
397 }
398 }
399
400 fn get_counter(&self, counter_index: usize) -> u8 {
401 let byte_index = counter_index / 2;
402 let is_high_nibble = counter_index % 2 == 1;
403
404 let byte = self.counters[byte_index].load(Ordering::Acquire);
405 if is_high_nibble {
406 (byte >> 4) & 0x0F
407 } else {
408 byte & 0x0F
409 }
410 }
411}
412
413pub struct HyperLogLog {
422 registers: Vec<AtomicU8>,
424 num_registers: usize,
426 precision: u8,
428 alpha: f64,
430}
431
432impl HyperLogLog {
433 pub fn new(precision: u8) -> Self {
438 let precision = precision.clamp(4, 18);
439 let num_registers = 1 << precision;
440
441 let alpha = match precision {
443 4 => 0.673,
444 5 => 0.697,
445 6 => 0.709,
446 _ => 0.7213 / (1.0 + 1.079 / num_registers as f64),
447 };
448
449 let registers = (0..num_registers).map(|_| AtomicU8::new(0)).collect();
450
451 Self {
452 registers,
453 num_registers,
454 precision,
455 alpha,
456 }
457 }
458
459 pub fn add<T: Hash>(&self, item: &T) {
461 let mut hasher = DefaultHasher::new();
462 item.hash(&mut hasher);
463 let hash = hasher.finish();
464
465 let register_idx = (hash >> (64 - self.precision)) as usize;
467
468 let remaining = (hash << self.precision) | (1 << (self.precision - 1));
470 let leading_zeros = (remaining.leading_zeros() + 1) as u8;
471
472 loop {
474 let current = self.registers[register_idx].load(Ordering::Acquire);
475 if leading_zeros <= current {
476 break;
477 }
478 if self.registers[register_idx]
479 .compare_exchange_weak(current, leading_zeros, Ordering::AcqRel, Ordering::Acquire)
480 .is_ok()
481 {
482 break;
483 }
484 }
485 }
486
487 pub fn estimate(&self) -> u64 {
489 let mut sum = 0.0;
491 let mut zeros = 0;
492
493 for reg in &self.registers {
494 let val = reg.load(Ordering::Relaxed);
495 sum += 1.0 / (1u64 << val) as f64;
496 if val == 0 {
497 zeros += 1;
498 }
499 }
500
501 let m = self.num_registers as f64;
502 let raw_estimate = self.alpha * m * m / sum;
503
504 let estimate = if raw_estimate <= 2.5 * m && zeros > 0 {
506 m * (m / zeros as f64).ln()
508 } else if raw_estimate > (1u64 << 32) as f64 / 30.0 {
509 -((1u64 << 32) as f64) * (1.0 - raw_estimate / (1u64 << 32) as f64).ln()
511 } else {
512 raw_estimate
513 };
514
515 estimate.round() as u64
516 }
517
518 pub fn merge(&self, other: &HyperLogLog) {
520 assert_eq!(self.num_registers, other.num_registers);
521
522 for i in 0..self.num_registers {
523 loop {
524 let current = self.registers[i].load(Ordering::Acquire);
525 let other_val = other.registers[i].load(Ordering::Relaxed);
526 let new_val = current.max(other_val);
527
528 if new_val == current {
529 break;
530 }
531
532 if self.registers[i]
533 .compare_exchange_weak(current, new_val, Ordering::AcqRel, Ordering::Acquire)
534 .is_ok()
535 {
536 break;
537 }
538 }
539 }
540 }
541
542 pub fn memory_usage(&self) -> usize {
544 self.num_registers
545 }
546
547 pub fn relative_error(&self) -> f64 {
549 1.04 / (self.num_registers as f64).sqrt()
550 }
551}
552
553pub struct OffsetBloomFilter {
562 filter: BloomFilter,
564 min_offset: AtomicU64,
566 max_offset: AtomicU64,
568 bucket_size: u64,
570}
571
572impl OffsetBloomFilter {
573 pub fn new(expected_offsets: usize, bucket_size: u64) -> Self {
579 Self {
580 filter: BloomFilter::new(expected_offsets, 0.01),
581 min_offset: AtomicU64::new(u64::MAX),
582 max_offset: AtomicU64::new(0),
583 bucket_size,
584 }
585 }
586
587 pub fn insert(&self, offset: u64) {
589 self.filter.insert(&offset);
590
591 self.min_offset.fetch_min(offset, Ordering::AcqRel);
593 self.max_offset.fetch_max(offset, Ordering::AcqRel);
594 }
595
596 pub fn contains(&self, offset: u64) -> bool {
598 let min = self.min_offset.load(Ordering::Acquire);
600 let max = self.max_offset.load(Ordering::Acquire);
601
602 if offset < min || offset > max {
603 return false;
604 }
605
606 self.filter.contains(&offset)
607 }
608
609 pub fn contains_range(&self, start: u64, end: u64) -> bool {
611 let min = self.min_offset.load(Ordering::Acquire);
612 let max = self.max_offset.load(Ordering::Acquire);
613
614 if end < min || start > max {
616 return false;
617 }
618
619 if end - start <= 10 {
621 for offset in start..=end {
622 if self.filter.contains(&offset) {
623 return true;
624 }
625 }
626 return false;
627 }
628
629 let start_bucket = start / self.bucket_size;
631 let end_bucket = end / self.bucket_size;
632
633 for bucket in start_bucket..=end_bucket {
634 let bucket_start = bucket * self.bucket_size;
635 let bucket_end = bucket_start + self.bucket_size - 1;
636
637 for &offset in &[
639 bucket_start,
640 bucket_start + self.bucket_size / 2,
641 bucket_end,
642 ] {
643 if offset >= start && offset <= end && self.filter.contains(&offset) {
644 return true;
645 }
646 }
647 }
648
649 let step = ((end - start) / 10).max(1);
651 let mut offset = start;
652 while offset <= end {
653 if self.filter.contains(&offset) {
654 return true;
655 }
656 offset += step;
657 }
658
659 false
660 }
661
662 pub fn offset_range(&self) -> (u64, u64) {
664 (
665 self.min_offset.load(Ordering::Acquire),
666 self.max_offset.load(Ordering::Acquire),
667 )
668 }
669
670 pub fn count(&self) -> u64 {
672 self.filter.count()
673 }
674}
675
676#[derive(Debug, Clone)]
682pub struct BatchConfig {
683 pub min_batch_size: usize,
685 pub max_batch_size: usize,
687 pub max_linger_us: u64,
689 pub target_latency_us: u64,
691 pub adaptive: bool,
693}
694
695impl Default for BatchConfig {
696 fn default() -> Self {
697 Self {
698 min_batch_size: 16,
699 max_batch_size: 1024,
700 max_linger_us: 5000, target_latency_us: 1000, adaptive: true,
703 }
704 }
705}
706
707pub struct AdaptiveBatcher<T> {
711 config: BatchConfig,
712 batch: parking_lot::Mutex<Vec<T>>,
714 current_batch_size: AtomicU32,
716 batch_start_us: AtomicU64,
718 recent_latencies: [AtomicU64; 8],
720 latency_index: AtomicU32,
721 batches_flushed: AtomicU64,
723 items_batched: AtomicU64,
725}
726
727impl<T> AdaptiveBatcher<T> {
728 pub fn new(config: BatchConfig) -> Self {
730 let initial_size = (config.min_batch_size + config.max_batch_size) / 2;
731
732 Self {
733 config,
734 batch: parking_lot::Mutex::new(Vec::with_capacity(initial_size)),
735 current_batch_size: AtomicU32::new(initial_size as u32),
736 batch_start_us: AtomicU64::new(0),
737 recent_latencies: std::array::from_fn(|_| AtomicU64::new(0)),
738 latency_index: AtomicU32::new(0),
739 batches_flushed: AtomicU64::new(0),
740 items_batched: AtomicU64::new(0),
741 }
742 }
743
744 pub fn add(&self, item: T) -> Option<Vec<T>> {
747 let now = Self::now_us();
748 let mut batch = self.batch.lock();
749
750 if batch.is_empty() {
752 self.batch_start_us.store(now, Ordering::Release);
753 }
754
755 batch.push(item);
756 self.items_batched.fetch_add(1, Ordering::Relaxed);
757
758 let batch_size = self.current_batch_size.load(Ordering::Relaxed) as usize;
759 let batch_age = now.saturating_sub(self.batch_start_us.load(Ordering::Acquire));
760
761 let should_flush = batch.len() >= batch_size
763 || batch_age >= self.config.max_linger_us
764 || batch.len() >= self.config.max_batch_size;
765
766 if should_flush {
767 let flushed = std::mem::take(&mut *batch);
768 batch.reserve(batch_size);
769 self.batches_flushed.fetch_add(1, Ordering::Relaxed);
770
771 self.record_latency(batch_age);
773
774 Some(flushed)
775 } else {
776 None
777 }
778 }
779
780 pub fn flush(&self) -> Vec<T> {
782 let now = Self::now_us();
783 let mut batch = self.batch.lock();
784
785 if !batch.is_empty() {
786 let batch_age = now.saturating_sub(self.batch_start_us.load(Ordering::Acquire));
787 self.record_latency(batch_age);
788 self.batches_flushed.fetch_add(1, Ordering::Relaxed);
789 }
790
791 let batch_size = self.current_batch_size.load(Ordering::Relaxed) as usize;
792 let flushed = std::mem::take(&mut *batch);
793 batch.reserve(batch_size);
794
795 flushed
796 }
797
798 pub fn needs_flush(&self) -> bool {
800 let batch = self.batch.lock();
801 if batch.is_empty() {
802 return false;
803 }
804
805 let now = Self::now_us();
806 let batch_age = now.saturating_sub(self.batch_start_us.load(Ordering::Acquire));
807 batch_age >= self.config.max_linger_us
808 }
809
810 pub fn current_batch_size(&self) -> usize {
812 self.current_batch_size.load(Ordering::Relaxed) as usize
813 }
814
815 pub fn pending_count(&self) -> usize {
817 self.batch.lock().len()
818 }
819
820 pub fn stats(&self) -> BatcherStats {
822 BatcherStats {
823 batches_flushed: self.batches_flushed.load(Ordering::Relaxed),
824 items_batched: self.items_batched.load(Ordering::Relaxed),
825 current_batch_size: self.current_batch_size.load(Ordering::Relaxed) as usize,
826 avg_latency_us: self.average_latency(),
827 }
828 }
829
830 fn record_latency(&self, latency_us: u64) {
831 if !self.config.adaptive {
832 return;
833 }
834
835 let idx = self.latency_index.fetch_add(1, Ordering::Relaxed) as usize % 8;
836 self.recent_latencies[idx].store(latency_us, Ordering::Relaxed);
837
838 let avg_latency = self.average_latency();
840 let current_size = self.current_batch_size.load(Ordering::Relaxed);
841
842 let new_size = if avg_latency > self.config.target_latency_us * 2 {
843 (current_size * 3 / 4).max(self.config.min_batch_size as u32)
845 } else if avg_latency < self.config.target_latency_us / 2 {
846 (current_size * 5 / 4).min(self.config.max_batch_size as u32)
848 } else {
849 current_size
850 };
851
852 self.current_batch_size.store(new_size, Ordering::Relaxed);
853 }
854
855 fn average_latency(&self) -> u64 {
856 let mut sum = 0u64;
857 let mut count = 0u64;
858
859 for lat in &self.recent_latencies {
860 let l = lat.load(Ordering::Relaxed);
861 if l > 0 {
862 sum += l;
863 count += 1;
864 }
865 }
866
867 if count > 0 {
868 sum / count
869 } else {
870 0
871 }
872 }
873
874 fn now_us() -> u64 {
875 std::time::SystemTime::now()
876 .duration_since(std::time::UNIX_EPOCH)
877 .unwrap_or_default()
878 .as_micros() as u64
879 }
880}
881
882#[derive(Debug, Clone)]
884pub struct BatcherStats {
885 pub batches_flushed: u64,
886 pub items_batched: u64,
887 pub current_batch_size: usize,
888 pub avg_latency_us: u64,
889}
890
891#[cfg(test)]
896mod tests {
897 use super::*;
898
899 #[test]
900 fn test_bloom_filter() {
901 let filter = BloomFilter::new(1000, 0.01);
902
903 for i in 0..1000 {
905 filter.insert(&i);
906 }
907
908 for i in 0..1000 {
910 assert!(filter.contains(&i), "Item {} should be present", i);
911 }
912
913 let mut false_positives = 0;
915 for i in 1000..2000 {
916 if filter.contains(&i) {
917 false_positives += 1;
918 }
919 }
920
921 assert!(
923 false_positives < 30,
924 "Too many false positives: {}",
925 false_positives
926 );
927 }
928
929 #[test]
930 fn test_counting_bloom_filter() {
931 let filter = CountingBloomFilter::new(1000, 0.01);
932
933 filter.insert(&42);
935 filter.insert(&43);
936 assert!(filter.contains(&42));
937 assert!(filter.contains(&43));
938
939 filter.remove(&42);
941 assert!(!filter.contains(&42));
942 assert!(filter.contains(&43));
943 }
944
945 #[test]
946 fn test_hyperloglog() {
947 let hll = HyperLogLog::new(14); for i in 0..10000 {
951 hll.add(&i);
952 }
953
954 let estimate = hll.estimate();
955
956 let error = (estimate as i64 - 10000i64).abs() as f64 / 10000.0;
958 assert!(
959 error < 0.1,
960 "Estimate {} too far from 10000 (error: {}%)",
961 estimate,
962 error * 100.0
963 );
964 }
965
966 #[test]
967 fn test_hyperloglog_merge() {
968 let hll1 = HyperLogLog::new(10);
969 let hll2 = HyperLogLog::new(10);
970
971 for i in 0..5000 {
972 hll1.add(&i);
973 }
974 for i in 5000..10000 {
975 hll2.add(&i);
976 }
977
978 hll1.merge(&hll2);
979
980 let estimate = hll1.estimate();
981 let error = (estimate as i64 - 10000i64).abs() as f64 / 10000.0;
982 assert!(
983 error < 0.15,
984 "Merged estimate {} too far from 10000",
985 estimate
986 );
987 }
988
989 #[test]
990 fn test_offset_bloom_filter() {
991 let filter = OffsetBloomFilter::new(1000, 100);
992
993 for offset in (0..1000).step_by(10) {
995 filter.insert(offset);
996 }
997
998 assert!(filter.contains(0));
1000 assert!(filter.contains(100));
1001 assert!(!filter.contains(5)); assert!(filter.contains_range(0, 50));
1005 assert!(filter.contains_range(90, 110));
1006
1007 let (min, max) = filter.offset_range();
1009 assert_eq!(min, 0);
1010 assert_eq!(max, 990);
1011 }
1012
1013 #[test]
1014 fn test_adaptive_batcher() {
1015 let config = BatchConfig {
1016 min_batch_size: 4,
1017 max_batch_size: 16,
1018 max_linger_us: 10000,
1019 target_latency_us: 1000,
1020 adaptive: true,
1021 };
1022
1023 let batcher = AdaptiveBatcher::new(config);
1024
1025 let mut flushed = None;
1027 for i in 0..20 {
1028 if let Some(batch) = batcher.add(i) {
1029 flushed = Some(batch);
1030 break;
1031 }
1032 }
1033
1034 assert!(flushed.is_some());
1035 let batch = flushed.unwrap();
1036 assert!(!batch.is_empty());
1037 }
1038}