async_backplane/
device.rs1use concurrent_queue::PopError;
2use futures_lite::{Future, FutureExt, Stream, StreamExt};
3use std::any::Any;
4use std::cell::RefCell;
5use std::fmt::Debug;
6use std::pin::Pin;
7use std::sync::Arc;
8use std::task::{Context, Poll};
9use crate::*;
10use crate::Watched::{Completed, Messaged};
11use crate::linemap::LineMap;
12use crate::plugboard::Plugboard;
13use crate::panic::dont_panic;
14
15#[derive(Debug)]
17pub struct Device {
18 plugboard: Arc<Plugboard>,
19 inner: RefCell<Inner>,
23}
24
25#[derive(Debug)]
26pub(crate) struct Inner {
27 out: LineMap,
28 done: bool,
29}
30
31impl Inner {
32 fn send(&mut self, message: Message) {
34 let mut last: Option<Message> = None; for (_, maybe) in self.out.drain() {
36 if let Some(line) = maybe {
37 let m = last.take().unwrap_or(message);
38 if let Err(e) = line.send(m) { last = Some(e); }
39 }
40 }
41 }
42}
43
44impl Default for Device {
45 fn default() -> Self { Device::new() }
46}
47
48impl Device {
49
50 pub fn new() -> Self {
52 Device {
53 plugboard: Arc::new(Plugboard::new()),
54 inner: RefCell::new(Inner { out: LineMap::new(), done: false }),
55 }
56 }
57
58 pub fn device_id(&self) -> DeviceID {
60 DeviceID::new(&*self.plugboard as *const _ as usize)
61 }
62
63 pub fn line(&self) -> Line {
65 Line { plugboard: self.plugboard.clone() }
66 }
67
68 pub fn disconnect(self, fault: Option<Fault>) {
70 self.do_disconnect(fault);
71 }
72
73 fn do_disconnect(&self, fault: Option<Fault>) {
74 self.plugboard.close(); let mut inner = self.inner.borrow_mut();
76 while let Ok(op) = self.plugboard.line_ops.pop() {
77 inner.out.apply(op);
78 } inner.send(Disconnected(self.device_id(), fault));
80 }
81
82 pub fn link(&self, other: &Device, mode: LinkMode) {
89 if self.device_id() == other.device_id() {
90 panic!("Do not link to yourself!");
91 }
92 if mode.monitor() {
93 other.inner.borrow_mut().out
94 .attach(Line { plugboard: self.plugboard.clone() });
95 }
96 if mode.notify() {
97 self.inner.borrow_mut().out
98 .attach(Line { plugboard: other.plugboard.clone() });
99 }
100 }
101
102 pub fn unlink(&self, other: &Device, mode: LinkMode) {
109 if self.device_id() == other.device_id() {
110 panic!("Do not link to yourself!");
111 }
112 if mode.monitor() {
113 other.inner.borrow_mut().out.detach(self.device_id());
114 }
115 if mode.notify() {
116 self.inner.borrow_mut().out.detach(other.device_id());
117 }
118 }
119
120 pub fn link_line(&self, other: Line, mode: LinkMode) -> Result<(), LinkError>{
123 if self.device_id() == other.device_id() {
124 panic!("Do not link to yourself!");
125 }
126 if mode.monitor() {
127 other.plugboard.plug(self.line(), LinkError::LinkDown)?;
128 }
129 if mode.notify() {
130 self.inner.borrow_mut().out.attach(other);
131 }
132 Ok(())
133 }
134
135 pub fn unlink_line(&self, other: &Line, mode: LinkMode) {
138 if self.device_id() == other.device_id() {
139 panic!("Do not link to yourself!");
140 }
141 #[allow(unused_must_use)]
142 if mode.monitor() {
143 other.plugboard.unplug(self.device_id(), LinkError::LinkDown);
144 }
145 if mode.notify() {
146 self.inner.borrow_mut().out.detach(other.device_id());
147 }
148 }
149
150 pub fn receive(&self) -> Option<Message> {
152 self.plugboard.messages.try_pop().ok()
153 }
154
155 pub async fn watch<F, C>(&mut self, f: F)
160 -> Result<Watched<<F as Future>::Output>, Crash<C>>
161 where F: Future + Unpin,
162 F::Output: Debug,
163 C: 'static + Any + Debug + Send {
164 let fut = dont_panic(f);
165 async {
166 let message = self.next().await.expect("The Device to still be usable.");
167 Ok(Messaged(message))
168 }.or(async {
169 match fut.await {
170 Ok(val) => Ok(Completed(val)),
171 Err(unwind) => Err(Crash::Panic(unwind)),
172 }
173 }).await
174 }
175
176 pub async fn part_manage<'a, F, T, C>(mut self, mut f: F)
192 -> Result<(Device, T), Crash<C>>
193 where F: Future<Output = Result<T, C>> + Unpin,
194 C: 'static + Debug + Send,
195 T: Debug {
196 loop {
197 match self.watch(&mut f).await {
198 Ok(Completed(Ok(val))) => { return Ok((self, val)); }
199 Ok(Completed(Err(val))) => {
200 self.disconnect(Some(Fault::Error));
201 return Err(Crash::Error(val));
202 }
203 Ok(Messaged(Disconnected(sender, Some(fault)))) => {
204 self.disconnect(Some(Fault::Cascade(sender)));
205 return Err(Crash::Cascade(sender, fault));
206 }
207 Ok(Messaged(Disconnected(sender, None))) => {
208 #[allow(unused_must_use)]
209 if !self.inner.borrow_mut().out.detach(sender) {
210 self.plugboard.unplug(sender, LinkError::LinkDown);
211 }
212 continue;
213 }
214 Ok(Messaged(Shutdown(id))) => {
215 self.disconnect(None);
216 return Err(Crash::PowerOff(id));
217 }
218 Err(crash) => {
219 self.disconnect(Some(Fault::Error));
220 return Err(crash);
221 }
222 }
223 }
224 }
225
226 pub async fn manage<F, C, T>(self, f: F) -> Result<T, Crash<C>>
229 where F: Future<Output=Result<T,C>> + Unpin,
230 C: 'static + Debug + Send,
231 T: Debug {
232 match self.part_manage(f).await {
233 Ok((device, val)) => {
234 device.disconnect(None);
235 Ok(val)
236 }
237 Err(e) => Err(e),
238 }
239 }
240
241}
242
243impl Drop for Device {
244 fn drop(&mut self) {
245 let mut inner = self.inner.borrow_mut();
246 if !inner.done {
247 self.plugboard.close(); while let Ok(op) = self.plugboard.line_ops.pop() { inner.out.apply(op); } inner.send(Disconnected(self.device_id(), Some(Fault::Drop)));
250 }
251 }
252 }
253
254impl Unpin for Device {}
255
256impl Stream for Device {
257 type Item = Message;
258 fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
259 let this = self.get_mut();
260 let mut inner = this.inner.borrow_mut();
261 if !inner.done {
262 match this.plugboard.messages.try_pop() {
263 Ok(val) => Poll::Ready(Some(val)),
264 Err(PopError::Empty) => {
265 this.plugboard.messages.register(ctx.waker());
266 match this.plugboard.messages.try_pop() {
268 Ok(val) => Poll::Ready(Some(val)), Err(PopError::Empty) => Poll::Pending,
270 Err(PopError::Closed) => {
271 inner.done = true;
272 Poll::Ready(None)
273 }
274 }
275 }
276 Err(PopError::Closed) => {
277 inner.done = true;
278 Poll::Ready(None)
279 }
280 }
281 } else {
282 Poll::Ready(None)
283 }
284 }
285}