use anyhow::{bail, Result};
use graphix_compiler::expr::ModuleResolver;
use graphix_rt::{GXConfig, GXEvent, GXHandle, GXRt, NoExt};
use netidx::publisher::Value;
use poolshark::global::GPooled;
use tokio::sync::mpsc;
pub struct TestCtx {
pub internal_only: netidx::InternalOnly,
pub rt: GXHandle<NoExt>,
}
impl TestCtx {
pub async fn shutdown(self) {
drop(self.rt);
self.internal_only.shutdown().await
}
}
pub type RegisterFn = fn(
&mut graphix_compiler::ExecCtx<GXRt<NoExt>, <NoExt as graphix_rt::GXExt>::UserEvent>,
&mut fxhash::FxHashMap<netidx_core::path::Path, arcstr::ArcStr>,
&mut graphix_package::IndexSet<arcstr::ArcStr>,
) -> Result<()>;
pub async fn init_with_resolvers(
sub: mpsc::Sender<GPooled<Vec<GXEvent>>>,
register: &[RegisterFn],
resolvers: Vec<ModuleResolver>,
) -> Result<TestCtx> {
init_with_setup(sub, register, resolvers, |_| {}).await
}
pub async fn init(
sub: mpsc::Sender<GPooled<Vec<GXEvent>>>,
register: &[RegisterFn],
) -> Result<TestCtx> {
init_with_setup(sub, register, vec![], |_| {}).await
}
pub async fn init_with_setup<F>(
sub: mpsc::Sender<GPooled<Vec<GXEvent>>>,
register: &[RegisterFn],
resolvers: Vec<ModuleResolver>,
setup: F,
) -> Result<TestCtx>
where
F: FnOnce(
&mut graphix_compiler::ExecCtx<
GXRt<NoExt>,
<NoExt as graphix_rt::GXExt>::UserEvent,
>,
),
{
let _ = env_logger::try_init();
let env = netidx::InternalOnly::new().await?;
let mut ctx = graphix_compiler::ExecCtx::new(GXRt::<NoExt>::new(
env.publisher().clone(),
env.subscriber().clone(),
))?;
let mut modules = fxhash::FxHashMap::default();
let mut root_mods = graphix_package::IndexSet::new();
for f in register {
f(&mut ctx, &mut modules, &mut root_mods)?;
}
setup(&mut ctx);
let mut parts = Vec::new();
for name in &root_mods {
if name == "core" {
parts.push(format!("mod core;\nuse core"));
} else {
parts.push(format!("mod {name}"));
}
}
let root = arcstr::ArcStr::from(parts.join(";\n"));
let mut all_resolvers = vec![ModuleResolver::VFS(modules)];
all_resolvers.extend(resolvers);
Ok(TestCtx {
internal_only: env,
rt: GXConfig::builder(ctx, sub)
.root(root)
.resolvers(all_resolvers)
.build()?
.start()
.await?,
})
}
pub async fn eval(code: &str, register: &[RegisterFn]) -> Result<(Value, TestCtx)> {
eval_with_setup(code, register, |_| {}).await
}
pub async fn eval_with_setup<F>(
code: &str,
register: &[RegisterFn],
setup: F,
) -> Result<(Value, TestCtx)>
where
F: FnOnce(
&mut graphix_compiler::ExecCtx<
GXRt<NoExt>,
<NoExt as graphix_rt::GXExt>::UserEvent,
>,
),
{
let (tx, mut rx) = mpsc::channel(10);
let gx_code = format!("let result = {code}");
let tbl = fxhash::FxHashMap::from_iter([(
netidx_core::path::Path::from("/test.gx"),
arcstr::ArcStr::from(gx_code),
)]);
let resolver = ModuleResolver::VFS(tbl);
let ctx = init_with_setup(tx, register, vec![resolver], setup).await?;
let compiled = ctx.rt.compile(arcstr::literal!("{ mod test; test::result }")).await?;
let eid = compiled.exprs[0].id;
let timeout = tokio::time::sleep(std::time::Duration::from_secs(5));
tokio::pin!(timeout);
loop {
tokio::select! {
_ = &mut timeout => bail!("timeout waiting for graphix result"),
batch = rx.recv() => match batch {
None => bail!("graphix runtime died"),
Some(mut batch) => {
for e in batch.drain(..) {
if let GXEvent::Updated(id, v) = e {
if id == eid {
return Ok((v, ctx));
}
}
}
}
}
}
}
}
pub use graphix_compiler::expr::parser::GRAPHIX_ESC;
pub use poolshark::local::LPooled;
pub fn escape_path(path: std::path::Display) -> LPooled<String> {
use std::fmt::Write;
let mut buf: LPooled<String> = LPooled::take();
let mut res: LPooled<String> = LPooled::take();
write!(buf, "{path}").unwrap();
GRAPHIX_ESC.escape_to(&*buf, &mut res);
res
}
#[macro_export]
macro_rules! run {
($name:ident, $code:expr, $pred:expr) => {
$crate::run!(@impl $name, $pred, 30, "/test.gx" => format!("let result = {}", $code));
};
($name:ident, $code:expr, $pred:expr, timeout: $timeout:expr) => {
$crate::run!(@impl $name, $pred, $timeout, "/test.gx" => format!("let result = {}", $code));
};
($name:ident, $pred:expr, $($path:literal => $code:expr),+) => {
$crate::run!(@impl $name, $pred, 30, $($path => $code),+);
};
(@impl $name:ident, $pred:expr, $timeout:expr, $($path:literal => $code:expr),+) => {
#[tokio::test(flavor = "current_thread")]
async fn $name() -> ::anyhow::Result<()> {
let (tx, mut rx) = ::tokio::sync::mpsc::channel(10);
let tbl = ::fxhash::FxHashMap::from_iter([
$((::netidx_core::path::Path::from($path), ::arcstr::ArcStr::from($code))),+
]);
let resolver = ::graphix_compiler::expr::ModuleResolver::VFS(tbl);
let ctx = $crate::testing::init_with_resolvers(
tx, &crate::TEST_REGISTER, vec![resolver],
).await?;
let bs = &ctx.rt;
match bs.compile(::arcstr::literal!("{ mod test; test::result }")).await {
Err(e) => assert!($pred(dbg!(Err(e)))),
Ok(e) => {
dbg!("compilation succeeded");
let eid = e.exprs[0].id;
let timeout = ::tokio::time::sleep(
::std::time::Duration::from_secs($timeout),
);
::tokio::pin!(timeout);
loop {
::tokio::select! {
_ = &mut timeout => ::anyhow::bail!(
"timeout after {}s waiting for result", $timeout,
),
batch = rx.recv() => match batch {
None => ::anyhow::bail!("runtime died"),
Some(mut batch) => {
for e in batch.drain(..) {
match e {
::graphix_rt::GXEvent::Env(_) => (),
::graphix_rt::GXEvent::Updated(id, v) => {
eprintln!("{v}");
assert_eq!(id, eid);
assert!($pred(Ok(&v)));
return Ok(());
}
}
}
}
}
}
}
}
}
ctx.shutdown().await;
Ok(())
}
};
}
#[macro_export]
macro_rules! run_with_tempdir {
(
name: $test_name:ident,
code: $code:literal,
setup: |$temp_dir:ident| $setup:block,
expect_error
) => {
$crate::run_with_tempdir! {
name: $test_name,
code: $code,
setup: |$temp_dir| $setup,
expect: |v: ::netidx::subscriber::Value| -> ::anyhow::Result<()> {
if matches!(v, ::netidx::subscriber::Value::Error(_)) {
Ok(())
} else {
panic!("expected Error value, got: {v:?}")
}
}
}
};
(
name: $test_name:ident,
code: $code:literal,
setup: |$temp_dir:ident| $setup:block,
verify: |$verify_dir:ident| $verify:block
) => {
$crate::run_with_tempdir! {
name: $test_name,
code: $code,
setup: |$temp_dir| $setup,
expect: |v: ::netidx::subscriber::Value| -> ::anyhow::Result<()> {
if !matches!(v, ::netidx::subscriber::Value::Null) {
panic!("expected Null (success), got: {v:?}");
}
Ok(())
},
verify: |$verify_dir| $verify
}
};
(
name: $test_name:ident,
code: $code:literal,
setup: |$temp_dir:ident| $setup:block,
expect: $expect_payload:expr
$(, verify: |$verify_dir:ident| $verify:block)?
) => {
#[tokio::test(flavor = "current_thread")]
async fn $test_name() -> ::anyhow::Result<()> {
let (tx, mut rx) = ::tokio::sync::mpsc::channel::<
::poolshark::global::GPooled<Vec<::graphix_rt::GXEvent>>
>(10);
let ctx = $crate::testing::init(tx, &crate::TEST_REGISTER).await?;
let $temp_dir = ::tempfile::tempdir()?;
let test_file = { $setup };
let code = format!(
$code,
$crate::testing::escape_path(test_file.display())
);
let compiled = ctx.rt.compile(::arcstr::ArcStr::from(code)).await?;
let eid = compiled.exprs[0].id;
let timeout = ::tokio::time::sleep(::std::time::Duration::from_secs(2));
::tokio::pin!(timeout);
loop {
::tokio::select! {
_ = &mut timeout => panic!("timeout waiting for result"),
Some(mut batch) = rx.recv() => {
for event in batch.drain(..) {
if let ::graphix_rt::GXEvent::Updated(id, v) = event {
if id == eid {
$expect_payload(v)?;
$(
let $verify_dir = &$temp_dir;
$verify
)?
return Ok(());
}
}
}
}
}
}
}
};
}