use crate::callback::Callback;
use crate::scheduler::{scheduler, Runnable, Shared};
use anymap::{self, AnyMap};
use bincode;
use log::warn;
use serde::{Deserialize, Serialize};
use slab::Slab;
use std::any::TypeId;
use std::cell::RefCell;
use std::collections::{hash_map, HashMap, HashSet};
use std::fmt;
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::rc::Rc;
use stdweb::Value;
#[allow(unused_imports)]
use stdweb::{_js_impl, js};
#[derive(Serialize, Deserialize)]
enum ToWorker<T> {
Connected(HandlerId),
ProcessInput(HandlerId, T),
Disconnected(HandlerId),
Destroy,
}
impl<T> Transferable for ToWorker<T> where T: Serialize + for<'de> Deserialize<'de> {}
#[derive(Serialize, Deserialize)]
enum FromWorker<T> {
WorkerLoaded,
ProcessOutput(HandlerId, T),
}
impl<T> Transferable for FromWorker<T> where T: Serialize + for<'de> Deserialize<'de> {}
pub trait Transferable
where
Self: Serialize + for<'de> Deserialize<'de>,
{
}
trait Packed {
fn pack(&self) -> Vec<u8>;
fn unpack(data: &[u8]) -> Self;
}
impl<T: Transferable> Packed for T {
fn pack(&self) -> Vec<u8> {
bincode::serialize(&self).expect("can't serialize a transferable object")
}
fn unpack(data: &[u8]) -> Self {
bincode::deserialize(&data).expect("can't deserialize a transferable object")
}
}
type SharedOutputSlab<AGN> = Shared<Slab<Option<Callback<<AGN as Agent>::Output>>>>;
#[derive(Serialize, Deserialize, Eq, PartialEq, Hash, Clone, Copy)]
pub struct HandlerId(usize, bool);
impl HandlerId {
fn new(id: usize, respondable: bool) -> Self {
HandlerId(id, respondable)
}
fn raw_id(self) -> usize {
self.0
}
pub fn is_respondable(&self) -> bool {
self.1
}
}
pub trait Bridged: Agent + Sized + 'static {
fn bridge(callback: Callback<Self::Output>) -> Box<dyn Bridge<Self>>;
}
pub trait Dispatched: Agent + Sized + 'static {
fn dispatcher() -> Dispatcher<Self>;
}
pub struct Dispatcher<T>(Box<dyn Bridge<T>>);
impl<T> fmt::Debug for Dispatcher<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Dispatcher<_>")
}
}
impl<T> Deref for Dispatcher<T> {
type Target = dyn Bridge<T>;
fn deref(&self) -> &Self::Target {
self.0.deref()
}
}
impl<T> DerefMut for Dispatcher<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.0.deref_mut()
}
}
pub trait Dispatchable: Discoverer {}
pub trait Threaded {
fn register();
}
impl<T> Threaded for T
where
T: Agent<Reach = Public>,
{
fn register() {
let scope = AgentScope::<T>::new();
let responder = WorkerResponder {};
let link = AgentLink::connect(&scope, responder);
let upd = AgentUpdate::Create(link);
scope.send(upd);
let handler = move |data: Vec<u8>| {
let msg = ToWorker::<T::Input>::unpack(&data);
match msg {
ToWorker::Connected(id) => {
let upd = AgentUpdate::Connected(id);
scope.send(upd);
}
ToWorker::ProcessInput(id, value) => {
let upd = AgentUpdate::Input(value, id);
scope.send(upd);
}
ToWorker::Disconnected(id) => {
let upd = AgentUpdate::Disconnected(id);
scope.send(upd);
}
ToWorker::Destroy => {
let upd = AgentUpdate::Destroy;
scope.send(upd);
js! {
self.close();
};
}
}
};
let loaded: FromWorker<T::Output> = FromWorker::WorkerLoaded;
let loaded = loaded.pack();
js! {
var handler = @{handler};
self.onmessage = function(event) {
handler(event.data);
};
self.postMessage(@{loaded});
};
}
}
impl<T> Bridged for T
where
T: Agent,
{
fn bridge(callback: Callback<Self::Output>) -> Box<dyn Bridge<Self>> {
Self::Reach::spawn_or_join(Some(callback))
}
}
impl<T> Dispatched for T
where
T: Agent,
<T as Agent>::Reach: Dispatchable,
{
fn dispatcher() -> Dispatcher<T> {
Dispatcher(Self::Reach::spawn_or_join::<T>(None))
}
}
#[doc(hidden)]
pub trait Discoverer {
fn spawn_or_join<AGN: Agent>(_callback: Option<Callback<AGN::Output>>) -> Box<dyn Bridge<AGN>> {
unimplemented!();
}
}
pub trait Bridge<AGN: Agent> {
fn send(&mut self, msg: AGN::Input);
}
struct LocalAgent<AGN: Agent> {
scope: AgentScope<AGN>,
slab: SharedOutputSlab<AGN>,
}
type Last = bool;
impl<AGN: Agent> LocalAgent<AGN> {
pub fn new(scope: &AgentScope<AGN>) -> Self {
let slab = Rc::new(RefCell::new(Slab::new()));
LocalAgent {
scope: scope.clone(),
slab,
}
}
fn slab(&self) -> SharedOutputSlab<AGN> {
self.slab.clone()
}
fn create_bridge(&mut self, callback: Option<Callback<AGN::Output>>) -> ContextBridge<AGN> {
let respondable = callback.is_some();
let id: usize = self.slab.borrow_mut().insert(callback);
let id = HandlerId::new(id, respondable);
ContextBridge {
scope: self.scope.clone(),
id,
}
}
fn remove_bridge(&mut self, bridge: &ContextBridge<AGN>) -> Last {
let mut slab = self.slab.borrow_mut();
let _ = slab.remove(bridge.id.raw_id());
slab.is_empty()
}
}
thread_local! {
static LOCAL_AGENTS_POOL: RefCell<AnyMap> = RefCell::new(AnyMap::new());
}
pub struct Context;
impl Discoverer for Context {
fn spawn_or_join<AGN: Agent>(callback: Option<Callback<AGN::Output>>) -> Box<dyn Bridge<AGN>> {
let mut scope_to_init = None;
let bridge = LOCAL_AGENTS_POOL.with(|pool| {
match pool.borrow_mut().entry::<LocalAgent<AGN>>() {
anymap::Entry::Occupied(mut entry) => {
entry.get_mut().create_bridge(callback)
}
anymap::Entry::Vacant(entry) => {
let scope = AgentScope::<AGN>::new();
let launched = LocalAgent::new(&scope);
let responder = SlabResponder {
slab: launched.slab(),
};
scope_to_init = Some((scope.clone(), responder));
entry.insert(launched).create_bridge(callback)
}
}
});
if let Some((scope, responder)) = scope_to_init {
let agent_link = AgentLink::connect(&scope, responder);
let upd = AgentUpdate::Create(agent_link);
scope.send(upd);
}
let upd = AgentUpdate::Connected(bridge.id);
bridge.scope.send(upd);
Box::new(bridge)
}
}
impl Dispatchable for Context {}
struct SlabResponder<AGN: Agent> {
slab: Shared<Slab<Option<Callback<AGN::Output>>>>,
}
impl<AGN: Agent> Responder<AGN> for SlabResponder<AGN> {
fn response(&self, id: HandlerId, output: AGN::Output) {
locate_callback_and_respond::<AGN>(&self.slab, id, output);
}
}
fn locate_callback_and_respond<AGN: Agent>(
slab: &SharedOutputSlab<AGN>,
id: HandlerId,
output: AGN::Output,
) {
match slab.borrow().get(id.raw_id()).cloned() {
Some(Some(callback)) => callback.emit(output),
Some(None) => warn!("The Id of the handler: {}, while present in the slab, is not associated with a callback.", id.raw_id()),
None => warn!("Id of handler does not exist in the slab: {}.", id.raw_id()),
}
}
struct ContextBridge<AGN: Agent> {
scope: AgentScope<AGN>,
id: HandlerId,
}
impl<AGN: Agent> Bridge<AGN> for ContextBridge<AGN> {
fn send(&mut self, msg: AGN::Input) {
let upd = AgentUpdate::Input(msg, self.id);
self.scope.send(upd);
}
}
impl<AGN: Agent> Drop for ContextBridge<AGN> {
fn drop(&mut self) {
LOCAL_AGENTS_POOL.with(|pool| {
let terminate_worker = {
if let Some(launched) = pool.borrow_mut().get_mut::<LocalAgent<AGN>>() {
launched.remove_bridge(self)
} else {
false
}
};
let upd = AgentUpdate::Disconnected(self.id);
self.scope.send(upd);
if terminate_worker {
let upd = AgentUpdate::Destroy;
self.scope.send(upd);
pool.borrow_mut().remove::<LocalAgent<AGN>>();
}
});
}
}
pub struct Job;
impl Discoverer for Job {
fn spawn_or_join<AGN: Agent>(callback: Option<Callback<AGN::Output>>) -> Box<dyn Bridge<AGN>> {
let callback = callback.expect("Callback required for Job");
let scope = AgentScope::<AGN>::new();
let responder = CallbackResponder { callback };
let agent_link = AgentLink::connect(&scope, responder);
let upd = AgentUpdate::Create(agent_link);
scope.send(upd);
let upd = AgentUpdate::Connected(SINGLETON_ID);
scope.send(upd);
let bridge = JobBridge { scope };
Box::new(bridge)
}
}
const SINGLETON_ID: HandlerId = HandlerId(0, true);
struct CallbackResponder<AGN: Agent> {
callback: Callback<AGN::Output>,
}
impl<AGN: Agent> Responder<AGN> for CallbackResponder<AGN> {
fn response(&self, id: HandlerId, output: AGN::Output) {
assert_eq!(id.raw_id(), SINGLETON_ID.raw_id());
self.callback.emit(output);
}
}
struct JobBridge<AGN: Agent> {
scope: AgentScope<AGN>,
}
impl<AGN: Agent> Bridge<AGN> for JobBridge<AGN> {
fn send(&mut self, msg: AGN::Input) {
let upd = AgentUpdate::Input(msg, SINGLETON_ID);
self.scope.send(upd);
}
}
impl<AGN: Agent> Drop for JobBridge<AGN> {
fn drop(&mut self) {
let upd = AgentUpdate::Disconnected(SINGLETON_ID);
self.scope.send(upd);
let upd = AgentUpdate::Destroy;
self.scope.send(upd);
}
}
pub struct Private;
impl Discoverer for Private {
fn spawn_or_join<AGN: Agent>(callback: Option<Callback<AGN::Output>>) -> Box<dyn Bridge<AGN>> {
let callback = callback.expect("Callback required for Private agents");
let handler = move |data: Vec<u8>| {
let msg = FromWorker::<AGN::Output>::unpack(&data);
match msg {
FromWorker::WorkerLoaded => {
}
FromWorker::ProcessOutput(id, output) => {
assert_eq!(id.raw_id(), SINGLETON_ID.raw_id());
callback.emit(output);
}
}
};
let name_of_resource = AGN::name_of_resource();
let worker = js! {
var worker = new Worker(@{name_of_resource});
var handler = @{handler};
worker.onmessage = function(event) {
handler(event.data);
};
return worker;
};
let bridge = PrivateBridge {
worker,
_agent: PhantomData,
};
Box::new(bridge)
}
}
pub struct PrivateBridge<T: Agent> {
worker: Value,
_agent: PhantomData<T>,
}
impl<AGN: Agent> Bridge<AGN> for PrivateBridge<AGN> {
fn send(&mut self, msg: AGN::Input) {
let msg = ToWorker::ProcessInput(SINGLETON_ID, msg).pack();
let worker = &self.worker;
js! {
var worker = @{worker};
var bytes = @{msg};
worker.postMessage(bytes);
};
}
}
impl<AGN: Agent> Drop for PrivateBridge<AGN> {
fn drop(&mut self) {
}
}
struct RemoteAgent<AGN: Agent> {
worker: Value,
slab: SharedOutputSlab<AGN>,
}
impl<AGN: Agent> RemoteAgent<AGN> {
pub fn new(worker: Value, slab: SharedOutputSlab<AGN>) -> Self {
RemoteAgent { worker, slab }
}
fn create_bridge(&mut self, callback: Option<Callback<AGN::Output>>) -> PublicBridge<AGN> {
let respondable = callback.is_some();
let id: usize = self.slab.borrow_mut().insert(callback);
let id = HandlerId::new(id, respondable);
PublicBridge {
worker: self.worker.clone(),
id,
_agent: PhantomData,
}
}
fn remove_bridge(&mut self, bridge: &PublicBridge<AGN>) -> Last {
let mut slab = self.slab.borrow_mut();
let _ = slab.remove(bridge.id.raw_id());
slab.is_empty()
}
}
thread_local! {
static REMOTE_AGENTS_POOL: RefCell<AnyMap> = RefCell::new(AnyMap::new());
static REMOTE_AGENTS_LOADED: RefCell<HashSet<TypeId>> = RefCell::new(HashSet::new());
static REMOTE_AGENTS_EARLY_MSGS_QUEUE: RefCell<HashMap<TypeId, Vec<Vec<u8>>>> = RefCell::new(HashMap::new());
}
pub struct Public;
impl Discoverer for Public {
fn spawn_or_join<AGN: Agent>(callback: Option<Callback<AGN::Output>>) -> Box<dyn Bridge<AGN>> {
let bridge = REMOTE_AGENTS_POOL.with(|pool| {
match pool.borrow_mut().entry::<RemoteAgent<AGN>>() {
anymap::Entry::Occupied(mut entry) => {
entry.get_mut().create_bridge(callback)
}
anymap::Entry::Vacant(entry) => {
let slab: Shared<Slab<Option<Callback<AGN::Output>>>> =
Rc::new(RefCell::new(Slab::new()));
let handler = {
let slab = slab.clone();
move |data: Vec<u8>, worker: Value| {
let msg = FromWorker::<AGN::Output>::unpack(&data);
match msg {
FromWorker::WorkerLoaded => {
let _ = REMOTE_AGENTS_LOADED.with(|local| {
local.borrow_mut().insert(TypeId::of::<AGN>())
});
REMOTE_AGENTS_EARLY_MSGS_QUEUE.with(|local| {
if let Some(msgs) =
local.borrow_mut().get_mut(&TypeId::of::<AGN>())
{
for msg in msgs.drain(..) {
let worker = &worker;
js! {@{worker}.postMessage(@{msg});};
}
}
});
}
FromWorker::ProcessOutput(id, output) => {
locate_callback_and_respond::<AGN>(&slab, id, output);
}
}
}
};
let name_of_resource = AGN::name_of_resource();
let worker = js! {
var worker = new Worker(@{name_of_resource});
var handler = @{handler};
worker.onmessage = function(event) {
handler(event.data, worker);
};
return worker;
};
let launched = RemoteAgent::new(worker, slab);
entry.insert(launched).create_bridge(callback)
}
}
});
Box::new(bridge)
}
}
impl Dispatchable for Public {}
pub struct PublicBridge<T: Agent> {
worker: Value,
id: HandlerId,
_agent: PhantomData<T>,
}
impl<AGN: Agent> PublicBridge<AGN> {
fn worker_is_loaded(&self) -> bool {
REMOTE_AGENTS_LOADED.with(|local| local.borrow().contains(&TypeId::of::<AGN>()))
}
fn msg_to_queue(&self, msg: Vec<u8>) {
REMOTE_AGENTS_EARLY_MSGS_QUEUE.with(|local| {
match local.borrow_mut().entry(TypeId::of::<AGN>()) {
hash_map::Entry::Vacant(record) => {
record.insert({
let mut v = Vec::new();
v.push(msg);
v
});
}
hash_map::Entry::Occupied(ref mut record) => {
record.get_mut().push(msg);
}
}
});
}
}
fn send_to_remote<AGN: Agent>(worker: &Value, msg: ToWorker<AGN::Input>) {
let msg = msg.pack();
js! {
var worker = @{worker};
var bytes = @{msg};
worker.postMessage(bytes);
};
}
impl<AGN: Agent> Bridge<AGN> for PublicBridge<AGN> {
fn send(&mut self, msg: AGN::Input) {
let msg = ToWorker::ProcessInput(self.id, msg);
if self.worker_is_loaded() {
send_to_remote::<AGN>(&self.worker, msg);
} else {
let msg = msg.pack();
self.msg_to_queue(msg);
}
}
}
impl<AGN: Agent> Drop for PublicBridge<AGN> {
fn drop(&mut self) {
REMOTE_AGENTS_POOL.with(|pool| {
let terminate_worker = {
if let Some(launched) = pool.borrow_mut().get_mut::<RemoteAgent<AGN>>() {
launched.remove_bridge(self)
} else {
false
}
};
let upd = ToWorker::Disconnected(self.id);
send_to_remote::<AGN>(&self.worker, upd);
if terminate_worker {
let upd = ToWorker::Destroy;
send_to_remote::<AGN>(&self.worker, upd);
pool.borrow_mut().remove::<RemoteAgent<AGN>>();
REMOTE_AGENTS_LOADED.with(|pool| {
pool.borrow_mut().remove(&TypeId::of::<AGN>());
});
REMOTE_AGENTS_EARLY_MSGS_QUEUE.with(|pool| {
pool.borrow_mut().remove(&TypeId::of::<AGN>());
});
}
});
}
}
pub struct Global;
impl Discoverer for Global {}
pub trait Agent: Sized + 'static {
type Reach: Discoverer;
type Message;
type Input: Transferable;
type Output: Transferable;
fn create(link: AgentLink<Self>) -> Self;
fn update(&mut self, msg: Self::Message);
fn connected(&mut self, _id: HandlerId) {}
fn handle(&mut self, msg: Self::Input, id: HandlerId);
fn disconnected(&mut self, _id: HandlerId) {}
fn destroy(&mut self) {}
fn name_of_resource() -> &'static str {
"main.js"
}
}
pub struct AgentScope<AGN: Agent> {
shared_agent: Shared<AgentRunnable<AGN>>,
}
impl<AGN: Agent> Clone for AgentScope<AGN> {
fn clone(&self) -> Self {
AgentScope {
shared_agent: self.shared_agent.clone(),
}
}
}
impl<AGN: Agent> AgentScope<AGN> {
fn new() -> Self {
let shared_agent = Rc::new(RefCell::new(AgentRunnable::new()));
AgentScope { shared_agent }
}
fn send(&self, update: AgentUpdate<AGN>) {
let envelope = AgentEnvelope {
shared_agent: self.shared_agent.clone(),
update,
};
let runnable: Box<dyn Runnable> = Box::new(envelope);
scheduler().put_and_try_run(runnable);
}
}
trait Responder<AGN: Agent> {
fn response(&self, id: HandlerId, output: AGN::Output);
}
struct WorkerResponder {}
impl<AGN: Agent> Responder<AGN> for WorkerResponder {
fn response(&self, id: HandlerId, output: AGN::Output) {
let msg = FromWorker::ProcessOutput(id, output);
let data = msg.pack();
js! {
var data = @{data};
self.postMessage(data);
};
}
}
pub struct AgentLink<AGN: Agent> {
scope: AgentScope<AGN>,
responder: Box<dyn Responder<AGN>>,
}
impl<AGN: Agent> AgentLink<AGN> {
fn connect<T>(scope: &AgentScope<AGN>, responder: T) -> Self
where
T: Responder<AGN> + 'static,
{
AgentLink {
scope: scope.clone(),
responder: Box::new(responder),
}
}
pub fn response(&self, id: HandlerId, output: AGN::Output) {
self.responder.response(id, output);
}
pub fn send_back<F, IN>(&self, function: F) -> Callback<IN>
where
F: Fn(IN) -> AGN::Message + 'static,
{
let scope = self.scope.clone();
let closure = move |input| {
let output = function(input);
let msg = AgentUpdate::Message(output);
scope.clone().send(msg);
};
closure.into()
}
}
impl<AGN: Agent> fmt::Debug for AgentLink<AGN> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("AgentLink<_>")
}
}
struct AgentRunnable<AGN> {
agent: Option<AGN>,
destroyed: bool,
}
impl<AGN> AgentRunnable<AGN> {
fn new() -> Self {
AgentRunnable {
agent: None,
destroyed: false,
}
}
}
enum AgentUpdate<AGN: Agent> {
Create(AgentLink<AGN>),
Message(AGN::Message),
Connected(HandlerId),
Input(AGN::Input, HandlerId),
Disconnected(HandlerId),
Destroy,
}
struct AgentEnvelope<AGN: Agent> {
shared_agent: Shared<AgentRunnable<AGN>>,
update: AgentUpdate<AGN>,
}
impl<AGN> Runnable for AgentEnvelope<AGN>
where
AGN: Agent,
{
fn run(self: Box<Self>) {
let mut this = self.shared_agent.borrow_mut();
if this.destroyed {
return;
}
match self.update {
AgentUpdate::Create(env) => {
this.agent = Some(AGN::create(env));
}
AgentUpdate::Message(msg) => {
this.agent
.as_mut()
.expect("agent was not created to process messages")
.update(msg);
}
AgentUpdate::Connected(id) => {
this.agent
.as_mut()
.expect("agent was not created to send a connected message")
.connected(id);
}
AgentUpdate::Input(inp, id) => {
this.agent
.as_mut()
.expect("agent was not created to process inputs")
.handle(inp, id);
}
AgentUpdate::Disconnected(id) => {
this.agent
.as_mut()
.expect("agent was not created to send a disconnected message")
.disconnected(id);
}
AgentUpdate::Destroy => {
let mut agent = this
.agent
.take()
.expect("trying to destroy not existent agent");
agent.destroy();
}
}
}
}