d3_core/foundation/machine.rs
1use super::*;
2use crossbeam::utils::Backoff;
3
4// Machines cooperate in a collective. There are several primary traits
5// that are presented here. Combined, they form the contract between
6// the machines, allowing them to interact with each other.
7//
8// The MachineImpl designates an instruction set, which machines implement.
9// It carries with it, two adapters and an instruction set. The ReceiverAdapter
10// is the primary means of expressing the machine. The SenderAdapter is
11// ancilary, used for parking a sender, which would otherwise block the
12// executor.
13//
14// To aid in construction, there's a derive macro, MachineImpl, which should
15// be used to designate the instruction sets which machines implement.
16// A machine may implement one or more of these.
17//
18// Examples
19//
20// #[derive(Copy, Clone, Debug, Eq, MachineImpl, PartialEq, SmartDefault)]
21// #[allow(dead_code)]
22// pub enum ActivationCommands {
23// #[default]
24// Start,
25// Stop,
26// }
27//
28
29// Each enum, that is an instuction set, derives MachineImpl.
30#[doc(hidden)]
31pub trait MachineImpl: 'static + Send + Sync {
32 type Adapter;
33 type SenderAdapter;
34 type InstructionSet: Send + Sync;
35
36 // Park a sender. If the sender can't be parked, the instruction
37 // is returned.
38 fn park_sender(
39 channel_id: usize, receiver_machine: Weak<self::tls::collective::MachineAdapter>,
40 sender: crossbeam::channel::Sender<Self::InstructionSet>, instruction: Self::InstructionSet,
41 ) -> Result<(), Self::InstructionSet>;
42}
43
44/// The machine is the common trait all machines must implement
45/// and describes how instuctions are delivered to a machine, via
46/// the receive method.
47pub trait Machine<T>: Send + Sync
48where
49 T: 'static + Send + Sync,
50{
51 /// The receive method receives instructions sent to it by itself or other machines.
52 fn receive(&self, cmd: T);
53 /// The disconnected method is called to notify the machine that its receiver has become
54 /// disconnect; it will no longer receive instructions.
55 /// This could be a result of server shutdown, or all senders dropping their senders.
56 fn disconnected(&self) {}
57 /// The connected method is called once, before receive messages. It provides a notification that the
58 /// machine has become connected and may receive instructions. It includes a Uuid for the machine,
59 /// which may be used in logging. A machine implementing several instruction sets will receive a differnt
60 /// Uuid for each instruction set implemented.
61 fn connected(&self, _uuid: Uuid) {}
62}
63
64// Adding the machine implementation to Mutex
65// makes it possible to hide the underlying wrapping.
66impl<T, P> Machine<P> for Mutex<T>
67where
68 T: Machine<P>,
69 P: MachineImpl,
70{
71 // fn receive(&self, cmd: P) { self.lock().unwrap().receive(cmd); }
72 // fn disconnected(&self) { self.lock().unwrap().disconnected(); }
73 // fn connected(&self, uuid: Uuid) { self.lock().unwrap().connected(uuid); }
74
75 // In order to prevent lockup of an executor, a try_lock is attempted, while
76 // it should always obtain the lock, there may be data-races where it can't.
77 // In those rare cases, a warning is logged and backoff is used. Eventually,
78 // it will give up and re-schedule. This same approach is used for the disconnected()
79 // and connected() methods.
80 fn receive(&self, cmd: P) {
81 if let Some(ref mut mutex) = self.try_lock() {
82 (*mutex).receive(cmd);
83 } else {
84 log::warn!("try_lock failed for receive, retrying");
85 let backoff = Backoff::new();
86 loop {
87 if let Some(ref mut mutex) = self.try_lock() {
88 (*mutex).receive(cmd);
89 return;
90 } else if backoff.is_completed() {
91 log::error!("try_lock failed for receive, giving up after multiple retries");
92 return;
93 } else {
94 backoff.snooze();
95 }
96 }
97 }
98 }
99
100 fn disconnected(&self) {
101 if let Some(ref mut mutex) = self.try_lock() {
102 (*mutex).disconnected();
103 } else {
104 log::warn!("try_lock failed for disconnected, retrying");
105 let backoff = Backoff::new();
106 loop {
107 if let Some(ref mut mutex) = self.try_lock() {
108 (*mutex).disconnected();
109 return;
110 } else if backoff.is_completed() {
111 log::error!("try_lock failed for disconnected, giving up after multiple retries");
112 return;
113 } else {
114 backoff.snooze();
115 }
116 }
117 }
118 }
119
120 fn connected(&self, uuid: Uuid) {
121 if let Some(ref mut mutex) = self.try_lock() {
122 (*mutex).connected(uuid);
123 } else {
124 log::warn!("try_lock failed for connected, retrying");
125 let backoff = Backoff::new();
126 loop {
127 if let Some(ref mut mutex) = self.try_lock() {
128 (*mutex).connected(uuid);
129 return;
130 } else if backoff.is_completed() {
131 log::error!("try_lock failed for connected, giving up after multiple retries");
132 return;
133 } else {
134 backoff.snooze();
135 }
136 }
137 }
138 }
139}