netidx_value/
pbuf.rs

1/// Why? Because Bytes is 5 words. This module exports PBytes, which
2/// reduces Bytes from 5 words to 1 words by wrapping it in a pooled
3/// arc. This allows us to reduce the size of Value from 5 words to 3
4/// words, while only paying the cost of a double indirection when
5/// accessing a Bytes.
6use bytes::{Buf, BufMut, Bytes};
7use netidx_core::pack::{decode_varint, encode_varint, varint_len, Pack, PackError};
8use poolshark::{
9    global::{arc::TArc as PArc, RawPool},
10    Poolable,
11};
12use serde::{de::Visitor, Deserialize, Serialize};
13use std::{borrow::Borrow, mem, ops::Deref, sync::LazyLock};
14
15#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
16struct PooledBytes(Bytes);
17
18impl Poolable for PooledBytes {
19    fn empty() -> Self {
20        Self(Bytes::new())
21    }
22
23    fn capacity(&self) -> usize {
24        1
25    }
26
27    fn reset(&mut self) {
28        *self = Self(Bytes::new())
29    }
30}
31
32impl Deref for PooledBytes {
33    type Target = Bytes;
34
35    fn deref(&self) -> &Self::Target {
36        &self.0
37    }
38}
39
40static POOL: LazyLock<RawPool<PArc<PooledBytes>>> =
41    LazyLock::new(|| RawPool::new(8124, 64));
42
43#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
44pub struct PBytes(PArc<PooledBytes>);
45
46impl Deref for PBytes {
47    type Target = Bytes;
48
49    fn deref(&self) -> &Self::Target {
50        &**self.0
51    }
52}
53
54impl Borrow<Bytes> for PBytes {
55    fn borrow(&self) -> &Bytes {
56        &*self
57    }
58}
59
60impl Borrow<[u8]> for PBytes {
61    fn borrow(&self) -> &[u8] {
62        &**self
63    }
64}
65
66impl AsRef<[u8]> for PBytes {
67    fn as_ref(&self) -> &[u8] {
68        &**self
69    }
70}
71
72impl PBytes {
73    pub fn new(b: Bytes) -> Self {
74        Self(PArc::new(&POOL, PooledBytes(b)))
75    }
76}
77
78impl From<Bytes> for PBytes {
79    fn from(value: Bytes) -> Self {
80        Self::new(value)
81    }
82}
83
84impl Into<Bytes> for PBytes {
85    fn into(mut self) -> Bytes {
86        match PArc::get_mut(&mut self.0) {
87            Some(b) => mem::take(b).0,
88            None => ((*self.0).0).clone(),
89        }
90    }
91}
92
93impl Serialize for PBytes {
94    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
95    where
96        S: serde::Serializer,
97    {
98        let bytes: &Bytes = &*self.0;
99        bytes.serialize(serializer)
100    }
101}
102
103struct PBytesVisitor;
104
105impl<'de> Visitor<'de> for PBytesVisitor {
106    type Value = PBytes;
107
108    fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
109        write!(f, "expecting a bytes")
110    }
111
112    fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E>
113    where
114        E: serde::de::Error,
115    {
116        Ok(PBytes::new(Bytes::copy_from_slice(v)))
117    }
118}
119
120impl<'de> Deserialize<'de> for PBytes {
121    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
122    where
123        D: serde::Deserializer<'de>,
124    {
125        deserializer.deserialize_bytes(PBytesVisitor)
126    }
127}
128
129impl Pack for PBytes {
130    fn encoded_len(&self) -> usize {
131        let len = self.len();
132        varint_len(len as u64) + len
133    }
134
135    fn encode(&self, buf: &mut impl BufMut) -> Result<(), PackError> {
136        encode_varint(self.len() as u64, buf);
137        Ok(buf.put_slice(&*self))
138    }
139
140    fn decode(buf: &mut impl Buf) -> Result<Self, PackError> {
141        let len = decode_varint(buf)?;
142        if len as usize > buf.remaining() {
143            Err(PackError::TooBig)
144        } else {
145            Ok(Self::from(buf.copy_to_bytes(len as usize)))
146        }
147    }
148}