graphix-package-core 0.8.0

A dataflow language for UIs and network programming, core package with shared builtin-authoring traits
Documentation
use anyhow::{bail, Result};
use graphix_compiler::expr::ModuleResolver;
use graphix_rt::{GXConfig, GXEvent, GXHandle, GXRt, NoExt};
use netidx::publisher::Value;
use poolshark::global::GPooled;
use tokio::sync::mpsc;

pub struct TestCtx {
    pub internal_only: netidx::InternalOnly,
    pub rt: GXHandle<NoExt>,
}

impl TestCtx {
    pub async fn shutdown(self) {
        drop(self.rt);
        self.internal_only.shutdown().await
    }
}

pub type RegisterFn = fn(
    &mut graphix_compiler::ExecCtx<GXRt<NoExt>, <NoExt as graphix_rt::GXExt>::UserEvent>,
    &mut fxhash::FxHashMap<netidx_core::path::Path, arcstr::ArcStr>,
    &mut graphix_package::IndexSet<arcstr::ArcStr>,
) -> Result<()>;

pub async fn init_with_resolvers(
    sub: mpsc::Sender<GPooled<Vec<GXEvent>>>,
    register: &[RegisterFn],
    resolvers: Vec<ModuleResolver>,
) -> Result<TestCtx> {
    init_with_setup(sub, register, resolvers, |_| {}).await
}

pub async fn init(
    sub: mpsc::Sender<GPooled<Vec<GXEvent>>>,
    register: &[RegisterFn],
) -> Result<TestCtx> {
    init_with_setup(sub, register, vec![], |_| {}).await
}

pub async fn init_with_setup<F>(
    sub: mpsc::Sender<GPooled<Vec<GXEvent>>>,
    register: &[RegisterFn],
    resolvers: Vec<ModuleResolver>,
    setup: F,
) -> Result<TestCtx>
where
    F: FnOnce(
        &mut graphix_compiler::ExecCtx<
            GXRt<NoExt>,
            <NoExt as graphix_rt::GXExt>::UserEvent,
        >,
    ),
{
    let _ = env_logger::try_init();
    let env = netidx::InternalOnly::new().await?;
    let mut ctx = graphix_compiler::ExecCtx::new(GXRt::<NoExt>::new(
        env.publisher().clone(),
        env.subscriber().clone(),
    ))?;
    let mut modules = fxhash::FxHashMap::default();
    let mut root_mods = graphix_package::IndexSet::new();
    for f in register {
        f(&mut ctx, &mut modules, &mut root_mods)?;
    }
    setup(&mut ctx);
    let mut parts = Vec::new();
    for name in &root_mods {
        if name == "core" {
            parts.push(format!("mod core;\nuse core"));
        } else {
            parts.push(format!("mod {name}"));
        }
    }
    let root = arcstr::ArcStr::from(parts.join(";\n"));
    let mut all_resolvers = vec![ModuleResolver::VFS(modules)];
    all_resolvers.extend(resolvers);
    Ok(TestCtx {
        internal_only: env,
        rt: GXConfig::builder(ctx, sub)
            .root(root)
            .resolvers(all_resolvers)
            .build()?
            .start()
            .await?,
    })
}

/// Evaluate a graphix expression and return its Value.
///
/// Compiles `code` as `let result = {code}` in a throwaway module,
/// waits for the first update, and returns the resulting value along
/// with the test context (caller must shut it down).
pub async fn eval(code: &str, register: &[RegisterFn]) -> Result<(Value, TestCtx)> {
    eval_with_setup(code, register, |_| {}).await
}

pub async fn eval_with_setup<F>(
    code: &str,
    register: &[RegisterFn],
    setup: F,
) -> Result<(Value, TestCtx)>
where
    F: FnOnce(
        &mut graphix_compiler::ExecCtx<
            GXRt<NoExt>,
            <NoExt as graphix_rt::GXExt>::UserEvent,
        >,
    ),
{
    let (tx, mut rx) = mpsc::channel(10);
    let gx_code = format!("let result = {code}");
    let tbl = fxhash::FxHashMap::from_iter([(
        netidx_core::path::Path::from("/test.gx"),
        arcstr::ArcStr::from(gx_code),
    )]);
    let resolver = ModuleResolver::VFS(tbl);
    let ctx = init_with_setup(tx, register, vec![resolver], setup).await?;
    let compiled = ctx.rt.compile(arcstr::literal!("{ mod test; test::result }")).await?;
    let eid = compiled.exprs[0].id;
    let timeout = tokio::time::sleep(std::time::Duration::from_secs(5));
    tokio::pin!(timeout);
    loop {
        tokio::select! {
            _ = &mut timeout => bail!("timeout waiting for graphix result"),
            batch = rx.recv() => match batch {
                None => bail!("graphix runtime died"),
                Some(mut batch) => {
                    for e in batch.drain(..) {
                        if let GXEvent::Updated(id, v) = e {
                            if id == eid {
                                return Ok((v, ctx));
                            }
                        }
                    }
                }
            }
        }
    }
}

pub use graphix_compiler::expr::parser::GRAPHIX_ESC;
pub use poolshark::local::LPooled;

pub fn escape_path(path: std::path::Display) -> LPooled<String> {
    use std::fmt::Write;
    let mut buf: LPooled<String> = LPooled::take();
    let mut res: LPooled<String> = LPooled::take();
    write!(buf, "{path}").unwrap();
    GRAPHIX_ESC.escape_to(&*buf, &mut res);
    res
}

