libdd_profiling/profiles/
compressor.rs

1// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4use std::io::{self, Read, Write};
5
6/// This type wraps a [`Vec`] to provide a [`Write`] interface that has a max
7/// capacity that won't be exceeded. Additionally, it gracefully handles
8/// out-of-memory conditions instead of panicking (unfortunately not compatible
9/// with the `no-panic` crate, though).
10pub struct SizeRestrictedBuffer {
11    vec: Vec<u8>,
12    max_capacity: usize,
13}
14
15impl SizeRestrictedBuffer {
16    /// Returns a buffer which can never hold any data.
17    pub const fn zero_capacity() -> Self {
18        Self {
19            vec: Vec::new(),
20            max_capacity: 0,
21        }
22    }
23
24    /// Tries to create an initial buffer with the provided size hint as well
25    /// as the provided max capacity. Neither number is required to be a power
26    /// of 2.
27    ///
28    /// # Errors
29    ///
30    /// - Fails if the `size_hint` is larger than the `max_capacity`.
31    /// - Fails if memory cannot be reserved.
32    pub fn try_new(size_hint: usize, max_capacity: usize) -> io::Result<Self> {
33        if size_hint > max_capacity {
34            return Err(io::Error::new(
35                io::ErrorKind::InvalidInput,
36                "size hint shouldn't be larger than max capacity",
37            ));
38        }
39        let mut vec = Vec::new();
40        vec.try_reserve(size_hint)?;
41        Ok(SizeRestrictedBuffer { vec, max_capacity })
42    }
43
44    pub fn as_slice(&self) -> &[u8] {
45        self.vec.as_slice()
46    }
47}
48
49impl From<SizeRestrictedBuffer> for Vec<u8> {
50    fn from(buf: SizeRestrictedBuffer) -> Self {
51        buf.vec
52    }
53}
54
55impl AsRef<[u8]> for SizeRestrictedBuffer {
56    fn as_ref(&self) -> &[u8] {
57        self.as_slice()
58    }
59}
60
61impl Write for SizeRestrictedBuffer {
62    #[inline]
63    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
64        let additional = buf.len();
65        if additional <= self.max_capacity.wrapping_sub(self.vec.len()) {
66            self.vec.try_reserve(additional)?;
67            self.vec.extend(buf);
68            Ok(additional)
69        } else {
70            Err(io::Error::new(
71                io::ErrorKind::StorageFull,
72                "no storage space: size-restricted buffer reached its maximum size",
73            ))
74        }
75    }
76
77    #[inline]
78    fn flush(&mut self) -> io::Result<()> {
79        Ok(())
80    }
81}
82
83pub trait ProfileCodec {
84    type Encoder: Write;
85
86    fn new_encoder(
87        size_hint: usize,
88        max_capacity: usize,
89        compression_level: i32,
90    ) -> io::Result<Self::Encoder>;
91
92    fn finish(encoder: Self::Encoder) -> io::Result<Vec<u8>>;
93}
94
95#[allow(unused)]
96pub struct NoopProfileCodec;
97
98impl ProfileCodec for NoopProfileCodec {
99    type Encoder = SizeRestrictedBuffer;
100
101    fn new_encoder(
102        size_hint: usize,
103        max_capacity: usize,
104        _compression_level: i32,
105    ) -> io::Result<Self::Encoder> {
106        SizeRestrictedBuffer::try_new(size_hint, max_capacity)
107    }
108
109    fn finish(encoder: Self::Encoder) -> io::Result<Vec<u8>> {
110        Ok(encoder.into())
111    }
112}
113
114#[allow(unused)]
115pub struct ZstdProfileCodec;
116
117impl ProfileCodec for ZstdProfileCodec {
118    type Encoder = zstd::Encoder<'static, SizeRestrictedBuffer>;
119
120    fn new_encoder(
121        size_hint: usize,
122        max_capacity: usize,
123        compression_level: i32,
124    ) -> io::Result<Self::Encoder> {
125        let buffer = SizeRestrictedBuffer::try_new(size_hint, max_capacity)?;
126        zstd::Encoder::<'static, SizeRestrictedBuffer>::new(buffer, compression_level)
127    }
128
129    fn finish(encoder: Self::Encoder) -> io::Result<Vec<u8>> {
130        match encoder.try_finish() {
131            Ok(buffer) => Ok(buffer.into()),
132            Err((_enc, error)) => Err(error),
133        }
134    }
135}
136
137#[cfg(not(miri))]
138pub type DefaultProfileCodec = ZstdProfileCodec;
139#[cfg(miri)]
140pub type DefaultProfileCodec = NoopProfileCodec;
141
142pub trait ObservationCodec {
143    type Encoder: Write;
144    type Decoder: Read;
145
146    fn new_encoder(size_hint: usize, max_capacity: usize) -> io::Result<Self::Encoder>;
147    fn encoder_into_decoder(encoder: Self::Encoder) -> io::Result<Self::Decoder>;
148}
149
150#[allow(unused)]
151pub struct NoopObservationCodec;
152
153impl ObservationCodec for NoopObservationCodec {
154    type Encoder = SizeRestrictedBuffer;
155    type Decoder = io::Cursor<SizeRestrictedBuffer>;
156
157    fn new_encoder(size_hint: usize, max_capacity: usize) -> io::Result<Self::Encoder> {
158        SizeRestrictedBuffer::try_new(size_hint, max_capacity)
159    }
160
161    fn encoder_into_decoder(encoder: Self::Encoder) -> io::Result<Self::Decoder> {
162        Ok(io::Cursor::new(encoder))
163    }
164}
165
166#[allow(unused)]
167pub struct ZstdObservationCodec;
168
169impl ObservationCodec for ZstdObservationCodec {
170    type Encoder = zstd::Encoder<'static, SizeRestrictedBuffer>;
171    type Decoder = zstd::Decoder<'static, io::Cursor<SizeRestrictedBuffer>>;
172
173    fn new_encoder(size_hint: usize, max_capacity: usize) -> io::Result<Self::Encoder> {
174        let buffer = SizeRestrictedBuffer::try_new(size_hint, max_capacity)?;
175        zstd::Encoder::new(buffer, 1)
176    }
177
178    fn encoder_into_decoder(encoder: Self::Encoder) -> io::Result<Self::Decoder> {
179        match encoder.try_finish() {
180            Ok(buffer) => zstd::Decoder::with_buffer(io::Cursor::new(buffer)),
181            Err((_enc, error)) => Err(error),
182        }
183    }
184}
185
186#[cfg(not(miri))]
187pub type DefaultObservationCodec = ZstdObservationCodec;
188#[cfg(miri)]
189pub type DefaultObservationCodec = NoopObservationCodec;
190
191/// Used to compress profile data.
192pub struct Compressor<C: ProfileCodec = DefaultProfileCodec> {
193    encoder: C::Encoder,
194}
195
196impl<C: ProfileCodec> Compressor<C> {
197    /// Creates a new compressor with the provided configuration.
198    ///
199    /// - `size_hint`: beginning capacity for the output buffer. This is a hint for the starting
200    ///   size, and the implementation may use something different.
201    /// - `max_capacity`: the maximum size for the output buffer (hard limit).
202    /// - `compression_level`: see [`zstd::Encoder::new`] for the valid range.
203    pub fn try_new(
204        size_hint: usize,
205        max_capacity: usize,
206        compression_level: i32,
207    ) -> io::Result<Compressor<C>> {
208        Ok(Compressor {
209            encoder: C::new_encoder(size_hint, max_capacity, compression_level)?,
210        })
211    }
212
213    /// Finish the compression, and return the compressed data.
214    pub fn finish(self) -> io::Result<Vec<u8>> {
215        C::finish(self.encoder)
216    }
217}
218
219impl<C: ProfileCodec> Write for Compressor<C> {
220    #[inline]
221    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
222        self.encoder.write(buf)
223    }
224
225    #[inline]
226    fn flush(&mut self) -> io::Result<()> {
227        self.encoder.flush()
228    }
229}