use crate::callback::Callback;
use crate::scheduler::{scheduler, Runnable, Shared};
use anymap::{self, AnyMap};
use bincode;
use cfg_if::cfg_if;
use cfg_match::cfg_match;
use log::warn;
use serde::{Deserialize, Serialize};
use slab::Slab;
use std::any::TypeId;
use std::cell::RefCell;
use std::collections::{hash_map, HashMap, HashSet};
use std::fmt;
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::rc::Rc;
cfg_if! {
if #[cfg(feature = "std_web")] {
use stdweb::Value;
#[allow(unused_imports)]
use stdweb::{_js_impl, js};
} else if #[cfg(feature = "web_sys")] {
use crate::utils;
use js_sys::{Array, Reflect, Uint8Array};
use wasm_bindgen::{closure::Closure, JsCast, JsValue};
use web_sys::{Blob, BlobPropertyBag, DedicatedWorkerGlobalScope, MessageEvent, Url, Worker, WorkerOptions};
}
}
#[derive(Serialize, Deserialize, Debug)]
enum ToWorker<T> {
Connected(HandlerId),
ProcessInput(HandlerId, T),
Disconnected(HandlerId),
Destroy,
}
#[derive(Serialize, Deserialize, Debug)]
enum FromWorker<T> {
WorkerLoaded,
ProcessOutput(HandlerId, T),
}
pub trait Packed {
fn pack(&self) -> Vec<u8>;
fn unpack(data: &[u8]) -> Self;
}
impl<T: Serialize + for<'de> Deserialize<'de>> Packed for T {
fn pack(&self) -> Vec<u8> {
bincode::serialize(&self).expect("can't serialize an agent message")
}
fn unpack(data: &[u8]) -> Self {
bincode::deserialize(&data).expect("can't deserialize an agent message")
}
}
type SharedOutputSlab<AGN> = Shared<Slab<Option<Callback<<AGN as Agent>::Output>>>>;
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Hash, Clone, Copy)]
pub struct HandlerId(usize, bool);
impl HandlerId {
fn new(id: usize, respondable: bool) -> Self {
HandlerId(id, respondable)
}
fn raw_id(self) -> usize {
self.0
}
pub fn is_respondable(self) -> bool {
self.1
}
}
pub trait Bridged: Agent + Sized + 'static {
fn bridge(callback: Callback<Self::Output>) -> Box<dyn Bridge<Self>>;
}
pub trait Dispatched: Agent + Sized + 'static {
fn dispatcher() -> Dispatcher<Self>;
}
pub struct Dispatcher<T>(Box<dyn Bridge<T>>);
impl<T> fmt::Debug for Dispatcher<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Dispatcher<_>")
}
}
impl<T> Deref for Dispatcher<T> {
type Target = dyn Bridge<T>;
fn deref(&self) -> &Self::Target {
self.0.deref()
}
}
impl<T> DerefMut for Dispatcher<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.0.deref_mut()
}
}
pub trait Dispatchable: Discoverer {}
pub trait Threaded {
fn register();
}
impl<T> Threaded for T
where
T: Agent<Reach = Public>,
{
fn register() {
let scope = AgentScope::<T>::new();
let responder = WorkerResponder {};
let link = AgentLink::connect(&scope, responder);
let upd = AgentLifecycleEvent::Create(link);
scope.send(upd);
let handler = move |data: Vec<u8>| {
let msg = ToWorker::<T::Input>::unpack(&data);
match msg {
ToWorker::Connected(id) => {
let upd = AgentLifecycleEvent::Connected(id);
scope.send(upd);
}
ToWorker::ProcessInput(id, value) => {
let upd = AgentLifecycleEvent::Input(value, id);
scope.send(upd);
}
ToWorker::Disconnected(id) => {
let upd = AgentLifecycleEvent::Disconnected(id);
scope.send(upd);
}
ToWorker::Destroy => {
let upd = AgentLifecycleEvent::Destroy;
scope.send(upd);
cfg_match! {
feature = "std_web" => js! { self.close(); },
feature = "web_sys" => worker_self().close(),
};
}
}
};
let loaded: FromWorker<T::Output> = FromWorker::WorkerLoaded;
let loaded = loaded.pack();
cfg_match! {
feature = "std_web" => js! {
var handler = @{handler};
self.onmessage = function(event) {
handler(event.data);
};
self.postMessage(@{loaded});
},
feature = "web_sys" => ({
let worker = worker_self();
worker.set_onmessage_closure(handler);
worker.post_message_vec(loaded);
}),
};
}
}
impl<T> Bridged for T
where
T: Agent,
{
fn bridge(callback: Callback<Self::Output>) -> Box<dyn Bridge<Self>> {
Self::Reach::spawn_or_join(Some(callback))
}
}
impl<T> Dispatched for T
where
T: Agent,
<T as Agent>::Reach: Dispatchable,
{
fn dispatcher() -> Dispatcher<T> {
Dispatcher(Self::Reach::spawn_or_join::<T>(None))
}
}
#[doc(hidden)]
pub trait Discoverer {
fn spawn_or_join<AGN: Agent>(_callback: Option<Callback<AGN::Output>>) -> Box<dyn Bridge<AGN>> {
unimplemented!(
"The Reach type that you tried to use with this Agent does not have
Discoverer properly implemented for it yet. Please see
https://docs.rs/yew/latest/yew/agent/ for other Reach options."
);
}
}
pub trait Bridge<AGN: Agent> {
fn send(&mut self, msg: AGN::Input);
}
struct LocalAgent<AGN: Agent> {
scope: AgentScope<AGN>,
slab: SharedOutputSlab<AGN>,
}
type Last = bool;
impl<AGN: Agent> LocalAgent<AGN> {
pub fn new(scope: &AgentScope<AGN>) -> Self {
let slab = Rc::new(RefCell::new(Slab::new()));
LocalAgent {
scope: scope.clone(),
slab,
}
}
fn slab(&self) -> SharedOutputSlab<AGN> {
self.slab.clone()
}
fn create_bridge(&mut self, callback: Option<Callback<AGN::Output>>) -> ContextBridge<AGN> {
let respondable = callback.is_some();
let mut slab = self.slab.borrow_mut();
let id: usize = slab.insert(callback);
let id = HandlerId::new(id, respondable);
ContextBridge {
scope: self.scope.clone(),
id,
}
}
fn remove_bridge(&mut self, bridge: &ContextBridge<AGN>) -> Last {
let mut slab = self.slab.borrow_mut();
let _ = slab.remove(bridge.id.raw_id());
slab.is_empty()
}
}
thread_local! {
static LOCAL_AGENTS_POOL: RefCell<AnyMap> = RefCell::new(AnyMap::new());
}
#[allow(missing_debug_implementations)]
pub struct Context;
impl Discoverer for Context {
fn spawn_or_join<AGN: Agent>(callback: Option<Callback<AGN::Output>>) -> Box<dyn Bridge<AGN>> {
let mut scope_to_init = None;
let bridge = LOCAL_AGENTS_POOL.with(|pool| {
let mut pool = pool.borrow_mut();
match pool.entry::<LocalAgent<AGN>>() {
anymap::Entry::Occupied(mut entry) => entry.get_mut().create_bridge(callback),
anymap::Entry::Vacant(entry) => {
let scope = AgentScope::<AGN>::new();
let launched = LocalAgent::new(&scope);
let responder = SlabResponder {
slab: launched.slab(),
};
scope_to_init = Some((scope, responder));
entry.insert(launched).create_bridge(callback)
}
}
});
if let Some((scope, responder)) = scope_to_init {
let agent_link = AgentLink::connect(&scope, responder);
let upd = AgentLifecycleEvent::Create(agent_link);
scope.send(upd);
}
let upd = AgentLifecycleEvent::Connected(bridge.id);
bridge.scope.send(upd);
Box::new(bridge)
}
}
impl Dispatchable for Context {}
struct SlabResponder<AGN: Agent> {
slab: Shared<Slab<Option<Callback<AGN::Output>>>>,
}
impl<AGN: Agent> Responder<AGN> for SlabResponder<AGN> {
fn respond(&self, id: HandlerId, output: AGN::Output) {
locate_callback_and_respond::<AGN>(&self.slab, id, output);
}
}
fn locate_callback_and_respond<AGN: Agent>(
slab: &SharedOutputSlab<AGN>,
id: HandlerId,
output: AGN::Output,
) {
let callback = {
let slab = slab.borrow();
match slab.get(id.raw_id()).cloned() {
Some(callback) => callback,
None => {
warn!("Id of handler does not exist in the slab: {}.", id.raw_id());
return;
}
}
};
match callback {
Some(callback) => callback.emit(output),
None => warn!("The Id of the handler: {}, while present in the slab, is not associated with a callback.", id.raw_id()),
}
}
struct ContextBridge<AGN: Agent> {
scope: AgentScope<AGN>,
id: HandlerId,
}
impl<AGN: Agent> Bridge<AGN> for ContextBridge<AGN> {
fn send(&mut self, msg: AGN::Input) {
let upd = AgentLifecycleEvent::Input(msg, self.id);
self.scope.send(upd);
}
}
impl<AGN: Agent> Drop for ContextBridge<AGN> {
fn drop(&mut self) {
let terminate_worker = LOCAL_AGENTS_POOL.with(|pool| {
let mut pool = pool.borrow_mut();
let terminate_worker = {
if let Some(launched) = pool.get_mut::<LocalAgent<AGN>>() {
launched.remove_bridge(self)
} else {
false
}
};
if terminate_worker {
pool.remove::<LocalAgent<AGN>>();
}
terminate_worker
});
let upd = AgentLifecycleEvent::Disconnected(self.id);
self.scope.send(upd);
if terminate_worker {
let upd = AgentLifecycleEvent::Destroy;
self.scope.send(upd);
}
}
}
#[allow(missing_debug_implementations)]
pub struct Job;
impl Discoverer for Job {
fn spawn_or_join<AGN: Agent>(callback: Option<Callback<AGN::Output>>) -> Box<dyn Bridge<AGN>> {
let callback = callback.expect("Callback required for Job");
let scope = AgentScope::<AGN>::new();
let responder = CallbackResponder { callback };
let agent_link = AgentLink::connect(&scope, responder);
let upd = AgentLifecycleEvent::Create(agent_link);
scope.send(upd);
let upd = AgentLifecycleEvent::Connected(SINGLETON_ID);
scope.send(upd);
let bridge = JobBridge { scope };
Box::new(bridge)
}
}
const SINGLETON_ID: HandlerId = HandlerId(0, true);
struct CallbackResponder<AGN: Agent> {
callback: Callback<AGN::Output>,
}
impl<AGN: Agent> Responder<AGN> for CallbackResponder<AGN> {
fn respond(&self, id: HandlerId, output: AGN::Output) {
assert_eq!(id.raw_id(), SINGLETON_ID.raw_id());
self.callback.emit(output);
}
}
struct JobBridge<AGN: Agent> {
scope: AgentScope<AGN>,
}
impl<AGN: Agent> Bridge<AGN> for JobBridge<AGN> {
fn send(&mut self, msg: AGN::Input) {
let upd = AgentLifecycleEvent::Input(msg, SINGLETON_ID);
self.scope.send(upd);
}
}
impl<AGN: Agent> Drop for JobBridge<AGN> {
fn drop(&mut self) {
let upd = AgentLifecycleEvent::Disconnected(SINGLETON_ID);
self.scope.send(upd);
let upd = AgentLifecycleEvent::Destroy;
self.scope.send(upd);
}
}
#[allow(missing_debug_implementations)]
pub struct Private;
impl Discoverer for Private {
fn spawn_or_join<AGN: Agent>(callback: Option<Callback<AGN::Output>>) -> Box<dyn Bridge<AGN>> {
let callback = callback.expect("Callback required for Private agents");
let handler = move |data: Vec<u8>| {
let msg = FromWorker::<AGN::Output>::unpack(&data);
match msg {
FromWorker::WorkerLoaded => {
}
FromWorker::ProcessOutput(id, output) => {
assert_eq!(id.raw_id(), SINGLETON_ID.raw_id());
callback.emit(output);
}
}
};
let name_of_resource = AGN::name_of_resource();
let worker = cfg_match! {
feature = "std_web" => js! {
var worker = new Worker(@{name_of_resource});
var handler = @{handler};
worker.onmessage = function(event) {
handler(event.data);
};
return worker;
},
feature = "web_sys" => ({
let worker = worker_new(name_of_resource, AGN::is_module());
worker.set_onmessage_closure(handler);
worker
}),
};
let bridge = PrivateBridge {
worker,
_agent: PhantomData,
};
Box::new(bridge)
}
}
pub struct PrivateBridge<T: Agent> {
#[cfg(feature = "std_web")]
worker: Value,
#[cfg(feature = "web_sys")]
worker: Worker,
_agent: PhantomData<T>,
}
impl<AGN: Agent> fmt::Debug for PrivateBridge<AGN> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("PrivateBridge<_>")
}
}
impl<AGN: Agent> Bridge<AGN> for PrivateBridge<AGN> {
fn send(&mut self, msg: AGN::Input) {
let msg = ToWorker::ProcessInput(SINGLETON_ID, msg).pack();
cfg_match! {
feature = "std_web" => ({
let worker = &self.worker;
js! {
var worker = @{worker};
var bytes = @{msg};
worker.postMessage(bytes);
};
}),
feature = "web_sys" => self.worker.post_message_vec(msg),
}
}
}
impl<AGN: Agent> Drop for PrivateBridge<AGN> {
fn drop(&mut self) {
}
}
struct RemoteAgent<AGN: Agent> {
#[cfg(feature = "std_web")]
worker: Value,
#[cfg(feature = "web_sys")]
worker: Worker,
slab: SharedOutputSlab<AGN>,
}
impl<AGN: Agent> RemoteAgent<AGN> {
pub fn new(
#[cfg(feature = "std_web")] worker: Value,
#[cfg(feature = "web_sys")] worker: Worker,
slab: SharedOutputSlab<AGN>,
) -> Self {
RemoteAgent { worker, slab }
}
fn create_bridge(&mut self, callback: Option<Callback<AGN::Output>>) -> PublicBridge<AGN> {
let respondable = callback.is_some();
let mut slab = self.slab.borrow_mut();
let id: usize = slab.insert(callback);
let id = HandlerId::new(id, respondable);
let bridge = PublicBridge {
worker: self.worker.clone(),
id,
_agent: PhantomData,
};
bridge.send_message(ToWorker::Connected(bridge.id));
bridge
}
fn remove_bridge(&mut self, bridge: &PublicBridge<AGN>) -> Last {
let mut slab = self.slab.borrow_mut();
let _ = slab.remove(bridge.id.raw_id());
slab.is_empty()
}
}
thread_local! {
static REMOTE_AGENTS_POOL: RefCell<AnyMap> = RefCell::new(AnyMap::new());
static REMOTE_AGENTS_LOADED: RefCell<HashSet<TypeId>> = RefCell::new(HashSet::new());
static REMOTE_AGENTS_EARLY_MSGS_QUEUE: RefCell<HashMap<TypeId, Vec<Vec<u8>>>> = RefCell::new(HashMap::new());
}
#[allow(missing_debug_implementations)]
pub struct Public;
impl Discoverer for Public {
fn spawn_or_join<AGN: Agent>(callback: Option<Callback<AGN::Output>>) -> Box<dyn Bridge<AGN>> {
let bridge = REMOTE_AGENTS_POOL.with(|pool| {
let mut pool = pool.borrow_mut();
match pool.entry::<RemoteAgent<AGN>>() {
anymap::Entry::Occupied(mut entry) => entry.get_mut().create_bridge(callback),
anymap::Entry::Vacant(entry) => {
let slab: Shared<Slab<Option<Callback<AGN::Output>>>> =
Rc::new(RefCell::new(Slab::new()));
let handler = {
let slab = slab.clone();
move |data: Vec<u8>,
#[cfg(feature = "std_web")] worker: Value,
#[cfg(feature = "web_sys")] worker: &Worker| {
let msg = FromWorker::<AGN::Output>::unpack(&data);
match msg {
FromWorker::WorkerLoaded => {
REMOTE_AGENTS_LOADED.with(|loaded| {
let _ = loaded.borrow_mut().insert(TypeId::of::<AGN>());
});
REMOTE_AGENTS_EARLY_MSGS_QUEUE.with(|queue| {
let mut queue = queue.borrow_mut();
if let Some(msgs) = queue.get_mut(&TypeId::of::<AGN>()) {
for msg in msgs.drain(..) {
cfg_match! {
feature = "std_web" => ({
let worker = &worker;
js! {@{worker}.postMessage(@{msg});};
}),
feature = "web_sys" => worker.post_message_vec(msg),
}
}
}
});
}
FromWorker::ProcessOutput(id, output) => {
locate_callback_and_respond::<AGN>(&slab, id, output);
}
}
}
};
let name_of_resource = AGN::name_of_resource();
let worker = cfg_match! {
feature = "std_web" => js! {
var worker = new Worker(@{name_of_resource});
var handler = @{handler};
worker.onmessage = function(event) {
handler(event.data, worker);
};
return worker;
},
feature = "web_sys" => ({
let worker = worker_new(name_of_resource, AGN::is_module());
let worker_clone = worker.clone();
worker.set_onmessage_closure(move |data: Vec<u8>| {
handler(data, &worker_clone);
});
worker
}),
};
let launched = RemoteAgent::new(worker, slab);
entry.insert(launched).create_bridge(callback)
}
}
});
Box::new(bridge)
}
}
impl Dispatchable for Public {}
pub struct PublicBridge<AGN: Agent> {
#[cfg(feature = "std_web")]
worker: Value,
#[cfg(feature = "web_sys")]
worker: Worker,
id: HandlerId,
_agent: PhantomData<AGN>,
}
impl<AGN: Agent> fmt::Debug for PublicBridge<AGN> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("PublicBridge<_>")
}
}
impl<AGN: Agent> PublicBridge<AGN> {
fn worker_is_loaded(&self) -> bool {
REMOTE_AGENTS_LOADED.with(|loaded| loaded.borrow().contains(&TypeId::of::<AGN>()))
}
fn msg_to_queue(&self, msg: Vec<u8>) {
REMOTE_AGENTS_EARLY_MSGS_QUEUE.with(|queue| {
let mut queue = queue.borrow_mut();
match queue.entry(TypeId::of::<AGN>()) {
hash_map::Entry::Vacant(record) => {
record.insert(vec![msg]);
}
hash_map::Entry::Occupied(ref mut record) => {
record.get_mut().push(msg);
}
}
});
}
fn send_message(&self, msg: ToWorker<AGN::Input>) {
if self.worker_is_loaded() {
send_to_remote::<AGN>(&self.worker, msg);
} else {
self.msg_to_queue(msg.pack());
}
}
}
fn send_to_remote<AGN: Agent>(
#[cfg(feature = "std_web")] worker: &Value,
#[cfg(feature = "web_sys")] worker: &Worker,
msg: ToWorker<AGN::Input>,
) {
let msg = msg.pack();
cfg_match! {
feature = "std_web" => js! {
var worker = @{worker};
var bytes = @{msg};
worker.postMessage(bytes);
},
feature = "web_sys" => worker.post_message_vec(msg),
};
}
impl<AGN: Agent> Bridge<AGN> for PublicBridge<AGN> {
fn send(&mut self, msg: AGN::Input) {
let msg = ToWorker::ProcessInput(self.id, msg);
self.send_message(msg);
}
}
impl<AGN: Agent> Drop for PublicBridge<AGN> {
fn drop(&mut self) {
let terminate_worker = REMOTE_AGENTS_POOL.with(|pool| {
let mut pool = pool.borrow_mut();
let terminate_worker = {
if let Some(launched) = pool.get_mut::<RemoteAgent<AGN>>() {
launched.remove_bridge(self)
} else {
false
}
};
if terminate_worker {
pool.remove::<RemoteAgent<AGN>>();
}
terminate_worker
});
let disconnected = ToWorker::Disconnected(self.id);
self.send_message(disconnected);
if terminate_worker {
let destroy = ToWorker::Destroy;
self.send_message(destroy);
REMOTE_AGENTS_LOADED.with(|loaded| {
loaded.borrow_mut().remove(&TypeId::of::<AGN>());
});
REMOTE_AGENTS_EARLY_MSGS_QUEUE.with(|queue| {
queue.borrow_mut().remove(&TypeId::of::<AGN>());
});
}
}
}
#[allow(missing_debug_implementations)]
pub struct Global;
impl Discoverer for Global {}
pub trait Agent: Sized + 'static {
type Reach: Discoverer;
type Message;
type Input: Serialize + for<'de> Deserialize<'de>;
type Output: Serialize + for<'de> Deserialize<'de>;
fn create(link: AgentLink<Self>) -> Self;
fn update(&mut self, msg: Self::Message);
fn connected(&mut self, _id: HandlerId) {}
fn handle_input(&mut self, msg: Self::Input, id: HandlerId);
fn disconnected(&mut self, _id: HandlerId) {}
fn destroy(&mut self) {}
fn name_of_resource() -> &'static str {
"main.js"
}
fn is_module() -> bool {
false
}
}
pub struct AgentScope<AGN: Agent> {
shared_agent: Shared<AgentRunnable<AGN>>,
}
impl<AGN: Agent> fmt::Debug for AgentScope<AGN> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("AgentScope<_>")
}
}
impl<AGN: Agent> Clone for AgentScope<AGN> {
fn clone(&self) -> Self {
AgentScope {
shared_agent: self.shared_agent.clone(),
}
}
}
impl<AGN: Agent> AgentScope<AGN> {
pub fn new() -> Self {
let shared_agent = Rc::new(RefCell::new(AgentRunnable::new()));
AgentScope { shared_agent }
}
pub fn send(&self, update: AgentLifecycleEvent<AGN>) {
let envelope = AgentEnvelope {
shared_agent: self.shared_agent.clone(),
update,
};
let runnable: Box<dyn Runnable> = Box::new(envelope);
scheduler().push(runnable);
}
}
impl<AGN: Agent> Default for AgentScope<AGN> {
fn default() -> Self {
Self::new()
}
}
pub trait Responder<AGN: Agent> {
fn respond(&self, id: HandlerId, output: AGN::Output);
}
struct WorkerResponder {}
impl<AGN: Agent> Responder<AGN> for WorkerResponder {
fn respond(&self, id: HandlerId, output: AGN::Output) {
let msg = FromWorker::ProcessOutput(id, output);
let data = msg.pack();
cfg_match! {
feature = "std_web" => js! {
var data = @{data};
self.postMessage(data);
},
feature = "web_sys" => worker_self().post_message_vec(data),
};
}
}
pub struct AgentLink<AGN: Agent> {
scope: AgentScope<AGN>,
responder: Rc<dyn Responder<AGN>>,
}
impl<AGN: Agent> AgentLink<AGN> {
pub fn connect<T>(scope: &AgentScope<AGN>, responder: T) -> Self
where
T: Responder<AGN> + 'static,
{
AgentLink {
scope: scope.clone(),
responder: Rc::new(responder),
}
}
pub fn respond(&self, id: HandlerId, output: AGN::Output) {
self.responder.respond(id, output);
}
pub fn callback<F, IN>(&self, function: F) -> Callback<IN>
where
F: Fn(IN) -> AGN::Message + 'static,
{
let scope = self.scope.clone();
let closure = move |input| {
let output = function(input);
scope.send(AgentLifecycleEvent::Message(output));
};
closure.into()
}
}
impl<AGN: Agent> fmt::Debug for AgentLink<AGN> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("AgentLink<_>")
}
}
impl<AGN: Agent> Clone for AgentLink<AGN> {
fn clone(&self) -> Self {
AgentLink {
scope: self.scope.clone(),
responder: self.responder.clone(),
}
}
}
struct AgentRunnable<AGN> {
agent: Option<AGN>,
destroyed: bool,
}
impl<AGN> AgentRunnable<AGN> {
fn new() -> Self {
AgentRunnable {
agent: None,
destroyed: false,
}
}
}
#[derive(Debug)]
pub enum AgentLifecycleEvent<AGN: Agent> {
Create(AgentLink<AGN>),
Message(AGN::Message),
Connected(HandlerId),
Input(AGN::Input, HandlerId),
Disconnected(HandlerId),
Destroy,
}
struct AgentEnvelope<AGN: Agent> {
shared_agent: Shared<AgentRunnable<AGN>>,
update: AgentLifecycleEvent<AGN>,
}
impl<AGN> Runnable for AgentEnvelope<AGN>
where
AGN: Agent,
{
fn run(self: Box<Self>) {
let mut this = self.shared_agent.borrow_mut();
if this.destroyed {
return;
}
match self.update {
AgentLifecycleEvent::Create(link) => {
this.agent = Some(AGN::create(link));
}
AgentLifecycleEvent::Message(msg) => {
this.agent
.as_mut()
.expect("agent was not created to process messages")
.update(msg);
}
AgentLifecycleEvent::Connected(id) => {
this.agent
.as_mut()
.expect("agent was not created to send a connected message")
.connected(id);
}
AgentLifecycleEvent::Input(inp, id) => {
this.agent
.as_mut()
.expect("agent was not created to process inputs")
.handle_input(inp, id);
}
AgentLifecycleEvent::Disconnected(id) => {
this.agent
.as_mut()
.expect("agent was not created to send a disconnected message")
.disconnected(id);
}
AgentLifecycleEvent::Destroy => {
let mut agent = this
.agent
.take()
.expect("trying to destroy not existent agent");
agent.destroy();
}
}
}
}
#[cfg(feature = "web_sys")]
fn worker_new(name_of_resource: &str, is_module: bool) -> Worker {
let href = utils::document().location().unwrap().href().unwrap();
let script_url = format!("{}{}", href, name_of_resource);
let wasm_url = format!("{}{}", href, name_of_resource.replace(".js", "_bg.wasm"));
let array = Array::new();
array.push(
&format!(
r#"importScripts("{}");wasm_bindgen("{}");"#,
script_url, wasm_url
)
.into(),
);
let blob = Blob::new_with_str_sequence_and_options(
&array,
BlobPropertyBag::new().type_("application/javascript"),
)
.unwrap();
let url = Url::create_object_url_with_blob(&blob).unwrap();
if is_module {
let options = WorkerOptions::new();
Reflect::set(
options.as_ref(),
&JsValue::from_str("type"),
&JsValue::from_str("module"),
)
.unwrap();
Worker::new_with_options(&url, &options).expect("failed to spawn worker")
} else {
Worker::new(&url).expect("failed to spawn worker")
}
}
#[cfg(feature = "web_sys")]
fn worker_self() -> DedicatedWorkerGlobalScope {
JsValue::from(js_sys::global()).into()
}
#[cfg(feature = "web_sys")]
trait WorkerExt {
fn set_onmessage_closure(&self, handler: impl 'static + Fn(Vec<u8>));
fn post_message_vec(&self, data: Vec<u8>);
}
#[cfg(feature = "web_sys")]
macro_rules! worker_ext_impl {
($($type:ident),+) => {$(
impl WorkerExt for $type {
fn set_onmessage_closure(&self, handler: impl 'static + Fn(Vec<u8>)) {
let handler = move |message: MessageEvent| {
let data = Uint8Array::from(message.data()).to_vec();
handler(data);
};
let closure = Closure::wrap(Box::new(handler) as Box<dyn Fn(MessageEvent)>);
self.set_onmessage(Some(closure.as_ref().unchecked_ref()));
closure.forget();
}
fn post_message_vec(&self, data: Vec<u8>) {
self.post_message(&Uint8Array::from(data.as_slice()))
.expect("failed to post message");
}
}
)+};
}
#[cfg(feature = "web_sys")]
worker_ext_impl! {
Worker, DedicatedWorkerGlobalScope
}