1use std::{
2 cell::RefCell,
3 cmp::min,
4 collections::{BTreeMap, BTreeSet, HashSet},
5 fmt::{Debug, Formatter},
6 mem::{replace, size_of, swap, take},
7 num::NonZeroUsize,
8 path::{Path, PathBuf},
9 rc::Rc,
10 sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering},
11 time::SystemTime,
12};
13
14use async_compression::tokio::{bufread::ZstdDecoder, write::ZstdEncoder};
15use futures::{stream, StreamExt};
16use lru::LruCache;
17use tokio::{
18 fs::{create_dir_all, remove_file, File, OpenOptions},
19 io::{AsyncReadExt, AsyncWriteExt, BufReader},
20 sync::Mutex,
21 task::spawn_blocking,
22};
23use tsz_compress::prelude::*;
24
25use crate::{prelude::*, time_key::TimeKeyItem};
26
27static HOT_SEGMENTS: AtomicIsize = AtomicIsize::new(0);
28struct DeferHotSegment(u64);
29impl Drop for DeferHotSegment {
30 fn drop(&mut self) {
31 let hot_segments = HOT_SEGMENTS.fetch_sub(1, Ordering::Relaxed);
32 println!("Dehydrating segment ({}): {:?}", hot_segments, self.0);
33 }
34}
35
36unsafe impl Send for TkStreamSet {}
40unsafe impl Sync for TkStreamSet {}
41impl TkStreamSet {
42 pub(crate) async fn initialize(&mut self) -> TkssResult<()> {
43 if !self.working_dir.exists() {
45 create_dir_all(self.working_dir.clone())
46 .await
47 .map_err(|_| TkssError::InvalidDirectory(self.working_dir.clone()))?;
48 }
49
50 println!("Initializing stream set in {}", self.working_dir.display());
51
52 self.state.segment_interval_us = self.segment_time_interval.as_micros() as u64;
53 self.state.retention_us = self.segment_retention_period.as_micros() as u64;
54 self.state.max_memory_usage = match self.memory_limit {
55 MemoryLimit::VeryLow => 1 * 1024 * 1024 * 1024,
56 MemoryLimit::Low => 8 * 1024 * 1024 * 1024,
57 MemoryLimit::Medium => 32 * 1024 * 1024 * 1024,
58 MemoryLimit::High => 128 * 1024 * 1024 * 1024,
59 MemoryLimit::VeryHigh => 1024 * 1024 * 1024 * 1024,
60 MemoryLimit::Unlimited => usize::MAX,
61 };
62
63 let max_segments = self.state.max_memory_usage as f64 * 0.8 / (256 * 1024 * 1024) as f64;
64 let max_segments = min(1024, max_segments as usize);
65 println!("Using {} segments in Lru Cache", max_segments);
66 self.state.segment_cache =
67 Mutex::new(LruCache::new(NonZeroUsize::new(max_segments).unwrap()));
68
69 Ok(())
70 }
71
72 pub async fn insert<I, T>(&self, keys: I) -> TkssResult<Vec<bool>>
73 where
74 I: Iterator<Item = T>,
75 T: TimeKey,
76 {
77 let prune_segment = (SystemTime::now()
79 .duration_since(SystemTime::UNIX_EPOCH)
80 .unwrap()
81 .as_micros() as u64
82 - self.state.retention_us)
83 / self.state.segment_interval_us
84 * self.state.segment_interval_us;
85 let mut segmented_keys = keys
86 .map(|k| {
87 (
88 k,
89 k.timestamp_us() / self.state.segment_interval_us
90 * self.state.segment_interval_us,
91 )
92 })
93 .collect::<Vec<_>>();
94 segmented_keys.sort_by_key(|(_, s)| *s);
95 if segmented_keys.is_empty() {
96 return Ok(vec![]);
97 }
98
99 let mut segment_idx = self.state.segment_idx.lock().await;
101
102 let mut prune = segment_idx.split_off(&prune_segment);
105 swap(&mut *segment_idx, &mut prune);
106 stream::iter(prune.into_iter())
107 .for_each_concurrent(num_cpus::get(), |(_, v)| async move {
108 let _ = v
110 .as_ref()
111 .borrow()
112 .delete_segment()
113 .await
114 .and_then(|bytes| {
115 self.state
116 .estimated_memory_used
117 .fetch_sub(bytes, std::sync::atomic::Ordering::Relaxed);
118 Ok(())
119 });
120 })
121 .await;
122
123 let first_segment = segmented_keys[0].1;
125 let last_segment = segmented_keys.last().unwrap().1;
126 let segments = segment_idx
127 .range(first_segment..=last_segment)
128 .map(|(k, v)| (k.clone(), v.clone()))
129 .collect::<Vec<_>>();
130
131 let mut new_segments = false;
133 let new_segment_set = segmented_keys
134 .iter()
135 .filter(|(_, s)| {
136 *s >= prune_segment
137 && !segments
138 .iter()
139 .any(|seg| seg.1.borrow().spans_key(*s, self.state.segment_interval_us))
140 })
141 .map(|(_, s)| *s)
142 .collect::<HashSet<_>>();
143
144 let mut cache = self.state.segment_cache.lock().await;
146 let uniq_keys = segmented_keys
147 .iter()
148 .map(|(_, s)| s)
149 .collect::<HashSet<_>>();
150 uniq_keys.iter().for_each(|s| cache.promote(s));
151 for s in new_segment_set {
152 new_segments = true;
153 let segment = Rc::new(RefCell::new(Segment::new(false, &self.working_dir, s)));
154 segment_idx.insert(s, segment.clone());
155 let stale_segment = cache.push(s, segment);
156
157 if let Some((stale_k, stale_segment)) = stale_segment {
158 println!("New segment, Evicting segment {} from memory", stale_k);
159 debug_assert!(stale_segment.borrow().hot.load(Ordering::SeqCst));
160 if uniq_keys.contains(&stale_k) {
161 tracing::error!("Warning segment cache too small ({}), cannot evict required segment, {} of {:?}", cache.len(), stale_k, uniq_keys );
162 println!("Warning segment cache too small ({}), cannot evict required segment, {} of {:?}", cache.len(), stale_k, uniq_keys );
163 let usage = stale_segment.as_ref().borrow().dehydrate_segment().await?;
164 self.state
165 .estimated_memory_used
166 .fetch_sub(usage, std::sync::atomic::Ordering::Relaxed);
167 }
168 }
169 }
170
171 let segments = if !new_segments {
173 segments
174 } else {
175 segment_idx
176 .range(first_segment..=last_segment)
177 .map(|(k, v)| (k.clone(), v.clone()))
178 .collect::<Vec<_>>()
179 };
180
181 let required_segments = segments.iter().map(|(k, _)| *k).collect::<Vec<_>>();
183 let mut cannot_evict_more = false;
184 for (_k, s) in segments.iter() {
185 if cannot_evict_more {
187 break;
188 }
189 while self.state.estimated_memory_used.load(Ordering::Relaxed)
195 > self.state.max_memory_usage
196 && !cache.is_empty()
197 {
198 println!(
199 "High memory pressure {} > {}, Evicting segment from memory",
200 self.state.estimated_memory_used.load(Ordering::Relaxed),
201 self.state.max_memory_usage
202 );
203 let stale_segment = cache.pop_lru();
205 if let Some((stale_k, stale_segment)) = stale_segment {
206 if required_segments.contains(&stale_k) {
207 println!("Cannot evict required segment");
208 cache.push(stale_k, stale_segment);
209 cannot_evict_more = true;
210 break;
211 } else {
212 debug_assert!(stale_segment.borrow().hot.load(Ordering::SeqCst));
213 let usage = stale_segment.as_ref().borrow().dehydrate_segment().await?;
214 self.state
215 .estimated_memory_used
216 .fetch_sub(usage, std::sync::atomic::Ordering::Relaxed);
217 }
218 }
219 }
220
221 let seg = s.as_ref().borrow();
223 if !seg.hot() {
224 let usage = seg.hydrate_segment().await?;
225 self.state
226 .estimated_memory_used
227 .fetch_add(usage, std::sync::atomic::Ordering::Relaxed);
228 }
229 }
230
231 let mut _guards = Vec::with_capacity(segments.len());
233 for (k, s) in segments.iter() {
234 let b = s.as_ref().borrow();
235 _guards.push((*k, b));
236 }
237 let mut guards = stream::iter(_guards.iter())
238 .map(|(k, b)| async move { (*k, b.idx.lock().await) })
239 .buffer_unordered(num_cpus::get())
240 .collect::<Vec<_>>()
241 .await;
242
243 drop(cache);
245 drop(segment_idx);
246
247 let deduped = segmented_keys
249 .into_iter()
250 .map(|(k, s)| {
251 if s < prune_segment {
252 return false;
253 }
254 let Some(idx) = guards.iter_mut().find(|seg| seg.0 == s) else {
255 tracing::error!(
256 "Pseudo-panic: Expected segment {} in {:?}",
257 s,
258 guards.iter().map(|(k, _)| k).collect::<Vec<_>>()
259 );
260 println!(
261 "Pseudo-panic: Expected segment {} in {:?}",
262 s,
263 guards.iter().map(|(k, _)| k).collect::<Vec<_>>()
264 );
265 return false;
266 };
267 idx.1.insert(k.key())
268 })
269 .collect::<Vec<_>>();
270
271 drop(guards);
273 drop(_guards);
274
275 let new_keys = deduped.iter().filter(|b| **b).count();
277 self.state.estimated_memory_used.fetch_add(
278 (new_keys as f32 * size_of::<u128>() as f32 * 2.1) as usize,
279 Ordering::Relaxed,
280 );
281
282 Ok(deduped)
283 }
284}
285
286pub(crate) struct TkStreamSetState {
287 segment_interval_us: u64,
291
292 retention_us: u64,
297
298 max_memory_usage: usize,
302
303 estimated_memory_used: AtomicUsize,
308
309 segment_idx: Mutex<BTreeMap<u64, Rc<RefCell<Segment>>>>,
314
315 segment_cache: Mutex<LruCache<u64, Rc<RefCell<Segment>>>>,
319}
320
321impl Default for TkStreamSetState {
322 fn default() -> Self {
323 TkStreamSetState {
324 segment_interval_us: 0,
325 retention_us: 0,
326 max_memory_usage: 0,
327 estimated_memory_used: 0.into(),
328 segment_idx: Mutex::new(BTreeMap::new()),
329 segment_cache: Mutex::new(LruCache::new(NonZeroUsize::new(16).unwrap())),
330 }
331 }
332}
333
334impl Debug for TkStreamSetState {
335 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
336 f.debug_struct("TkStreamSetState")
337 .field("segment_interval_us", &self.segment_interval_us)
338 .finish()
339 }
340}
341
342#[allow(unused)]
356pub(crate) struct Segment {
357 halt: AtomicBool,
358 hot: AtomicBool,
359 working_dir: PathBuf,
360 timestamp_us: u64,
361 idx: Mutex<BTreeSet<TimeKeyItem>>,
362}
363
364impl Segment {
365 pub fn new(halt: bool, working_dir: &Path, timestamp_us: u64) -> Self {
366 Segment {
367 halt: halt.into(),
368 hot: false.into(),
369 working_dir: working_dir.to_path_buf(),
370 timestamp_us,
371 idx: Mutex::new(BTreeSet::new()),
372 }
373 }
374
375 pub fn spans_key(&self, timestamp_us: u64, interval_us: u64) -> bool {
376 self.timestamp_us <= timestamp_us && timestamp_us < self.timestamp_us + interval_us
377 }
378
379 pub fn hot(&self) -> bool {
380 self.hot.load(Ordering::Relaxed)
381 }
382
383 fn path(&self) -> PathBuf {
384 self.working_dir.join(format!("{}.bin", self.timestamp_us))
385 }
386
387 #[tracing::instrument(skip(self), ret)]
388 pub async fn hydrate_segment(&self) -> TkssResult<usize> {
389 let mut idx = self.idx.lock().await;
390 let hot = self.hot.swap(true, Ordering::Relaxed);
391 if hot {
392 return Ok(0);
394 }
395
396 let hot_segments = HOT_SEGMENTS.fetch_add(1, Ordering::Relaxed);
397
398 println!(
399 "Hydrating segment ({}): {:?}",
400 hot_segments, self.timestamp_us
401 );
402
403 let path = self.path();
405 if !path.exists() {
406 return Ok(0);
407 }
408
409 let file = BufReader::new(File::open(&path).await?);
411 let mut zfile = ZstdDecoder::new(file);
412 let mut compressed = Vec::new();
413 zfile.read_to_end(&mut compressed).await?;
414 let bytes = compressed.len();
415
416 let header = "TKSS".as_bytes();
418 if &compressed[0..header.len()] != header {
419 Err(TkssError::InvalidHeader(path.clone()))?;
420 }
421 let compressed = &compressed[header.len()..];
422
423 let length = u64::from_be_bytes(compressed[..8].try_into()?);
425 if (length + 7) as isize / 8 < compressed.len() as isize - (header.len() + 8) as isize {
426 println!("Length: {} ({} bytes)", length, (length + 7) as usize / 8);
427 Err(TkssError::InvalidLength(path.clone()))?;
428 }
429 let compressed = &compressed[8..];
430 let bits = BitBufferSlice::from_slice(compressed);
431 let bits = bits.split_at(length as usize).0;
432 let mut decompressor = Decompressor::new(bits);
433
434 println!(
435 "Read segment: {:?} ({} + 8 ({}) + {} = {} bytes)",
436 self.timestamp_us,
437 header.len(),
438 length,
439 compressed.len(),
440 bytes
441 );
442
443 idx.extend(
445 decompressor
446 .decompress()
447 .filter_map(|e: Result<TimeKeyItem, _>| match e {
448 Ok(e) => Some(e),
449 Err(e) => {
450 tracing::error!("Error decompressing segment: {:?}", e);
451 None
452 }
453 }),
454 );
455
456 let size_estimate = (idx.len() as f64 * size_of::<u128>() as f64 * 2.1) as usize;
458 println!("Estimated size: {}", size_estimate);
459 Ok(size_estimate)
460 }
461
462 #[tracing::instrument(skip(self), ret)]
463 pub async fn dehydrate_segment(&self) -> TkssResult<usize> {
464 let mut idx = self.idx.lock().await;
465 let hot = self.hot.swap(false, Ordering::Relaxed);
466 if !hot {
467 return Ok(0);
469 }
470
471 let _hot_segment = DeferHotSegment(self.timestamp_us);
472
473 let pts = idx.len();
475 let size_estimate = (idx.len() as f64 * size_of::<u128>() as f64 * 2.1) as usize;
476 let elems = take(&mut *idx);
477 let compressed = spawn_blocking(move || {
478 let mut compressor = Compressor::new();
479 elems.into_iter().for_each(|e| compressor.compress(e));
480 let mut compressed = compressor.finish();
481 compressed.shrink_to_fit();
482 compressed.set_uninitialized(false);
483 compressed
484 })
485 .await?;
486
487 let header = "TKSS".as_bytes();
489 let length = compressed.len() as u64;
490 let compressed = compressed.into_vec();
491 let file = OpenOptions::new()
492 .write(true)
493 .create(true)
494 .open(&self.path())
495 .await?;
496 let mut zfile = ZstdEncoder::new(file);
497
498 zfile.write_all(header).await?;
500 zfile.write_u64(length).await?;
501 zfile.write_all(&compressed).await?;
502
503 zfile.shutdown().await?;
505 let bytes = zfile.into_inner().metadata().await?.len();
506 println!(
507 "Wrote segment: {:?} ({} + 8 ({}) + {} = {} bytes / {} bytes) ({:.2}x compression)",
508 self.timestamp_us,
509 header.len(),
510 length,
511 compressed.len(),
512 header.len() + 8 + compressed.len(),
513 bytes,
514 (pts as f64 * size_of::<u128>() as f64) / (bytes as f64)
515 );
516
517 Ok(size_estimate)
519 }
520
521 #[tracing::instrument(skip(self), ret)]
522 pub async fn delete_segment(&self) -> TkssResult<usize> {
523 let mut idx = self.idx.lock().await;
524 let hot = self.hot.swap(false, Ordering::Relaxed);
525 self.halt.store(true, Ordering::Relaxed);
526 let pts = if !hot {
527 0
528 } else {
529 HOT_SEGMENTS.fetch_sub(1, Ordering::Relaxed);
530 idx.len()
531 };
532
533 let size_estimate = (pts as f64 * size_of::<u128>() as f64 * 2.1) as usize;
535 idx.clear();
536
537 let path = self.path();
539 if path.exists() {
540 println!("Deleting segment: {:?}", self.timestamp_us);
541 remove_file(&path).await?;
542 }
543
544 Ok(size_estimate)
546 }
547}
548
549impl Drop for Segment {
554 fn drop(&mut self) {
555 println!("Dropping segment: {:?}", self.timestamp_us);
556 if self.halt.swap(true, Ordering::SeqCst) {
557 return;
558 }
559 let this = replace(
560 self,
561 Segment::new(true, &self.working_dir, self.timestamp_us),
562 );
563 tokio::spawn(async move {
564 println!("Dehydrating segment on drop: {:?}", this.timestamp_us);
565 let _ = this.dehydrate_segment().await;
566 });
567 }
568}
569
570#[cfg(test)]
571mod tests {
572 use std::{
573 sync::atomic::Ordering,
574 time::{Duration, Instant, SystemTime},
575 };
576
577 use futures::{stream, StreamExt};
578 use rand::{seq::SliceRandom, thread_rng};
579 use tempdir::TempDir;
580 use tokio::{fs::read_dir, time::sleep};
581
582 use crate::{prelude::*, stream_set::HOT_SEGMENTS};
583
584 #[tokio::test]
585 async fn can_insert_keys() {
586 let sset = TkStreamSetBuilder::new().build().await.unwrap();
587 let now_ts = SystemTime::now()
588 .duration_since(SystemTime::UNIX_EPOCH)
589 .unwrap()
590 .as_micros() as i64;
591 let keys = (0..10).map(|i| UserDeviceTimeKey {
592 timestamp_us: now_ts + i * 1000,
593 user_id: 42,
594 device_id: 69,
595 });
596 let results = sset.insert(keys.clone()).await.unwrap();
597 assert_eq!(results, vec![true; 10]);
598
599 let results = sset.insert(keys).await.unwrap();
600 assert_eq!(results, vec![false; 10]);
601 }
602
603 #[tokio::test]
604 async fn can_hydrate_drop_rehydrate() {
605 let tmp_dir = TempDir::new("can_hydrate_drop_rehydrate").unwrap();
607 let tmp_path = tmp_dir.into_path();
608
609 let sset = TkStreamSetBuilder::new()
610 .with_working_dir(tmp_path.clone())
611 .build()
612 .await
613 .unwrap();
614 let now_ts = SystemTime::now()
615 .duration_since(SystemTime::UNIX_EPOCH)
616 .unwrap()
617 .as_micros() as i64;
618 let keys = (0..50).map(|i| UserDeviceTimeKey {
619 timestamp_us: now_ts + i * 1000,
620 user_id: 42,
621 device_id: 69,
622 });
623 let results = sset.insert(keys.clone()).await.unwrap();
624 assert_eq!(results, vec![true; 50]);
625
626 let results = sset.insert(keys.clone()).await.unwrap();
627 assert_eq!(results, vec![false; 50]);
628
629 drop(sset);
630
631 let ts = Instant::now();
632 while HOT_SEGMENTS.load(Ordering::Relaxed) > 0 && ts.elapsed() < Duration::from_secs(15) {
633 sleep(Duration::from_millis(10)).await;
634 }
635
636 let mut files = read_dir(tmp_path.clone()).await.unwrap();
638 let has_files = files.next_entry().await.unwrap().is_some();
639 assert!(has_files);
640
641 let sset = TkStreamSetBuilder::new()
642 .with_working_dir(tmp_path.clone())
643 .build()
644 .await
645 .unwrap();
646 let results = sset.insert(keys).await.unwrap();
647 assert_eq!(results, vec![false; 50]);
648 }
649
650 #[tokio::test]
651 async fn can_hydrate_drop_rehydrate_across_segments() {
652 let tmp_dir = TempDir::new("can_hydrate_drop_rehydrate_across_segments").unwrap();
654 let tmp_path = tmp_dir.into_path();
655
656 let sset = TkStreamSetBuilder::new()
657 .with_working_dir(tmp_path.clone())
658 .with_segment_time_interval(Duration::from_secs(300))
659 .build()
660 .await
661 .unwrap();
662 let now_ts = SystemTime::now()
663 .duration_since(SystemTime::UNIX_EPOCH)
664 .unwrap()
665 .as_micros() as i64;
666
667 const NUM_BATCHES: usize = 3600; const NUM_BATCH_ROWS: usize = 200; let start = Instant::now();
671 let sset_ref = &sset;
672 stream::iter(0..NUM_BATCHES)
673 .for_each_concurrent(num_cpus::get(), |j| async move {
674 if j % 100 == 0 {
675 println!("Inserting batch {}", j);
676 }
677 let keys = (0..NUM_BATCH_ROWS).map(move |i| UserDeviceTimeKey {
678 timestamp_us: now_ts + (j * NUM_BATCH_ROWS + i) as i64 * 5000,
679 user_id: 42 + i as i32 % 7,
680 device_id: 69 + i as i32 % 11,
681 });
682 let results = sset_ref.insert(keys.clone()).await.unwrap();
683 assert_eq!(results, vec![true; NUM_BATCH_ROWS]);
684
685 let results = sset_ref.insert(keys.clone()).await.unwrap();
686 assert_eq!(results, vec![false; NUM_BATCH_ROWS]);
687 })
688 .await;
689 println!(
690 "Inserting {} keys took {:?}",
691 NUM_BATCHES * NUM_BATCH_ROWS,
692 start.elapsed()
693 );
694
695 drop(sset);
696
697 let ts = Instant::now();
698 while HOT_SEGMENTS.load(Ordering::Relaxed) > 0 && ts.elapsed() < Duration::from_secs(15) {
699 sleep(Duration::from_millis(10)).await;
700 }
701
702 let mut files = read_dir(tmp_path.clone()).await.unwrap();
704 let mut num_files = 0;
705 while let Ok(Some(_file)) = files.next_entry().await {
706 num_files += 1;
707 }
708 println!("Found {} files", num_files);
709 assert!(num_files > 1);
710
711 let sset = TkStreamSetBuilder::new()
712 .with_working_dir(tmp_path.clone())
713 .with_segment_time_interval(Duration::from_secs(300))
714 .build()
715 .await
716 .unwrap();
717
718 let sset = &sset;
719 stream::iter(0..NUM_BATCHES)
720 .for_each_concurrent(num_cpus::get(), |j| async move {
721 let keys = (0..NUM_BATCH_ROWS).map(|i| UserDeviceTimeKey {
722 timestamp_us: now_ts + (j * NUM_BATCH_ROWS + i) as i64 * 5000,
723 user_id: 42 + i as i32 % 7,
724 device_id: 69 + i as i32 % 11,
725 });
726
727 let results = sset.insert(keys.clone()).await.unwrap();
728 assert_eq!(results, vec![false; NUM_BATCH_ROWS]);
729 })
730 .await
731 }
732
733 #[tokio::test]
734 async fn can_hydrate_drop_rehydrate_across_segments_random_insertion() {
735 let tmp_dir =
737 TempDir::new("can_hydrate_drop_rehydrate_across_segments_random_insertion").unwrap();
738 let tmp_path = tmp_dir.into_path();
739
740 let sset = TkStreamSetBuilder::new()
741 .with_working_dir(tmp_path.clone())
742 .with_segment_time_interval(Duration::from_secs(300))
743 .build()
744 .await
745 .unwrap();
746 let now_ts = SystemTime::now()
747 .duration_since(SystemTime::UNIX_EPOCH)
748 .unwrap()
749 .as_micros() as i64;
750
751 const NUM_BATCHES: usize = 2 * 3600;
752 const NUM_BATCH_ROWS: usize = 100;
753 let mut batch_order = (0..NUM_BATCHES).collect::<Vec<_>>();
754 batch_order.shuffle(&mut thread_rng());
755
756 let start = Instant::now();
757 let sset_ref = &sset;
758 stream::iter(batch_order.iter())
759 .for_each_concurrent(num_cpus::get(), |j| async move {
760 if j % 100 == 0 {
761 println!("Inserting batch {}", j);
762 }
763 let keys = (0..NUM_BATCH_ROWS).map(move |i| UserDeviceTimeKey {
764 timestamp_us: now_ts + (j * NUM_BATCH_ROWS + i) as i64 * 5000,
765 user_id: 42 + i as i32 % 7,
766 device_id: 69 + i as i32 % 11,
767 });
768 let results = sset_ref.insert(keys.clone()).await.unwrap();
769 assert_eq!(results, vec![true; NUM_BATCH_ROWS]);
770
771 let results = sset_ref.insert(keys.clone()).await.unwrap();
772 assert_eq!(results, vec![false; NUM_BATCH_ROWS]);
773 })
774 .await;
775 println!(
776 "Inserting {} keys took {:?}",
777 NUM_BATCHES * NUM_BATCH_ROWS,
778 start.elapsed()
779 );
780
781 drop(sset);
782
783 let ts = Instant::now();
784 while HOT_SEGMENTS.load(Ordering::Relaxed) > 0 && ts.elapsed() < Duration::from_secs(15) {
785 sleep(Duration::from_millis(10)).await;
786 }
787
788 let mut files = read_dir(tmp_path.clone()).await.unwrap();
790 let mut num_files = 0;
791 while let Ok(Some(_file)) = files.next_entry().await {
792 num_files += 1;
793 }
794 println!("Found {} files", num_files);
795 assert!(num_files > 1);
796
797 let sset = TkStreamSetBuilder::new()
798 .with_working_dir(tmp_path.clone())
799 .with_segment_time_interval(Duration::from_secs(300))
800 .build()
801 .await
802 .unwrap();
803
804 let sset = &sset;
805 stream::iter(batch_order.iter())
806 .for_each_concurrent(num_cpus::get(), |j| async move {
807 let keys = (0..NUM_BATCH_ROWS).map(|i| UserDeviceTimeKey {
808 timestamp_us: now_ts + (j * NUM_BATCH_ROWS + i) as i64 * 5000,
809 user_id: 42 + i as i32 % 7,
810 device_id: 69 + i as i32 % 11,
811 });
812
813 let results = sset.insert(keys.clone()).await.unwrap();
814 assert_eq!(results, vec![false; NUM_BATCH_ROWS]);
815 })
816 .await
817 }
818
819 #[tokio::test]
820 async fn can_hydrate_drop_rehydrate_new_segments_then_old_segments() {
821 let tmp_dir =
823 TempDir::new("can_hydrate_drop_rehydrate_new_segments_then_old_segments").unwrap();
824 let tmp_path = tmp_dir.into_path();
825
826 let sset = TkStreamSetBuilder::new()
827 .with_working_dir(tmp_path.clone())
828 .with_segment_time_interval(Duration::from_secs(300))
829 .with_memory_limit(MemoryLimit::Low)
830 .build()
831 .await
832 .unwrap();
833 let now_ts = SystemTime::now()
834 .duration_since(SystemTime::UNIX_EPOCH)
835 .unwrap()
836 .as_micros() as i64;
837
838 const NUM_BATCHES: usize = 24 * 3600;
839 const NUM_BATCH_ROWS: usize = 100;
840 let batch_order = (0..NUM_BATCHES).collect::<Vec<_>>();
841
842 let start = Instant::now();
843 let sset_ref = &sset;
844 stream::iter(batch_order[(NUM_BATCHES / 2)..].iter())
845 .for_each_concurrent(num_cpus::get(), |j| async move {
846 if j % 100 == 0 {
847 println!("Inserting batch {}", j);
848 }
849 let keys = (0..NUM_BATCH_ROWS).map(move |i| UserDeviceTimeKey {
850 timestamp_us: now_ts + (j * NUM_BATCH_ROWS + i) as i64 * 5000,
851 user_id: 42 + i as i32 % 7,
852 device_id: 69 + i as i32 % 11,
853 });
854 let results = sset_ref.insert(keys.clone()).await.unwrap();
855 assert_eq!(results, vec![true; NUM_BATCH_ROWS]);
856
857 let results = sset_ref.insert(keys.clone()).await.unwrap();
858 assert_eq!(results, vec![false; NUM_BATCH_ROWS]);
859 })
860 .await;
861 println!(
862 "Inserting {} keys took {:?}",
863 NUM_BATCHES * NUM_BATCH_ROWS,
864 start.elapsed()
865 );
866 stream::iter(batch_order[..(NUM_BATCHES / 2)].iter())
867 .for_each_concurrent(num_cpus::get(), |j| async move {
868 if j % 100 == 0 {
869 println!("Inserting batch {}", j);
870 }
871 let keys = (0..NUM_BATCH_ROWS).map(move |i| UserDeviceTimeKey {
872 timestamp_us: now_ts + (j * NUM_BATCH_ROWS + i) as i64 * 5000,
873 user_id: 42 + i as i32 % 7,
874 device_id: 69 + i as i32 % 11,
875 });
876 let results = sset_ref.insert(keys.clone()).await.unwrap();
877 assert_eq!(results, vec![true; NUM_BATCH_ROWS]);
878
879 let results = sset_ref.insert(keys.clone()).await.unwrap();
880 assert_eq!(results, vec![false; NUM_BATCH_ROWS]);
881 })
882 .await;
883 println!(
884 "Inserting {} keys took {:?}",
885 NUM_BATCHES * NUM_BATCH_ROWS,
886 start.elapsed()
887 );
888
889 drop(sset);
890
891 let ts = Instant::now();
892 while HOT_SEGMENTS.load(Ordering::Relaxed) > 0 && ts.elapsed() < Duration::from_secs(15) {
893 sleep(Duration::from_millis(10)).await;
894 }
895
896 let mut files = read_dir(tmp_path.clone()).await.unwrap();
898 let mut num_files = 0;
899 while let Ok(Some(_file)) = files.next_entry().await {
900 num_files += 1;
901 }
902 println!("Found {} files", num_files);
903 assert!(num_files > 1);
904
905 let sset = TkStreamSetBuilder::new()
906 .with_working_dir(tmp_path.clone())
907 .with_segment_time_interval(Duration::from_secs(300))
908 .build()
909 .await
910 .unwrap();
911
912 let sset = &sset;
913 stream::iter(batch_order.iter())
914 .for_each_concurrent(num_cpus::get(), |j| async move {
915 let keys = (0..NUM_BATCH_ROWS).map(|i| UserDeviceTimeKey {
916 timestamp_us: now_ts + (j * NUM_BATCH_ROWS + i) as i64 * 5000,
917 user_id: 42 + i as i32 % 7,
918 device_id: 69 + i as i32 % 11,
919 });
920
921 let results = sset.insert(keys.clone()).await.unwrap();
922 assert_eq!(results, vec![false; NUM_BATCH_ROWS]);
923 })
924 .await
925 }
926
927 #[cfg(not(debug_assertions))]
929 #[tokio::test]
930 async fn can_hydrate_drop_rehydrate_across_day_of_segments() {
931 let tmp_dir = TempDir::new("can_hydrate_drop_rehydrate_across_day_of_segments").unwrap();
933 let tmp_path = tmp_dir.into_path();
934
935 let sset = TkStreamSetBuilder::new()
936 .with_working_dir(tmp_path.clone())
937 .with_segment_time_interval(Duration::from_secs(60 * 60 * 2))
938 .with_memory_limit(MemoryLimit::Low)
939 .build()
940 .await
941 .unwrap();
942 let now_ts = SystemTime::now()
943 .duration_since(SystemTime::UNIX_EPOCH)
944 .unwrap()
945 .as_micros() as i64;
946
947 const NUM_BATCHES: usize = 48 * 3600; const NUM_BATCH_ROWS: usize = 5000; let start = Instant::now();
951 let sset_ref = &sset;
952 stream::iter(0..NUM_BATCHES)
953 .for_each_concurrent(num_cpus::get(), |j| async move {
954 if j % 100 == 0 {
955 println!("Inserting batch {}", j);
956 }
957 let keys = (0..NUM_BATCH_ROWS).map(move |i| UserDeviceTimeKey {
958 timestamp_us: now_ts + (j * NUM_BATCH_ROWS + i) as i64 * 1000,
959 user_id: 42 + i as i32 % 7,
960 device_id: 69 + i as i32 % 11,
961 });
962 let results = sset_ref.insert(keys.clone()).await.unwrap();
963 assert_eq!(results, vec![true; NUM_BATCH_ROWS]);
964
965 let results = sset_ref.insert(keys.clone()).await.unwrap();
966 assert_eq!(results, vec![false; NUM_BATCH_ROWS]);
967 })
968 .await;
969 println!(
970 "Inserting {} keys took {:?}",
971 NUM_BATCHES * NUM_BATCH_ROWS,
972 start.elapsed()
973 );
974
975 drop(sset);
976
977 let ts = Instant::now();
978 while HOT_SEGMENTS.load(Ordering::Relaxed) > 0 && ts.elapsed() < Duration::from_secs(15) {
979 sleep(Duration::from_millis(10)).await;
980 }
981
982 let mut files = read_dir(tmp_path.clone()).await.unwrap();
984 let has_files = files.next_entry().await.unwrap().is_some();
985 assert!(has_files);
986
987 let sset = TkStreamSetBuilder::new()
988 .with_working_dir(tmp_path.clone())
989 .with_segment_time_interval(Duration::from_secs(60 * 60 * 2))
990 .with_memory_limit(MemoryLimit::Low)
991 .build()
992 .await
993 .unwrap();
994
995 let sset = &sset;
996 stream::iter(0..NUM_BATCHES)
997 .for_each_concurrent(num_cpus::get(), |j| async move {
998 let keys = (0..NUM_BATCH_ROWS).map(|i| UserDeviceTimeKey {
999 timestamp_us: now_ts + (j * NUM_BATCH_ROWS + i) as i64 * 1000,
1000 user_id: 42 + i as i32 % 7,
1001 device_id: 69 + i as i32 % 11,
1002 });
1003
1004 let results = sset.insert(keys.clone()).await.unwrap();
1005 assert_eq!(results, vec![false; NUM_BATCH_ROWS]);
1006 })
1007 .await
1008 }
1009}