netidx_value/
pbuf.rs

1/// Why? Because Bytes is 5 words. This module export PBytes, which
2/// reduces Bytes from 5 words to 1 words by wrapping it in a pooled
3/// arc. This allows us to reduce the size of Value from 5 words to 3
4/// words, while only paying the cost of a double indirection when
5/// accessing a zero copy Bytes.
6use bytes::{Buf, BufMut, Bytes};
7use netidx_core::{
8    pack::{decode_varint, encode_varint, varint_len, Pack, PackError},
9    pool::{pooled::PArc, RawPool},
10};
11use serde::{de::Visitor, Deserialize, Serialize};
12use std::{borrow::Borrow, mem, ops::Deref, sync::LazyLock};
13
14static POOL: LazyLock<RawPool<PArc<Bytes>>> = LazyLock::new(|| RawPool::new(8124, 64));
15
16#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
17pub struct PBytes(PArc<Bytes>);
18
19impl Deref for PBytes {
20    type Target = Bytes;
21
22    fn deref(&self) -> &Self::Target {
23        &*self.0
24    }
25}
26
27impl Borrow<Bytes> for PBytes {
28    fn borrow(&self) -> &Bytes {
29        &*self
30    }
31}
32
33impl Borrow<[u8]> for PBytes {
34    fn borrow(&self) -> &[u8] {
35        &**self
36    }
37}
38
39impl AsRef<[u8]> for PBytes {
40    fn as_ref(&self) -> &[u8] {
41        &**self
42    }
43}
44
45impl PBytes {
46    pub fn new(b: Bytes) -> Self {
47        Self(PArc::new(&POOL, b))
48    }
49}
50
51impl From<Bytes> for PBytes {
52    fn from(value: Bytes) -> Self {
53        Self::new(value)
54    }
55}
56
57impl Into<Bytes> for PBytes {
58    fn into(mut self) -> Bytes {
59        match PArc::get_mut(&mut self.0) {
60            Some(b) => mem::take(b),
61            None => (*self.0).clone(),
62        }
63    }
64}
65
66impl Serialize for PBytes {
67    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
68    where
69        S: serde::Serializer,
70    {
71        let bytes: &Bytes = &*self.0;
72        bytes.serialize(serializer)
73    }
74}
75
76struct PBytesVisitor;
77
78impl<'de> Visitor<'de> for PBytesVisitor {
79    type Value = PBytes;
80
81    fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
82        write!(f, "expecting a bytes")
83    }
84
85    fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E>
86    where
87        E: serde::de::Error,
88    {
89        Ok(PBytes::new(Bytes::copy_from_slice(v)))
90    }
91}
92
93impl<'de> Deserialize<'de> for PBytes {
94    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
95    where
96        D: serde::Deserializer<'de>,
97    {
98        deserializer.deserialize_bytes(PBytesVisitor)
99    }
100}
101
102impl Pack for PBytes {
103    fn encoded_len(&self) -> usize {
104        let len = self.len();
105        varint_len(len as u64) + len
106    }
107
108    fn encode(&self, buf: &mut impl BufMut) -> Result<(), PackError> {
109        encode_varint(self.len() as u64, buf);
110        Ok(buf.put_slice(&*self))
111    }
112
113    fn decode(buf: &mut impl Buf) -> Result<Self, PackError> {
114        let len = decode_varint(buf)?;
115        if len as usize > buf.remaining() {
116            Err(PackError::TooBig)
117        } else {
118            Ok(Self::from(buf.copy_to_bytes(len as usize)))
119        }
120    }
121}