timely_communication/
message.rs1use std::sync::Arc;
4use bytes::arc::Bytes;
5use abomonation;
6use crate::Data;
7
8pub enum RefOrMut<'a, T> where T: 'a {
10 Ref(&'a T),
12 Mut(&'a mut T),
14}
15
16impl<'a, T: 'a> ::std::ops::Deref for RefOrMut<'a, T> {
17 type Target = T;
18 fn deref(&self) -> &Self::Target {
19 match self {
20 RefOrMut::Ref(reference) => reference,
21 RefOrMut::Mut(reference) => reference,
22 }
23 }
24}
25
26impl<'a, T: 'a> ::std::borrow::Borrow<T> for RefOrMut<'a, T> {
27 fn borrow(&self) -> &T {
28 match self {
29 RefOrMut::Ref(reference) => reference,
30 RefOrMut::Mut(reference) => reference,
31 }
32 }
33}
34
35impl<'a, T: Clone+'a> RefOrMut<'a, T> {
36 pub fn swap<'b>(self, element: &'b mut T) {
40 match self {
41 RefOrMut::Ref(reference) => element.clone_from(reference),
42 RefOrMut::Mut(reference) => ::std::mem::swap(reference, element),
43 };
44 }
45 pub fn replace(self, mut element: T) -> T {
49 self.swap(&mut element);
50 element
51 }
52
53 pub fn take(self) -> T where T: Default {
58 let mut element = Default::default();
59 self.swap(&mut element);
60 element
61 }
62}
63
64pub struct Message<T> {
66 payload: MessageContents<T>,
67}
68
69enum MessageContents<T> {
71 Binary(abomonation::abomonated::Abomonated<T, Bytes>),
73 Owned(T),
75 Arc(Arc<T>),
77}
78
79impl<T> Message<T> {
80 pub fn from_typed(typed: T) -> Self {
82 Message { payload: MessageContents::Owned(typed) }
83 }
84 pub fn from_arc(typed: Arc<T>) -> Self {
86 Message { payload: MessageContents::Arc(typed) }
87 }
88 pub fn if_typed(self) -> Option<T> {
90 match self.payload {
91 MessageContents::Binary(_) => None,
92 MessageContents::Owned(typed) => Some(typed),
93 MessageContents::Arc(_) => None,
94 }
95 }
96 pub fn if_mut(&mut self) -> Option<&mut T> {
98 match &mut self.payload {
99 MessageContents::Binary(_) => None,
100 MessageContents::Owned(typed) => Some(typed),
101 MessageContents::Arc(_) => None,
102 }
103 }
104 pub fn as_ref_or_mut(&mut self) -> RefOrMut<T> {
110 match &mut self.payload {
111 MessageContents::Binary(bytes) => { RefOrMut::Ref(bytes) },
112 MessageContents::Owned(typed) => { RefOrMut::Mut(typed) },
113 MessageContents::Arc(typed) => { RefOrMut::Ref(typed) },
114 }
115 }
116}
117
118#[cfg(not(feature = "bincode"))]
120impl<T: Data> Message<T> {
121 pub unsafe fn from_bytes(bytes: Bytes) -> Self {
129 let abomonated = abomonation::abomonated::Abomonated::new(bytes).expect("Abomonated::new() failed.");
130 Message { payload: MessageContents::Binary(abomonated) }
131 }
132
133 pub fn length_in_bytes(&self) -> usize {
135 match &self.payload {
136 MessageContents::Binary(bytes) => { bytes.as_bytes().len() },
137 MessageContents::Owned(typed) => { abomonation::measure(typed) },
138 MessageContents::Arc(typed) =>{ abomonation::measure::<T>(&**typed) } ,
139 }
140 }
141
142 pub fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
144 match &self.payload {
145 MessageContents::Binary(bytes) => {
146 writer.write_all(bytes.as_bytes()).expect("Message::into_bytes(): write_all failed.");
147 },
148 MessageContents::Owned(typed) => {
149 unsafe { abomonation::encode(typed, writer).expect("Message::into_bytes(): Abomonation::encode failed"); }
150 },
151 MessageContents::Arc(typed) => {
152 unsafe { abomonation::encode(&**typed, writer).expect("Message::into_bytes(): Abomonation::encode failed"); }
153 },
154 }
155 }
156}
157
158#[cfg(feature = "bincode")]
159impl<T: Data> Message<T> {
160 pub fn from_bytes(bytes: Bytes) -> Self {
162 let typed = ::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed");
163 Message { payload: MessageContents::Owned(typed) }
164 }
165
166 pub fn length_in_bytes(&self) -> usize {
168 match &self.payload {
169 MessageContents::Binary(bytes) => { bytes.as_bytes().len() },
170 MessageContents::Owned(typed) => {
171 ::bincode::serialized_size(&typed).expect("bincode::serialized_size() failed") as usize
172 },
173 MessageContents::Arc(typed) => {
174 ::bincode::serialized_size(&**typed).expect("bincode::serialized_size() failed") as usize
175 },
176 }
177 }
178
179 pub fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
181 match &self.payload {
182 MessageContents::Binary(bytes) => {
183 writer.write_all(bytes.as_bytes()).expect("Message::into_bytes(): write_all failed.");
184 },
185 MessageContents::Owned(typed) => {
186 ::bincode::serialize_into(writer, &typed).expect("bincode::serialize_into() failed");
187 },
188 MessageContents::Arc(typed) => {
189 ::bincode::serialize_into(writer, &**typed).expect("bincode::serialize_into() failed");
190 },
191 }
192 }
193}
194
195impl<T> ::std::ops::Deref for Message<T> {
196 type Target = T;
197 fn deref(&self) -> &Self::Target {
198 match &self.payload {
200 MessageContents::Binary(bytes) => { bytes },
201 MessageContents::Owned(typed) => { typed },
202 MessageContents::Arc(typed) => { typed },
203 }
204 }
205}
206
207impl<T: Clone> Message<T> {
208 pub fn into_typed(self) -> T {
210 match self.payload {
211 MessageContents::Binary(bytes) => bytes.clone(),
212 MessageContents::Owned(instance) => instance,
213 MessageContents::Arc(instance) => (*instance).clone(),
215 }
216 }
217 pub fn as_mut(&mut self) -> &mut T {
219
220 let cloned: Option<T> = match &self.payload {
221 MessageContents::Binary(bytes) => Some((*bytes).clone()),
222 MessageContents::Owned(_) => None,
223 MessageContents::Arc(typed) => Some((**typed).clone()),
225 };
226
227 if let Some(cloned) = cloned {
228 self.payload = MessageContents::Owned(cloned);
229 }
230
231 if let MessageContents::Owned(typed) = &mut self.payload {
232 typed
233 }
234 else {
235 unreachable!()
236 }
237 }
238}