extern crate proc_macro;
use proc_macro::TokenStream;
use quote::{format_ident, quote};
use syn::parse_macro_input;
use syn::DeriveInput;
#[proc_macro_derive(MachineImpl)]
pub fn derive_machine_impl_fn(input: TokenStream) -> TokenStream {
let input = parse_macro_input!(input as DeriveInput);
let name = &input.ident;
let adapter_ident = format_ident!("MachineBuilder{}", name.to_string());
let sender_adapter_ident = format_ident!("SenderAdapter{}", name.to_string());
let expanded = quote! {
impl MachineImpl for #name {
type Adapter = #adapter_ident;
type SenderAdapter = #sender_adapter_ident;
type InstructionSet = #name;
fn park_sender(
channel_id: usize,
receiver_machine: std::sync::Weak<MachineAdapter>,
sender: crossbeam::channel::Sender<Self::InstructionSet>,
instruction: Self::InstructionSet) -> Result<(),Self::InstructionSet> {
tls_executor_data.with(|t|{
let mut tls = t.borrow_mut();
if tls.id == 0 { Err(instruction) }
else {
if let ExecutorDataField::Machine(machine) = &tls.machine {
let adapter = #sender_adapter_ident {
receiver_machine,
sender: sender,
instruction: Some(instruction),
};
let shared_adapter = MachineSenderAdapter::new(machine, Box::new(adapter));
tls.sender_blocked(channel_id, shared_adapter);
}
Ok(())
}
})
}
}
#[doc(hidden)]
pub struct #adapter_ident {
machine: std::sync::Arc<dyn Machine<#name>>,
receiver: Receiver<#name>,
}
impl std::fmt::Debug for #adapter_ident {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "#adapter_ident {{ .. }}")
}
}
impl MachineDependentAdapter for #adapter_ident {
fn receive_cmd(&self, machine: &ShareableMachine, once: bool, drop: bool, time_slice: std::time::Duration, stats: &mut ExecutorStats) {
stats.tasks_executed += 1;
if machine.is_disconnected() {
if let Err(state) = machine.compare_and_exchange_state(MachineState::Ready, MachineState::Running) {
log::error!("exec: disconnected: expected state = Ready, machine {} found {:#?}", machine.get_key(), state);
machine.set_state(MachineState::Running);
}
if once {
self.machine.connected(machine.get_id());
stats.instructs_sent += 1;
}
self.machine.disconnected();
stats.instructs_sent += 1;
if let Err(state) = machine.compare_and_exchange_state(MachineState::Running, MachineState::Dead) {
log::error!("exec: disconnected: expected state = Running, machine {} found {:#?}", machine.get_key(), state);
machine.set_state(MachineState::Dead);
}
return
}
if let Err(state) = machine.compare_and_exchange_state(MachineState::Ready, MachineState::Running) {
log::error!("exec: expected state = Ready, machine {} found {:#?}", machine.get_key(), state);
machine.set_state(MachineState::Running);
}
let start = std::time::Instant::now();
if once {
self.machine.connected(machine.get_id());
stats.instructs_sent += 1;
}
let mut count = 0;
loop {
if start.elapsed() > time_slice {
stats.exhausted_slice += 1;
break;
}
let state = machine.get_state();
if state != MachineState::Running {
log::debug!("exec: no longer running, machine {} state {:#?}", machine.get_key(), state);
break;
}
match self.receiver.receiver.try_recv() {
Ok(cmd) => {
self.machine.receive(cmd);
stats.instructs_sent += 1;
count += 1;
},
Err(crossbeam::channel::TryRecvError::Empty) => {
if drop || machine.is_disconnected() {
log::trace!("exec: machine {} disconnected, cleaning up", machine.get_key());
self.machine.disconnected();
stats.instructs_sent += 1;
if let Err(state) = machine.compare_and_exchange_state(MachineState::Running, MachineState::Dead) {
log::error!("exec: (drop) expected state = Running, machine {} found {:#?}", machine.get_key(), state);
machine.set_state(MachineState::Dead);
}
}
break;
},
Err(crossbeam::channel::TryRecvError::Disconnected) => {
log::trace!("exec: machine {} disconnected, cleaning up", machine.get_key());
self.machine.disconnected();
stats.instructs_sent += 1;
if let Err(state) = machine.compare_and_exchange_state(MachineState::Running, MachineState::Dead) {
log::error!("exec: (disconnected) expected state = Running, machine {} found {:#?}", machine.get_key(), state);
machine.set_state(MachineState::Dead);
}
break;
},
}
}
}
fn is_channel_empty(&self) -> bool {
self.receiver.receiver.is_empty()
}
fn channel_len(&self) -> usize {
self.receiver.receiver.len()
}
}
#[doc(hidden)]
pub struct #sender_adapter_ident {
receiver_machine: std::sync::Weak<MachineAdapter>,
sender: crossbeam::channel::Sender<#name>,
instruction: Option<#name>,
}
impl #sender_adapter_ident {
fn try_send(&mut self) -> Result<usize, TrySendError> {
let instruction = self.instruction.take().unwrap();
match self.sender.try_send(instruction) {
Ok(()) => {
if let Some(machine) = self.receiver_machine.upgrade() {
Ok(machine.get_key())
} else {
Err(TrySendError::Disconnected)
}
},
Err(crossbeam::channel::TrySendError::Disconnected(inst)) => {
self.instruction = Some(inst);
Err(TrySendError::Disconnected)
},
Err(crossbeam::channel::TrySendError::Full(inst)) => {
self.instruction = Some(inst);
Err(TrySendError::Full)
},
}
}
}
impl MachineDependentSenderAdapter for #sender_adapter_ident {
fn try_send(&mut self) -> Result<usize, TrySendError> {
match self.try_send() {
Ok(receiver_key) => {
Ok(receiver_key)
},
Err(e) => Err(e),
}
}
}
impl MachineBuilder for #adapter_ident {
type InstructionSet = #name;
fn build_raw<T>(raw: T, channel_capacity: usize) -> (std::sync::Arc<parking_lot::Mutex<T>>, Sender<Self::InstructionSet>, MachineAdapter)
where T: 'static + Machine<Self::InstructionSet>
{
let (sender, receiver) = channel_with_capacity::<Self::InstructionSet>(channel_capacity);
Self::build_common(raw, sender, receiver)
}
fn build_addition<T>(machine: &std::sync::Arc<parking_lot::Mutex<T>>, channel_capacity: usize) -> (Sender<Self::InstructionSet>, MachineAdapter)
where T: 'static + Machine<Self::InstructionSet>
{
let (sender, receiver) = channel_with_capacity::<Self::InstructionSet>(channel_capacity);
Self::build_addition_common(machine, sender, receiver)
}
fn build_unbounded<T>(raw: T) -> (std::sync::Arc<parking_lot::Mutex<T>>, Sender<Self::InstructionSet>, MachineAdapter)
where T: 'static + Machine<Self::InstructionSet>
{
let (sender, receiver) = channel::<Self::InstructionSet>();
Self::build_common(raw, sender, receiver)
}
fn build_addition_unbounded<T>(machine: &std::sync::Arc<parking_lot::Mutex<T>>) -> (Sender<Self::InstructionSet>, MachineAdapter)
where T: 'static + Machine<Self::InstructionSet>
{
let (sender, receiver) = channel::<Self::InstructionSet>();
Self::build_addition_common(machine, sender, receiver)
}
fn build_common<T>(raw: T, sender: Sender<Self::InstructionSet>, receiver: Receiver<Self::InstructionSet>) -> (std::sync::Arc<parking_lot::Mutex<T>>, Sender<Self::InstructionSet>, MachineAdapter )
where T: 'static + Machine<Self::InstructionSet>
{
let instance: std::sync::Arc<parking_lot::Mutex<T>> = std::sync::Arc::new(parking_lot::Mutex::new(raw));
let machine = std::sync::Arc::clone(&instance) as std::sync::Arc<dyn Machine<Self::InstructionSet>>;
let adapter = Self {
machine, receiver,
};
let machine_adapter = MachineAdapter::new(Box::new(adapter));
(instance, sender, machine_adapter)
}
fn build_addition_common<T>(machine: &std::sync::Arc<parking_lot::Mutex<T>>, sender: Sender<Self::InstructionSet>, receiver: Receiver<Self::InstructionSet>) -> (Sender<Self::InstructionSet>, MachineAdapter )
where T: 'static + Machine<Self::InstructionSet>
{
let machine = std::sync::Arc::clone(machine) as std::sync::Arc<dyn Machine<Self::InstructionSet>>;
let adapter = Self {
machine, receiver,
};
let machine_adapter = MachineAdapter::new(Box::new(adapter));
(sender, machine_adapter)
}
}
};
TokenStream::from(expanded)
}