use bytes::Bytes;
use lazy_static::lazy_static;
use serde::ser::SerializeSeq;
use serde::{Deserialize, Serialize, Serializer};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::mpsc::*;
use tokio::task::LocalSet;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum Event {
Spawn {
id: u64,
t: String,
world: u64,
},
Despawn {
id: u64,
world: u64,
},
Change {
id: u64,
t: String,
v: Bytes,
world: u64,
},
Invoke {
t: String,
v: Bytes,
world: u64,
},
Update(f32),
}
#[derive(Clone, Debug)]
pub enum Message {
Event(Event),
Chan(u64, UnboundedSender<Message>),
Close(u64),
Exit,
}
pub struct Chan {
pub id: u64,
pub tx: UnboundedSender<Message>,
pub rx: UnboundedReceiver<Message>,
}
impl Chan {
pub fn update(&mut self, dt: f32, raw: &[u8], out: &mut Vec<u8>) {
let mut de = serde_cbor::de::Deserializer::from_slice(raw);
while let Ok(event) = Event::deserialize(&mut de) {
let _ = self.tx.send(Message::Event(event));
}
let _ = self.tx.send(Message::Event(Event::Update(dt)));
let mut ser = serde_cbor::Serializer::new(out);
let mut seq = ser.serialize_seq(None).unwrap();
while let Ok(v) = self.rx.try_recv() {
match v {
Message::Event(event) => {
seq.serialize_element(&event).unwrap();
}
_ => {}
}
}
seq.end().unwrap();
}
pub fn close(&self) {
let _ = self.tx.send(Message::Close(self.id));
}
}
pub struct Runtime {
tx: UnboundedSender<Message>,
next_id: AtomicU64,
}
#[derive(Default)]
struct Entity {
t: String,
v: HashMap<String, Bytes>,
}
impl Runtime {
pub fn new() -> Self {
let (tx, mut rx) = unbounded_channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
let local_set = LocalSet::new();
local_set.block_on(&rt, async move {
let mut txs = HashMap::<u64, UnboundedSender<Message>>::new();
let mut entitys = HashMap::<u64, HashMap<u64, Entity>>::new();
while let Some(v) = rx.recv().await {
match &v {
Message::Event(event) => {
match event {
Event::Spawn { id, t, world } => {
entitys.entry(*world).or_default().insert(
*id,
Entity {
t: t.clone(),
v: HashMap::new(),
},
);
}
Event::Despawn { id, world } => {
entitys.entry(*world).or_default().remove(&*id);
}
Event::Change { id, t, v, world } => {
if let Some(e) =
entitys.entry(*world).or_default().get_mut(&*id)
{
e.v.insert(t.clone(), v.clone());
}
}
_ => {}
}
for i in txs.values() {
let _ = i.send(v.clone());
}
}
Message::Chan(id, tx) => {
txs.insert(*id, tx.clone());
for (world, entities) in entitys.iter() {
for (id, entity) in entities.iter() {
let _ = tx.send(Message::Event(Event::Spawn {
id: *id,
t: entity.t.clone(),
world: *world,
}));
for (k, v) in &entity.v {
let _ = tx.send(Message::Event(Event::Change {
id: *id,
t: k.clone(),
v: v.clone(),
world: *world,
}));
}
}
}
}
Message::Close(id) => {
txs.remove(&id);
}
Message::Exit => {
break;
}
}
}
});
});
Self {
tx: tx,
next_id: AtomicU64::new(0),
}
}
pub fn chan(&self) -> Chan {
let id = self.next_id.fetch_add(1, Ordering::SeqCst) + 1;
let (tx, rx) = unbounded_channel();
let _ = self.tx.send(Message::Chan(id, tx));
Chan {
id,
tx: self.tx.clone(),
rx: rx,
}
}
pub fn exit(&self) {
let _ = self.tx.send(Message::Exit);
}
}
impl Default for Runtime {
fn default() -> Self {
Self::new()
}
}
impl Drop for Runtime {
fn drop(&mut self) {
self.exit()
}
}
lazy_static! {
pub static ref RUNTIME: Runtime = Runtime::new();
}