1use std::{
2 borrow::Cow,
3 collections::{hash_map::Entry, HashMap},
4 fs::{File, OpenOptions},
5 io::Write,
6 sync::{Arc, OnceLock},
7 time::{Duration, Instant},
8};
9
10use ahash::AHashMap;
11use lazy_static::lazy_static;
12use memmap2::Mmap;
13use ouroboros::self_referencing;
14use parking_lot::Mutex;
15use rkyv::{
16 collections::swiss_table::ArchivedHashMap, string::ArchivedString, Archive,
17 Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
18};
19use serde_json::value::RawValue;
20
21use crate::{
22 evaluation::{
23 dynamic_returnable::DynamicReturnableValue,
24 evaluator_value::{EvaluatorValue, EvaluatorValueInner, MemoizedEvaluatorValue},
25 rkyv_value::{ArchivedRkyvValue, RkyvValue},
26 },
27 hashing,
28 interned_string::{InternedString, InternedStringValue},
29 log_d, log_e, log_w,
30 networking::ResponseData,
31 observability::ops_stats::OpsStatsForInstance,
32 specs_response::{
33 proto_specs::deserialize_protobuf,
34 spec_types::{Spec, SpecsResponseFull},
35 specs_hash_map::{SpecPointer, SpecsHashMap},
36 },
37 DynamicReturnable, StatsigErr,
38};
39
40const TAG: &str = "InternedStore";
41
42static IMMORTAL_DATA: OnceLock<ImmortalData> = OnceLock::new();
43static MMAP_DATA: OnceLock<LoadedMmapData> = OnceLock::new();
44
45lazy_static! {
46 static ref MUTABLE_DATA: Mutex<MutableData> = Mutex::new(MutableData::default());
47}
48
49#[derive(Default, Archive, RkyvDeserialize, RkyvSerialize)]
50struct MmapData {
51 strings: std::collections::HashMap<u64, String>,
52 returnables: std::collections::HashMap<u64, HashMap<String, RkyvValue>>,
53}
54
55#[self_referencing]
56struct LoadedMmapData {
57 file: File,
58 mmap: Mmap,
59
60 #[borrows(mmap)]
61 archived: &'this ArchivedMmapData,
62}
63
64#[derive(Default)]
72struct ImmortalData {
73 strings: AHashMap<u64, &'static str>,
74 returnables: AHashMap<u64, &'static HashMap<String, RkyvValue>>,
75 evaluator_values: AHashMap<u64, &'static MemoizedEvaluatorValue>,
76 feature_gates: AHashMap<u64, &'static Spec>,
77 dynamic_configs: AHashMap<u64, &'static Spec>,
78 layer_configs: AHashMap<u64, &'static Spec>,
79}
80#[derive(Default)]
81struct MutableData {
82 strings: AHashMap<u64, Arc<String>>,
83 returnables: AHashMap<u64, Arc<HashMap<String, RkyvValue>>>,
84 evaluator_values: AHashMap<u64, Arc<MemoizedEvaluatorValue>>,
85}
86
87pub trait Internable: Sized {
88 type Input<'a>;
89 fn intern(input: Self::Input<'_>) -> Self;
90}
91
92pub struct InternedStore;
93
94impl InternedStore {
95 pub fn preload(data: &[u8]) -> Result<(), StatsigErr> {
96 Self::preload_multi(&[data])
97 }
98
99 pub fn preload_multi(data: &[&[u8]]) -> Result<(), StatsigErr> {
100 let start_time = Instant::now();
101
102 if IMMORTAL_DATA.get().is_some() {
103 log_e!(TAG, "Already preloaded");
104 return Err(StatsigErr::InvalidOperation(
105 "Already preloaded".to_string(),
106 ));
107 }
108
109 let specs_responses = data
110 .iter()
111 .map(|data| try_parse_as_json(data).or_else(|_| try_parse_as_proto(data)))
112 .collect::<Result<Vec<SpecsResponseFull>, StatsigErr>>()?;
113
114 let immortal = mutable_to_immortal(specs_responses)?;
115
116 if IMMORTAL_DATA.set(immortal).is_err() {
117 return Err(StatsigErr::LockFailure(
118 "Failed to set IMMORTAL_DATA".to_string(),
119 ));
120 }
121
122 let end_time = Instant::now();
123 log_d!(
124 TAG,
125 "Preload took {}ms",
126 end_time.duration_since(start_time).as_millis()
127 );
128
129 Ok(())
130 }
131
132 pub fn write_mmap_data(data: &[&[u8]], path: &str) -> Result<(), StatsigErr> {
133 let mut file = OpenOptions::new()
134 .read(true)
135 .write(true)
136 .create(true)
137 .truncate(true)
138 .open(path)
139 .map_err(|e| StatsigErr::FileError(e.to_string()))?;
140
141 let specs_responses = data
142 .iter()
143 .map(|data| try_parse_as_json(data).or_else(|_| try_parse_as_proto(data)))
144 .collect::<Result<Vec<SpecsResponseFull>, StatsigErr>>()?;
145
146 let mmap_data = mutable_to_mmap_data(specs_responses)?;
147 let archived = rkyv::to_bytes::<rkyv::rancor::Error>(&mmap_data)
148 .map_err(|e| StatsigErr::SerializationError(e.to_string()))?;
149
150 file.write_all(&archived)
151 .map_err(|e| StatsigErr::FileError(e.to_string()))?;
152 file.sync_all()
153 .map_err(|e| StatsigErr::FileError(e.to_string()))?;
154
155 log_d!(TAG, "Wrote {} bytes to mmap file", archived.len());
156
157 Ok(())
158 }
159
160 pub fn preload_mmap(path: &str) -> Result<(), StatsigErr> {
161 let file = File::open(path).map_err(|e| StatsigErr::FileError(e.to_string()))?;
162 let mmap = unsafe { Mmap::map(&file).map_err(|e| StatsigErr::FileError(e.to_string()))? };
163
164 let loaded_result = LoadedMmapDataTryBuilder {
165 file,
166 mmap,
167 archived_builder: |mmap| rkyv::access::<ArchivedMmapData, rkyv::rancor::Error>(mmap),
168 }
169 .try_build();
170
171 let loaded = match loaded_result {
172 Ok(loaded) => loaded,
173 Err(e) => {
174 return Err(StatsigErr::SerializationError(e.to_string()));
175 }
176 };
177
178 MMAP_DATA
179 .set(loaded)
180 .map_err(|_| StatsigErr::LockFailure("Failed to set MMAP_DATA".to_string()))
181 }
182
183 pub fn get_or_intern_string<T: AsRef<str> + ToString>(value: T) -> InternedString {
184 let hash = hashing::hash_one(value.as_ref().as_bytes());
185
186 if let Some(string) = get_string_from_mmap(hash) {
187 return InternedString::from_static(hash, string);
188 }
189
190 if let Some(string) = get_string_from_shared(hash) {
191 return InternedString::from_static(hash, string);
192 }
193
194 let ptr = get_string_from_local(hash, value);
195 InternedString::from_pointer(hash, ptr)
196 }
197
198 pub fn get_or_intern_returnable(value: Cow<'_, RawValue>) -> DynamicReturnable {
199 let raw_string = value.get();
200 match raw_string {
201 "true" => return DynamicReturnable::from_bool(true),
202 "false" => return DynamicReturnable::from_bool(false),
203 "null" => return DynamicReturnable::empty(),
204 _ => {}
205 }
206
207 let hash = hashing::hash_one(raw_string.as_bytes());
208
209 if let Some(returnable) = get_returnable_from_mmap(hash) {
210 return DynamicReturnable::from_archived(hash, returnable);
211 }
212
213 if let Some(returnable) = get_returnable_from_shared(hash) {
214 return DynamicReturnable::from_static(hash, returnable);
215 }
216
217 let ptr = get_returnable_from_local(hash, value);
218 DynamicReturnable::from_pointer(hash, ptr)
219 }
220
221 pub fn get_or_intern_evaluator_value(value: Cow<'_, RawValue>) -> EvaluatorValue {
222 let raw_string = value.get();
223 let hash = hashing::hash_one(raw_string.as_bytes());
224
225 if let Some(evaluator_value) = get_evaluator_value_from_shared(hash) {
226 return EvaluatorValue::from_static(hash, evaluator_value);
227 }
228
229 let ptr = get_evaluator_value_from_local(hash, value);
230 EvaluatorValue::from_pointer(hash, ptr)
231 }
232
233 pub fn replace_evaluator_value(hash: u64, evaluator_value: Arc<MemoizedEvaluatorValue>) {
234 let old = use_mutable_data("replace_evaluator_value", |data| {
235 data.evaluator_values.insert(hash, evaluator_value)
236 });
237 drop(old);
238 }
239
240 pub fn try_get_preloaded_evaluator_value(bytes: &[u8]) -> Option<EvaluatorValue> {
241 let hash = hashing::hash_one(bytes);
242 if let Some(evaluator_value) = get_evaluator_value_from_shared(hash) {
243 return Some(EvaluatorValue::from_static(hash, evaluator_value));
244 }
245
246 None
247 }
248
249 pub fn try_get_preloaded_returnable(bytes: &[u8]) -> Option<DynamicReturnable> {
250 match bytes {
251 b"true" => return Some(DynamicReturnable::from_bool(true)),
252 b"false" => return Some(DynamicReturnable::from_bool(false)),
253 b"null" => return Some(DynamicReturnable::empty()),
254 _ => {}
255 }
256
257 let hash = hashing::hash_one(bytes);
258
259 if let Some(returnable) = get_returnable_from_mmap(hash) {
260 return Some(DynamicReturnable::from_archived(hash, returnable));
261 }
262
263 if let Some(returnable) = get_returnable_from_shared(hash) {
264 return Some(DynamicReturnable::from_static(hash, returnable));
265 }
266
267 None
268 }
269
270 pub fn try_get_preloaded_dynamic_config(name: &InternedString) -> Option<SpecPointer> {
271 match IMMORTAL_DATA.get() {
272 Some(shared) => shared
273 .dynamic_configs
274 .get(&name.hash)
275 .map(|s| SpecPointer::Static(s)),
276 None => None,
277 }
278 }
279
280 pub fn try_get_preloaded_layer_config(name: &InternedString) -> Option<SpecPointer> {
281 match IMMORTAL_DATA.get() {
282 Some(shared) => shared
283 .layer_configs
284 .get(&name.hash)
285 .map(|s| SpecPointer::Static(s)),
286 None => None,
287 }
288 }
289
290 pub fn try_get_preloaded_feature_gate(name: &InternedString) -> Option<SpecPointer> {
291 match IMMORTAL_DATA.get() {
292 Some(shared) => shared
293 .feature_gates
294 .get(&name.hash)
295 .map(|s| SpecPointer::Static(s)),
296 None => None,
297 }
298 }
299
300 pub fn release_returnable(hash: u64) {
301 let ptr = use_mutable_data("release_returnable", |data| {
302 try_release_entry(&mut data.returnables, hash)
303 });
304 drop(ptr);
305 }
306
307 pub fn release_string(hash: u64) {
308 let ptr = use_mutable_data("release_string", |data| {
309 try_release_entry(&mut data.strings, hash)
310 });
311 drop(ptr);
312 }
313
314 pub fn release_evaluator_value(hash: u64) {
315 let ptr = use_mutable_data("release_eval_value", |data| {
316 try_release_entry(&mut data.evaluator_values, hash)
317 });
318 drop(ptr);
319 }
320
321 #[cfg(test)]
322 pub fn get_memoized_len() -> (
323 usize,
324 usize,
325 usize,
326 ) {
327 match MUTABLE_DATA.try_lock() {
328 Some(memo) => (
329 memo.strings.len(),
330 memo.returnables.len(),
331 memo.evaluator_values.len(),
332 ),
333 None => (0, 0, 0),
334 }
335 }
336}
337
338fn try_parse_as_json(data: &[u8]) -> Result<SpecsResponseFull, StatsigErr> {
341 serde_json::from_slice(data)
342 .map_err(|e| StatsigErr::JsonParseError(TAG.to_string(), e.to_string()))
343}
344
345fn try_parse_as_proto(data: &[u8]) -> Result<SpecsResponseFull, StatsigErr> {
346 let current = SpecsResponseFull::default();
347 let mut next = SpecsResponseFull::default();
348
349 let mut response_data = ResponseData::from_bytes_with_headers(
350 data.to_vec(),
351 Some(std::collections::HashMap::from([(
352 "content-encoding".to_string(),
353 "statsig-br".to_string(),
354 )])),
355 );
356
357 let ops_stats = OpsStatsForInstance::new();
358
359 deserialize_protobuf(&ops_stats, ¤t, &mut next, &mut response_data)?;
360
361 Ok(next)
362}
363
364fn get_string_from_mmap(hash: u64) -> Option<&'static str> {
367 let data = MMAP_DATA.get()?;
368 let archived_hash = rkyv::primitive::ArchivedU64::from_native(hash);
369 let found = data.borrow_archived().strings.get(&archived_hash);
370 found.map(|s| s.as_str())
371}
372
373fn get_string_from_shared(hash: u64) -> Option<&'static str> {
374 match IMMORTAL_DATA.get() {
375 Some(shared) => shared.strings.get(&hash).copied(),
376 None => None,
377 }
378}
379
380fn get_string_from_local<T: ToString>(hash: u64, value: T) -> Arc<String> {
381 let result = use_mutable_data("intern_string", |data| {
382 if let Some(string) = data.strings.get(&hash) {
383 return Some(string.clone());
384 }
385
386 let ptr = Arc::new(value.to_string());
387 data.strings.insert(hash, ptr.clone());
388 Some(ptr)
389 });
390
391 result.unwrap_or_else(|| {
392 log_w!(TAG, "Failed to get string from local");
393 Arc::new(value.to_string())
394 })
395}
396
397fn get_returnable_from_mmap(
400 hash: u64,
401) -> Option<&'static ArchivedHashMap<ArchivedString, ArchivedRkyvValue>> {
402 let data = MMAP_DATA.get()?;
403
404 let archived_hash = rkyv::primitive::ArchivedU64::from_native(hash);
405 let found = data.borrow_archived().returnables.get(&archived_hash)?;
406 Some(found)
407}
408
409fn get_returnable_from_shared(hash: u64) -> Option<&'static HashMap<String, RkyvValue>> {
410 match IMMORTAL_DATA.get() {
411 Some(shared) => shared.returnables.get(&hash).copied(),
412 None => None,
413 }
414}
415
416fn get_returnable_from_local(hash: u64, value: Cow<RawValue>) -> Arc<HashMap<String, RkyvValue>> {
417 let result = use_mutable_data("intern_returnable", |data| {
418 if let Some(returnable) = data.returnables.get(&hash) {
419 return Some(returnable.clone());
420 }
421
422 None
423 });
424
425 if let Some(returnable) = result {
426 return returnable;
427 }
428
429 let owned: HashMap<String, RkyvValue> = match serde_json::from_str(value.get()) {
430 Ok(owned) => owned,
431 Err(e) => {
432 log_e!(TAG, "Failed to parse returnable from local: {}", e);
433 return Arc::new(HashMap::new());
434 }
435 };
436
437 let ptr = Arc::new(owned);
438
439 use_mutable_data("intern_returnable", |data| {
440 data.returnables.insert(hash, ptr.clone());
441 Some(())
442 });
443
444 ptr
445}
446
447fn get_evaluator_value_from_shared(hash: u64) -> Option<&'static MemoizedEvaluatorValue> {
450 match IMMORTAL_DATA.get() {
451 Some(shared) => shared.evaluator_values.get(&hash).copied(),
452 None => None,
453 }
454}
455
456fn get_evaluator_value_from_local(
457 hash: u64,
458 value: Cow<'_, RawValue>,
459) -> Arc<MemoizedEvaluatorValue> {
460 let result = use_mutable_data("eval_value_lookup", |data| {
461 if let Some(evaluator_value) = data.evaluator_values.get(&hash) {
462 return Some(evaluator_value.clone());
463 }
464
465 None
466 });
467
468 if let Some(evaluator_value) = result {
469 return evaluator_value;
470 }
471
472 let ptr = Arc::new(MemoizedEvaluatorValue::from_raw_value(value));
474 let _ = use_mutable_data("intern_evaluator_value", |data| {
475 data.evaluator_values.insert(hash, ptr.clone());
476 Some(())
477 });
478
479 ptr
480}
481
482fn try_release_entry<T>(data: &mut AHashMap<u64, Arc<T>>, hash: u64) -> Option<Arc<T>> {
485 let found = match data.entry(hash) {
486 Entry::Occupied(entry) => entry,
487 Entry::Vacant(_) => return None,
488 };
489
490 let strong_count = Arc::strong_count(found.get());
491 if strong_count == 1 {
492 let value = found.remove();
493 return Some(value);
495 }
496
497 None
498}
499
500fn use_mutable_data<T>(reason: &str, f: impl FnOnce(&mut MutableData) -> Option<T>) -> Option<T> {
501 let mut data = match MUTABLE_DATA.try_lock_for(Duration::from_secs(5)) {
502 Some(data) => data,
503 None => {
504 #[cfg(test)]
505 panic!("Failed to acquire lock for mutable data ({reason})");
506
507 #[cfg(not(test))]
508 {
509 log_e!(TAG, "Failed to acquire lock for mutable data ({reason})");
510 return None;
511 }
512 }
513 };
514
515 f(&mut data)
516}
517
518fn mutable_to_immortal(
519 specs_responses: Vec<SpecsResponseFull>,
520) -> Result<ImmortalData, StatsigErr> {
521 let mutable_data: MutableData = {
522 let mut mutable_data_lock = MUTABLE_DATA.lock();
523 std::mem::take(&mut *mutable_data_lock)
524 };
525 let mut immortal = ImmortalData::default();
526
527 for (hash, arc) in mutable_data.strings.into_iter() {
528 let raw = Arc::into_raw(arc);
529 let leaked: &'static str = unsafe { &*raw };
530 immortal.strings.insert(hash, leaked);
531 }
532
533 for (hash, returnable) in mutable_data.returnables.into_iter() {
534 let raw_returnable = Arc::into_raw(returnable);
535 let leaked = unsafe { &*raw_returnable };
536 immortal.returnables.insert(hash, leaked);
537 }
538
539 for (hash, evaluator_value) in mutable_data.evaluator_values.into_iter() {
540 let raw_evaluator_value = Arc::into_raw(evaluator_value);
541 let leaked = unsafe { &*raw_evaluator_value };
542 immortal.evaluator_values.insert(hash, leaked);
543 }
544
545 for response in specs_responses {
546 try_insert_specs(response.feature_gates, &mut immortal.feature_gates);
547 try_insert_specs(response.dynamic_configs, &mut immortal.dynamic_configs);
548 try_insert_specs(response.layer_configs, &mut immortal.layer_configs);
549 }
550
551 Ok(immortal)
552}
553
554fn mutable_to_mmap_data(specs_responses: Vec<SpecsResponseFull>) -> Result<MmapData, StatsigErr> {
555 let mutable_data: MutableData = {
556 let mut mutable_data_lock = MUTABLE_DATA.lock();
557 std::mem::take(&mut *mutable_data_lock)
558 };
559 let mut mmap_data = MmapData::default();
560
561 for (hash, arc) in mutable_data.strings.into_iter() {
562 let taken = arc.to_string();
563 mmap_data.strings.insert(hash, taken);
564 }
565
566 for (hash, returnable) in mutable_data.returnables.into_iter() {
567 let taken: HashMap<String, RkyvValue> = returnable.as_ref().clone();
568 mmap_data.returnables.insert(hash, taken);
569 }
570
571 for response in specs_responses {
580 drop(response);
581 }
582
583 Ok(mmap_data)
584}
585
586fn try_insert_specs(source: SpecsHashMap, destination: &mut AHashMap<u64, &'static Spec>) {
587 for (name, spec_ptr) in source.0.into_iter() {
588 let spec = match spec_ptr {
589 SpecPointer::Pointer(spec) => spec,
590 _ => continue,
591 };
592
593 if spec.checksum.is_none() {
594 continue;
596 }
597
598 let raw_spec = Arc::into_raw(spec);
599 let spec = unsafe { &*raw_spec };
600 destination.insert(name.hash, spec);
601 }
602}
603
604impl EvaluatorValue {
607 fn from_static(hash: u64, evaluator_value: &'static MemoizedEvaluatorValue) -> Self {
608 Self {
609 hash,
610 inner: EvaluatorValueInner::Static(evaluator_value),
611 }
612 }
613
614 fn from_pointer(hash: u64, pointer: Arc<MemoizedEvaluatorValue>) -> Self {
615 Self {
616 hash,
617 inner: EvaluatorValueInner::Pointer(pointer),
618 }
619 }
620}
621
622impl DynamicReturnable {
623 fn from_static(hash: u64, returnable: &'static HashMap<String, RkyvValue>) -> Self {
624 Self {
625 hash,
626 value: DynamicReturnableValue::JsonStatic(returnable),
627 }
628 }
629
630 fn from_archived(
631 hash: u64,
632 returnable: &'static ArchivedHashMap<ArchivedString, ArchivedRkyvValue>,
633 ) -> Self {
634 Self {
635 hash,
636 value: DynamicReturnableValue::JsonArchived(returnable),
637 }
638 }
639
640 fn from_pointer(hash: u64, pointer: Arc<HashMap<String, RkyvValue>>) -> Self {
641 Self {
642 hash,
643 value: DynamicReturnableValue::JsonPointer(pointer),
644 }
645 }
646}
647
648impl InternedString {
649 fn from_static(hash: u64, string: &'static str) -> Self {
650 Self {
651 hash,
652 value: InternedStringValue::Static(string),
653 }
654 }
655
656 fn from_pointer(hash: u64, pointer: Arc<String>) -> Self {
657 Self {
658 hash,
659 value: InternedStringValue::Pointer(pointer),
660 }
661 }
662}