use crate::AsyncOperation;
use async_channel::{Receiver, Sender};
use bevy_ecs::prelude::*;
use bevy_ecs::system::Command;
use bevy_reflect::prelude::*;
use bevy_reflect::TypeRegistry;
use std::any::TypeId;
use std::marker::PhantomData;
#[derive(Debug)]
#[non_exhaustive]
pub enum ResourceOperation {
	Insert(Box<dyn Reflect>),
	Remove(TypeId),
	WaitFor(TypeId, Sender<Box<dyn Reflect>>),
}
impl Command for ResourceOperation {
	fn apply(self, world: &mut World) {
		world.resource_scope(|world, registry: Mut<AppTypeRegistry>| {
			let registry = registry.read();
			match self {
				ResourceOperation::Insert(boxed) => {
					let reflect_resource = get_reflect_resource(®istry, boxed.type_id());
					reflect_resource.apply_or_insert(world, boxed.as_reflect());
				}
				ResourceOperation::Remove(type_id) => {
					let reflect_resource = get_reflect_resource(®istry, type_id);
					reflect_resource.remove(world);
				}
				ResourceOperation::WaitFor(type_id, sender) => {
					let reflect_resource = get_reflect_resource(®istry, type_id);
					if let Some(reflect) = reflect_resource.reflect(world) {
						sender
							.try_send(reflect.clone_value())
							.expect("invariant broken");
					} else {
						world.spawn(WaitingFor(type_id, sender));
					}
				}
			}
		});
	}
}
impl From<ResourceOperation> for AsyncOperation {
	fn from(resource_op: ResourceOperation) -> Self {
		Self::Resource(resource_op)
	}
}
fn get_reflect_resource(registry: &TypeRegistry, type_id: TypeId) -> &ReflectResource {
	let type_registration = registry.get(type_id).expect("reflect type not registered");
	type_registration
		.data::<ReflectResource>()
		.expect("reflect type is not a resource")
}
#[derive(Component)]
pub(crate) struct WaitingFor(TypeId, Sender<Box<dyn Reflect>>);
pub(crate) fn wait_for_reflect_resources(
	mut commands: Commands,
	query: Query<(Entity, &WaitingFor)>,
	registry: Res<AppTypeRegistry>,
	world: &World,
) {
	let registry = registry.read();
	for (id, WaitingFor(type_id, sender)) in query.iter() {
		let reflect_resource = get_reflect_resource(®istry, *type_id);
		if let Some(reflect) = reflect_resource.reflect(world) {
			sender
				.try_send(reflect.clone_value())
				.expect("invariant broken");
			commands.entity(id).despawn();
		}
	}
}
#[derive(Debug)]
pub struct AsyncResource<R>(Receiver<Box<dyn Reflect>>, PhantomData<R>);
impl<R: Resource + FromReflect> AsyncResource<R> {
	pub(crate) fn new(receiver: Receiver<Box<dyn Reflect>>) -> Self {
		Self(receiver, PhantomData)
	}
	pub async fn wait(self) -> R {
		let boxed_dynamic = self.0.recv().await.expect("invariant broken");
		R::take_from_reflect(boxed_dynamic).expect("invariant broken")
	}
}
#[cfg(test)]
mod tests {
	use crate::{AsyncEcsPlugin, AsyncWorld};
	use bevy::prelude::*;
	use futures_lite::future;
	#[derive(Default, Resource, Reflect)]
	#[reflect(Resource)]
	struct Counter(u8);
	#[test]
	fn insert() {
		let mut app = App::new();
		app.add_plugins((MinimalPlugins, AsyncEcsPlugin));
		app.register_type::<Counter>();
		let async_world = AsyncWorld::from_world(&mut app.world);
		let (sender, receiver) = async_channel::bounded(1);
		std::thread::spawn(move || {
			future::block_on(async move {
				async_world.insert_resource(Counter(4)).await;
				sender.send(()).await.unwrap();
			});
		});
		loop {
			match receiver.try_recv() {
				Ok(_) => break,
				Err(_) => app.update(),
			}
		}
		app.update();
		assert_eq!(4, app.world.resource::<Counter>().0);
	}
	#[test]
	fn remove() {
		let mut app = App::new();
		app.add_plugins((MinimalPlugins, AsyncEcsPlugin));
		app.register_type::<Counter>();
		let async_world = AsyncWorld::from_world(&mut app.world);
		let (sender, receiver) = async_channel::bounded(1);
		app.insert_resource(Counter(7));
		std::thread::spawn(move || {
			future::block_on(async move {
				async_world.remove_resource::<Counter>().await;
				sender.send(()).await.unwrap();
			});
		});
		loop {
			match receiver.try_recv() {
				Ok(_) => break,
				Err(_) => app.update(),
			}
		}
		app.update();
		assert!(app.world.get_resource::<Counter>().is_none());
	}
	#[test]
	fn wait_for() {
		let mut app = App::new();
		app.add_plugins((MinimalPlugins, AsyncEcsPlugin));
		app.register_type::<Counter>();
		let async_world_1 = AsyncWorld::from_world(&mut app.world);
		let async_world_2 = async_world_1.clone();
		let (barrier_tx, barrier_rx) = async_channel::bounded(1);
		let (value_tx, value_rx) = async_channel::bounded(1);
		std::thread::spawn(move || {
			future::block_on(async move {
				let resource = async_world_1.start_waiting_for_resource::<Counter>().await;
				barrier_tx.send(()).await.unwrap();
				let counter = resource.wait().await;
				value_tx.send(counter.0).await.unwrap();
			});
		});
		std::thread::spawn(move || {
			future::block_on(async move {
				barrier_rx.recv().await.unwrap();
				async_world_2.insert_resource(Counter(3)).await;
			});
		});
		let value = loop {
			match value_rx.try_recv() {
				Ok(value) => break value,
				Err(_) => app.update(),
			}
		};
		app.update();
		assert_eq!(3, value);
	}
	#[test]
	fn wait_for_immediate() {
		let mut app = App::new();
		app.add_plugins((MinimalPlugins, AsyncEcsPlugin));
		app.register_type::<Counter>();
		app.insert_resource(Counter(1));
		let async_world = AsyncWorld::from_world(&mut app.world);
		let (value_tx, value_rx) = async_channel::bounded(1);
		std::thread::spawn(move || {
			future::block_on(async move {
				let counter = async_world.wait_for_resource::<Counter>().await;
				value_tx.send(counter.0).await.unwrap();
			});
		});
		let value = loop {
			match value_rx.try_recv() {
				Ok(value) => break value,
				Err(_) => app.update(),
			}
		};
		app.update();
		assert_eq!(1, value);
	}
}