use async_compat::Compat;
use bevy_app::{App, Plugin};
use bevy_ecs::prelude::*;
use bevy_tasks::{IoTaskPool, Task};
use futures_lite::{future, StreamExt};
use kwai_interactive_live::EventStream;
pub use kwai_interactive_live::{ConnectParams, ConnectResp, DisconnectParams, Event};
#[allow(clippy::restriction)]
pub struct KwaiPlugin;
impl Plugin for KwaiPlugin {
#[inline]
fn build(&self, app: &mut App) {
app.add_event::<ConnectErrEvent>()
.init_resource::<EventReceiver>()
.add_system(frame_loop);
}
}
pub struct ConnectErrEvent(pub anyhow::Error);
#[derive(Default)]
pub struct EventReceiver(Option<crossbeam_channel::Receiver<kwai_interactive_live::Event>>);
impl EventReceiver {
#[must_use]
#[inline]
pub fn recv(&self) -> Option<Event> {
self.0.as_ref().and_then(|rx| rx.try_recv().ok())
}
}
#[derive(Component)]
struct KwaiConnectResp(Task<anyhow::Result<(ConnectResp, EventStream)>>);
#[inline]
pub fn connect(commands: &mut Commands, p: ConnectParams) {
let task = IoTaskPool::get().spawn(Compat::new(async {
kwai_interactive_live::connect(p).await
}));
commands.spawn().insert(KwaiConnectResp(task));
}
#[inline]
pub fn disconnect(p: DisconnectParams) {
IoTaskPool::get()
.spawn(Compat::new(async move {
kwai_interactive_live::disconnect(&p).await
}))
.detach();
}
fn frame_loop(
mut commands: Commands,
mut tasks: Query<(Entity, &mut KwaiConnectResp)>,
mut connect_resp_event: EventWriter<ConnectErrEvent>,
) {
for (e, mut task) in &mut tasks {
if let Some(resp) = future::block_on(future::poll_once(&mut task.0)) {
match resp {
Ok((resp, stream)) => {
let (tx, rx) = crossbeam_channel::unbounded();
IoTaskPool::get()
.spawn(Compat::new(stream.into_stream().for_each(move |s| {
if let Err(e) = tx.send(s) {
log::error!("send event error: {e}");
}
})))
.detach();
commands.insert_resource(EventReceiver(Some(rx)));
commands.insert_resource(resp);
}
Err(e) => connect_resp_event.send(ConnectErrEvent(e)),
}
commands.entity(e).despawn();
}
}
}