timely_communication/
message.rs

1//! Types wrapping typed data.
2
3use std::sync::Arc;
4use bytes::arc::Bytes;
5use abomonation;
6use crate::Data;
7
8/// Either an immutable or mutable reference.
9pub enum RefOrMut<'a, T> where T: 'a {
10    /// An immutable reference.
11    Ref(&'a T),
12    /// A mutable reference.
13    Mut(&'a mut T),
14}
15
16impl<'a, T: 'a> ::std::ops::Deref for RefOrMut<'a, T> {
17    type Target = T;
18    fn deref(&self) -> &Self::Target {
19        match self {
20            RefOrMut::Ref(reference) => reference,
21            RefOrMut::Mut(reference) => reference,
22        }
23    }
24}
25
26impl<'a, T: 'a> ::std::borrow::Borrow<T> for RefOrMut<'a, T> {
27    fn borrow(&self) -> &T {
28        match self {
29            RefOrMut::Ref(reference) => reference,
30            RefOrMut::Mut(reference) => reference,
31        }
32    }
33}
34
35impl<'a, T: Clone+'a> RefOrMut<'a, T> {
36    /// Extracts the contents of `self`, either by cloning or swapping.
37    ///
38    /// This consumes `self` because its contents are now in an unknown state.
39    pub fn swap<'b>(self, element: &'b mut T) {
40        match self {
41            RefOrMut::Ref(reference) => element.clone_from(reference),
42            RefOrMut::Mut(reference) => ::std::mem::swap(reference, element),
43        };
44    }
45    /// Extracts the contents of `self`, either by cloning or swapping.
46    ///
47    /// This consumes `self` because its contents are now in an unknown state.
48    pub fn replace(self, mut element: T) -> T {
49        self.swap(&mut element);
50        element
51    }
52
53    /// Extracts the contents of `self`, either by cloning, or swapping and leaving a default
54    /// element in place.
55    ///
56    /// This consumes `self` because its contents are now in an unknown state.
57    pub fn take(self) -> T where T: Default {
58        let mut element = Default::default();
59        self.swap(&mut element);
60        element
61    }
62}
63
64/// A wrapped message which may be either typed or binary data.
65pub struct Message<T> {
66    payload: MessageContents<T>,
67}
68
69/// Possible returned representations from a channel.
70enum MessageContents<T> {
71    /// Binary representation. Only available as a reference.
72    Binary(abomonation::abomonated::Abomonated<T, Bytes>),
73    /// Rust typed instance. Available for ownership.
74    Owned(T),
75    /// Atomic reference counted. Only available as a reference.
76    Arc(Arc<T>),
77}
78
79impl<T> Message<T> {
80    /// Wrap a typed item as a message.
81    pub fn from_typed(typed: T) -> Self {
82        Message { payload: MessageContents::Owned(typed) }
83    }
84    /// Wrap a shared typed item as a message.
85    pub fn from_arc(typed: Arc<T>) -> Self {
86        Message { payload: MessageContents::Arc(typed) }
87    }
88    /// Destructures and returns any typed data.
89    pub fn if_typed(self) -> Option<T> {
90        match self.payload {
91            MessageContents::Binary(_) => None,
92            MessageContents::Owned(typed) => Some(typed),
93            MessageContents::Arc(_) => None,
94        }
95    }
96    /// Returns a mutable reference, if typed.
97    pub fn if_mut(&mut self) -> Option<&mut T> {
98        match &mut self.payload {
99            MessageContents::Binary(_) => None,
100            MessageContents::Owned(typed) => Some(typed),
101            MessageContents::Arc(_) => None,
102        }
103    }
104    /// Returns an immutable or mutable typed reference.
105    ///
106    /// This method returns a mutable reference if the underlying data are typed Rust
107    /// instances, which admit mutation, and it returns an immutable reference if the
108    /// data are serialized binary data.
109    pub fn as_ref_or_mut(&mut self) -> RefOrMut<T> {
110        match &mut self.payload {
111            MessageContents::Binary(bytes) => { RefOrMut::Ref(bytes) },
112            MessageContents::Owned(typed) => { RefOrMut::Mut(typed) },
113            MessageContents::Arc(typed) => { RefOrMut::Ref(typed) },
114        }
115    }
116}
117
118// These methods require `T` to implement `Abomonation`, for serialization functionality.
119#[cfg(not(feature = "bincode"))]
120impl<T: Data> Message<T> {
121    /// Wrap bytes as a message.
122    ///
123    /// # Safety
124    ///
125    /// This method is unsafe, in that `Abomonated::new()` is unsafe: it presumes that
126    /// the binary data can be safely decoded, which is unsafe for e.g. UTF8 data and
127    /// enumerations (perhaps among many other types).
128    pub unsafe fn from_bytes(bytes: Bytes) -> Self {
129        let abomonated = abomonation::abomonated::Abomonated::new(bytes).expect("Abomonated::new() failed.");
130        Message { payload: MessageContents::Binary(abomonated) }
131    }
132
133    /// The number of bytes required to serialize the data.
134    pub fn length_in_bytes(&self) -> usize {
135        match &self.payload {
136            MessageContents::Binary(bytes) => { bytes.as_bytes().len() },
137            MessageContents::Owned(typed) => { abomonation::measure(typed) },
138            MessageContents::Arc(typed) =>{ abomonation::measure::<T>(&**typed) } ,
139        }
140    }
141
142    /// Writes the binary representation into `writer`.
143    pub fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
144        match &self.payload {
145            MessageContents::Binary(bytes) => {
146                writer.write_all(bytes.as_bytes()).expect("Message::into_bytes(): write_all failed.");
147            },
148            MessageContents::Owned(typed) => {
149                unsafe { abomonation::encode(typed, writer).expect("Message::into_bytes(): Abomonation::encode failed"); }
150            },
151            MessageContents::Arc(typed) => {
152                unsafe { abomonation::encode(&**typed, writer).expect("Message::into_bytes(): Abomonation::encode failed"); }
153            },
154        }
155    }
156}
157
158#[cfg(feature = "bincode")]
159impl<T: Data> Message<T> {
160    /// Wrap bytes as a message.
161    pub fn from_bytes(bytes: Bytes) -> Self {
162        let typed = ::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed");
163        Message { payload: MessageContents::Owned(typed) }
164    }
165
166    /// The number of bytes required to serialize the data.
167    pub fn length_in_bytes(&self) -> usize {
168        match &self.payload {
169            MessageContents::Binary(bytes) => { bytes.as_bytes().len() },
170            MessageContents::Owned(typed) => {
171                ::bincode::serialized_size(&typed).expect("bincode::serialized_size() failed") as usize
172            },
173            MessageContents::Arc(typed) => {
174                ::bincode::serialized_size(&**typed).expect("bincode::serialized_size() failed") as usize
175            },
176        }
177    }
178
179    /// Writes the binary representation into `writer`.
180    pub fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
181        match &self.payload {
182            MessageContents::Binary(bytes) => {
183                writer.write_all(bytes.as_bytes()).expect("Message::into_bytes(): write_all failed.");
184            },
185            MessageContents::Owned(typed) => {
186                ::bincode::serialize_into(writer, &typed).expect("bincode::serialize_into() failed");
187            },
188            MessageContents::Arc(typed) => {
189                ::bincode::serialize_into(writer, &**typed).expect("bincode::serialize_into() failed");
190            },
191        }
192    }
193}
194
195impl<T> ::std::ops::Deref for Message<T> {
196    type Target = T;
197    fn deref(&self) -> &Self::Target {
198        // TODO: In principle we have aready decoded, but let's go again
199        match &self.payload {
200            MessageContents::Binary(bytes) => { bytes },
201            MessageContents::Owned(typed) => { typed },
202            MessageContents::Arc(typed) => { typed },
203        }
204    }
205}
206
207impl<T: Clone> Message<T> {
208    /// Produces a typed instance of the wrapped element.
209    pub fn into_typed(self) -> T {
210        match self.payload {
211            MessageContents::Binary(bytes) => bytes.clone(),
212            MessageContents::Owned(instance) => instance,
213            // TODO: Could attempt `Arc::try_unwrap()` here.
214            MessageContents::Arc(instance) => (*instance).clone(),
215        }
216    }
217    /// Ensures the message is typed data and returns a mutable reference to it.
218    pub fn as_mut(&mut self) -> &mut T {
219
220        let cloned: Option<T> = match &self.payload {
221            MessageContents::Binary(bytes) => Some((*bytes).clone()),
222            MessageContents::Owned(_) => None,
223            // TODO: Could attempt `Arc::try_unwrap()` here.
224            MessageContents::Arc(typed) => Some((**typed).clone()),
225        };
226
227        if let Some(cloned) = cloned {
228            self.payload = MessageContents::Owned(cloned);
229        }
230
231        if let MessageContents::Owned(typed) = &mut self.payload {
232            typed
233        }
234        else {
235            unreachable!()
236        }
237    }
238}