d3_derive/
lib.rs

1extern crate proc_macro;
2use proc_macro::TokenStream;
3use quote::{format_ident, quote};
4use syn::parse_macro_input;
5use syn::DeriveInput;
6
7/// MachineImpl is a derive macro that tranforms an enum into an instruction set that can be implemented
8/// by machines.
9///
10/// # Example
11///
12/// ```text
13/// #[macro_use] extern crate d3_derive;
14/// use d3_core::MachineImpl::*;
15///
16/// // instructions can be unit-like
17/// #[derive(Debug, MachineImpl)]
18/// pub enum StateTable {
19///     Init,
20///     Start,
21///     Stop,
22/// }
23///
24/// // instructions can also be tupple, and struct
25/// #[derive(Debug, MachineImpl)]
26/// pub enum TrafficLight {
27///     Red(TrafficLightModality),
28///     Green(TrafficLightModality),
29///     Yellow(TrafficLightModality),
30/// }
31///
32/// #[derive(Debug)]
33/// pub enum TrafficLightModality {
34///     Solid,
35///     Blinking,
36/// }
37///
38/// Instructions can be mixed
39/// #[derive(Debug, MachineImpl)]
40/// pub enum Calc {
41///     Clear,
42///     Add(u32),
43///     Sub(u32),
44///     Div(u32),
45///     Mul(u32),
46/// }
47/// ```
48#[proc_macro_derive(MachineImpl)]
49pub fn derive_machine_impl_fn(input: TokenStream) -> TokenStream {
50    let input = parse_macro_input!(input as DeriveInput);
51    let name = &input.ident;
52    let adapter_ident = format_ident!("MachineBuilder{}", name.to_string());
53    let sender_adapter_ident = format_ident!("SenderAdapter{}", name.to_string());
54    // let recv_wait_ident = format_ident!("RecvWait{}", name.to_string());
55    let expanded = quote! {
56        impl MachineImpl for #name {
57            type Adapter = #adapter_ident;
58            type SenderAdapter = #sender_adapter_ident;
59            type InstructionSet = #name;
60            fn park_sender(
61                channel_id: usize,
62                receiver_machine: std::sync::Weak<MachineAdapter>,
63                sender: crossbeam::channel::Sender<Self::InstructionSet>,
64                instruction: Self::InstructionSet) -> Result<(),Self::InstructionSet> {
65                //Err(instruction)
66                tls_executor_data.with(|t|{
67                    let mut tls = t.borrow_mut();
68                    // if its the main thread, let it block.
69                    if tls.id == 0 { Err(instruction) }
70                    else {
71                        if let ExecutorDataField::Machine(machine) = &tls.machine {
72                           let adapter = #sender_adapter_ident {
73                                receiver_machine,
74                                sender: sender,
75                                instruction: Some(instruction),
76                            };
77                            let shared_adapter = MachineSenderAdapter::new(machine, Box::new(adapter));
78                            tls.sender_blocked(channel_id, shared_adapter);
79                        }
80                        Ok(())
81                    }
82                })
83            }
84        }
85
86        // This is the instruction set dependent machine, we've forgone generic <T>
87        // as it becomes unwieldy when it comes to scheduling and execution. For the
88        // most part it is immutable, with the only exception being the instruction,
89        // which unfortunately has to travel between threads. We don't want to recreate
90        // this for each instruction sent, so we're going to wrap the instruction
91        // with Arc<Mutex> to allow inner access, kinda sucks cuz the lifecycle is
92        // such that there is only one owner at a time. Let's see if Arc<> is good enough
93        #[doc(hidden)]
94        pub struct #adapter_ident {
95            machine: std::sync::Arc<dyn Machine<#name>>,
96            receiver: Receiver<#name>,
97        }
98        impl std::fmt::Debug for #adapter_ident {
99            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100                write!(f, "#adapter_ident {{ .. }}")
101            }
102        }
103        // This is the generic adapter implementation for the adapter, much of this is
104        // already generic, so maybe there's an alternative where the dyn stuff can
105        // be used less often.
106        //
107        impl MachineDependentAdapter for #adapter_ident {
108            fn receive_cmd(&self, machine: &ShareableMachine, once: bool, drop: bool, time_slice: std::time::Duration, stats: &mut ExecutorStats) {
109                stats.tasks_executed += 1;
110                if machine.is_disconnected() {
111                    if let Err(state) = machine.compare_and_exchange_state(MachineState::Ready, MachineState::Running) {
112                        log::error!("exec: disconnected: expected state = Ready, machine {} found {:#?}", machine.get_key(), state);
113                        machine.set_state(MachineState::Running);
114                    }
115                    if once {
116                        self.machine.connected(machine.get_id());
117                        stats.instructs_sent += 1;
118                    }
119                    self.machine.disconnected();
120                    stats.instructs_sent += 1;
121                    if let Err(state) = machine.compare_and_exchange_state(MachineState::Running, MachineState::Dead) {
122                        log::error!("exec: disconnected: expected state = Running, machine {} found {:#?}", machine.get_key(), state);
123                        machine.set_state(MachineState::Dead);
124                    }
125                    return
126                }
127                if let Err(state) = machine.compare_and_exchange_state(MachineState::Ready, MachineState::Running) {
128                    log::error!("exec: expected state = Ready, machine {} found {:#?}", machine.get_key(), state);
129                    machine.set_state(MachineState::Running);
130                }
131                // while we're running, might as well try to drain the queue, but keep it bounded
132                let start = std::time::Instant::now();
133                if once {
134                    self.machine.connected(machine.get_id());
135                    stats.instructs_sent += 1;
136                }
137                //let mut cmd = self.take_instruction().unwrap();
138                let mut count = 0;
139                //log::trace!("enter chan {}, machine {} q_len {}", self.receiver.get_id(), machine.get_key(), self.receiver.receiver.len());
140                loop {
141                    if start.elapsed() > time_slice {
142                        stats.exhausted_slice += 1;
143                        break;
144                    }
145                    let state = machine.get_state();
146                    if state != MachineState::Running {
147                        log::debug!("exec: no longer running, machine {} state {:#?}", machine.get_key(), state);
148                        break;
149                    }
150                    match self.receiver.receiver.try_recv() {
151                        Ok(cmd) => {
152                            self.machine.receive(cmd);
153                            stats.instructs_sent += 1;
154                            count += 1;
155                        },
156                        Err(crossbeam::channel::TryRecvError::Empty) => {
157                            if drop || machine.is_disconnected() {
158                                // treat as disconnected
159                                log::trace!("exec: machine {} disconnected, cleaning up", machine.get_key());
160                                self.machine.disconnected();
161                                stats.instructs_sent += 1;
162                                if let Err(state) = machine.compare_and_exchange_state(MachineState::Running, MachineState::Dead) {
163                                    log::error!("exec: (drop) expected state = Running, machine {} found {:#?}", machine.get_key(), state);
164                                    machine.set_state(MachineState::Dead);
165                                }
166                            }
167                            break;
168                        },
169                        Err(crossbeam::channel::TryRecvError::Disconnected) => {
170                            log::trace!("exec: machine {} disconnected, cleaning up", machine.get_key());
171                            self.machine.disconnected();
172                            stats.instructs_sent += 1;
173                            if let Err(state) = machine.compare_and_exchange_state(MachineState::Running, MachineState::Dead) {
174                                log::error!("exec: (disconnected) expected state = Running, machine {} found {:#?}", machine.get_key(), state);
175                                machine.set_state(MachineState::Dead);
176                            }
177                            break;
178                        },
179                    }
180                }
181                //log::trace!("exit chan {}, machine {} q_len {}, count {}", self.receiver.get_id(), machine.get_key(), self.receiver.receiver.len(), count);
182            }
183            // determine if channel is empty
184            fn is_channel_empty(&self) -> bool {
185                self.receiver.receiver.is_empty()
186            }
187            // get number of instructions in queue
188            fn channel_len(&self) -> usize {
189                self.receiver.receiver.len()
190            }
191        }
192        #[doc(hidden)]
193        pub struct #sender_adapter_ident {
194            receiver_machine: std::sync::Weak<MachineAdapter>,
195            sender: crossbeam::channel::Sender<#name>,
196            instruction: Option<#name>,
197        }
198        impl #sender_adapter_ident {
199            fn try_send(&mut self) -> Result<usize, TrySendError> {
200                let instruction = self.instruction.take().unwrap();
201                match self.sender.try_send(instruction) {
202                    Ok(()) => {
203                        if let Some(machine) = self.receiver_machine.upgrade() {
204                            Ok(machine.get_key())
205                        } else {
206                            Err(TrySendError::Disconnected)
207                        }
208                    },
209                    Err(crossbeam::channel::TrySendError::Disconnected(inst)) => {
210                        self.instruction = Some(inst);
211                        Err(TrySendError::Disconnected)
212                    },
213                    Err(crossbeam::channel::TrySendError::Full(inst)) => {
214                        self.instruction = Some(inst);
215                        Err(TrySendError::Full)
216                    },
217                }
218            }
219        }
220
221        impl MachineDependentSenderAdapter for #sender_adapter_ident {
222            fn try_send(&mut self) -> Result<usize, TrySendError> {
223                match self.try_send() {
224                    Ok(receiver_key) => {
225                        Ok(receiver_key)
226                    },
227                    Err(e) => Err(e),
228                }
229            }
230        }
231
232        impl MachineBuilder for #adapter_ident {
233            type InstructionSet = #name;
234            /// Consume a raw machine, using it to create a machine that is usable by
235            /// the framework.
236            fn build_raw<T>(raw: T, channel_capacity: usize) -> (std::sync::Arc<parking_lot::Mutex<T>>, Sender<Self::InstructionSet>, MachineAdapter)
237            where T: 'static + Machine<Self::InstructionSet>
238            {
239                // need to review allocation strategy for bounded
240                let (sender, receiver) = channel_with_capacity::<Self::InstructionSet>(channel_capacity);
241                Self::build_common(raw, sender, receiver)
242            }
243
244            fn build_addition<T>(machine: &std::sync::Arc<parking_lot::Mutex<T>>, channel_capacity: usize) -> (Sender<Self::InstructionSet>, MachineAdapter)
245            where T: 'static + Machine<Self::InstructionSet>
246            {
247                // need to review allocation strategy for bounded
248                let (sender, receiver) = channel_with_capacity::<Self::InstructionSet>(channel_capacity);
249                Self::build_addition_common(machine, sender, receiver)
250            }
251
252            fn build_unbounded<T>(raw: T) -> (std::sync::Arc<parking_lot::Mutex<T>>, Sender<Self::InstructionSet>, MachineAdapter)
253            where T: 'static + Machine<Self::InstructionSet>
254            {
255                // need to review allocation strategy for bounded
256                let (sender, receiver) = channel::<Self::InstructionSet>();
257                Self::build_common(raw, sender, receiver)
258            }
259
260            fn build_addition_unbounded<T>(machine: &std::sync::Arc<parking_lot::Mutex<T>>) -> (Sender<Self::InstructionSet>, MachineAdapter)
261            where T: 'static + Machine<Self::InstructionSet>
262            {
263                // need to review allocation strategy for bounded
264                let (sender, receiver) = channel::<Self::InstructionSet>();
265                Self::build_addition_common(machine, sender, receiver)
266            }
267
268            fn build_common<T>(raw: T, sender: Sender<Self::InstructionSet>, receiver: Receiver<Self::InstructionSet>) -> (std::sync::Arc<parking_lot::Mutex<T>>, Sender<Self::InstructionSet>, MachineAdapter )
269                where T: 'static + Machine<Self::InstructionSet>
270            {
271                 // wrap it
272                 let instance: std::sync::Arc<parking_lot::Mutex<T>> = std::sync::Arc::new(parking_lot::Mutex::new(raw));
273                 // clone it, making it look like a machine, Machine for Mutex<T> facilitates this
274                 let machine = std::sync::Arc::clone(&instance) as std::sync::Arc<dyn Machine<Self::InstructionSet>>;
275                 // wrap the machine dependent bits
276                 let adapter = Self {
277                     machine, receiver,
278                 };
279                 // wrap the independent and normalize the dependent with a trait object
280                 let machine_adapter = MachineAdapter::new(Box::new(adapter));
281                 (instance, sender, machine_adapter)
282            }
283
284            fn build_addition_common<T>(machine: &std::sync::Arc<parking_lot::Mutex<T>>, sender: Sender<Self::InstructionSet>, receiver: Receiver<Self::InstructionSet>) -> (Sender<Self::InstructionSet>, MachineAdapter )
285                where T: 'static + Machine<Self::InstructionSet>
286            {
287                 // clone it, making it look like a machine, Machine for Mutex<T> facilitates this
288                 let machine = std::sync::Arc::clone(machine) as std::sync::Arc<dyn Machine<Self::InstructionSet>>;
289                 // wrap the machine dependent bits
290                 let adapter = Self {
291                     machine, receiver,
292                 };
293                 // wrap the independent and normalize the dependent with a trait object
294                 let machine_adapter = MachineAdapter::new(Box::new(adapter));
295                 (sender, machine_adapter)
296            }
297        }
298    };
299    TokenStream::from(expanded)
300}