time_key_stream_set/
stream_set.rs

1use std::{
2    cell::RefCell,
3    cmp::min,
4    collections::{BTreeMap, BTreeSet, HashSet},
5    fmt::{Debug, Formatter},
6    mem::{replace, size_of, swap, take},
7    num::NonZeroUsize,
8    path::{Path, PathBuf},
9    rc::Rc,
10    sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering},
11    time::SystemTime,
12};
13
14use async_compression::tokio::{bufread::ZstdDecoder, write::ZstdEncoder};
15use futures::{stream, StreamExt};
16use lru::LruCache;
17use tokio::{
18    fs::{create_dir_all, remove_file, File, OpenOptions},
19    io::{AsyncReadExt, AsyncWriteExt, BufReader},
20    sync::Mutex,
21    task::spawn_blocking,
22};
23use tsz_compress::prelude::*;
24
25use crate::{prelude::*, time_key::TimeKeyItem};
26
27static HOT_SEGMENTS: AtomicIsize = AtomicIsize::new(0);
28struct DeferHotSegment(u64);
29impl Drop for DeferHotSegment {
30    fn drop(&mut self) {
31        let hot_segments = HOT_SEGMENTS.fetch_sub(1, Ordering::Relaxed);
32        println!("Dehydrating segment ({}): {:?}", hot_segments, self.0);
33    }
34}
35
36///
37/// Implement the stream set interface
38///
39unsafe impl Send for TkStreamSet {}
40unsafe impl Sync for TkStreamSet {}
41impl TkStreamSet {
42    pub(crate) async fn initialize(&mut self) -> TkssResult<()> {
43        // Confirm the working directory exists
44        if !self.working_dir.exists() {
45            create_dir_all(self.working_dir.clone())
46                .await
47                .map_err(|_| TkssError::InvalidDirectory(self.working_dir.clone()))?;
48        }
49
50        println!("Initializing stream set in {}", self.working_dir.display());
51
52        self.state.segment_interval_us = self.segment_time_interval.as_micros() as u64;
53        self.state.retention_us = self.segment_retention_period.as_micros() as u64;
54        self.state.max_memory_usage = match self.memory_limit {
55            MemoryLimit::VeryLow => 1 * 1024 * 1024 * 1024,
56            MemoryLimit::Low => 8 * 1024 * 1024 * 1024,
57            MemoryLimit::Medium => 32 * 1024 * 1024 * 1024,
58            MemoryLimit::High => 128 * 1024 * 1024 * 1024,
59            MemoryLimit::VeryHigh => 1024 * 1024 * 1024 * 1024,
60            MemoryLimit::Unlimited => usize::MAX,
61        };
62
63        let max_segments = self.state.max_memory_usage as f64 * 0.8 / (256 * 1024 * 1024) as f64;
64        let max_segments = min(1024, max_segments as usize);
65        println!("Using {} segments in Lru Cache", max_segments);
66        self.state.segment_cache =
67            Mutex::new(LruCache::new(NonZeroUsize::new(max_segments).unwrap()));
68
69        Ok(())
70    }
71
72    pub async fn insert<I, T>(&self, keys: I) -> TkssResult<Vec<bool>>
73    where
74        I: Iterator<Item = T>,
75        T: TimeKey,
76    {
77        // Find the segments that need to be loaded into memory
78        let prune_segment = (SystemTime::now()
79            .duration_since(SystemTime::UNIX_EPOCH)
80            .unwrap()
81            .as_micros() as u64
82            - self.state.retention_us)
83            / self.state.segment_interval_us
84            * self.state.segment_interval_us;
85        let mut segmented_keys = keys
86            .map(|k| {
87                (
88                    k,
89                    k.timestamp_us() / self.state.segment_interval_us
90                        * self.state.segment_interval_us,
91                )
92            })
93            .collect::<Vec<_>>();
94        segmented_keys.sort_by_key(|(_, s)| *s);
95        if segmented_keys.is_empty() {
96            return Ok(vec![]);
97        }
98
99        // We now hold exclusive access over the segment index for the whole stream set
100        let mut segment_idx = self.state.segment_idx.lock().await;
101
102        // Split off the old segments that are no longer needed
103        // Put the segments that we actually want back in the index
104        let mut prune = segment_idx.split_off(&prune_segment);
105        swap(&mut *segment_idx, &mut prune);
106        stream::iter(prune.into_iter())
107            .for_each_concurrent(num_cpus::get(), |(_, v)| async move {
108                // If deletion fails, we just ignore it
109                let _ = v
110                    .as_ref()
111                    .borrow()
112                    .delete_segment()
113                    .await
114                    .and_then(|bytes| {
115                        self.state
116                            .estimated_memory_used
117                            .fetch_sub(bytes, std::sync::atomic::Ordering::Relaxed);
118                        Ok(())
119                    });
120            })
121            .await;
122
123        // These are the segments that need to be in memory right now
124        let first_segment = segmented_keys[0].1;
125        let last_segment = segmented_keys.last().unwrap().1;
126        let segments = segment_idx
127            .range(first_segment..=last_segment)
128            .map(|(k, v)| (k.clone(), v.clone()))
129            .collect::<Vec<_>>();
130
131        // Create the segments if they do not exist
132        let mut new_segments = false;
133        let new_segment_set = segmented_keys
134            .iter()
135            .filter(|(_, s)| {
136                *s >= prune_segment
137                    && !segments
138                        .iter()
139                        .any(|seg| seg.1.borrow().spans_key(*s, self.state.segment_interval_us))
140            })
141            .map(|(_, s)| *s)
142            .collect::<HashSet<_>>();
143
144        // Inserting new segments may evict segments from the LRU cache that are still in use ... (thrashing)
145        let mut cache = self.state.segment_cache.lock().await;
146        let uniq_keys = segmented_keys
147            .iter()
148            .map(|(_, s)| s)
149            .collect::<HashSet<_>>();
150        uniq_keys.iter().for_each(|s| cache.promote(s));
151        for s in new_segment_set {
152            new_segments = true;
153            let segment = Rc::new(RefCell::new(Segment::new(false, &self.working_dir, s)));
154            segment_idx.insert(s, segment.clone());
155            let stale_segment = cache.push(s, segment);
156
157            if let Some((stale_k, stale_segment)) = stale_segment {
158                println!("New segment, Evicting segment {} from memory", stale_k);
159                debug_assert!(stale_segment.borrow().hot.load(Ordering::SeqCst));
160                if uniq_keys.contains(&stale_k) {
161                    tracing::error!("Warning segment cache too small ({}), cannot evict required segment, {} of {:?}", cache.len(), stale_k, uniq_keys );
162                    println!("Warning segment cache too small ({}), cannot evict required segment, {} of {:?}", cache.len(), stale_k, uniq_keys );
163                    let usage = stale_segment.as_ref().borrow().dehydrate_segment().await?;
164                    self.state
165                        .estimated_memory_used
166                        .fetch_sub(usage, std::sync::atomic::Ordering::Relaxed);
167                }
168            }
169        }
170
171        // Find and lock the segments again, if we created new ones
172        let segments = if !new_segments {
173            segments
174        } else {
175            segment_idx
176                .range(first_segment..=last_segment)
177                .map(|(k, v)| (k.clone(), v.clone()))
178                .collect::<Vec<_>>()
179        };
180
181        // Bring the segments into memory and flush stale segments to disk if we are over the memory limit
182        let required_segments = segments.iter().map(|(k, _)| *k).collect::<Vec<_>>();
183        let mut cannot_evict_more = false;
184        for (_k, s) in segments.iter() {
185            // Evict data from memory if we cannot fit the new data in memory
186            if cannot_evict_more {
187                break;
188            }
189            // println!(
190            //     "Estimated memory usage {} > {}?",
191            //     self.state.estimated_memory_used.load(Ordering::Relaxed),
192            //     self.state.max_memory_usage
193            // );
194            while self.state.estimated_memory_used.load(Ordering::Relaxed)
195                > self.state.max_memory_usage
196                && !cache.is_empty()
197            {
198                println!(
199                    "High memory pressure {} > {}, Evicting segment from memory",
200                    self.state.estimated_memory_used.load(Ordering::Relaxed),
201                    self.state.max_memory_usage
202                );
203                // Every item in this insertion was already promoted
204                let stale_segment = cache.pop_lru();
205                if let Some((stale_k, stale_segment)) = stale_segment {
206                    if required_segments.contains(&stale_k) {
207                        println!("Cannot evict required segment");
208                        cache.push(stale_k, stale_segment);
209                        cannot_evict_more = true;
210                        break;
211                    } else {
212                        debug_assert!(stale_segment.borrow().hot.load(Ordering::SeqCst));
213                        let usage = stale_segment.as_ref().borrow().dehydrate_segment().await?;
214                        self.state
215                            .estimated_memory_used
216                            .fetch_sub(usage, std::sync::atomic::Ordering::Relaxed);
217                    }
218                }
219            }
220
221            // Load the segment into memory if it is not already
222            let seg = s.as_ref().borrow();
223            if !seg.hot() {
224                let usage = seg.hydrate_segment().await?;
225                self.state
226                    .estimated_memory_used
227                    .fetch_add(usage, std::sync::atomic::Ordering::Relaxed);
228            }
229        }
230
231        // We now need to hold exclusive access over the segments that are required for this insert
232        let mut _guards = Vec::with_capacity(segments.len());
233        for (k, s) in segments.iter() {
234            let b = s.as_ref().borrow();
235            _guards.push((*k, b));
236        }
237        let mut guards = stream::iter(_guards.iter())
238            .map(|(k, b)| async move { (*k, b.idx.lock().await) })
239            .buffer_unordered(num_cpus::get())
240            .collect::<Vec<_>>()
241            .await;
242
243        // We no longer need to hold exclusive access over the cache and the segment index
244        drop(cache);
245        drop(segment_idx);
246
247        // Insert the keys into the segments
248        let deduped = segmented_keys
249            .into_iter()
250            .map(|(k, s)| {
251                if s < prune_segment {
252                    return false;
253                }
254                let Some(idx) = guards.iter_mut().find(|seg| seg.0 == s) else {
255                    tracing::error!(
256                        "Pseudo-panic: Expected segment {} in {:?}",
257                        s,
258                        guards.iter().map(|(k, _)| k).collect::<Vec<_>>()
259                    );
260                    println!(
261                        "Pseudo-panic: Expected segment {} in {:?}",
262                        s,
263                        guards.iter().map(|(k, _)| k).collect::<Vec<_>>()
264                    );
265                    return false;
266                };
267                idx.1.insert(k.key())
268            })
269            .collect::<Vec<_>>();
270
271        // Dehydrate any evicted segments not yet dehydrated
272        drop(guards);
273        drop(_guards);
274
275        // Update memory usage
276        let new_keys = deduped.iter().filter(|b| **b).count();
277        self.state.estimated_memory_used.fetch_add(
278            (new_keys as f32 * size_of::<u128>() as f32 * 2.1) as usize,
279            Ordering::Relaxed,
280        );
281
282        Ok(deduped)
283    }
284}
285
286pub(crate) struct TkStreamSetState {
287    ///
288    /// Cached interval for bucketing segments
289    ///
290    segment_interval_us: u64,
291
292    ///
293    /// Cached interval for deleting segments
294    /// Segments older than (now - retention_us) will be deleted
295    ///
296    retention_us: u64,
297
298    ///
299    /// Cached maximum memory usage in bytes
300    ///
301    max_memory_usage: usize,
302
303    ///
304    /// An ongoing estimate of the amount of memory used by the stream set
305    /// in bytes.
306    ///
307    estimated_memory_used: AtomicUsize,
308
309    ///
310    /// This keeps an ordered map of the time-keyed segments based on
311    /// the first timestamp in the segment.
312    ///
313    segment_idx: Mutex<BTreeMap<u64, Rc<RefCell<Segment>>>>,
314
315    ///
316    /// This keeps an LRU cache for segments in memory
317    ///
318    segment_cache: Mutex<LruCache<u64, Rc<RefCell<Segment>>>>,
319}
320
321impl Default for TkStreamSetState {
322    fn default() -> Self {
323        TkStreamSetState {
324            segment_interval_us: 0,
325            retention_us: 0,
326            max_memory_usage: 0,
327            estimated_memory_used: 0.into(),
328            segment_idx: Mutex::new(BTreeMap::new()),
329            segment_cache: Mutex::new(LruCache::new(NonZeroUsize::new(16).unwrap())),
330        }
331    }
332}
333
334impl Debug for TkStreamSetState {
335    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
336        f.debug_struct("TkStreamSetState")
337            .field("segment_interval_us", &self.segment_interval_us)
338            .finish()
339    }
340}
341
342///
343/// A `Segment` represents the smallest unit of data that is read/written to disk.
344/// They represent a unique interval of time, and are bucketed by the first timestamp.
345///
346/// It is recommended for efficiency that the segment interval is sized such that
347/// segments are not too large, but also not too small. A good rule of thumb is to
348/// target a segment size of 256MiB or less for optimal log2(n) insertion performance
349/// and reduced impact of single-threaded tsz_compress compaction.
350///
351/// The `hot` flag indicates whether the segment is currently in memory or not. Where
352/// segments are held in an LRU cache sized based on the `max_memory_usage` parameter
353/// and the `segment_interval_us` parameter to hold ~256MiB of segments in memory.
354///
355#[allow(unused)]
356pub(crate) struct Segment {
357    halt: AtomicBool,
358    hot: AtomicBool,
359    working_dir: PathBuf,
360    timestamp_us: u64,
361    idx: Mutex<BTreeSet<TimeKeyItem>>,
362}
363
364impl Segment {
365    pub fn new(halt: bool, working_dir: &Path, timestamp_us: u64) -> Self {
366        Segment {
367            halt: halt.into(),
368            hot: false.into(),
369            working_dir: working_dir.to_path_buf(),
370            timestamp_us,
371            idx: Mutex::new(BTreeSet::new()),
372        }
373    }
374
375    pub fn spans_key(&self, timestamp_us: u64, interval_us: u64) -> bool {
376        self.timestamp_us <= timestamp_us && timestamp_us < self.timestamp_us + interval_us
377    }
378
379    pub fn hot(&self) -> bool {
380        self.hot.load(Ordering::Relaxed)
381    }
382
383    fn path(&self) -> PathBuf {
384        self.working_dir.join(format!("{}.bin", self.timestamp_us))
385    }
386
387    #[tracing::instrument(skip(self), ret)]
388    pub async fn hydrate_segment(&self) -> TkssResult<usize> {
389        let mut idx = self.idx.lock().await;
390        let hot = self.hot.swap(true, Ordering::Relaxed);
391        if hot {
392            // No change in size, if already hot
393            return Ok(0);
394        }
395
396        let hot_segments = HOT_SEGMENTS.fetch_add(1, Ordering::Relaxed);
397
398        println!(
399            "Hydrating segment ({}): {:?}",
400            hot_segments, self.timestamp_us
401        );
402
403        // If the path doesn't exist, it wasn't written to disk
404        let path = self.path();
405        if !path.exists() {
406            return Ok(0);
407        }
408
409        // Read all the bits from disk into a Vec<u8>
410        let file = BufReader::new(File::open(&path).await?);
411        let mut zfile = ZstdDecoder::new(file);
412        let mut compressed = Vec::new();
413        zfile.read_to_end(&mut compressed).await?;
414        let bytes = compressed.len();
415
416        // Expect the header to be "TKSS"
417        let header = "TKSS".as_bytes();
418        if &compressed[0..header.len()] != header {
419            Err(TkssError::InvalidHeader(path.clone()))?;
420        }
421        let compressed = &compressed[header.len()..];
422
423        // The next 8 bytes are the u64 bit length of the compressed data
424        let length = u64::from_be_bytes(compressed[..8].try_into()?);
425        if (length + 7) as isize / 8 < compressed.len() as isize - (header.len() + 8) as isize {
426            println!("Length: {} ({} bytes)", length, (length + 7) as usize / 8);
427            Err(TkssError::InvalidLength(path.clone()))?;
428        }
429        let compressed = &compressed[8..];
430        let bits = BitBufferSlice::from_slice(compressed);
431        let bits = bits.split_at(length as usize).0;
432        let mut decompressor = Decompressor::new(bits);
433
434        println!(
435            "Read segment: {:?} ({} + 8 ({}) + {} = {} bytes)",
436            self.timestamp_us,
437            header.len(),
438            length,
439            compressed.len(),
440            bytes
441        );
442
443        // Decompress the data directly into the index
444        idx.extend(
445            decompressor
446                .decompress()
447                .filter_map(|e: Result<TimeKeyItem, _>| match e {
448                    Ok(e) => Some(e),
449                    Err(e) => {
450                        tracing::error!("Error decompressing segment: {:?}", e);
451                        None
452                    }
453                }),
454        );
455
456        // Return the estimated size of the segment based on the number of items
457        let size_estimate = (idx.len() as f64 * size_of::<u128>() as f64 * 2.1) as usize;
458        println!("Estimated size: {}", size_estimate);
459        Ok(size_estimate)
460    }
461
462    #[tracing::instrument(skip(self), ret)]
463    pub async fn dehydrate_segment(&self) -> TkssResult<usize> {
464        let mut idx = self.idx.lock().await;
465        let hot = self.hot.swap(false, Ordering::Relaxed);
466        if !hot {
467            // No change in size, if already cold
468            return Ok(0);
469        }
470
471        let _hot_segment = DeferHotSegment(self.timestamp_us);
472
473        // Compress the data
474        let pts = idx.len();
475        let size_estimate = (idx.len() as f64 * size_of::<u128>() as f64 * 2.1) as usize;
476        let elems = take(&mut *idx);
477        let compressed = spawn_blocking(move || {
478            let mut compressor = Compressor::new();
479            elems.into_iter().for_each(|e| compressor.compress(e));
480            let mut compressed = compressor.finish();
481            compressed.shrink_to_fit();
482            compressed.set_uninitialized(false);
483            compressed
484        })
485        .await?;
486
487        // Write the data to disk
488        let header = "TKSS".as_bytes();
489        let length = compressed.len() as u64;
490        let compressed = compressed.into_vec();
491        let file = OpenOptions::new()
492            .write(true)
493            .create(true)
494            .open(&self.path())
495            .await?;
496        let mut zfile = ZstdEncoder::new(file);
497
498        // Write the header, the bits length as a u64, and then the bits
499        zfile.write_all(header).await?;
500        zfile.write_u64(length).await?;
501        zfile.write_all(&compressed).await?;
502
503        // Flush the file and drop it, print the file size
504        zfile.shutdown().await?;
505        let bytes = zfile.into_inner().metadata().await?.len();
506        println!(
507            "Wrote segment: {:?} ({} + 8 ({}) + {} = {} bytes / {} bytes) ({:.2}x compression)",
508            self.timestamp_us,
509            header.len(),
510            length,
511            compressed.len(),
512            header.len() + 8 + compressed.len(),
513            bytes,
514            (pts as f64 * size_of::<u128>() as f64) / (bytes as f64)
515        );
516
517        // Return the size of the data released
518        Ok(size_estimate)
519    }
520
521    #[tracing::instrument(skip(self), ret)]
522    pub async fn delete_segment(&self) -> TkssResult<usize> {
523        let mut idx = self.idx.lock().await;
524        let hot = self.hot.swap(false, Ordering::Relaxed);
525        self.halt.store(true, Ordering::Relaxed);
526        let pts = if !hot {
527            0
528        } else {
529            HOT_SEGMENTS.fetch_sub(1, Ordering::Relaxed);
530            idx.len()
531        };
532
533        // Count how many samples are getting deleted
534        let size_estimate = (pts as f64 * size_of::<u128>() as f64 * 2.1) as usize;
535        idx.clear();
536
537        // Delete the file
538        let path = self.path();
539        if path.exists() {
540            println!("Deleting segment: {:?}", self.timestamp_us);
541            remove_file(&path).await?;
542        }
543
544        // Return the size of the data released
545        Ok(size_estimate)
546    }
547}
548
549///
550/// Upon drop, take the index and current state out of this segment
551/// and spawn a task to write it to disk, leaving an empty segment in place.
552///
553impl Drop for Segment {
554    fn drop(&mut self) {
555        println!("Dropping segment: {:?}", self.timestamp_us);
556        if self.halt.swap(true, Ordering::SeqCst) {
557            return;
558        }
559        let this = replace(
560            self,
561            Segment::new(true, &self.working_dir, self.timestamp_us),
562        );
563        tokio::spawn(async move {
564            println!("Dehydrating segment on drop: {:?}", this.timestamp_us);
565            let _ = this.dehydrate_segment().await;
566        });
567    }
568}
569
570#[cfg(test)]
571mod tests {
572    use std::{
573        sync::atomic::Ordering,
574        time::{Duration, Instant, SystemTime},
575    };
576
577    use futures::{stream, StreamExt};
578    use rand::{seq::SliceRandom, thread_rng};
579    use tempdir::TempDir;
580    use tokio::{fs::read_dir, time::sleep};
581
582    use crate::{prelude::*, stream_set::HOT_SEGMENTS};
583
584    #[tokio::test]
585    async fn can_insert_keys() {
586        let sset = TkStreamSetBuilder::new().build().await.unwrap();
587        let now_ts = SystemTime::now()
588            .duration_since(SystemTime::UNIX_EPOCH)
589            .unwrap()
590            .as_micros() as i64;
591        let keys = (0..10).map(|i| UserDeviceTimeKey {
592            timestamp_us: now_ts + i * 1000,
593            user_id: 42,
594            device_id: 69,
595        });
596        let results = sset.insert(keys.clone()).await.unwrap();
597        assert_eq!(results, vec![true; 10]);
598
599        let results = sset.insert(keys).await.unwrap();
600        assert_eq!(results, vec![false; 10]);
601    }
602
603    #[tokio::test]
604    async fn can_hydrate_drop_rehydrate() {
605        // Configure a temporary directory that will be deleted when the test is done
606        let tmp_dir = TempDir::new("can_hydrate_drop_rehydrate").unwrap();
607        let tmp_path = tmp_dir.into_path();
608
609        let sset = TkStreamSetBuilder::new()
610            .with_working_dir(tmp_path.clone())
611            .build()
612            .await
613            .unwrap();
614        let now_ts = SystemTime::now()
615            .duration_since(SystemTime::UNIX_EPOCH)
616            .unwrap()
617            .as_micros() as i64;
618        let keys = (0..50).map(|i| UserDeviceTimeKey {
619            timestamp_us: now_ts + i * 1000,
620            user_id: 42,
621            device_id: 69,
622        });
623        let results = sset.insert(keys.clone()).await.unwrap();
624        assert_eq!(results, vec![true; 50]);
625
626        let results = sset.insert(keys.clone()).await.unwrap();
627        assert_eq!(results, vec![false; 50]);
628
629        drop(sset);
630
631        let ts = Instant::now();
632        while HOT_SEGMENTS.load(Ordering::Relaxed) > 0 && ts.elapsed() < Duration::from_secs(15) {
633            sleep(Duration::from_millis(10)).await;
634        }
635
636        // assert that there is a file in the directory
637        let mut files = read_dir(tmp_path.clone()).await.unwrap();
638        let has_files = files.next_entry().await.unwrap().is_some();
639        assert!(has_files);
640
641        let sset = TkStreamSetBuilder::new()
642            .with_working_dir(tmp_path.clone())
643            .build()
644            .await
645            .unwrap();
646        let results = sset.insert(keys).await.unwrap();
647        assert_eq!(results, vec![false; 50]);
648    }
649
650    #[tokio::test]
651    async fn can_hydrate_drop_rehydrate_across_segments() {
652        // Configure a temporary directory that will be deleted when the test is done
653        let tmp_dir = TempDir::new("can_hydrate_drop_rehydrate_across_segments").unwrap();
654        let tmp_path = tmp_dir.into_path();
655
656        let sset = TkStreamSetBuilder::new()
657            .with_working_dir(tmp_path.clone())
658            .with_segment_time_interval(Duration::from_secs(300))
659            .build()
660            .await
661            .unwrap();
662        let now_ts = SystemTime::now()
663            .duration_since(SystemTime::UNIX_EPOCH)
664            .unwrap()
665            .as_micros() as i64;
666
667        const NUM_BATCHES: usize = 3600; // 1 hour
668        const NUM_BATCH_ROWS: usize = 200; // 1 second each
669
670        let start = Instant::now();
671        let sset_ref = &sset;
672        stream::iter(0..NUM_BATCHES)
673            .for_each_concurrent(num_cpus::get(), |j| async move {
674                if j % 100 == 0 {
675                    println!("Inserting batch {}", j);
676                }
677                let keys = (0..NUM_BATCH_ROWS).map(move |i| UserDeviceTimeKey {
678                    timestamp_us: now_ts + (j * NUM_BATCH_ROWS + i) as i64 * 5000,
679                    user_id: 42 + i as i32 % 7,
680                    device_id: 69 + i as i32 % 11,
681                });
682                let results = sset_ref.insert(keys.clone()).await.unwrap();
683                assert_eq!(results, vec![true; NUM_BATCH_ROWS]);
684
685                let results = sset_ref.insert(keys.clone()).await.unwrap();
686                assert_eq!(results, vec![false; NUM_BATCH_ROWS]);
687            })
688            .await;
689        println!(
690            "Inserting {} keys took {:?}",
691            NUM_BATCHES * NUM_BATCH_ROWS,
692            start.elapsed()
693        );
694
695        drop(sset);
696
697        let ts = Instant::now();
698        while HOT_SEGMENTS.load(Ordering::Relaxed) > 0 && ts.elapsed() < Duration::from_secs(15) {
699            sleep(Duration::from_millis(10)).await;
700        }
701
702        // assert that there is a file in the directory
703        let mut files = read_dir(tmp_path.clone()).await.unwrap();
704        let mut num_files = 0;
705        while let Ok(Some(_file)) = files.next_entry().await {
706            num_files += 1;
707        }
708        println!("Found {} files", num_files);
709        assert!(num_files > 1);
710
711        let sset = TkStreamSetBuilder::new()
712            .with_working_dir(tmp_path.clone())
713            .with_segment_time_interval(Duration::from_secs(300))
714            .build()
715            .await
716            .unwrap();
717
718        let sset = &sset;
719        stream::iter(0..NUM_BATCHES)
720            .for_each_concurrent(num_cpus::get(), |j| async move {
721                let keys = (0..NUM_BATCH_ROWS).map(|i| UserDeviceTimeKey {
722                    timestamp_us: now_ts + (j * NUM_BATCH_ROWS + i) as i64 * 5000,
723                    user_id: 42 + i as i32 % 7,
724                    device_id: 69 + i as i32 % 11,
725                });
726
727                let results = sset.insert(keys.clone()).await.unwrap();
728                assert_eq!(results, vec![false; NUM_BATCH_ROWS]);
729            })
730            .await
731    }
732
733    #[tokio::test]
734    async fn can_hydrate_drop_rehydrate_across_segments_random_insertion() {
735        // Configure a temporary directory that will be deleted when the test is done
736        let tmp_dir =
737            TempDir::new("can_hydrate_drop_rehydrate_across_segments_random_insertion").unwrap();
738        let tmp_path = tmp_dir.into_path();
739
740        let sset = TkStreamSetBuilder::new()
741            .with_working_dir(tmp_path.clone())
742            .with_segment_time_interval(Duration::from_secs(300))
743            .build()
744            .await
745            .unwrap();
746        let now_ts = SystemTime::now()
747            .duration_since(SystemTime::UNIX_EPOCH)
748            .unwrap()
749            .as_micros() as i64;
750
751        const NUM_BATCHES: usize = 2 * 3600;
752        const NUM_BATCH_ROWS: usize = 100;
753        let mut batch_order = (0..NUM_BATCHES).collect::<Vec<_>>();
754        batch_order.shuffle(&mut thread_rng());
755
756        let start = Instant::now();
757        let sset_ref = &sset;
758        stream::iter(batch_order.iter())
759            .for_each_concurrent(num_cpus::get(), |j| async move {
760                if j % 100 == 0 {
761                    println!("Inserting batch {}", j);
762                }
763                let keys = (0..NUM_BATCH_ROWS).map(move |i| UserDeviceTimeKey {
764                    timestamp_us: now_ts + (j * NUM_BATCH_ROWS + i) as i64 * 5000,
765                    user_id: 42 + i as i32 % 7,
766                    device_id: 69 + i as i32 % 11,
767                });
768                let results = sset_ref.insert(keys.clone()).await.unwrap();
769                assert_eq!(results, vec![true; NUM_BATCH_ROWS]);
770
771                let results = sset_ref.insert(keys.clone()).await.unwrap();
772                assert_eq!(results, vec![false; NUM_BATCH_ROWS]);
773            })
774            .await;
775        println!(
776            "Inserting {} keys took {:?}",
777            NUM_BATCHES * NUM_BATCH_ROWS,
778            start.elapsed()
779        );
780
781        drop(sset);
782
783        let ts = Instant::now();
784        while HOT_SEGMENTS.load(Ordering::Relaxed) > 0 && ts.elapsed() < Duration::from_secs(15) {
785            sleep(Duration::from_millis(10)).await;
786        }
787
788        // assert that there is a file in the directory
789        let mut files = read_dir(tmp_path.clone()).await.unwrap();
790        let mut num_files = 0;
791        while let Ok(Some(_file)) = files.next_entry().await {
792            num_files += 1;
793        }
794        println!("Found {} files", num_files);
795        assert!(num_files > 1);
796
797        let sset = TkStreamSetBuilder::new()
798            .with_working_dir(tmp_path.clone())
799            .with_segment_time_interval(Duration::from_secs(300))
800            .build()
801            .await
802            .unwrap();
803
804        let sset = &sset;
805        stream::iter(batch_order.iter())
806            .for_each_concurrent(num_cpus::get(), |j| async move {
807                let keys = (0..NUM_BATCH_ROWS).map(|i| UserDeviceTimeKey {
808                    timestamp_us: now_ts + (j * NUM_BATCH_ROWS + i) as i64 * 5000,
809                    user_id: 42 + i as i32 % 7,
810                    device_id: 69 + i as i32 % 11,
811                });
812
813                let results = sset.insert(keys.clone()).await.unwrap();
814                assert_eq!(results, vec![false; NUM_BATCH_ROWS]);
815            })
816            .await
817    }
818
819    #[tokio::test]
820    async fn can_hydrate_drop_rehydrate_new_segments_then_old_segments() {
821        // Configure a temporary directory that will be deleted when the test is done
822        let tmp_dir =
823            TempDir::new("can_hydrate_drop_rehydrate_new_segments_then_old_segments").unwrap();
824        let tmp_path = tmp_dir.into_path();
825
826        let sset = TkStreamSetBuilder::new()
827            .with_working_dir(tmp_path.clone())
828            .with_segment_time_interval(Duration::from_secs(300))
829            .with_memory_limit(MemoryLimit::Low)
830            .build()
831            .await
832            .unwrap();
833        let now_ts = SystemTime::now()
834            .duration_since(SystemTime::UNIX_EPOCH)
835            .unwrap()
836            .as_micros() as i64;
837
838        const NUM_BATCHES: usize = 24 * 3600;
839        const NUM_BATCH_ROWS: usize = 100;
840        let batch_order = (0..NUM_BATCHES).collect::<Vec<_>>();
841
842        let start = Instant::now();
843        let sset_ref = &sset;
844        stream::iter(batch_order[(NUM_BATCHES / 2)..].iter())
845            .for_each_concurrent(num_cpus::get(), |j| async move {
846                if j % 100 == 0 {
847                    println!("Inserting batch {}", j);
848                }
849                let keys = (0..NUM_BATCH_ROWS).map(move |i| UserDeviceTimeKey {
850                    timestamp_us: now_ts + (j * NUM_BATCH_ROWS + i) as i64 * 5000,
851                    user_id: 42 + i as i32 % 7,
852                    device_id: 69 + i as i32 % 11,
853                });
854                let results = sset_ref.insert(keys.clone()).await.unwrap();
855                assert_eq!(results, vec![true; NUM_BATCH_ROWS]);
856
857                let results = sset_ref.insert(keys.clone()).await.unwrap();
858                assert_eq!(results, vec![false; NUM_BATCH_ROWS]);
859            })
860            .await;
861        println!(
862            "Inserting {} keys took {:?}",
863            NUM_BATCHES * NUM_BATCH_ROWS,
864            start.elapsed()
865        );
866        stream::iter(batch_order[..(NUM_BATCHES / 2)].iter())
867            .for_each_concurrent(num_cpus::get(), |j| async move {
868                if j % 100 == 0 {
869                    println!("Inserting batch {}", j);
870                }
871                let keys = (0..NUM_BATCH_ROWS).map(move |i| UserDeviceTimeKey {
872                    timestamp_us: now_ts + (j * NUM_BATCH_ROWS + i) as i64 * 5000,
873                    user_id: 42 + i as i32 % 7,
874                    device_id: 69 + i as i32 % 11,
875                });
876                let results = sset_ref.insert(keys.clone()).await.unwrap();
877                assert_eq!(results, vec![true; NUM_BATCH_ROWS]);
878
879                let results = sset_ref.insert(keys.clone()).await.unwrap();
880                assert_eq!(results, vec![false; NUM_BATCH_ROWS]);
881            })
882            .await;
883        println!(
884            "Inserting {} keys took {:?}",
885            NUM_BATCHES * NUM_BATCH_ROWS,
886            start.elapsed()
887        );
888
889        drop(sset);
890
891        let ts = Instant::now();
892        while HOT_SEGMENTS.load(Ordering::Relaxed) > 0 && ts.elapsed() < Duration::from_secs(15) {
893            sleep(Duration::from_millis(10)).await;
894        }
895
896        // assert that there is a file in the directory
897        let mut files = read_dir(tmp_path.clone()).await.unwrap();
898        let mut num_files = 0;
899        while let Ok(Some(_file)) = files.next_entry().await {
900            num_files += 1;
901        }
902        println!("Found {} files", num_files);
903        assert!(num_files > 1);
904
905        let sset = TkStreamSetBuilder::new()
906            .with_working_dir(tmp_path.clone())
907            .with_segment_time_interval(Duration::from_secs(300))
908            .build()
909            .await
910            .unwrap();
911
912        let sset = &sset;
913        stream::iter(batch_order.iter())
914            .for_each_concurrent(num_cpus::get(), |j| async move {
915                let keys = (0..NUM_BATCH_ROWS).map(|i| UserDeviceTimeKey {
916                    timestamp_us: now_ts + (j * NUM_BATCH_ROWS + i) as i64 * 5000,
917                    user_id: 42 + i as i32 % 7,
918                    device_id: 69 + i as i32 % 11,
919                });
920
921                let results = sset.insert(keys.clone()).await.unwrap();
922                assert_eq!(results, vec![false; NUM_BATCH_ROWS]);
923            })
924            .await
925    }
926
927    // release build only
928    #[cfg(not(debug_assertions))]
929    #[tokio::test]
930    async fn can_hydrate_drop_rehydrate_across_day_of_segments() {
931        // Configure a temporary directory that will be deleted when the test is done
932        let tmp_dir = TempDir::new("can_hydrate_drop_rehydrate_across_day_of_segments").unwrap();
933        let tmp_path = tmp_dir.into_path();
934
935        let sset = TkStreamSetBuilder::new()
936            .with_working_dir(tmp_path.clone())
937            .with_segment_time_interval(Duration::from_secs(60 * 60 * 2))
938            .with_memory_limit(MemoryLimit::Low)
939            .build()
940            .await
941            .unwrap();
942        let now_ts = SystemTime::now()
943            .duration_since(SystemTime::UNIX_EPOCH)
944            .unwrap()
945            .as_micros() as i64;
946
947        const NUM_BATCHES: usize = 48 * 3600; // 1 day
948        const NUM_BATCH_ROWS: usize = 5000; // 1 second each
949
950        let start = Instant::now();
951        let sset_ref = &sset;
952        stream::iter(0..NUM_BATCHES)
953            .for_each_concurrent(num_cpus::get(), |j| async move {
954                if j % 100 == 0 {
955                    println!("Inserting batch {}", j);
956                }
957                let keys = (0..NUM_BATCH_ROWS).map(move |i| UserDeviceTimeKey {
958                    timestamp_us: now_ts + (j * NUM_BATCH_ROWS + i) as i64 * 1000,
959                    user_id: 42 + i as i32 % 7,
960                    device_id: 69 + i as i32 % 11,
961                });
962                let results = sset_ref.insert(keys.clone()).await.unwrap();
963                assert_eq!(results, vec![true; NUM_BATCH_ROWS]);
964
965                let results = sset_ref.insert(keys.clone()).await.unwrap();
966                assert_eq!(results, vec![false; NUM_BATCH_ROWS]);
967            })
968            .await;
969        println!(
970            "Inserting {} keys took {:?}",
971            NUM_BATCHES * NUM_BATCH_ROWS,
972            start.elapsed()
973        );
974
975        drop(sset);
976
977        let ts = Instant::now();
978        while HOT_SEGMENTS.load(Ordering::Relaxed) > 0 && ts.elapsed() < Duration::from_secs(15) {
979            sleep(Duration::from_millis(10)).await;
980        }
981
982        // assert that there is a file in the directory
983        let mut files = read_dir(tmp_path.clone()).await.unwrap();
984        let has_files = files.next_entry().await.unwrap().is_some();
985        assert!(has_files);
986
987        let sset = TkStreamSetBuilder::new()
988            .with_working_dir(tmp_path.clone())
989            .with_segment_time_interval(Duration::from_secs(60 * 60 * 2))
990            .with_memory_limit(MemoryLimit::Low)
991            .build()
992            .await
993            .unwrap();
994
995        let sset = &sset;
996        stream::iter(0..NUM_BATCHES)
997            .for_each_concurrent(num_cpus::get(), |j| async move {
998                let keys = (0..NUM_BATCH_ROWS).map(|i| UserDeviceTimeKey {
999                    timestamp_us: now_ts + (j * NUM_BATCH_ROWS + i) as i64 * 1000,
1000                    user_id: 42 + i as i32 % 7,
1001                    device_id: 69 + i as i32 % 11,
1002                });
1003
1004                let results = sset.insert(keys.clone()).await.unwrap();
1005                assert_eq!(results, vec![false; NUM_BATCH_ROWS]);
1006            })
1007            .await
1008    }
1009}