d3_core/foundation/
machine.rs

1use super::*;
2use crossbeam::utils::Backoff;
3
4// Machines cooperate in a collective. There are several primary traits
5// that are presented here. Combined, they form the contract between
6// the machines, allowing them to interact with each other.
7//
8// The MachineImpl designates an instruction set, which machines implement.
9// It carries with it, two adapters and an instruction set. The ReceiverAdapter
10// is the primary means of expressing the machine. The SenderAdapter is
11// ancilary, used for parking a sender, which would otherwise block the
12// executor.
13//
14// To aid in construction, there's a derive macro, MachineImpl, which should
15// be used to designate the instruction sets which machines implement.
16// A machine may implement one or more of these.
17//
18// Examples
19//
20//     #[derive(Copy, Clone, Debug, Eq, MachineImpl, PartialEq, SmartDefault)]
21//     #[allow(dead_code)]
22//     pub enum ActivationCommands {
23//         #[default]
24//         Start,
25//         Stop,
26//     }
27//
28
29// Each enum, that is an instuction set, derives MachineImpl.
30#[doc(hidden)]
31pub trait MachineImpl: 'static + Send + Sync {
32    type Adapter;
33    type SenderAdapter;
34    type InstructionSet: Send + Sync;
35
36    // Park a sender. If the sender can't be parked, the instruction
37    // is returned.
38    fn park_sender(
39        channel_id: usize, receiver_machine: Weak<self::tls::collective::MachineAdapter>,
40        sender: crossbeam::channel::Sender<Self::InstructionSet>, instruction: Self::InstructionSet,
41    ) -> Result<(), Self::InstructionSet>;
42}
43
44/// The machine is the common trait all machines must implement
45/// and describes how instuctions are delivered to a machine, via
46/// the receive method.
47pub trait Machine<T>: Send + Sync
48where
49    T: 'static + Send + Sync,
50{
51    /// The receive method receives instructions sent to it by itself or other machines.
52    fn receive(&self, cmd: T);
53    /// The disconnected method is called to notify the machine that its receiver has become
54    /// disconnect; it will no longer receive instructions.
55    /// This could be a result of server shutdown, or all senders dropping their senders.
56    fn disconnected(&self) {}
57    /// The connected method is called once, before receive messages. It provides a notification that the
58    /// machine has become connected and may receive instructions. It includes a Uuid for the machine,
59    /// which may be used in logging. A machine implementing several instruction sets will receive a differnt
60    /// Uuid for each instruction set implemented.
61    fn connected(&self, _uuid: Uuid) {}
62}
63
64// Adding the machine implementation to Mutex
65// makes it possible to hide the underlying wrapping.
66impl<T, P> Machine<P> for Mutex<T>
67where
68    T: Machine<P>,
69    P: MachineImpl,
70{
71    // fn receive(&self, cmd: P) { self.lock().unwrap().receive(cmd); }
72    // fn disconnected(&self) { self.lock().unwrap().disconnected(); }
73    // fn connected(&self, uuid: Uuid) { self.lock().unwrap().connected(uuid); }
74
75    // In order to prevent lockup of an executor, a try_lock is attempted, while
76    // it should always obtain the lock, there may be data-races where it can't.
77    // In those rare cases, a warning is logged and backoff is used. Eventually,
78    // it will give up and re-schedule. This same approach is used for the disconnected()
79    // and connected() methods.
80    fn receive(&self, cmd: P) {
81        if let Some(ref mut mutex) = self.try_lock() {
82            (*mutex).receive(cmd);
83        } else {
84            log::warn!("try_lock failed for receive, retrying");
85            let backoff = Backoff::new();
86            loop {
87                if let Some(ref mut mutex) = self.try_lock() {
88                    (*mutex).receive(cmd);
89                    return;
90                } else if backoff.is_completed() {
91                    log::error!("try_lock failed for receive, giving up after multiple retries");
92                    return;
93                } else {
94                    backoff.snooze();
95                }
96            }
97        }
98    }
99
100    fn disconnected(&self) {
101        if let Some(ref mut mutex) = self.try_lock() {
102            (*mutex).disconnected();
103        } else {
104            log::warn!("try_lock failed for disconnected, retrying");
105            let backoff = Backoff::new();
106            loop {
107                if let Some(ref mut mutex) = self.try_lock() {
108                    (*mutex).disconnected();
109                    return;
110                } else if backoff.is_completed() {
111                    log::error!("try_lock failed for disconnected, giving up after multiple retries");
112                    return;
113                } else {
114                    backoff.snooze();
115                }
116            }
117        }
118    }
119
120    fn connected(&self, uuid: Uuid) {
121        if let Some(ref mut mutex) = self.try_lock() {
122            (*mutex).connected(uuid);
123        } else {
124            log::warn!("try_lock failed for connected, retrying");
125            let backoff = Backoff::new();
126            loop {
127                if let Some(ref mut mutex) = self.try_lock() {
128                    (*mutex).connected(uuid);
129                    return;
130                } else if backoff.is_completed() {
131                    log::error!("try_lock failed for connected, giving up after multiple retries");
132                    return;
133                } else {
134                    backoff.snooze();
135                }
136            }
137        }
138    }
139}