statsig_rust/
hashset_with_ttl.rs

1use crate::log_d;
2use crate::StatsigRuntime;
3use std::collections::HashSet;
4use std::sync::{Arc, Mutex, Weak};
5use tokio::sync::Notify;
6use tokio::time::{sleep, Duration};
7
8const TAG: &str = stringify!(HashSetWithTTL);
9
10pub struct HashSetWithTTL {
11    set: Arc<Mutex<HashSet<String>>>,
12    shutdown_notify: Arc<Notify>,
13}
14
15impl HashSetWithTTL {
16    pub fn new(statsig_runtime: &Arc<StatsigRuntime>, interval_duration: Duration) -> Self {
17        let instance = Self {
18            set: Arc::new(Mutex::new(HashSet::new())),
19            shutdown_notify: Arc::new(Notify::new()),
20        };
21
22        let weak_instance = Arc::downgrade(&instance.set);
23        let shutdown_notify = instance.shutdown_notify.clone();
24
25        statsig_runtime.spawn(
26            "hashset_with_ttl_worker",
27            move |rt_shutdown_notify| async move {
28                Self::run_background_reset(
29                    weak_instance,
30                    interval_duration,
31                    rt_shutdown_notify,
32                    shutdown_notify,
33                )
34                .await;
35            },
36        );
37
38        instance
39    }
40
41    pub fn add(&self, key: String) -> Result<(), String> {
42        match self.set.lock() {
43            Ok(mut set) => {
44                set.insert(key);
45                Ok(())
46            }
47            Err(e) => Err(format!("Failed to acquire lock: {e}")),
48        }
49    }
50
51    pub fn contains(&self, key: &str) -> Result<bool, String> {
52        match self.set.lock() {
53            Ok(set) => Ok(set.contains(key)),
54            Err(e) => Err(format!("Failed to acquire lock: {e}")),
55        }
56    }
57
58    pub async fn shutdown(&self) {
59        self.shutdown_notify.notify_one();
60    }
61
62    fn reset(set: &Mutex<HashSet<String>>) -> Result<(), String> {
63        match set.lock() {
64            Ok(mut set) => {
65                set.clear();
66                Ok(())
67            }
68            Err(e) => Err(format!("Failed to acquire lock: {e}")),
69        }
70    }
71
72    async fn run_background_reset(
73        weak_instance: Weak<Mutex<HashSet<String>>>,
74        interval_duration: Duration,
75        rt_shutdown_notify: Arc<Notify>,
76        shutdown_notify: Arc<Notify>,
77    ) {
78        loop {
79            tokio::select! {
80                () = sleep(interval_duration) => {
81                    if let Some(set) = weak_instance.upgrade() {
82                        if let Err(e) = Self::reset(&set) {
83                            log_d!(TAG, "Failed to reset HashSetWithTTL: {}", e);
84                        }
85                    } else {
86                        break;
87                    }
88                }
89                () = rt_shutdown_notify.notified() => {
90                    log_d!(TAG, "Runtime shutdown. Exiting hashset with ttl worker.");
91                    break;
92                }
93                () = shutdown_notify.notified() => {
94                    log_d!(TAG, "Shutdown signal received. Exiting hashset with ttl worker.");
95                    break;
96                }
97            }
98        }
99    }
100}