Skip to main content

netidx_value/
abstract_type.rs

1use crate::Value;
2use ahash::AHashMap;
3use anyhow::bail;
4use bytes::{Buf, BufMut, Bytes};
5use netidx_core::{
6    pack::{len_wrapped_decode, len_wrapped_encode, len_wrapped_len, Pack, PackError},
7    utils,
8};
9use parking_lot::RwLock;
10use serde::{de, ser, Deserialize, Serialize};
11use std::{
12    any::{Any, TypeId},
13    cmp::Ordering,
14    collections::hash_map::Entry,
15    fmt::{self, Debug, Formatter},
16    hash::{Hash, Hasher},
17    marker::PhantomData,
18    result,
19    sync::{Arc, LazyLock},
20};
21use uuid::Uuid;
22
23type EncodedLenFn = Box<dyn Fn(&Abstract) -> usize + Send + Sync + 'static>;
24type EncodeFn = Box<
25    dyn Fn(&Abstract, &mut dyn BufMut) -> result::Result<(), PackError>
26        + Send
27        + Sync
28        + 'static,
29>;
30type DecodeFn = Box<
31    dyn Fn(&mut dyn Buf) -> result::Result<Box<dyn Any + Send + Sync>, PackError>
32        + Send
33        + Sync
34        + 'static,
35>;
36
37type HashFn = Box<dyn Fn(&Abstract, &mut dyn Hasher) + Send + Sync + 'static>;
38
39type EqFn = Box<dyn Fn(&Abstract, &Abstract) -> bool + Send + Sync + 'static>;
40
41type OrdFn = Box<dyn Fn(&Abstract, &Abstract) -> Ordering + Send + Sync + 'static>;
42
43type DebugFn =
44    Box<dyn Fn(&Abstract, &mut Formatter) -> fmt::Result + Send + Sync + 'static>;
45
46// this is necessary because Pack is not object safe
47struct AbstractVtable {
48    tid: TypeId,
49    encoded_len: EncodedLenFn,
50    encode: EncodeFn,
51    decode: DecodeFn,
52    debug: DebugFn,
53    hash: HashFn,
54    eq: EqFn,
55    ord: OrdFn,
56}
57
58impl AbstractVtable {
59    fn new<T: Any + Debug + Pack + Hash + Eq + Ord + Send + Sync>() -> Self {
60        AbstractVtable {
61            tid: TypeId::of::<T>(),
62            encoded_len: Box::new(|t| {
63                let t = t.downcast_ref::<T>().unwrap();
64                Pack::encoded_len(t)
65            }),
66            encode: Box::new(|t, mut buf| {
67                let t = t.downcast_ref::<T>().unwrap();
68                Pack::encode(t, &mut buf)
69            }),
70            decode: Box::new(|mut buf| {
71                let t = T::decode(&mut buf)?;
72                Ok(Box::new(t))
73            }),
74            debug: Box::new(|t, f| {
75                let t = t.downcast_ref::<T>().unwrap();
76                t.fmt(f)
77            }),
78            hash: Box::new(|t, mut hasher| {
79                let t = t.downcast_ref::<T>().unwrap();
80                t.hash(&mut hasher)
81            }),
82            eq: Box::new(|t0, t1| {
83                let t0 = t0.downcast_ref::<T>();
84                let t1 = t1.downcast_ref::<T>();
85                t0 == t1
86            }),
87            ord: Box::new(|t0, t1| match t0.type_id().cmp(&t1.type_id()) {
88                Ordering::Equal => {
89                    let t0 = t0.downcast_ref::<T>().unwrap();
90                    let t1 = t1.downcast_ref::<T>().unwrap();
91                    t0.cmp(t1)
92                }
93                o => o,
94            }),
95        }
96    }
97}
98
99/// This is the type that will be decoded if we unpack an abstract
100/// type that hasn't been registered. It preserves the original
101/// payload of the type so if we end up forwarding this to another
102/// program that does know this type, it will work.
103#[derive(Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
104pub struct UnknownAbstractType(Bytes);
105
106impl Pack for UnknownAbstractType {
107    fn encoded_len(&self) -> usize {
108        self.0.len()
109    }
110
111    fn encode(&self, buf: &mut impl BufMut) -> Result<(), PackError> {
112        Ok(buf.put_slice(&self.0[..]))
113    }
114
115    fn decode(buf: &mut impl Buf) -> Result<Self, PackError> {
116        // len_wrapped_encode passes us a limited view of the buffer,
117        // since we are always the last thing in it we can just take
118        // the whole thing.
119        Ok(UnknownAbstractType(buf.copy_to_bytes(buf.remaining())))
120    }
121}
122
123struct Registry {
124    by_uuid: AHashMap<Uuid, Arc<AbstractVtable>>,
125    by_tid: AHashMap<TypeId, Uuid>,
126}
127
128impl Registry {
129    fn insert<T: Any + Debug + Pack + Hash + Eq + Ord + Send + Sync>(
130        &mut self,
131        id: Uuid,
132    ) -> anyhow::Result<Arc<AbstractVtable>> {
133        match self.by_uuid.entry(id) {
134            Entry::Occupied(e) => {
135                if e.get().tid != TypeId::of::<T>() {
136                    bail!("attempt to register {id:?} with different types")
137                }
138                Ok(e.get().clone())
139            }
140            Entry::Vacant(e) => {
141                match self.by_tid.entry(TypeId::of::<T>()) {
142                    Entry::Vacant(e) => e.insert(id),
143                    Entry::Occupied(_) => {
144                        bail!("T registered multiple times with different ids")
145                    }
146                };
147                let vt = Arc::new(AbstractVtable::new::<T>());
148                e.insert(vt.clone());
149                Ok(vt)
150            }
151        }
152    }
153}
154
155/// This is the ID of UnknownAbstractType
156pub static UNKNOWN_ID: Uuid = Uuid::from_bytes([
157    195, 155, 41, 43, 251, 148, 70, 166, 129, 118, 150, 177, 94, 123, 235, 23,
158]);
159
160static REGISTRY: LazyLock<RwLock<Registry>> = LazyLock::new(|| {
161    let mut reg = Registry { by_uuid: AHashMap::default(), by_tid: AHashMap::default() };
162    reg.insert::<UnknownAbstractType>(UNKNOWN_ID).unwrap();
163    RwLock::new(reg)
164});
165
166/// Wrap Ts as Abstracts
167pub struct AbstractWrapper<T: Any + Debug + Pack + Hash + Eq + Ord + Send + Sync> {
168    id: Uuid,
169    vtable: Arc<AbstractVtable>,
170    t: PhantomData<T>,
171}
172
173impl<T: Any + Debug + Pack + Hash + Eq + Ord + Send + Sync> AbstractWrapper<T> {
174    /// Return the UUID that T is registered as
175    pub fn id(&self) -> Uuid {
176        self.id
177    }
178
179    /// Wrap T as an Abstract Value
180    pub fn wrap(&self, t: T) -> Value {
181        Value::Abstract(Abstract(Arc::new(AbstractInner {
182            id: self.id,
183            vtable: self.vtable.clone(),
184            t: Box::new(t),
185        })))
186    }
187}
188
189struct AbstractInner {
190    id: Uuid,
191    vtable: Arc<AbstractVtable>,
192    t: Box<dyn Any + Send + Sync>,
193}
194
195/// The abstract netidx value type
196///
197/// Any type implementing Any + Pack + Send + Sync can be wrapped in an Abstract
198/// and published to netidx.
199///
200/// When a client reads an abstract value from netidx, it will look
201/// to see if it has the same type registered (by UUID). If it does,
202/// it will be able to decode the abstract type and use it. If it
203/// does not, then a special type called UnknownAbstractType will
204/// be substituted and the decode will not fail.
205///
206/// You may recover the original type by downcasting, which will fail
207/// if the type isn't the one you expected.
208///
209/// Through abstract types the Value type can be extended in whatever way is
210/// suited by the application/user site without every netidx user needing to
211/// agree on a new protocol extension. As long as all the applications using an
212/// abstract type register it with the same UUID, and the Pack implementation
213/// remains compatible, then it can be seamlessly used without interfering with
214/// non participarting applications.
215#[derive(Clone)]
216pub struct Abstract(Arc<AbstractInner>);
217
218impl Abstract {
219    /// Look up the UUID of the concrete type of this Abstract
220    pub fn id(&self) -> Uuid {
221        self.0.id
222    }
223
224    /// Downcast &self to &T. If self isn't a T then return None.
225    pub fn downcast_ref<T: Any + Send + Sync>(&self) -> Option<&T> {
226        (&*self.0.t).downcast_ref::<T>()
227    }
228
229    /// Register a new abstract type, return an object that will wrap instances
230    /// of T as an Abstract.
231    ///
232    /// Register is idempotent.
233    ///
234    /// - it is an error to register T with multiple different ids
235    /// - it is an error to register the same id with multiple Ts
236    pub fn register<T: Any + Debug + Pack + Hash + Eq + Ord + Send + Sync>(
237        id: Uuid,
238    ) -> anyhow::Result<AbstractWrapper<T>> {
239        let vtable = REGISTRY.write().insert::<T>(id)?;
240        Ok(AbstractWrapper { id, vtable, t: PhantomData })
241    }
242}
243
244impl Default for Abstract {
245    fn default() -> Self {
246        let reg = REGISTRY.read();
247        Abstract(Arc::new(AbstractInner {
248            id: UNKNOWN_ID,
249            vtable: reg.by_uuid[&UNKNOWN_ID].clone(),
250            t: Box::new(UnknownAbstractType),
251        }))
252    }
253}
254
255impl fmt::Debug for Abstract {
256    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
257        write!(f, "Abstract(")?;
258        (self.0.vtable.debug)(self, f)?;
259        write!(f, ")")
260    }
261}
262
263impl Pack for Abstract {
264    fn encoded_len(&self) -> usize {
265        let id_len = Pack::encoded_len(&self.0.id);
266        let t_len = (self.0.vtable.encoded_len)(self);
267        len_wrapped_len(id_len + t_len)
268    }
269
270    fn encode(&self, buf: &mut impl BufMut) -> Result<(), PackError> {
271        len_wrapped_encode(buf, self, |buf| {
272            Pack::encode(&self.0.id, buf)?;
273            (self.0.vtable.encode)(self, buf)
274        })
275    }
276
277    fn decode(buf: &mut impl Buf) -> Result<Self, PackError> {
278        len_wrapped_decode(buf, |buf| {
279            let id: Uuid = Pack::decode(buf)?;
280            let reg = REGISTRY.read();
281            match reg.by_uuid.get(&id) {
282                Some(vtable) => {
283                    let t = (vtable.decode)(buf)?;
284                    Ok(Abstract(Arc::new(AbstractInner {
285                        id,
286                        vtable: vtable.clone(),
287                        t,
288                    })))
289                }
290                None => Ok(Abstract(Arc::new(AbstractInner {
291                    id,
292                    vtable: reg.by_uuid[&UNKNOWN_ID].clone(),
293                    t: Box::new(UnknownAbstractType::decode(buf)?),
294                }))),
295            }
296        })
297    }
298}
299
300impl Serialize for Abstract {
301    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
302    where
303        S: serde::Serializer,
304    {
305        let buf = utils::pack(self).map_err(|e| ser::Error::custom(e))?;
306        buf.serialize(serializer)
307    }
308}
309
310impl<'de> Deserialize<'de> for Abstract {
311    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
312    where
313        D: serde::Deserializer<'de>,
314    {
315        let mut buf = Bytes::deserialize(deserializer)?;
316        Pack::decode(&mut buf).map_err(|e| de::Error::custom(e))
317    }
318}
319
320impl Hash for Abstract {
321    fn hash<H: Hasher>(&self, state: &mut H) {
322        (self.0.vtable.hash)(self, state)
323    }
324}
325
326impl PartialEq for Abstract {
327    fn eq(&self, other: &Self) -> bool {
328        (self.0.vtable.eq)(self, other)
329    }
330}
331
332impl Eq for Abstract {}
333
334impl PartialOrd for Abstract {
335    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
336        Some(self.cmp(other))
337    }
338}
339
340impl Ord for Abstract {
341    fn cmp(&self, other: &Self) -> Ordering {
342        (self.0.vtable.ord)(self, other)
343    }
344}