Skip to main content

statsig_rust/interned_values/
interned_store.rs

1use std::{
2    borrow::Cow,
3    collections::{hash_map::Entry, HashMap},
4    fs::{File, OpenOptions},
5    io::Write,
6    sync::{Arc, OnceLock},
7    time::{Duration, Instant},
8};
9
10use ahash::AHashMap;
11use lazy_static::lazy_static;
12use memmap2::Mmap;
13use ouroboros::self_referencing;
14use parking_lot::Mutex;
15use rkyv::{
16    collections::swiss_table::ArchivedHashMap, string::ArchivedString, Archive,
17    Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
18};
19use serde_json::value::RawValue;
20
21use crate::{
22    evaluation::{
23        dynamic_returnable::DynamicReturnableValue,
24        evaluator_value::{EvaluatorValue, EvaluatorValueInner, MemoizedEvaluatorValue},
25        rkyv_value::{ArchivedRkyvValue, RkyvValue},
26    },
27    hashing,
28    interned_string::{InternedString, InternedStringValue},
29    log_d, log_e, log_w,
30    networking::ResponseData,
31    observability::ops_stats::OpsStatsForInstance,
32    specs_response::{
33        proto_specs::deserialize_protobuf,
34        spec_types::{Spec, SpecsResponseFull},
35        specs_hash_map::{SpecPointer, SpecsHashMap},
36    },
37    DynamicReturnable, StatsigErr,
38};
39
40const TAG: &str = "InternedStore";
41
42static IMMORTAL_DATA: OnceLock<ImmortalData> = OnceLock::new();
43static MMAP_DATA: OnceLock<LoadedMmapData> = OnceLock::new();
44
45lazy_static! {
46    static ref MUTABLE_DATA: Mutex<MutableData> = Mutex::new(MutableData::default());
47}
48
49#[derive(Default, Archive, RkyvDeserialize, RkyvSerialize)]
50struct MmapData {
51    strings: std::collections::HashMap<u64, String>,
52    returnables: std::collections::HashMap<u64, HashMap<String, RkyvValue>>,
53}
54
55#[self_referencing]
56struct LoadedMmapData {
57    file: File,
58    mmap: Mmap,
59
60    #[borrows(mmap)]
61    archived: &'this ArchivedMmapData,
62}
63
64/// Immortal vs Mutable Data
65/// ------------------------------------------------------------
66/// -`ImmortalData` is static and never changes. It will only exist if a successful call to `preload` is made. It is intentionally
67///  leaked so that it can be accessed across forks without incrementing the reference count.
68/// -`MutableData` is dynamic and changes over time as values are added and removed.
69/// ------------------------------------------------------------
70/// In all cases, we first check if there is a ImmortalData entry and then fallback to MutableData.
71#[derive(Default)]
72struct ImmortalData {
73    strings: AHashMap<u64, &'static str>,
74    returnables: AHashMap<u64, &'static HashMap<String, RkyvValue>>,
75    evaluator_values: AHashMap<u64, &'static MemoizedEvaluatorValue>,
76    feature_gates: AHashMap<u64, &'static Spec>,
77    dynamic_configs: AHashMap<u64, &'static Spec>,
78    layer_configs: AHashMap<u64, &'static Spec>,
79}
80#[derive(Default)]
81struct MutableData {
82    strings: AHashMap<u64, Arc<String>>,
83    returnables: AHashMap<u64, Arc<HashMap<String, RkyvValue>>>,
84    evaluator_values: AHashMap<u64, Arc<MemoizedEvaluatorValue>>,
85}
86
87pub trait Internable: Sized {
88    type Input<'a>;
89    fn intern(input: Self::Input<'_>) -> Self;
90}
91
92pub struct InternedStore;
93
94impl InternedStore {
95    pub fn preload(data: &[u8]) -> Result<(), StatsigErr> {
96        Self::preload_multi(&[data])
97    }
98
99    pub fn preload_multi(data: &[&[u8]]) -> Result<(), StatsigErr> {
100        let start_time = Instant::now();
101
102        if IMMORTAL_DATA.get().is_some() {
103            log_e!(TAG, "Already preloaded");
104            return Err(StatsigErr::InvalidOperation(
105                "Already preloaded".to_string(),
106            ));
107        }
108
109        let specs_responses = data
110            .iter()
111            .map(|data| try_parse_as_json(data).or_else(|_| try_parse_as_proto(data)))
112            .collect::<Result<Vec<SpecsResponseFull>, StatsigErr>>()?;
113
114        let immortal = mutable_to_immortal(specs_responses)?;
115
116        if IMMORTAL_DATA.set(immortal).is_err() {
117            return Err(StatsigErr::LockFailure(
118                "Failed to set IMMORTAL_DATA".to_string(),
119            ));
120        }
121
122        let end_time = Instant::now();
123        log_d!(
124            TAG,
125            "Preload took {}ms",
126            end_time.duration_since(start_time).as_millis()
127        );
128
129        Ok(())
130    }
131
132    pub fn write_mmap_data(data: &[&[u8]], path: &str) -> Result<(), StatsigErr> {
133        let mut file = OpenOptions::new()
134            .read(true)
135            .write(true)
136            .create(true)
137            .truncate(true)
138            .open(path)
139            .map_err(|e| StatsigErr::FileError(e.to_string()))?;
140
141        let specs_responses = data
142            .iter()
143            .map(|data| try_parse_as_json(data).or_else(|_| try_parse_as_proto(data)))
144            .collect::<Result<Vec<SpecsResponseFull>, StatsigErr>>()?;
145
146        let mmap_data = mutable_to_mmap_data(specs_responses)?;
147        let archived = rkyv::to_bytes::<rkyv::rancor::Error>(&mmap_data)
148            .map_err(|e| StatsigErr::SerializationError(e.to_string()))?;
149
150        file.write_all(&archived)
151            .map_err(|e| StatsigErr::FileError(e.to_string()))?;
152        file.sync_all()
153            .map_err(|e| StatsigErr::FileError(e.to_string()))?;
154
155        log_d!(TAG, "Wrote {} bytes to mmap file", archived.len());
156
157        Ok(())
158    }
159
160    pub fn preload_mmap(path: &str) -> Result<(), StatsigErr> {
161        let file = File::open(path).map_err(|e| StatsigErr::FileError(e.to_string()))?;
162        let mmap = unsafe { Mmap::map(&file).map_err(|e| StatsigErr::FileError(e.to_string()))? };
163
164        let loaded_result = LoadedMmapDataTryBuilder {
165            file,
166            mmap,
167            archived_builder: |mmap| rkyv::access::<ArchivedMmapData, rkyv::rancor::Error>(mmap),
168        }
169        .try_build();
170
171        let loaded = match loaded_result {
172            Ok(loaded) => loaded,
173            Err(e) => {
174                return Err(StatsigErr::SerializationError(e.to_string()));
175            }
176        };
177
178        MMAP_DATA
179            .set(loaded)
180            .map_err(|_| StatsigErr::LockFailure("Failed to set MMAP_DATA".to_string()))
181    }
182
183    pub fn get_or_intern_string<T: AsRef<str> + ToString>(value: T) -> InternedString {
184        let hash = hashing::hash_one(value.as_ref().as_bytes());
185
186        if let Some(string) = get_string_from_mmap(hash) {
187            return InternedString::from_static(hash, string);
188        }
189
190        if let Some(string) = get_string_from_shared(hash) {
191            return InternedString::from_static(hash, string);
192        }
193
194        let ptr = get_string_from_local(hash, value);
195        InternedString::from_pointer(hash, ptr)
196    }
197
198    pub fn get_or_intern_returnable(value: Cow<'_, RawValue>) -> DynamicReturnable {
199        let raw_string = value.get();
200        match raw_string {
201            "true" => return DynamicReturnable::from_bool(true),
202            "false" => return DynamicReturnable::from_bool(false),
203            "null" => return DynamicReturnable::empty(),
204            _ => {}
205        }
206
207        let hash = hashing::hash_one(raw_string.as_bytes());
208
209        if let Some(returnable) = get_returnable_from_mmap(hash) {
210            return DynamicReturnable::from_archived(hash, returnable);
211        }
212
213        if let Some(returnable) = get_returnable_from_shared(hash) {
214            return DynamicReturnable::from_static(hash, returnable);
215        }
216
217        let ptr = get_returnable_from_local(hash, value);
218        DynamicReturnable::from_pointer(hash, ptr)
219    }
220
221    pub fn get_or_intern_evaluator_value(value: Cow<'_, RawValue>) -> EvaluatorValue {
222        let raw_string = value.get();
223        let hash = hashing::hash_one(raw_string.as_bytes());
224
225        if let Some(evaluator_value) = get_evaluator_value_from_shared(hash) {
226            return EvaluatorValue::from_static(hash, evaluator_value);
227        }
228
229        let ptr = get_evaluator_value_from_local(hash, value);
230        EvaluatorValue::from_pointer(hash, ptr)
231    }
232
233    pub fn replace_evaluator_value(hash: u64, evaluator_value: Arc<MemoizedEvaluatorValue>) {
234        let old = use_mutable_data("replace_evaluator_value", |data| {
235            data.evaluator_values.insert(hash, evaluator_value)
236        });
237        drop(old);
238    }
239
240    pub fn try_get_preloaded_evaluator_value(bytes: &[u8]) -> Option<EvaluatorValue> {
241        let hash = hashing::hash_one(bytes);
242        if let Some(evaluator_value) = get_evaluator_value_from_shared(hash) {
243            return Some(EvaluatorValue::from_static(hash, evaluator_value));
244        }
245
246        None
247    }
248
249    pub fn try_get_preloaded_returnable(bytes: &[u8]) -> Option<DynamicReturnable> {
250        match bytes {
251            b"true" => return Some(DynamicReturnable::from_bool(true)),
252            b"false" => return Some(DynamicReturnable::from_bool(false)),
253            b"null" => return Some(DynamicReturnable::empty()),
254            _ => {}
255        }
256
257        let hash = hashing::hash_one(bytes);
258
259        if let Some(returnable) = get_returnable_from_mmap(hash) {
260            return Some(DynamicReturnable::from_archived(hash, returnable));
261        }
262
263        if let Some(returnable) = get_returnable_from_shared(hash) {
264            return Some(DynamicReturnable::from_static(hash, returnable));
265        }
266
267        None
268    }
269
270    pub fn try_get_preloaded_dynamic_config(name: &InternedString) -> Option<SpecPointer> {
271        match IMMORTAL_DATA.get() {
272            Some(shared) => shared
273                .dynamic_configs
274                .get(&name.hash)
275                .map(|s| SpecPointer::Static(s)),
276            None => None,
277        }
278    }
279
280    pub fn try_get_preloaded_layer_config(name: &InternedString) -> Option<SpecPointer> {
281        match IMMORTAL_DATA.get() {
282            Some(shared) => shared
283                .layer_configs
284                .get(&name.hash)
285                .map(|s| SpecPointer::Static(s)),
286            None => None,
287        }
288    }
289
290    pub fn try_get_preloaded_feature_gate(name: &InternedString) -> Option<SpecPointer> {
291        match IMMORTAL_DATA.get() {
292            Some(shared) => shared
293                .feature_gates
294                .get(&name.hash)
295                .map(|s| SpecPointer::Static(s)),
296            None => None,
297        }
298    }
299
300    pub fn release_returnable(hash: u64) {
301        let ptr = use_mutable_data("release_returnable", |data| {
302            try_release_entry(&mut data.returnables, hash)
303        });
304        drop(ptr);
305    }
306
307    pub fn release_string(hash: u64) {
308        let ptr = use_mutable_data("release_string", |data| {
309            try_release_entry(&mut data.strings, hash)
310        });
311        drop(ptr);
312    }
313
314    pub fn release_evaluator_value(hash: u64) {
315        let ptr = use_mutable_data("release_eval_value", |data| {
316            try_release_entry(&mut data.evaluator_values, hash)
317        });
318        drop(ptr);
319    }
320
321    #[cfg(test)]
322    pub fn get_memoized_len() -> (
323        /* strings */ usize,
324        /* returnables */ usize,
325        /* evaluator values */ usize,
326    ) {
327        match MUTABLE_DATA.try_lock() {
328            Some(memo) => (
329                memo.strings.len(),
330                memo.returnables.len(),
331                memo.evaluator_values.len(),
332            ),
333            None => (0, 0, 0),
334        }
335    }
336}
337
338// ------------------------------------------------------------------------------- [ Preloading ]
339
340fn try_parse_as_json(data: &[u8]) -> Result<SpecsResponseFull, StatsigErr> {
341    serde_json::from_slice(data)
342        .map_err(|e| StatsigErr::JsonParseError(TAG.to_string(), e.to_string()))
343}
344
345fn try_parse_as_proto(data: &[u8]) -> Result<SpecsResponseFull, StatsigErr> {
346    let current = SpecsResponseFull::default();
347    let mut next = SpecsResponseFull::default();
348
349    let mut response_data = ResponseData::from_bytes_with_headers(
350        data.to_vec(),
351        Some(std::collections::HashMap::from([(
352            "content-encoding".to_string(),
353            "statsig-br".to_string(),
354        )])),
355    );
356
357    let ops_stats = OpsStatsForInstance::new();
358
359    deserialize_protobuf(&ops_stats, &current, &mut next, &mut response_data)?;
360
361    Ok(next)
362}
363
364// ------------------------------------------------------------------------------- [ String ]
365
366fn get_string_from_mmap(hash: u64) -> Option<&'static str> {
367    let data = MMAP_DATA.get()?;
368    let archived_hash = rkyv::primitive::ArchivedU64::from_native(hash);
369    let found = data.borrow_archived().strings.get(&archived_hash);
370    found.map(|s| s.as_str())
371}
372
373fn get_string_from_shared(hash: u64) -> Option<&'static str> {
374    match IMMORTAL_DATA.get() {
375        Some(shared) => shared.strings.get(&hash).copied(),
376        None => None,
377    }
378}
379
380fn get_string_from_local<T: ToString>(hash: u64, value: T) -> Arc<String> {
381    let result = use_mutable_data("intern_string", |data| {
382        if let Some(string) = data.strings.get(&hash) {
383            return Some(string.clone());
384        }
385
386        let ptr = Arc::new(value.to_string());
387        data.strings.insert(hash, ptr.clone());
388        Some(ptr)
389    });
390
391    result.unwrap_or_else(|| {
392        log_w!(TAG, "Failed to get string from local");
393        Arc::new(value.to_string())
394    })
395}
396
397// ------------------------------------------------------------------------------- [ Returnable ]
398
399fn get_returnable_from_mmap(
400    hash: u64,
401) -> Option<&'static ArchivedHashMap<ArchivedString, ArchivedRkyvValue>> {
402    let data = MMAP_DATA.get()?;
403
404    let archived_hash = rkyv::primitive::ArchivedU64::from_native(hash);
405    let found = data.borrow_archived().returnables.get(&archived_hash)?;
406    Some(found)
407}
408
409fn get_returnable_from_shared(hash: u64) -> Option<&'static HashMap<String, RkyvValue>> {
410    match IMMORTAL_DATA.get() {
411        Some(shared) => shared.returnables.get(&hash).copied(),
412        None => None,
413    }
414}
415
416fn get_returnable_from_local(hash: u64, value: Cow<RawValue>) -> Arc<HashMap<String, RkyvValue>> {
417    let result = use_mutable_data("intern_returnable", |data| {
418        if let Some(returnable) = data.returnables.get(&hash) {
419            return Some(returnable.clone());
420        }
421
422        None
423    });
424
425    if let Some(returnable) = result {
426        return returnable;
427    }
428
429    let owned: HashMap<String, RkyvValue> = match serde_json::from_str(value.get()) {
430        Ok(owned) => owned,
431        Err(e) => {
432            log_e!(TAG, "Failed to parse returnable from local: {}", e);
433            return Arc::new(HashMap::new());
434        }
435    };
436
437    let ptr = Arc::new(owned);
438
439    use_mutable_data("intern_returnable", |data| {
440        data.returnables.insert(hash, ptr.clone());
441        Some(())
442    });
443
444    ptr
445}
446
447// ------------------------------------------------------------------------------- [ Evaluator Value ]
448
449fn get_evaluator_value_from_shared(hash: u64) -> Option<&'static MemoizedEvaluatorValue> {
450    match IMMORTAL_DATA.get() {
451        Some(shared) => shared.evaluator_values.get(&hash).copied(),
452        None => None,
453    }
454}
455
456fn get_evaluator_value_from_local(
457    hash: u64,
458    value: Cow<'_, RawValue>,
459) -> Arc<MemoizedEvaluatorValue> {
460    let result = use_mutable_data("eval_value_lookup", |data| {
461        if let Some(evaluator_value) = data.evaluator_values.get(&hash) {
462            return Some(evaluator_value.clone());
463        }
464
465        None
466    });
467
468    if let Some(evaluator_value) = result {
469        return evaluator_value;
470    }
471
472    // intentinonally done across two locks to avoid deadlock with InternedString creation
473    let ptr = Arc::new(MemoizedEvaluatorValue::from_raw_value(value));
474    let _ = use_mutable_data("intern_evaluator_value", |data| {
475        data.evaluator_values.insert(hash, ptr.clone());
476        Some(())
477    });
478
479    ptr
480}
481
482// ------------------------------------------------------------------------------- [ Helpers ]
483
484fn try_release_entry<T>(data: &mut AHashMap<u64, Arc<T>>, hash: u64) -> Option<Arc<T>> {
485    let found = match data.entry(hash) {
486        Entry::Occupied(entry) => entry,
487        Entry::Vacant(_) => return None,
488    };
489
490    let strong_count = Arc::strong_count(found.get());
491    if strong_count == 1 {
492        let value = found.remove();
493        // return the value so it isn't dropped while holding the lock
494        return Some(value);
495    }
496
497    None
498}
499
500fn use_mutable_data<T>(reason: &str, f: impl FnOnce(&mut MutableData) -> Option<T>) -> Option<T> {
501    let mut data = match MUTABLE_DATA.try_lock_for(Duration::from_secs(5)) {
502        Some(data) => data,
503        None => {
504            #[cfg(test)]
505            panic!("Failed to acquire lock for mutable data ({reason})");
506
507            #[cfg(not(test))]
508            {
509                log_e!(TAG, "Failed to acquire lock for mutable data ({reason})");
510                return None;
511            }
512        }
513    };
514
515    f(&mut data)
516}
517
518fn mutable_to_immortal(
519    specs_responses: Vec<SpecsResponseFull>,
520) -> Result<ImmortalData, StatsigErr> {
521    let mutable_data: MutableData = {
522        let mut mutable_data_lock = MUTABLE_DATA.lock();
523        std::mem::take(&mut *mutable_data_lock)
524    };
525    let mut immortal = ImmortalData::default();
526
527    for (hash, arc) in mutable_data.strings.into_iter() {
528        let raw = Arc::into_raw(arc);
529        let leaked: &'static str = unsafe { &*raw };
530        immortal.strings.insert(hash, leaked);
531    }
532
533    for (hash, returnable) in mutable_data.returnables.into_iter() {
534        let raw_returnable = Arc::into_raw(returnable);
535        let leaked = unsafe { &*raw_returnable };
536        immortal.returnables.insert(hash, leaked);
537    }
538
539    for (hash, evaluator_value) in mutable_data.evaluator_values.into_iter() {
540        let raw_evaluator_value = Arc::into_raw(evaluator_value);
541        let leaked = unsafe { &*raw_evaluator_value };
542        immortal.evaluator_values.insert(hash, leaked);
543    }
544
545    for response in specs_responses {
546        try_insert_specs(response.feature_gates, &mut immortal.feature_gates);
547        try_insert_specs(response.dynamic_configs, &mut immortal.dynamic_configs);
548        try_insert_specs(response.layer_configs, &mut immortal.layer_configs);
549    }
550
551    Ok(immortal)
552}
553
554fn mutable_to_mmap_data(specs_responses: Vec<SpecsResponseFull>) -> Result<MmapData, StatsigErr> {
555    let mutable_data: MutableData = {
556        let mut mutable_data_lock = MUTABLE_DATA.lock();
557        std::mem::take(&mut *mutable_data_lock)
558    };
559    let mut mmap_data = MmapData::default();
560
561    for (hash, arc) in mutable_data.strings.into_iter() {
562        let taken = arc.to_string();
563        mmap_data.strings.insert(hash, taken);
564    }
565
566    for (hash, returnable) in mutable_data.returnables.into_iter() {
567        let taken: HashMap<String, RkyvValue> = returnable.as_ref().clone();
568        mmap_data.returnables.insert(hash, taken);
569    }
570
571    // TODO: Add evaluator values to mmap data
572    // for (hash, evaluator_value) in mutable_data.evaluator_values.into_iter() {
573    //     let raw_evaluator_value = Arc::into_raw(evaluator_value);
574    //     let leaked = unsafe { &*raw_evaluator_value };
575    //     mmap_data.evaluator_values.insert(hash, leaked);
576    // }
577
578    // held until after the mmap data is written to the file
579    for response in specs_responses {
580        drop(response);
581    }
582
583    Ok(mmap_data)
584}
585
586fn try_insert_specs(source: SpecsHashMap, destination: &mut AHashMap<u64, &'static Spec>) {
587    for (name, spec_ptr) in source.0.into_iter() {
588        let spec = match spec_ptr {
589            SpecPointer::Pointer(spec) => spec,
590            _ => continue,
591        };
592
593        if spec.checksum.is_none() {
594            // no point doint this if there is no checksum field to verify against later
595            continue;
596        }
597
598        let raw_spec = Arc::into_raw(spec);
599        let spec = unsafe { &*raw_spec };
600        destination.insert(name.hash, spec);
601    }
602}
603
604// ------------------------------------------------------------------------------- [ Helper Implementations ]
605
606impl EvaluatorValue {
607    fn from_static(hash: u64, evaluator_value: &'static MemoizedEvaluatorValue) -> Self {
608        Self {
609            hash,
610            inner: EvaluatorValueInner::Static(evaluator_value),
611        }
612    }
613
614    fn from_pointer(hash: u64, pointer: Arc<MemoizedEvaluatorValue>) -> Self {
615        Self {
616            hash,
617            inner: EvaluatorValueInner::Pointer(pointer),
618        }
619    }
620}
621
622impl DynamicReturnable {
623    fn from_static(hash: u64, returnable: &'static HashMap<String, RkyvValue>) -> Self {
624        Self {
625            hash,
626            value: DynamicReturnableValue::JsonStatic(returnable),
627        }
628    }
629
630    fn from_archived(
631        hash: u64,
632        returnable: &'static ArchivedHashMap<ArchivedString, ArchivedRkyvValue>,
633    ) -> Self {
634        Self {
635            hash,
636            value: DynamicReturnableValue::JsonArchived(returnable),
637        }
638    }
639
640    fn from_pointer(hash: u64, pointer: Arc<HashMap<String, RkyvValue>>) -> Self {
641        Self {
642            hash,
643            value: DynamicReturnableValue::JsonPointer(pointer),
644        }
645    }
646}
647
648impl InternedString {
649    fn from_static(hash: u64, string: &'static str) -> Self {
650        Self {
651            hash,
652            value: InternedStringValue::Static(string),
653        }
654    }
655
656    fn from_pointer(hash: u64, pointer: Arc<String>) -> Self {
657        Self {
658            hash,
659            value: InternedStringValue::Pointer(pointer),
660        }
661    }
662}