use mech_core::*;
use mech_syntax::compiler::Compiler;
use mech_utilities::*;
use std::thread::{self, JoinHandle};
use std::sync::Arc;
use std::cell::RefCell;
use hashbrown::{HashSet, HashMap};
use hashbrown::hash_map::Entry;
use crossbeam_channel::Sender;
use crossbeam_channel::Receiver;
use colored::*;
use super::program::Program;
use super::persister::Persister;
use std::net::{SocketAddr, UdpSocket};
extern crate websocket;
use websocket::OwnedMessage;
use std::io;
use std::time::Instant;
use std::sync::Mutex;
extern crate miniz_oxide;
extern crate bincode;
use std::io::{Write, BufReader, BufWriter, stdout};
use std::fs::{OpenOptions, File, canonicalize, create_dir};
use miniz_oxide::inflate::decompress_to_vec;
use miniz_oxide::deflate::compress_to_vec;
#[derive(Debug, Clone)]
pub enum ClientMessage {
Stop,
Pause,
Resume,
Reset,
Exit(i32),
Time(usize),
NewBlocks(usize),
Value(Value),
Transaction(Transaction),
String(String),
Error(MechError),
Timing(f64),
StepDone,
Done,
Ready,
}
pub struct RunLoop {
pub name: String,
pub socket_address: Option<String>,
thread: JoinHandle<()>,
pub outgoing: Sender<RunLoopMessage>,
pub incoming: Receiver<ClientMessage>,
}
impl RunLoop {
pub fn wait(self) {
self.thread.join().unwrap();
}
pub fn close(&self) {
match self.outgoing.send(RunLoopMessage::Stop) {
Ok(..) => (),
Err(..) => (),
}
}
pub fn send(&self, msg: RunLoopMessage) -> Result<(),&str> {
match self.outgoing.send(msg) {
Ok(_) => Ok(()),
Err(_) => Err("Failed to send message"),
}
}
pub fn receive(&self) -> Result<ClientMessage,&str> {
match self.incoming.recv() {
Ok(message) => Ok(message),
Err(_) => Err("Failed to send message"),
}
}
pub fn is_empty(&self) -> bool {
self.incoming.is_empty()
}
pub fn channel(&self) -> Sender<RunLoopMessage> {
self.outgoing.clone()
}
}
pub struct ProgramRunner {
pub name: String,
pub socket: Option<Arc<UdpSocket>>,
pub registry: String,
pub capabilities: Option<HashSet<Capability>>,
}
impl ProgramRunner {
pub fn new(name:&str, capabilities: Option<HashSet<Capability>>) -> ProgramRunner {
let socket = match UdpSocket::bind("127.0.0.1:0") {
Ok(socket) => Some(Arc::new(socket)),
_ => None,
};
ProgramRunner {
name: name.to_owned(),
socket,
registry: "https://gitlab.com/mech-lang/machines/mech/-/raw/v0.1-beta/src/registry.mec".to_string(),
capabilities,
}
}
pub fn add_persist_channel(&mut self, persister:&mut Persister) {
}
pub fn run(self) -> Result<RunLoop,MechError> {
let (outgoing, program_incoming) = crossbeam_channel::unbounded();
let runloop_outgoing = outgoing.clone();
let (client_outgoing, incoming) = crossbeam_channel::unbounded();
let name = format!("{}", &self.name.clone());
let socket_address = match self.socket {
Some(ref socket) => Some(socket.local_addr().unwrap().to_string()),
None => None,
};
let thread = thread::Builder::new().name(name.clone()).spawn(move || {
let mut program = Program::new("new program", 100, 1000, outgoing.clone(), program_incoming, self.registry, self.capabilities);
let program_channel_udpsocket = program.outgoing.clone();
let program_channel_udpsocket = program.outgoing.clone();
match &self.socket {
Some(ref socket) => {
let socket_receiver = socket.clone();
let thread = thread::Builder::new().name("remote core listener".to_string()).spawn(move || {
let mut compressed_message = [0; 16_383];
loop {
match socket_receiver.recv_from(&mut compressed_message) {
Ok((amt, src)) => {
let serialized_message = decompress_to_vec(&compressed_message).expect("Failed to decompress!");
let message: Result<SocketMessage, bincode::Error> = bincode::deserialize(&serialized_message);
match message {
Ok(SocketMessage::RemoteCoreConnect(remote_core_address)) => {
program_channel_udpsocket.send(RunLoopMessage::RemoteCoreConnect(MechSocket::UdpSocket(remote_core_address)));
}
Ok(SocketMessage::RemoteCoreDisconnect(remote_core_address)) => {
program_channel_udpsocket.send(RunLoopMessage::RemoteCoreDisconnect(remote_core_address));
}
Ok(SocketMessage::Listening(register)) => {
program_channel_udpsocket.send(RunLoopMessage::Listening((hash_str(&src.to_string()), register)));
}
Ok(SocketMessage::Ping) => {
println!("Got a ping from: {:?}", src);
let message = bincode::serialize(&SocketMessage::Pong).unwrap();
let compressed_message = compress_to_vec(&message,6);
socket_receiver.send_to(&compressed_message, src);
}
Ok(SocketMessage::Pong) => {
println!("Got a pong from: {:?}", src);
}
Ok(SocketMessage::Transaction(txn)) => {
program_channel_udpsocket.send(RunLoopMessage::String((format!("Received Txn: {:?}", txn),None)));
program_channel_udpsocket.send(RunLoopMessage::Transaction(txn));
}
Ok(x) => println!("Unhandled Message {:?}", x),
Err(error) => println!("{:?}", error),
}
}
Err(error) => {
}
}
}
}).unwrap();
}
None => (),
}
match program.download_dependencies(Some(client_outgoing.clone())) {
Ok(resolved_errors) => {
let (_,_,nbo) = program.mech.resolve_errors(&resolved_errors);
program.mech.schedule_blocks();
for output in nbo {
program.mech.step(&output);
}
}
Err(err) => {
client_outgoing.send(ClientMessage::Error(err.clone()));
}
}
client_outgoing.send(ClientMessage::Ready);
let mut paused = false;
let mut iteration: u64 = 0;
'runloop: loop {
match (program.incoming.recv(), paused) {
(Ok(RunLoopMessage::Transaction(txn)), false) => {
let now = Instant::now();
match program.mech.process_transaction(&txn) {
Ok((new_block_ids,changed_registers)) => {
for trigger_register in &changed_registers {
let mut machine_triggers = vec![];
match &program.mech.schedule.trigger_to_output.get(trigger_register) {
Some(ref output) => {
for register in output.iter() {
machine_triggers.push(register.clone());
}
}
None => ()
}
for register in machine_triggers {
program.trigger_machine(®ister);
}
match program.trigger_to_listener.entry(trigger_register.clone()) {
Entry::Occupied(mut o) => {
match program.mech.schedule.trigger_to_output.get(trigger_register) {
Some(output) => {
for (register,remote_cores) in &program.listeners {
if output.contains(register) {
o.insert((register.clone(),remote_cores.clone()));
break;
}
}
}
None => ()
}
let ((output_table_id,row_ix,col_ix),listeners) = o.get();
let trigger = o.key();
match program.mech.get_table_by_id(*output_table_id.unwrap()) {
Ok(table) =>{
let table_brrw = table.borrow();
let changes = table_brrw.data_to_changes();
let message = bincode::serialize(&SocketMessage::Transaction(changes)).unwrap();
let compressed_message = compress_to_vec(&message,6);
for remote_core_id in listeners {
match (&self.socket,program.remote_cores.get_mut(remote_core_id)) {
(Some(ref socket),Some(MechSocket::UdpSocket(remote_core_address))) => {
let len = socket.send_to(&compressed_message, remote_core_address.clone()).unwrap();
}
(Some(ref socket),Some(MechSocket::WebSocketSender(websocket))) => {
match websocket.send_message(&OwnedMessage::Binary(compressed_message.clone())) {
Ok(()) => (),
Err(x) => {
client_outgoing.send(ClientMessage::String(format!("Remote core disconnected: {}", humanize(&remote_core_id))));
program.remote_cores.remove(remote_core_id);
for (core_id, core_address) in &program.remote_cores {
match core_address {
MechSocket::UdpSocket(core_address) => {
let message = bincode::serialize(&SocketMessage::RemoteCoreDisconnect(*remote_core_id)).unwrap();
let compressed_message = compress_to_vec(&message,6);
let len = socket.send_to(&compressed_message, core_address.clone()).unwrap();
}
MechSocket::WebSocket(_) => {
}
_ => (),
}
}
},
};
}
_ => (),
}
}
}
Err(err) => {
client_outgoing.send(ClientMessage::Error(err));
}
}
}
Entry::Vacant(mut v) => {
match program.mech.schedule.trigger_to_output.get(trigger_register) {
Some(output) => {
for (register,remote_cores) in &program.listeners {
if output.contains(register) {
v.insert((register.clone(),remote_cores.clone()));
break;
}
}
}
None => ()
}
}
}
}
}
Err(err) => {
client_outgoing.send(ClientMessage::Error(err));
}
};
let elapsed_time = now.elapsed();
let cycle_duration = elapsed_time.as_nanos() as f64;
client_outgoing.send(ClientMessage::Timing(1.0 / (cycle_duration / 1_000_000_000.0)));
client_outgoing.send(ClientMessage::StepDone);
},
(Ok(RunLoopMessage::Listening((core_id, register))), _) => {
let (table_id,row,col) = ®ister;
let name = program.mech.get_name(*table_id.unwrap()).unwrap();
match program.mech.output.contains(®ister.clone()) {
true => {
client_outgoing.send(ClientMessage::String(format!("Sending #{} to {}", name, humanize(&core_id))));
let mut listeners = program.listeners.entry(register.clone()).or_insert(HashSet::new());
listeners.insert(core_id);
match program.mech.get_table_by_id(*table_id.unwrap()) {
Ok(table) => {
let table_brrw = table.borrow();
let changes = table_brrw.to_changes();
let message = bincode::serialize(&SocketMessage::Transaction(changes)).unwrap();
let compressed_message = compress_to_vec(&message,6);
match (&self.socket,program.remote_cores.get_mut(&core_id)) {
(Some(ref socket),Some(MechSocket::UdpSocket(remote_core_address))) => {
let len = socket.send_to(&compressed_message, remote_core_address.clone()).unwrap();
}
(_,Some(MechSocket::WebSocketSender(websocket))) => {
websocket.send_message(&OwnedMessage::Binary(compressed_message)).unwrap();
}
_ => (),
}
}
Err(_) => (),
}
},
false => (),
}
},
(Ok(RunLoopMessage::RemoteCoreDisconnect(remote_core_id)), _) => {
match &self.socket {
Some(ref socket) => {
let socket_address = hash_str(&socket.local_addr().unwrap().to_string());
if remote_core_id != socket_address {
match program.remote_cores.get(&remote_core_id) {
None => {
}
Some(_) => {
client_outgoing.send(ClientMessage::String(format!("Remote core disconnected: {}", humanize(&remote_core_id))));
program.remote_cores.remove(&remote_core_id);
for (core_id, core_address) in &program.remote_cores {
match core_address {
MechSocket::UdpSocket(core_address) => {
let message = bincode::serialize(&SocketMessage::RemoteCoreDisconnect(remote_core_id)).unwrap();
let compressed_message = compress_to_vec(&message,6);
let len = socket.send_to(&compressed_message, core_address.clone()).unwrap();
}
MechSocket::WebSocket(_) => {
}
_ => (),
}
}
}
}
}
}
None => (),
}
}
(Ok(RunLoopMessage::RemoteCoreConnect(MechSocket::UdpSocket(remote_core_address))), _) => {
match &self.socket {
Some(ref socket) => {
let socket_address = socket.local_addr().unwrap().to_string();
if remote_core_address != socket_address {
match program.remote_cores.get(&hash_str(&remote_core_address)) {
None => {
program.remote_cores.insert(hash_str(&remote_core_address),MechSocket::UdpSocket(remote_core_address.clone()));
client_outgoing.send(ClientMessage::String(format!("Remote core connected: {}", humanize(&hash_str(&remote_core_address)))));
let message = bincode::serialize(&SocketMessage::RemoteCoreConnect(socket_address.clone())).unwrap();
let compressed_message = compress_to_vec(&message,6);
let len = socket.send_to(&compressed_message, remote_core_address.clone()).unwrap();
for (core_id, core_address) in &program.remote_cores {
match core_address {
MechSocket::UdpSocket(core_address) => {
let message = bincode::serialize(&SocketMessage::RemoteCoreConnect(core_address.to_string())).unwrap();
let compressed_message = compress_to_vec(&message,6);
let len = socket.send_to(&compressed_message, remote_core_address.clone()).unwrap();
}
MechSocket::WebSocket(_) => {
}
_ => (),
}
}
}
Some(_) => {
for register in &program.mech.needed_registers() {
let message = bincode::serialize(&SocketMessage::Listening(register.clone())).unwrap();
let compressed_message = compress_to_vec(&message,6);
let len = socket.send_to(&compressed_message, remote_core_address.clone()).unwrap();
}
}
}
}
}
None => (),
}
}
(Ok(RunLoopMessage::RemoteCoreConnect(MechSocket::WebSocket(ws_stream))), _) => {
let remote_core_address = ws_stream.peer_addr().unwrap();
let remote_core_id = hash_str(&remote_core_address.to_string());
let (mut ws_incoming, mut ws_outgoing) = ws_stream.split().unwrap();
for needed_register in &program.mech.needed_registers() {
let message = bincode::serialize(&SocketMessage::Listening(needed_register.clone())).unwrap();
let compressed_message = compress_to_vec(&message,6);
ws_outgoing.send_message(&OwnedMessage::Binary(compressed_message)).unwrap();
}
program.remote_cores.insert(remote_core_id, MechSocket::WebSocketSender(ws_outgoing));
let program_channel_websocket = program.outgoing.clone();
client_outgoing.send(ClientMessage::String(format!("Remote core connected: {}", humanize(&hash_str(&remote_core_address.to_string())))));
thread::spawn(move || {
for message in ws_incoming.incoming_messages() {
let message = message.unwrap();
match message {
OwnedMessage::Close(_) => {
return;
}
OwnedMessage::Binary(msg) => {
let message: Result<SocketMessage, bincode::Error> = bincode::deserialize(&msg);
match message {
Ok(SocketMessage::Listening(register)) => {
program_channel_websocket.send(RunLoopMessage::Listening((remote_core_id, register)));
}
Ok(SocketMessage::Transaction(txn)) => {
program_channel_websocket.send(RunLoopMessage::Transaction(txn));
},
x => {println!("Unhandled Message: {:?}", x);},
}
}
_ => (),
}
}
});
}
(Ok(RunLoopMessage::String((string,color))), _) => {
let out_string = match color {
Some(color) => {
let r: u8 = (color >> 16) as u8;
let g: u8 = (color >> 8) as u8;
let b: u8 = color as u8;
format!("{}", string.truecolor(r,g,b))
},
None => string,
};
client_outgoing.send(ClientMessage::String(out_string));
}
(Ok(RunLoopMessage::Exit(exit_code)), _) => {
client_outgoing.send(ClientMessage::Exit(exit_code));
}
(Ok(RunLoopMessage::DumpCore(core_ix)), _) => {
let result = match core_ix {
0 => {
let core = &program.mech;
let mut minicores = vec![];
for (_,core) in &program.cores {
let minicore = MiniCore::minify_core(&core);
minicores.push(minicore);
}
bincode::serialize(&minicores).unwrap()
}
1 => {
let core = &program.mech;
let minicore = MiniCore::minify_core(&core);
let minicores = MechCode::MiniCores(vec![minicore]);
bincode::serialize(&minicores).unwrap()
}
_ => {
let core = program.cores.get_mut(&core_ix).unwrap();
let minicore = MiniCore::minify_core(&core);
let minicores = MechCode::MiniCores(vec![minicore]);
bincode::serialize(&minicores).unwrap()
}
};
let output_name = format!("core-{}.blx",core_ix);
let file = OpenOptions::new().write(true).create(true).open(&output_name).unwrap();
let mut writer = BufWriter::new(file);
if let Err(e) = writer.write_all(&result) {
panic!("{} Failed to write core(s)! {:?}", "[Error]".truecolor(170,51,85), e);
std::process::exit(1);
}
writer.flush().unwrap();
client_outgoing.send(ClientMessage::String(format!("Wrote {:?}", output_name)));
client_outgoing.send(ClientMessage::Done);
}
(Ok(RunLoopMessage::NewCore), _) => {
let new_core = Core::new();
let new_core_ix = program.cores.len() as u64 + 2;
program.cores.insert(new_core_ix, new_core);
client_outgoing.send(ClientMessage::Done);
}
(Ok(RunLoopMessage::Code((core_ix,code))), _) => {
let sections: Vec<Vec<SectionElement>> = match code {
MechCode::MiniCores(cores) => {
for mc in cores {
let core = MiniCore::maximize_core(&mc);
let ix = program.cores.len() + 2;
program.cores.insert(ix as u64,core);
}
continue 'runloop;
}
MechCode::String(code) => {
let mut compiler = Compiler::new();
match compiler.compile_str(&code) {
Ok(sections) => sections,
Err(err) => {
client_outgoing.send(ClientMessage::Error(err));
client_outgoing.send(ClientMessage::StepDone);
continue 'runloop;
}
}
},
MechCode::MiniBlocks(mb_sections) => {
let mut sections: Vec<Vec<SectionElement>> = vec![];
for section in mb_sections {
let section: Vec<SectionElement> = section.iter().map(|mb| SectionElement::Block(MiniBlock::maximize_block(mb))).collect();
sections.push(section);
}
sections
}
};
{
let result = {
let mut core: &mut Core = match core_ix {
1 => &mut program.mech,
_ => program.cores.get_mut(&core_ix).unwrap(),
};
core.load_sections(sections)
};
for (new_block_ids,_,new_block_errors) in result {
if new_block_errors.len() > 0 {
match program.download_dependencies(Some(client_outgoing.clone())) {
Ok(resolved_errors) => {
let mut core: &mut Core = match core_ix {
1 => &mut program.mech,
_ => program.cores.get_mut(&core_ix).unwrap(),
};
let (_,_,nbo) = core.resolve_errors(&resolved_errors);
core.schedule_blocks();
for output in nbo {
core.step(&output);
}
}
Err(err) => {
client_outgoing.send(ClientMessage::Error(err.clone()));
}
}
}
if let Some(last_block_id) = new_block_ids.last() {
let mut core: &mut Core = match core_ix {
1 => &mut program.mech,
_ => program.cores.get_mut(&core_ix).unwrap(),
};
let block = core.blocks.get(last_block_id).unwrap().borrow();
let out_id = match block.transformations.last() {
Some(Transformation::Function{name,arguments,out}) => {
let (out_id,_,_) = out;
*out_id
}
Some(Transformation::TableDefine{table_id,indices,out}) => {
*out
}
Some(Transformation::Set{src_id, src_row, src_col, dest_id, dest_row, dest_col}) => {
*dest_id
}
Some(Transformation::TableAlias{table_id, alias}) => {
*table_id
}
Some(Transformation::Whenever{table_id, ..}) => {
*table_id
}
_ => {
TableId::Local(0)
}
};
if let Ok(out_table) = block.get_table(&out_id) {
client_outgoing.send(ClientMessage::String(format!("{:?}", out_table.borrow())));
}
}
}
let mut core: &mut Core = match core_ix {
1 => &mut program.mech,
_ => program.cores.get_mut(&core_ix).unwrap(),
};
for (error,_) in core.full_errors.iter() {
client_outgoing.send(ClientMessage::Error(error.clone()));
}
}
client_outgoing.send(ClientMessage::StepDone);
}
(Ok(RunLoopMessage::Reset(core_ix)), _) => {
let new_core = Core::new();
match core_ix {
1 => {program.mech = new_core;}
_ => {program.cores.insert(core_ix,new_core);},
};
client_outgoing.send(ClientMessage::Reset);
},
(Ok(RunLoopMessage::PrintCore(core_id)), _) => {
match core_id {
None => {client_outgoing.send(ClientMessage::String(format!("There are {:?} cores running.", program.cores.len() + 1)));}
Some(0) => {client_outgoing.send(ClientMessage::String("Core indices start a 1.".to_string()));}
Some(1) => {client_outgoing.send(ClientMessage::String(format!("{:?}", program.mech)));}
Some(core_id) => {
if core_id < program.cores.len() as u64 + 1 {
client_outgoing.send(ClientMessage::String(format!("{:?}", program.cores.get(&core_id).unwrap())));
}
}
}
},
(Ok(RunLoopMessage::PrintInfo), _) => {
client_outgoing.send(ClientMessage::String(format!("{:?}", program)));
},
(Ok(RunLoopMessage::PrintTable(table_id)), _) => {
let result = match program.mech.get_table_by_id(table_id) {
Ok(table_ref) => format!("{:?}", table_ref.borrow()),
Err(x) => format!("{:?}", x),
};
client_outgoing.send(ClientMessage::String(result));
},
(Ok(RunLoopMessage::Blocks(miniblocks)), _) => {
client_outgoing.send(ClientMessage::StepDone);
}
(Ok(RunLoopMessage::Stop), _) => {
client_outgoing.send(ClientMessage::Stop);
break 'runloop;
},
(Ok(RunLoopMessage::GetValue((table_id,row,column))),_) => {
let msg = match program.mech.get_table_by_id(table_id) {
Ok(table) => {
match table.borrow().get(&row,&column) {
Ok(v) => ClientMessage::Value(v.clone()),
Err(error) => ClientMessage::Error(error.clone()),
}
}
Err(error) => ClientMessage::Error(error.clone()),
};
client_outgoing.send(msg);
},
(Ok(RunLoopMessage::Pause), false) => {
paused = true;
client_outgoing.send(ClientMessage::Pause);
},
(Ok(RunLoopMessage::Resume), true) => {
paused = false;
client_outgoing.send(ClientMessage::Resume);
},
(Ok(RunLoopMessage::StepBack), _) => {
if !paused {
paused = true;
}
}
(Ok(RunLoopMessage::StepForward), true) => {
}
(Err(_), _) => {
break 'runloop
},
x => println!("qq {:?}", x),
}
client_outgoing.send(ClientMessage::Done);
}
}).unwrap();
Ok(RunLoop { name, socket_address, thread, outgoing: runloop_outgoing, incoming })
}
}