1extern crate proc_macro;
2use proc_macro::TokenStream;
3use quote::{format_ident, quote};
4use syn::parse_macro_input;
5use syn::DeriveInput;
6
7#[proc_macro_derive(MachineImpl)]
49pub fn derive_machine_impl_fn(input: TokenStream) -> TokenStream {
50 let input = parse_macro_input!(input as DeriveInput);
51 let name = &input.ident;
52 let adapter_ident = format_ident!("MachineBuilder{}", name.to_string());
53 let sender_adapter_ident = format_ident!("SenderAdapter{}", name.to_string());
54 let expanded = quote! {
56 impl MachineImpl for #name {
57 type Adapter = #adapter_ident;
58 type SenderAdapter = #sender_adapter_ident;
59 type InstructionSet = #name;
60 fn park_sender(
61 channel_id: usize,
62 receiver_machine: std::sync::Weak<MachineAdapter>,
63 sender: crossbeam::channel::Sender<Self::InstructionSet>,
64 instruction: Self::InstructionSet) -> Result<(),Self::InstructionSet> {
65 tls_executor_data.with(|t|{
67 let mut tls = t.borrow_mut();
68 if tls.id == 0 { Err(instruction) }
70 else {
71 if let ExecutorDataField::Machine(machine) = &tls.machine {
72 let adapter = #sender_adapter_ident {
73 receiver_machine,
74 sender: sender,
75 instruction: Some(instruction),
76 };
77 let shared_adapter = MachineSenderAdapter::new(machine, Box::new(adapter));
78 tls.sender_blocked(channel_id, shared_adapter);
79 }
80 Ok(())
81 }
82 })
83 }
84 }
85
86 #[doc(hidden)]
94 pub struct #adapter_ident {
95 machine: std::sync::Arc<dyn Machine<#name>>,
96 receiver: Receiver<#name>,
97 }
98 impl std::fmt::Debug for #adapter_ident {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 write!(f, "#adapter_ident {{ .. }}")
101 }
102 }
103 impl MachineDependentAdapter for #adapter_ident {
108 fn receive_cmd(&self, machine: &ShareableMachine, once: bool, drop: bool, time_slice: std::time::Duration, stats: &mut ExecutorStats) {
109 stats.tasks_executed += 1;
110 if machine.is_disconnected() {
111 if let Err(state) = machine.compare_and_exchange_state(MachineState::Ready, MachineState::Running) {
112 log::error!("exec: disconnected: expected state = Ready, machine {} found {:#?}", machine.get_key(), state);
113 machine.set_state(MachineState::Running);
114 }
115 if once {
116 self.machine.connected(machine.get_id());
117 stats.instructs_sent += 1;
118 }
119 self.machine.disconnected();
120 stats.instructs_sent += 1;
121 if let Err(state) = machine.compare_and_exchange_state(MachineState::Running, MachineState::Dead) {
122 log::error!("exec: disconnected: expected state = Running, machine {} found {:#?}", machine.get_key(), state);
123 machine.set_state(MachineState::Dead);
124 }
125 return
126 }
127 if let Err(state) = machine.compare_and_exchange_state(MachineState::Ready, MachineState::Running) {
128 log::error!("exec: expected state = Ready, machine {} found {:#?}", machine.get_key(), state);
129 machine.set_state(MachineState::Running);
130 }
131 let start = std::time::Instant::now();
133 if once {
134 self.machine.connected(machine.get_id());
135 stats.instructs_sent += 1;
136 }
137 let mut count = 0;
139 loop {
141 if start.elapsed() > time_slice {
142 stats.exhausted_slice += 1;
143 break;
144 }
145 let state = machine.get_state();
146 if state != MachineState::Running {
147 log::debug!("exec: no longer running, machine {} state {:#?}", machine.get_key(), state);
148 break;
149 }
150 match self.receiver.receiver.try_recv() {
151 Ok(cmd) => {
152 self.machine.receive(cmd);
153 stats.instructs_sent += 1;
154 count += 1;
155 },
156 Err(crossbeam::channel::TryRecvError::Empty) => {
157 if drop || machine.is_disconnected() {
158 log::trace!("exec: machine {} disconnected, cleaning up", machine.get_key());
160 self.machine.disconnected();
161 stats.instructs_sent += 1;
162 if let Err(state) = machine.compare_and_exchange_state(MachineState::Running, MachineState::Dead) {
163 log::error!("exec: (drop) expected state = Running, machine {} found {:#?}", machine.get_key(), state);
164 machine.set_state(MachineState::Dead);
165 }
166 }
167 break;
168 },
169 Err(crossbeam::channel::TryRecvError::Disconnected) => {
170 log::trace!("exec: machine {} disconnected, cleaning up", machine.get_key());
171 self.machine.disconnected();
172 stats.instructs_sent += 1;
173 if let Err(state) = machine.compare_and_exchange_state(MachineState::Running, MachineState::Dead) {
174 log::error!("exec: (disconnected) expected state = Running, machine {} found {:#?}", machine.get_key(), state);
175 machine.set_state(MachineState::Dead);
176 }
177 break;
178 },
179 }
180 }
181 }
183 fn is_channel_empty(&self) -> bool {
185 self.receiver.receiver.is_empty()
186 }
187 fn channel_len(&self) -> usize {
189 self.receiver.receiver.len()
190 }
191 }
192 #[doc(hidden)]
193 pub struct #sender_adapter_ident {
194 receiver_machine: std::sync::Weak<MachineAdapter>,
195 sender: crossbeam::channel::Sender<#name>,
196 instruction: Option<#name>,
197 }
198 impl #sender_adapter_ident {
199 fn try_send(&mut self) -> Result<usize, TrySendError> {
200 let instruction = self.instruction.take().unwrap();
201 match self.sender.try_send(instruction) {
202 Ok(()) => {
203 if let Some(machine) = self.receiver_machine.upgrade() {
204 Ok(machine.get_key())
205 } else {
206 Err(TrySendError::Disconnected)
207 }
208 },
209 Err(crossbeam::channel::TrySendError::Disconnected(inst)) => {
210 self.instruction = Some(inst);
211 Err(TrySendError::Disconnected)
212 },
213 Err(crossbeam::channel::TrySendError::Full(inst)) => {
214 self.instruction = Some(inst);
215 Err(TrySendError::Full)
216 },
217 }
218 }
219 }
220
221 impl MachineDependentSenderAdapter for #sender_adapter_ident {
222 fn try_send(&mut self) -> Result<usize, TrySendError> {
223 match self.try_send() {
224 Ok(receiver_key) => {
225 Ok(receiver_key)
226 },
227 Err(e) => Err(e),
228 }
229 }
230 }
231
232 impl MachineBuilder for #adapter_ident {
233 type InstructionSet = #name;
234 fn build_raw<T>(raw: T, channel_capacity: usize) -> (std::sync::Arc<parking_lot::Mutex<T>>, Sender<Self::InstructionSet>, MachineAdapter)
237 where T: 'static + Machine<Self::InstructionSet>
238 {
239 let (sender, receiver) = channel_with_capacity::<Self::InstructionSet>(channel_capacity);
241 Self::build_common(raw, sender, receiver)
242 }
243
244 fn build_addition<T>(machine: &std::sync::Arc<parking_lot::Mutex<T>>, channel_capacity: usize) -> (Sender<Self::InstructionSet>, MachineAdapter)
245 where T: 'static + Machine<Self::InstructionSet>
246 {
247 let (sender, receiver) = channel_with_capacity::<Self::InstructionSet>(channel_capacity);
249 Self::build_addition_common(machine, sender, receiver)
250 }
251
252 fn build_unbounded<T>(raw: T) -> (std::sync::Arc<parking_lot::Mutex<T>>, Sender<Self::InstructionSet>, MachineAdapter)
253 where T: 'static + Machine<Self::InstructionSet>
254 {
255 let (sender, receiver) = channel::<Self::InstructionSet>();
257 Self::build_common(raw, sender, receiver)
258 }
259
260 fn build_addition_unbounded<T>(machine: &std::sync::Arc<parking_lot::Mutex<T>>) -> (Sender<Self::InstructionSet>, MachineAdapter)
261 where T: 'static + Machine<Self::InstructionSet>
262 {
263 let (sender, receiver) = channel::<Self::InstructionSet>();
265 Self::build_addition_common(machine, sender, receiver)
266 }
267
268 fn build_common<T>(raw: T, sender: Sender<Self::InstructionSet>, receiver: Receiver<Self::InstructionSet>) -> (std::sync::Arc<parking_lot::Mutex<T>>, Sender<Self::InstructionSet>, MachineAdapter )
269 where T: 'static + Machine<Self::InstructionSet>
270 {
271 let instance: std::sync::Arc<parking_lot::Mutex<T>> = std::sync::Arc::new(parking_lot::Mutex::new(raw));
273 let machine = std::sync::Arc::clone(&instance) as std::sync::Arc<dyn Machine<Self::InstructionSet>>;
275 let adapter = Self {
277 machine, receiver,
278 };
279 let machine_adapter = MachineAdapter::new(Box::new(adapter));
281 (instance, sender, machine_adapter)
282 }
283
284 fn build_addition_common<T>(machine: &std::sync::Arc<parking_lot::Mutex<T>>, sender: Sender<Self::InstructionSet>, receiver: Receiver<Self::InstructionSet>) -> (Sender<Self::InstructionSet>, MachineAdapter )
285 where T: 'static + Machine<Self::InstructionSet>
286 {
287 let machine = std::sync::Arc::clone(machine) as std::sync::Arc<dyn Machine<Self::InstructionSet>>;
289 let adapter = Self {
291 machine, receiver,
292 };
293 let machine_adapter = MachineAdapter::new(Box::new(adapter));
295 (sender, machine_adapter)
296 }
297 }
298 };
299 TokenStream::from(expanded)
300}