#[macro_export]
macro_rules! run {
    ($name:ident, $code:expr, $pred:expr) => {
        $crate::run!(@impl $name, $pred, 30, "/test.gx" => format!("let result = {}", $code));
    };
    ($name:ident, $code:expr, $pred:expr, timeout: $timeout:expr) => {
        $crate::run!(@impl $name, $pred, $timeout, "/test.gx" => format!("let result = {}", $code));
    };
    ($name:ident, $pred:expr, $($path:literal => $code:expr),+) => {
        $crate::run!(@impl $name, $pred, 30, $($path => $code),+);
    };
    (@impl $name:ident, $pred:expr, $timeout:expr, $($path:literal => $code:expr),+) => {
        #[tokio::test(flavor = "current_thread")]
        async fn $name() -> ::anyhow::Result<()> {
            let (tx, mut rx) = ::tokio::sync::mpsc::channel(10);
            let tbl = ::fxhash::FxHashMap::from_iter([
                $((::netidx_core::path::Path::from($path), ::arcstr::ArcStr::from($code))),+
            ]);
            let resolver = ::graphix_compiler::expr::ModuleResolver::VFS(tbl);
            let ctx = $crate::testing::init_with_resolvers(
                tx, &crate::TEST_REGISTER, vec![resolver],
            ).await?;
            let bs = &ctx.rt;
            match bs.compile(::arcstr::literal!("{ mod test; test::result }")).await {
                Err(e) => assert!($pred(dbg!(Err(e)))),
                Ok(e) => {
                    dbg!("compilation succeeded");
                    let eid = e.exprs[0].id;
                    let timeout = ::tokio::time::sleep(
                        ::std::time::Duration::from_secs($timeout),
                    );
                    ::tokio::pin!(timeout);
                    loop {
                        ::tokio::select! {
                            _ = &mut timeout => ::anyhow::bail!(
                                "timeout after {}s waiting for result", $timeout,
                            ),
                            batch = rx.recv() => match batch {
                                None => ::anyhow::bail!("runtime died"),
                                Some(mut batch) => {
                                    for e in batch.drain(..) {
                                        match e {
                                            ::graphix_rt::GXEvent::Env(_) => (),
                                            ::graphix_rt::GXEvent::Updated(id, v) => {
                                                eprintln!("{v}");
                                                assert_eq!(id, eid);
                                                assert!($pred(Ok(&v)));
                                                return Ok(());
                                            }
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
            }
            ctx.shutdown().await;
            Ok(())
        }
    };
}

#[macro_export]
macro_rules! run_with_tempdir {
    (
        name: $test_name:ident,
        code: $code:literal,
        setup: |$temp_dir:ident| $setup:block,
        expect_error
    ) => {
        $crate::run_with_tempdir! {
            name: $test_name,
            code: $code,
            setup: |$temp_dir| $setup,
            expect: |v: ::netidx::subscriber::Value| -> ::anyhow::Result<()> {
                if matches!(v, ::netidx::subscriber::Value::Error(_)) {
                    Ok(())
                } else {
                    panic!("expected Error value, got: {v:?}")
                }
            }
        }
    };
    (
        name: $test_name:ident,
        code: $code:literal,
        setup: |$temp_dir:ident| $setup:block,
        verify: |$verify_dir:ident| $verify:block
    ) => {
        $crate::run_with_tempdir! {
            name: $test_name,
            code: $code,
            setup: |$temp_dir| $setup,
            expect: |v: ::netidx::subscriber::Value| -> ::anyhow::Result<()> {
                if !matches!(v, ::netidx::subscriber::Value::Null) {
                    panic!("expected Null (success), got: {v:?}");
                }
                Ok(())
            },
            verify: |$verify_dir| $verify
        }
    };
    (
        name: $test_name:ident,
        code: $code:literal,
        setup: |$temp_dir:ident| $setup:block,
        expect: $expect_payload:expr
        $(, verify: |$verify_dir:ident| $verify:block)?
    ) => {
        #[tokio::test(flavor = "current_thread")]
        async fn $test_name() -> ::anyhow::Result<()> {
            let (tx, mut rx) = ::tokio::sync::mpsc::channel::<
                ::poolshark::global::GPooled<Vec<::graphix_rt::GXEvent>>
            >(10);
            let ctx = $crate::testing::init(tx, &crate::TEST_REGISTER).await?;
            let $temp_dir = ::tempfile::tempdir()?;

            let test_file = { $setup };

            let code = format!(
                $code,
                $crate::testing::escape_path(test_file.display())
            );
            let compiled = ctx.rt.compile(::arcstr::ArcStr::from(code)).await?;
            let eid = compiled.exprs[0].id;

            let timeout = ::tokio::time::sleep(::std::time::Duration::from_secs(2));
            ::tokio::pin!(timeout);

            loop {
                ::tokio::select! {
                    _ = &mut timeout => panic!("timeout waiting for result"),
                    Some(mut batch) = rx.recv() => {
                        for event in batch.drain(..) {
                            if let ::graphix_rt::GXEvent::Updated(id, v) = event {
                                if id == eid {
                                    $expect_payload(v)?;
                                    $(
                                        let $verify_dir = &$temp_dir;
                                        $verify
                                    )?
                                    return Ok(());
                                }
                            }
                        }
                    }
                }
            }
        }
    };
}