codemp 0.8.5

codemp -- code multiplexer
Documentation
use mlua::prelude::*;

pub(crate) fn tokio() -> &'static tokio::runtime::Runtime {
	use std::sync::OnceLock;
	static RT: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
	RT.get_or_init(|| {
		tokio::runtime::Builder::new_current_thread()
			.enable_all()
			.build()
			.expect("could not create tokio runtime")
	})
}

macro_rules! a_sync {
	($($clone:ident)* => $x:expr) => {
		{
			$(let $clone = $clone.clone();)*
			Ok(
				crate::ffi::lua::ext::a_sync::Promise(
					Some(
						crate::ffi::lua::ext::a_sync::tokio()
							.spawn(async move {
								let res = $x;
								Ok(crate::ffi::lua::ext::callback::CallbackArg::from(res))
							})
					)
				)
			)
		}
	};
}

pub(crate) use a_sync;

pub(crate) struct Promise(
	pub(crate) Option<tokio::task::JoinHandle<LuaResult<super::callback::CallbackArg>>>,
);

impl LuaUserData for Promise {
	fn add_fields<F: LuaUserDataFields<Self>>(fields: &mut F) {
		fields.add_field_method_get("ready", |_, this| {
			Ok(this.0.as_ref().map_or(true, |x| x.is_finished()))
		});
	}

	fn add_methods<M: LuaUserDataMethods<Self>>(methods: &mut M) {
		// TODO: await MUST NOT be used in callbacks!!
		methods.add_method_mut("await", |_, this, ()| match this.0.take() {
			None => Err(LuaError::runtime("Promise already awaited")),
			Some(x) => Ok(
				tokio()
					.block_on(x)
					.map_err(LuaError::runtime)?
					.map_err(LuaError::runtime)?
			),
		});
		methods.add_method_mut("cancel", |_, this, ()| match this.0.take() {
			None => Err(LuaError::runtime("Promise already awaited")),
			Some(x) => Ok(x.abort()),
		});
		methods.add_method_mut("and_then", |lua, this, (cb,): (LuaFunction,)| {
			let key = uuid::Uuid::new_v4().to_string();
			lua.set_named_registry_value(&key, cb)?;
			match this.0.take() {
				None => Err(LuaError::runtime("Promise already awaited")),
				Some(x) => {
					tokio().spawn(async move {
						match x.await {
							Err(e) => {
								tracing::error!("could not join promise to run callback: {e}")
							}
							Ok(res) => match res {
								Err(e) => super::callback().failure(e),
								Ok(val) => super::callback().invoke(key, val, true),
							},
						};
					});
					Ok(())
				}
			}
		});
	}
}

pub(crate) fn setup_driver(_: &Lua, (block,): (Option<bool>,)) -> LuaResult<Option<Driver>> {
	let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
	let future = async move {
		tracing::info!(" :: driving runtime...");
		tokio::select! {
			() = std::future::pending::<()>() => {},
			_ = rx.recv() => {},
		}
	};
	if block.unwrap_or(false) {
		super::tokio().block_on(future);
		Ok(None)
	} else {
		let handle = std::thread::spawn(move || super::tokio().block_on(future));
		Ok(Some(Driver(tx, Some(handle))))
	}
}

#[derive(Debug)]
pub(crate) struct Driver(
	pub(crate) tokio::sync::mpsc::UnboundedSender<()>,
	Option<std::thread::JoinHandle<()>>,
);
impl LuaUserData for Driver {
	fn add_methods<M: LuaUserDataMethods<Self>>(methods: &mut M) {
		methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| {
			Ok(format!("{:?}", this))
		});
		methods.add_method_mut("stop", |_, this, ()| match this.1.take() {
			None => Ok(false),
			Some(handle) => {
				if this.0.send(()).is_err() {
					tracing::warn!("found runtime already stopped while attempting to stop it");
				}
				match handle.join() {
					Err(e) => Err(LuaError::runtime(format!("runtime thread panicked: {e:?}"))),
					Ok(()) => Ok(true),
				}
			}
		});
	}
}