statsig_rust/compression/
zstd_decompression_dict.rs

1use std::sync::Arc;
2
3use crate::observability::ops_stats::OpsStatsForInstance;
4use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
5use serde::Serialize;
6use std::fmt::Debug;
7
8use crate::{log_error_to_statsig_and_console, StatsigErr};
9
10const TAG: &str = stringify!(DictionaryDecoder);
11
12/// Wraps zstd::dict::DecoderDictionary.
13/// No need to wrap this in Arc; its internals are already wrapped in Arc.
14#[derive(Clone)]
15pub struct DictionaryDecoder {
16    inner: Arc<DictionaryDecoderInner>,
17}
18
19impl Serialize for DictionaryDecoder {
20    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
21    where
22        S: serde::Serializer,
23    {
24        self.inner.serialize(serializer)
25    }
26}
27
28impl Debug for DictionaryDecoder {
29    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30        write!(
31            f,
32            "DictionaryDecoder {{ dict_id: {:?} }}",
33            self.inner.dict_id
34        )
35    }
36}
37
38#[derive(Serialize)]
39struct DictionaryDecoderInner {
40    #[serde(skip)]
41    ops_stats: Option<Arc<OpsStatsForInstance>>,
42    dict_id: String,
43    #[serde(skip)]
44    ddict: zstd::dict::DecoderDictionary<'static>,
45}
46
47impl DictionaryDecoder {
48    pub fn new(
49        ops_stats: Option<Arc<OpsStatsForInstance>>,
50        dict_id: String,
51        dict_buff: &[u8],
52    ) -> Self {
53        let ddict = zstd::dict::DecoderDictionary::copy(dict_buff);
54        Self {
55            inner: Arc::new(DictionaryDecoderInner {
56                ops_stats,
57                dict_id,
58                ddict,
59            }),
60        }
61    }
62
63    pub fn get_dict_id(&self) -> &str {
64        &self.inner.dict_id
65    }
66
67    pub fn decompress(&self, compressed: &[u8]) -> Result<Vec<u8>, StatsigErr> {
68        let mut decompressed = std::io::Cursor::new(Vec::new());
69        let compressed_reader = std::io::Cursor::new(compressed);
70        let mut decoder =
71            zstd::stream::Decoder::with_prepared_dictionary(compressed_reader, &self.inner.ddict)
72                .map_err(|e| {
73                let err = StatsigErr::ZstdError(format!(
74                    "Unexpected error while constructing decoder with dictionary {}: {}",
75                    self.inner.dict_id, e
76                ));
77                if let Some(ops_stats) = &self.inner.ops_stats {
78                    log_error_to_statsig_and_console!(ops_stats, TAG, err.clone());
79                }
80                err
81            })?;
82        std::io::copy(&mut decoder, &mut decompressed).map_err(|e| {
83            StatsigErr::ZstdError(format!("Unexpected error while decompressing data: {e}"))
84        })?;
85
86        Ok(decompressed.into_inner())
87    }
88}