use bevy_ecs::{
prelude::{Entity, Resource, World},
system::Commands,
world::CommandQueue,
};
use tokio::sync::{
mpsc::{UnboundedReceiver as TokioReceiver, UnboundedSender as TokioSender, unbounded_channel},
oneshot,
};
use std::sync::Arc;
use crate::{
OperationError, OperationRoster, Outcome, Promise, Provider, Reply, RequestExt, StreamPack,
};
#[derive(Clone)]
pub struct Channel {
inner: Arc<InnerChannel>,
}
impl Channel {
pub fn request_outcome<P>(&self, request: P::Request, provider: P) -> Outcome<P::Response>
where
P: Provider,
P::Request: 'static + Send + Sync,
P::Response: 'static + Send + Sync,
P::Streams: 'static + StreamPack,
P: 'static + Send + Sync,
{
let (outcome, capture) = Outcome::new();
let _ = self
.commands(move |commands| commands.request(request, provider).send_outcome(capture));
outcome
}
#[deprecated(since = "0.0.6", note = "Use .request_outcome() instead")]
pub fn query<P>(&self, request: P::Request, provider: P) -> Promise<P::Response>
where
P: Provider,
P::Request: 'static + Send + Sync,
P::Response: 'static + Send + Sync,
P::Streams: 'static + StreamPack,
P: 'static + Send + Sync,
{
#[allow(deprecated)]
self.command(move |commands| commands.request(request, provider).take().response)
.flatten()
}
pub fn commands<F, U>(&self, f: F) -> Reply<U>
where
F: FnOnce(&mut Commands) -> U + 'static + Send,
U: 'static + Send,
{
let (sender, receiver) = oneshot::channel();
self.inner
.sender
.send(Box::new(
move |world: &mut World, _: &mut OperationRoster| {
let mut command_queue = CommandQueue::default();
let mut commands = Commands::new(&mut command_queue, world);
let u = f(&mut commands);
command_queue.apply(world);
let _ = sender.send(u);
},
))
.ok();
Reply::new(receiver)
}
#[deprecated(since = "0.0.6", note = "Use .commands() instead")]
pub fn command<F, U>(&self, f: F) -> Promise<U>
where
F: FnOnce(&mut Commands) -> U + 'static + Send,
U: 'static + Send,
{
let (sender, promise) = Promise::new();
self.inner
.sender
.send(Box::new(
move |world: &mut World, _: &mut OperationRoster| {
let mut command_queue = CommandQueue::default();
let mut commands = Commands::new(&mut command_queue, world);
let u = f(&mut commands);
command_queue.apply(world);
let _ = sender.send(u);
},
))
.ok();
promise
}
pub fn world<F, U>(&self, f: F) -> Reply<U>
where
F: FnOnce(&mut World) -> U + 'static + Send,
U: 'static + Send,
{
let (sender, receiver) = oneshot::channel();
self.inner
.sender
.send(Box::new(
move |world: &mut World, _: &mut OperationRoster| {
let u = f(world);
let _ = sender.send(u);
},
))
.ok();
Reply::new(receiver)
}
pub(crate) fn for_streams<Streams: StreamPack>(
&self,
world: &World,
) -> Result<Streams::StreamChannels, OperationError> {
Ok(Streams::make_stream_channels(&self.inner, world))
}
pub(crate) fn new(source: Entity, session: Entity, sender: TokioSender<ChannelItem>) -> Self {
Self {
inner: Arc::new(InnerChannel {
source,
session,
sender,
}),
}
}
}
#[derive(Clone)]
pub struct InnerChannel {
pub(crate) source: Entity,
pub(crate) session: Entity,
pub(crate) sender: TokioSender<ChannelItem>,
}
impl InnerChannel {
pub fn source(&self) -> Entity {
self.source
}
pub fn sender(&self) -> &TokioSender<ChannelItem> {
&self.sender
}
}
pub(crate) type ChannelItem = Box<dyn FnOnce(&mut World, &mut OperationRoster) + Send>;
pub(crate) type ChannelSender = TokioSender<ChannelItem>;
pub(crate) type ChannelReceiver = TokioReceiver<ChannelItem>;
#[derive(Resource)]
pub(crate) struct ChannelQueue {
pub(crate) sender: ChannelSender,
pub(crate) receiver: ChannelReceiver,
}
impl ChannelQueue {
pub(crate) fn new() -> Self {
let (sender, receiver) = unbounded_channel();
Self { sender, receiver }
}
}
impl Default for ChannelQueue {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use crate::{prelude::*, testing::*};
use bevy_ecs::system::EntityCommands;
use std::time::Duration;
#[test]
fn test_channel_request() {
let mut context = TestingContext::minimal_plugins();
let (hello, repeat) = context.command(|commands| {
let hello =
commands.spawn_service(say_hello.with(|entity_cmds: &mut EntityCommands| {
entity_cmds.insert((
Salutation("Guten tag, ".into()),
Name("tester".into()),
RunCount(0),
));
}));
let repeat =
commands.spawn_service(repeat_service.with(|entity_cmds: &mut EntityCommands| {
entity_cmds.insert(RunCount(0));
}));
(hello, repeat)
});
for _ in 0..5 {
context
.try_resolve_request(
RepeatRequest {
service: hello,
count: 5,
},
repeat,
Duration::from_secs(5),
)
.unwrap();
}
let count = context
.app
.world()
.get::<RunCount>(hello.provider())
.unwrap()
.0;
assert_eq!(count, 25);
let count = context
.app
.world()
.get::<RunCount>(repeat.provider())
.unwrap()
.0;
assert_eq!(count, 5);
}
}