async_backplane/
device.rs

1use concurrent_queue::PopError;
2use futures_lite::{Future, FutureExt, Stream, StreamExt};
3use std::any::Any;
4use std::cell::RefCell;
5use std::fmt::Debug;
6use std::pin::Pin;
7use std::sync::Arc;
8use std::task::{Context, Poll};
9use crate::*;
10use crate::Watched::{Completed, Messaged};
11use crate::linemap::LineMap;
12use crate::plugboard::Plugboard;
13use crate::panic::dont_panic;
14
15/// A Device connects a Future to the backplane.
16#[derive(Debug)]
17pub struct Device {
18    plugboard: Arc<Plugboard>,
19    // This is here so we don't have to mark everything
20    // mut. Accordingly, we also can't let the user have direct
21    // access, in case they e.g. hold it across an await boundary.
22    inner: RefCell<Inner>,
23}
24
25#[derive(Debug)]
26pub(crate) struct Inner {
27    out: LineMap,
28    done: bool,
29}
30
31impl Inner {
32    // Actually send all the messages.
33    fn send(&mut self, message: Message) {
34        let mut last: Option<Message> = None; // avoid copying
35        for (_, maybe) in self.out.drain() {
36            if let Some(line) = maybe {
37                let m = last.take().unwrap_or(message);
38                if let Err(e) = line.send(m) { last = Some(e); }
39            }
40        }
41    }
42}
43
44impl Default for Device {
45    fn default() -> Self { Device::new() }
46}
47
48impl Device {
49
50    /// Creates a new Device.
51    pub fn new() -> Self {
52        Device {
53            plugboard: Arc::new(Plugboard::new()),
54            inner: RefCell::new(Inner { out: LineMap::new(), done: false }),
55        }
56    }
57
58    /// Get the ID of this Device.
59    pub fn device_id(&self) -> DeviceID {
60        DeviceID::new(&*self.plugboard as *const _ as usize)
61    }
62
63    /// Opens a line to the Device.
64    pub fn line(&self) -> Line {
65        Line { plugboard: self.plugboard.clone() }
66    }
67
68    /// Notify our peers we're disconnecting.
69    pub fn disconnect(self, fault: Option<Fault>) {
70        self.do_disconnect(fault);
71    }
72
73    fn do_disconnect(&self, fault: Option<Fault>) {
74        self.plugboard.close(); // no more requests
75        let mut inner = self.inner.borrow_mut();
76        while let Ok(op) = self.plugboard.line_ops.pop() {
77            inner.out.apply(op);
78        } // sync
79        inner.send(Disconnected(self.device_id(), fault));
80    }
81
82    /// Link with another Device with the provided LinkMode. LinkModes
83    /// are additive, so you can 'upgrade' a link this way.
84    ///
85    /// This method is intended for static-style linking, where the
86    /// topology is not expected to change. You should not link to a
87    /// Device this way after linking to it through a Line.
88    pub fn link(&self, other: &Device, mode: LinkMode) {
89        if self.device_id() == other.device_id() {
90            panic!("Do not link to yourself!");
91        }
92        if mode.monitor() {
93            other.inner.borrow_mut().out
94                .attach(Line { plugboard: self.plugboard.clone() });
95        }
96        if mode.notify() {
97            self.inner.borrow_mut().out
98                .attach(Line { plugboard: other.plugboard.clone() });
99        }
100    }
101
102    /// Unlink from another Device with the provided LinkMode. LinkModes
103    /// are subtractive, so you can 'downgrade' a link this way.
104    ///
105    /// This method is intended for static-style linking, where the
106    /// topology is not expected to change. You should not link to a
107    /// Device this way after linking to it through a Line.
108    pub fn unlink(&self, other: &Device, mode: LinkMode) {
109        if self.device_id() == other.device_id() {
110            panic!("Do not link to yourself!");
111        }
112        if mode.monitor() {
113            other.inner.borrow_mut().out.detach(self.device_id());
114        }
115        if mode.notify() {
116            self.inner.borrow_mut().out.detach(other.device_id());
117        }
118    }
119   
120    /// Link with a line. This is safer than linking directly to a
121    /// Device, but a little slower.
122    pub fn link_line(&self, other: Line, mode: LinkMode) -> Result<(), LinkError>{
123        if self.device_id() == other.device_id() {
124            panic!("Do not link to yourself!");
125        }
126        if mode.monitor() {
127            other.plugboard.plug(self.line(), LinkError::LinkDown)?;
128        }
129        if mode.notify() {
130            self.inner.borrow_mut().out.attach(other);
131        }
132        Ok(())
133    }
134
135    /// Unlink with a line. This is safer than linking directly to a
136    /// Device, but a little slower.
137    pub fn unlink_line(&self, other: &Line, mode: LinkMode) {
138        if self.device_id() == other.device_id() {
139            panic!("Do not link to yourself!");
140        }
141        #[allow(unused_must_use)]
142        if mode.monitor() {
143            other.plugboard.unplug(self.device_id(), LinkError::LinkDown);
144        }
145        if mode.notify() {
146            self.inner.borrow_mut().out.detach(other.device_id());
147        }
148    }
149
150    /// Attempts to get the next message. Does not wait for one to arrive.
151    pub fn receive(&self) -> Option<Message> {
152        self.plugboard.messages.try_pop().ok()
153    }
154
155    /// Returns the first of (with a bias towards the former):
156    /// * The next message to be received.
157    /// * The result of the completed future.
158    /// * The crash of the Device.
159    pub async fn watch<F, C>(&mut self, f: F)
160                             -> Result<Watched<<F as Future>::Output>, Crash<C>>
161    where F: Future + Unpin,
162          F::Output: Debug,
163          C: 'static + Any + Debug + Send {
164        let fut = dont_panic(f);
165        async {
166            let message = self.next().await.expect("The Device to still be usable.");
167            Ok(Messaged(message))
168        }.or(async {
169            match fut.await {
170                Ok(val) => Ok(Completed(val)),
171                Err(unwind) => Err(Crash::Panic(unwind)),
172            }
173        }).await
174    }
175
176    /// Runs an async closure while monitoring for messages. Messages
177    /// are handled as follows:
178    ///
179    /// * Disconnects without fault are ignored.
180    /// * Disconnects with fault cause the Device to fault.
181    /// * Requests to disconnect cause the Device to crash but
182    /// announce a successful completion.
183    ///
184    /// If the provided closure returns successfully, the result is
185    /// returned along with the Device for re-use. Monitors will *not*
186    /// be notified.
187    ///
188    /// If the Device faults, either because the provided closure
189    /// returned an Err variant or because a fault was propagated,
190    /// announces our fault to our monitors.
191    pub async fn part_manage<'a, F, T, C>(mut self, mut f: F)
192                                          -> Result<(Device, T), Crash<C>>
193    where F: Future<Output = Result<T, C>> + Unpin,
194          C: 'static + Debug + Send,
195          T: Debug {
196        loop {
197            match self.watch(&mut f).await {
198                Ok(Completed(Ok(val))) => { return Ok((self, val)); }
199                Ok(Completed(Err(val))) => {
200                    self.disconnect(Some(Fault::Error));
201                    return Err(Crash::Error(val));
202                }
203                Ok(Messaged(Disconnected(sender, Some(fault)))) => {
204                    self.disconnect(Some(Fault::Cascade(sender)));
205                    return Err(Crash::Cascade(sender, fault));
206                }
207                Ok(Messaged(Disconnected(sender, None))) => {
208                    #[allow(unused_must_use)]
209                    if !self.inner.borrow_mut().out.detach(sender) {
210                        self.plugboard.unplug(sender, LinkError::LinkDown);
211                    }
212                    continue;
213                }
214                Ok(Messaged(Shutdown(id))) => {
215                    self.disconnect(None);
216                    return Err(Crash::PowerOff(id));
217                }
218                Err(crash) => {
219                    self.disconnect(Some(Fault::Error));
220                    return Err(crash);
221                }
222            }
223        }
224    }
225
226    /// Like `part_manage()`, but in the case of successful completion
227    /// of the provided future, notifies our monitors and consumes self
228    pub async fn manage<F, C, T>(self, f: F) -> Result<T, Crash<C>>
229    where F: Future<Output=Result<T,C>> + Unpin,
230          C: 'static + Debug + Send,
231          T: Debug {
232        match self.part_manage(f).await {
233            Ok((device, val)) => {
234                device.disconnect(None);
235                Ok(val)
236            }
237            Err(e) => Err(e),
238        }
239    }
240
241}
242
243impl Drop for Device {
244    fn drop(&mut self) {
245        let mut inner = self.inner.borrow_mut();
246        if !inner.done {
247            self.plugboard.close(); // no more requests
248            while let Ok(op) = self.plugboard.line_ops.pop() { inner.out.apply(op); } // sync
249            inner.send(Disconnected(self.device_id(), Some(Fault::Drop)));
250        }
251    }
252 }
253
254impl Unpin for Device {}
255
256impl Stream for Device {
257    type Item = Message;
258    fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
259        let this = self.get_mut();
260        let mut inner = this.inner.borrow_mut();
261        if !inner.done {
262            match this.plugboard.messages.try_pop() {
263                Ok(val) => Poll::Ready(Some(val)),
264                Err(PopError::Empty) => {
265                    this.plugboard.messages.register(ctx.waker());
266                    // Make sure we don't lose out in a race
267                    match this.plugboard.messages.try_pop() {
268                        Ok(val) => Poll::Ready(Some(val)), // Sorry for leaving a waker
269                        Err(PopError::Empty) => Poll::Pending,
270                        Err(PopError::Closed) => {
271                            inner.done = true;
272                            Poll::Ready(None)
273                        }
274                    }
275                }
276                Err(PopError::Closed) => {
277                    inner.done = true;
278                    Poll::Ready(None)
279                }
280            }
281        } else {
282            Poll::Ready(None)
283        }
284    }
285}