use std::collections::VecDeque;
use std::future::Future;
use std::sync::Arc;
use futures::channel::{mpsc, oneshot};
use futures::lock::{Mutex, MutexGuard};
use futures::future;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
pub use chunk::ChunkGuard;
pub use chunk_set::{ChunkIter, ChunkSetGuard};
pub(crate) use transaction::Transaction;
pub use transaction::TransactionGuard;
use crate::component::ComponentTypeID;
use crate::snapshot::Snapshot;
use crate::universe::Universe;
use crate::world::transaction::locks_include_archetype;
use futures::future::Either;
mod chunk;
mod chunk_set;
mod transaction;
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub enum Lock {
Read(ComponentTypeID),
Write(ComponentTypeID),
Exclude(ComponentTypeID),
}
impl Lock {
pub fn is_read(&self) -> bool {
match self {
Lock::Read(_) => true,
_ => false,
}
}
pub fn is_write(&self) -> bool {
match self {
Lock::Write(_) => true,
_ => false,
}
}
pub fn is_exclude(&self) -> bool {
match self {
Lock::Exclude(_) => true,
_ => false,
}
}
pub fn type_id(&self) -> ComponentTypeID {
match self {
Lock::Read(x) => *x,
Lock::Write(x) => *x,
Lock::Exclude(x) => *x,
}
}
}
type BoxedTransaction = Box<dyn for<'a> FnOnce(TransactionGuard<'a>) + Send + 'static>;
type TransactionCommand = (Vec<Lock>, BoxedTransaction, oneshot::Sender<()>);
pub struct World {
universe: Arc<Universe>,
snapshot: Arc<Mutex<Arc<Snapshot>>>,
exit_rx: oneshot::Receiver<()>,
transaction_tx: mpsc::UnboundedSender<TransactionCommand>,
}
impl World {
pub fn new(universe: Arc<Universe>) -> World {
Self::with_snapshot(Arc::new(Snapshot::empty(universe)))
}
pub fn with_snapshot(snapshot: Arc<Snapshot>) -> World {
let universe = snapshot.universe().clone();
let snapshot = Arc::new(Mutex::new(snapshot));
let (transaction_tx, transaction_rx) = mpsc::unbounded();
let (exit_tx, exit_rx) = oneshot::channel();
let transactions = WorldTransactions::new(snapshot.clone());
tokio::spawn(async move {
let _ = exit_tx;
transactions.handle_commands(transaction_rx).await;
});
World {
universe,
snapshot,
exit_rx,
transaction_tx,
}
}
pub fn universe(&self) -> &Arc<Universe> { &self.universe }
pub async fn into_snapshot(self) -> Arc<Snapshot> {
drop(self.transaction_tx);
self.exit_rx.await.ok();
Arc::try_unwrap(self.snapshot).unwrap().into_inner()
}
pub async fn snapshot(&self) -> Arc<Snapshot> {
self.snapshot.lock().await.clone()
}
pub async fn replace_snapshot(&self, snapshot: Arc<Snapshot>) -> Arc<Snapshot> {
assert!(Arc::ptr_eq(snapshot.universe(), &self.universe),
"snapshot is not of this universe");
std::mem::replace(&mut *self.snapshot.lock().await, snapshot)
}
pub async fn take_snapshot(&self) -> Arc<Snapshot> {
self.replace_snapshot(Arc::new(Snapshot::empty(self.universe.clone()))).await
}
pub async fn set_snapshot(&self, snapshot: Arc<Snapshot>) {
self.replace_snapshot(snapshot).await;
}
pub async fn clear(&self) {
self.take_snapshot().await;
}
pub async fn exclusive_transaction<F>(&self, f: F)
where F: FnOnce(&mut Arc<Snapshot>) + Send
{
let mut snapshot = self.snapshot.lock().await;
f(&mut *snapshot);
}
pub async fn transaction<F>(&self, locks: impl Into<Vec<Lock>>, f: F)
where for<'a> F: FnOnce(TransactionGuard<'a>) + Send + 'static
{
let f = Box::new(f) as Box<dyn for<'a> FnOnce(TransactionGuard<'a>) + Send>;
let (signal_tx, signal_rx) = oneshot::channel();
self.transaction_tx.unbounded_send((locks.into(), f, signal_tx)).unwrap();
signal_rx.await.ok();
}
}
struct WorldTransactions {
lock: Option<MutexGuard<'static, Arc<Snapshot>>>,
snapshot: Arc<Mutex<Arc<Snapshot>>>,
active_transactions: Vec<Arc<Transaction>>,
pending_transactions: VecDeque<TransactionCommand>,
}
impl WorldTransactions {
pub fn new(snapshot: Arc<Mutex<Arc<Snapshot>>>) -> WorldTransactions {
WorldTransactions {
lock: None,
snapshot,
active_transactions: Vec::new(),
pending_transactions: VecDeque::new(),
}
}
fn start_transaction(&mut self, cmd: TransactionCommand) -> impl Future<Output=Arc<Transaction>> {
let (locks, f, tx) = cmd;
let snapshot = self.lock.as_mut().unwrap();
let archetypes = (0..snapshot.chunk_sets().len())
.map(|idx| snapshot.universe().archetype_by_id(idx).unwrap())
.filter(|a| transaction::locks_include_archetype(a, &locks))
.collect();
let transaction = Arc::new(
Transaction::new(snapshot.clone(), archetypes, locks));
self.active_transactions.push(transaction.clone());
let (done_tx, done_rx) = oneshot::channel::<()>();
let transaction_clone = transaction.clone();
rayon::spawn(move || {
let _ = done_tx;
let _ = tx;
let guard = TransactionGuard::new(&transaction_clone);
(f)(guard);
});
async move {
done_rx.await.ok();
transaction
}
}
fn locks_compatible(&self, existing: &[Lock], new: &[Lock]) -> bool {
let snap = self.lock.as_ref().unwrap();
let mut can_lock = true;
for lock in new {
match lock {
Lock::Read(type_id) => {
let ok = existing.iter()
.all(|l| !l.is_write() || l.type_id() != *type_id);
if !ok {
can_lock = false;
break;
}
}
Lock::Write(type_id) => {
let ok = existing.iter()
.all(|l| l.is_exclude() || l.type_id() != *type_id);
if !ok {
can_lock = false;
break;
}
}
Lock::Exclude(_) => {}
}
}
if can_lock {
return true;
}
for (idx, _) in snap.chunk_sets().iter().enumerate() {
let archetype = snap.universe().archetype_by_id(idx).unwrap();
if locks_include_archetype(&archetype, existing)
&& locks_include_archetype(&archetype, new) {
return false;
}
}
true
}
fn can_start(&self, locks: &[Lock]) -> bool {
self.active_transactions.iter()
.all(|t| self.locks_compatible(t.locks(), locks))
}
pub async fn handle_commands(
mut self,
mut commands_rx: mpsc::UnboundedReceiver<TransactionCommand>,
) {
let mut exit = false;
let mut active_futures = FuturesUnordered::new();
loop {
let task_done = if active_futures.is_empty() {
Either::Left(commands_rx.next().await)
} else {
match future::select(commands_rx.next(), active_futures.next()).await {
Either::Left((x, _)) => Either::Left(x),
Either::Right((x, _)) => Either::Right(x),
}
};
match task_done {
Either::Left(None) => {
exit = true;
if active_futures.is_empty() {
return;
}
}
Either::Left(Some((locks, f, tx))) => {
if self.can_start(&locks) {
if self.active_transactions.is_empty() && self.lock.is_none() {
let lock = unsafe { std::mem::transmute(self.snapshot.lock().await) };
self.lock = Some(lock);
}
active_futures.push(self.start_transaction((locks, f, tx)));
} else {
self.pending_transactions.push_back((locks, f, tx));
}
}
Either::Right(Some(done)) => {
self.active_transactions.retain(|t| !Arc::ptr_eq(t, &done));
let mut i = 0;
while i < self.pending_transactions.len() {
let (locks, _, _) = &self.pending_transactions[i];
if self.can_start(locks) {
let cmd = self.pending_transactions.remove(i).unwrap();
let f = self.start_transaction(cmd);
active_futures.push(f);
} else {
i += 1;
}
}
if active_futures.is_empty() {
self.lock.take();
if exit {
return;
}
}
}
_ => panic!("unexpected future wakeup in WorldTransactions")
}
}
}
}