netidx_value/
abstract_type.rs

1use anyhow::bail;
2use bytes::{Buf, BufMut, Bytes};
3use fxhash::FxHashMap;
4use netidx_core::{
5    pack::{len_wrapped_decode, len_wrapped_encode, len_wrapped_len, Pack, PackError},
6    utils,
7};
8use parking_lot::RwLock;
9use serde::{de, ser, Deserialize, Serialize};
10use std::{
11    any::{Any, TypeId},
12    cmp::Ordering,
13    collections::{hash_map::Entry, HashMap},
14    fmt::{self, Debug, Formatter},
15    hash::{Hash, Hasher},
16    marker::PhantomData,
17    result,
18    sync::{Arc, LazyLock},
19};
20use uuid::Uuid;
21
22use crate::Value;
23
24type EncodedLenFn = Box<dyn Fn(&Abstract) -> usize + Send + Sync + 'static>;
25type EncodeFn = Box<
26    dyn Fn(&Abstract, &mut dyn BufMut) -> result::Result<(), PackError>
27        + Send
28        + Sync
29        + 'static,
30>;
31type DecodeFn = Box<
32    dyn Fn(&mut dyn Buf) -> result::Result<Box<dyn Any + Send + Sync>, PackError>
33        + Send
34        + Sync
35        + 'static,
36>;
37
38type HashFn = Box<dyn Fn(&Abstract, &mut dyn Hasher) + Send + Sync + 'static>;
39
40type EqFn = Box<dyn Fn(&Abstract, &Abstract) -> bool + Send + Sync + 'static>;
41
42type OrdFn = Box<dyn Fn(&Abstract, &Abstract) -> Ordering + Send + Sync + 'static>;
43
44type DebugFn =
45    Box<dyn Fn(&Abstract, &mut Formatter) -> fmt::Result + Send + Sync + 'static>;
46
47// this is necessary because Pack is not object safe
48struct AbstractVtable {
49    tid: TypeId,
50    encoded_len: EncodedLenFn,
51    encode: EncodeFn,
52    decode: DecodeFn,
53    debug: DebugFn,
54    hash: HashFn,
55    eq: EqFn,
56    ord: OrdFn,
57}
58
59impl AbstractVtable {
60    fn new<T: Any + Debug + Pack + Hash + Eq + Ord + Send + Sync>() -> Self {
61        AbstractVtable {
62            tid: TypeId::of::<T>(),
63            encoded_len: Box::new(|t| {
64                let t = t.downcast_ref::<T>().unwrap();
65                Pack::encoded_len(t)
66            }),
67            encode: Box::new(|t, mut buf| {
68                let t = t.downcast_ref::<T>().unwrap();
69                Pack::encode(t, &mut buf)
70            }),
71            decode: Box::new(|mut buf| {
72                let t = T::decode(&mut buf)?;
73                Ok(Box::new(t))
74            }),
75            debug: Box::new(|t, f| {
76                let t = t.downcast_ref::<T>().unwrap();
77                t.fmt(f)
78            }),
79            hash: Box::new(|t, mut hasher| {
80                let t = t.downcast_ref::<T>().unwrap();
81                t.hash(&mut hasher)
82            }),
83            eq: Box::new(|t0, t1| {
84                let t0 = t0.downcast_ref::<T>();
85                let t1 = t1.downcast_ref::<T>();
86                t0 == t1
87            }),
88            ord: Box::new(|t0, t1| match t0.type_id().cmp(&t1.type_id()) {
89                Ordering::Equal => {
90                    let t0 = t0.downcast_ref::<T>().unwrap();
91                    let t1 = t1.downcast_ref::<T>().unwrap();
92                    t0.cmp(t1)
93                }
94                o => o,
95            }),
96        }
97    }
98}
99
100/// This is the type that will be decoded if we unpack an abstract
101/// type that hasn't been registered. It preserves the original
102/// payload of the type so if we end up forwarding this to another
103/// program that does know this type, it will work.
104#[derive(Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
105pub struct UnknownAbstractType(Bytes);
106
107impl Pack for UnknownAbstractType {
108    fn encoded_len(&self) -> usize {
109        self.0.len()
110    }
111
112    fn encode(&self, buf: &mut impl BufMut) -> Result<(), PackError> {
113        Ok(buf.put_slice(&self.0[..]))
114    }
115
116    fn decode(buf: &mut impl Buf) -> Result<Self, PackError> {
117        // len_wrapped_encode passes us a limited view of the buffer,
118        // since we are always the last thing in it we can just take
119        // the whole thing.
120        Ok(UnknownAbstractType(buf.copy_to_bytes(buf.remaining())))
121    }
122}
123
124struct Registry {
125    by_uuid: FxHashMap<Uuid, Arc<AbstractVtable>>,
126    by_tid: FxHashMap<TypeId, Uuid>,
127}
128
129impl Registry {
130    fn insert<T: Any + Debug + Pack + Hash + Eq + Ord + Send + Sync>(
131        &mut self,
132        id: Uuid,
133    ) -> anyhow::Result<Arc<AbstractVtable>> {
134        match self.by_uuid.entry(id) {
135            Entry::Occupied(e) => {
136                if e.get().tid != TypeId::of::<T>() {
137                    bail!("attempt to register {id:?} with different types")
138                }
139                Ok(e.get().clone())
140            }
141            Entry::Vacant(e) => {
142                match self.by_tid.entry(TypeId::of::<T>()) {
143                    Entry::Vacant(e) => e.insert(id),
144                    Entry::Occupied(_) => {
145                        bail!("T registered multiple times with different ids")
146                    }
147                };
148                let vt = Arc::new(AbstractVtable::new::<T>());
149                e.insert(vt.clone());
150                Ok(vt)
151            }
152        }
153    }
154}
155
156/// This is the ID of UnknownAbstractType
157pub static UNKNOWN_ID: Uuid = Uuid::from_bytes([
158    195, 155, 41, 43, 251, 148, 70, 166, 129, 118, 150, 177, 94, 123, 235, 23,
159]);
160
161static REGISTRY: LazyLock<RwLock<Registry>> = LazyLock::new(|| {
162    let mut reg = Registry { by_uuid: HashMap::default(), by_tid: HashMap::default() };
163    reg.insert::<UnknownAbstractType>(UNKNOWN_ID).unwrap();
164    RwLock::new(reg)
165});
166
167/// Wrap Ts as Abstracts
168pub struct AbstractWrapper<T: Any + Debug + Pack + Hash + Eq + Ord + Send + Sync> {
169    id: Uuid,
170    vtable: Arc<AbstractVtable>,
171    t: PhantomData<T>,
172}
173
174impl<T: Any + Debug + Pack + Hash + Eq + Ord + Send + Sync> AbstractWrapper<T> {
175    /// Return the UUID that T is registered as
176    pub fn id(&self) -> Uuid {
177        self.id
178    }
179
180    /// Wrap T as an Abstract Value
181    pub fn wrap(&self, t: T) -> Value {
182        Value::Abstract(Abstract(Arc::new(AbstractInner {
183            id: self.id,
184            vtable: self.vtable.clone(),
185            t: Box::new(t),
186        })))
187    }
188}
189
190struct AbstractInner {
191    id: Uuid,
192    vtable: Arc<AbstractVtable>,
193    t: Box<dyn Any + Send + Sync>,
194}
195
196/// The abstract netidx value type
197///
198/// Any type implementing Any + Pack + Send + Sync can be wrapped in an Abstract
199/// and published to netidx.
200///
201/// When a client reads an abstract value from netidx, it will look
202/// to see if it has the same type registered (by UUID). If it does,
203/// it will be able to decode the abstract type and use it. If it
204/// does not, then a special type called UnknownAbstractType will
205/// be substituted and the decode will not fail.
206///
207/// You may recover the original type by downcasting, which will fail
208/// if the type isn't the one you expected.
209///
210/// Through abstract types the Value type can be extended in whatever way is
211/// suited by the application/user site without every netidx user needing to
212/// agree on a new protocol extension. As long as all the applications using an
213/// abstract type register it with the same UUID, and the Pack implementation
214/// remains compatible, then it can be seamlessly used without interfering with
215/// non participarting applications.
216#[derive(Clone)]
217pub struct Abstract(Arc<AbstractInner>);
218
219impl Abstract {
220    /// Look up the UUID of the concrete type of this Abstract
221    pub fn id(&self) -> Uuid {
222        self.0.id
223    }
224
225    /// Downcast &self to &T. If self isn't a T then return None.
226    pub fn downcast_ref<T: Any + Send + Sync>(&self) -> Option<&T> {
227        (&*self.0.t).downcast_ref::<T>()
228    }
229
230    /// Register a new abstract type, return an object that will wrap instances
231    /// of T as an Abstract.
232    ///
233    /// Register is idempotent.
234    ///
235    /// - it is an error to register T with multiple different ids
236    /// - it is an error to register the same id with multiple Ts
237    pub fn register<T: Any + Debug + Pack + Hash + Eq + Ord + Send + Sync>(
238        id: Uuid,
239    ) -> anyhow::Result<AbstractWrapper<T>> {
240        let vtable = REGISTRY.write().insert::<T>(id)?;
241        Ok(AbstractWrapper { id, vtable, t: PhantomData })
242    }
243}
244
245impl Default for Abstract {
246    fn default() -> Self {
247        let reg = REGISTRY.read();
248        Abstract(Arc::new(AbstractInner {
249            id: UNKNOWN_ID,
250            vtable: reg.by_uuid[&UNKNOWN_ID].clone(),
251            t: Box::new(UnknownAbstractType),
252        }))
253    }
254}
255
256impl fmt::Debug for Abstract {
257    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
258        write!(f, "Abstract(")?;
259        (self.0.vtable.debug)(self, f)?;
260        write!(f, ")")
261    }
262}
263
264impl Pack for Abstract {
265    fn encoded_len(&self) -> usize {
266        let id_len = Pack::encoded_len(&self.0.id);
267        let t_len = (self.0.vtable.encoded_len)(self);
268        len_wrapped_len(id_len + t_len)
269    }
270
271    fn encode(&self, buf: &mut impl BufMut) -> Result<(), PackError> {
272        len_wrapped_encode(buf, self, |buf| {
273            Pack::encode(&self.0.id, buf)?;
274            (self.0.vtable.encode)(self, buf)
275        })
276    }
277
278    fn decode(buf: &mut impl Buf) -> Result<Self, PackError> {
279        len_wrapped_decode(buf, |buf| {
280            let id: Uuid = Pack::decode(buf)?;
281            let reg = REGISTRY.read();
282            match reg.by_uuid.get(&id) {
283                Some(vtable) => {
284                    let t = (vtable.decode)(buf)?;
285                    Ok(Abstract(Arc::new(AbstractInner {
286                        id,
287                        vtable: vtable.clone(),
288                        t,
289                    })))
290                }
291                None => Ok(Abstract(Arc::new(AbstractInner {
292                    id,
293                    vtable: reg.by_uuid[&UNKNOWN_ID].clone(),
294                    t: Box::new(UnknownAbstractType::decode(buf)?),
295                }))),
296            }
297        })
298    }
299}
300
301impl Serialize for Abstract {
302    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
303    where
304        S: serde::Serializer,
305    {
306        let buf = utils::pack(self).map_err(|e| ser::Error::custom(e))?;
307        buf.serialize(serializer)
308    }
309}
310
311impl<'de> Deserialize<'de> for Abstract {
312    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
313    where
314        D: serde::Deserializer<'de>,
315    {
316        let mut buf = Bytes::deserialize(deserializer)?;
317        Pack::decode(&mut buf).map_err(|e| de::Error::custom(e))
318    }
319}
320
321impl Hash for Abstract {
322    fn hash<H: Hasher>(&self, state: &mut H) {
323        (self.0.vtable.hash)(self, state)
324    }
325}
326
327impl PartialEq for Abstract {
328    fn eq(&self, other: &Self) -> bool {
329        (self.0.vtable.eq)(self, other)
330    }
331}
332
333impl Eq for Abstract {}
334
335impl PartialOrd for Abstract {
336    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
337        Some(self.cmp(other))
338    }
339}
340
341impl Ord for Abstract {
342    fn cmp(&self, other: &Self) -> Ordering {
343        (self.0.vtable.ord)(self, other)
344    }
345}