#![doc(
html_logo_url = "https://graphix-lang.github.io/graphix/graphix-icon.svg",
html_favicon_url = "https://graphix-lang.github.io/graphix/graphix-icon.svg"
)]
use anyhow::{anyhow, bail, Result};
use arcstr::ArcStr;
use derive_builder::Builder;
use enumflags2::BitFlags;
use graphix_compiler::{
env::Env,
expr::{ExprId, ModPath, ModuleResolver, Source},
typ::{FnType, Type},
BindId, CFlag, Event, ExecCtx, NoUserEvent, Scope, UserEvent,
};
use log::error;
use netidx::{
protocol::valarray::ValArray,
publisher::{Value, WriteRequest},
subscriber::{self, SubId},
};
use netidx_core::atomic_id;
use netidx_value::FromValue;
use nohash::IntSet;
use poolshark::global::GPooled;
use serde_derive::{Deserialize, Serialize};
use smallvec::SmallVec;
use std::{fmt, future, sync::Arc, time::Duration};
use tokio::{
sync::{
mpsc::{self as tmpsc},
oneshot,
},
task::{self, JoinHandle},
};
mod gx;
mod rt;
use gx::GX;
pub use rt::GXRt;
pub trait GXExt: Default + fmt::Debug + Send + Sync + 'static {
type UserEvent: UserEvent + Send + Sync + 'static;
fn update_sources(&mut self) -> impl Future<Output = Result<()>> + Send;
fn do_cycle(&mut self, event: &mut Event<Self::UserEvent>) -> Result<()>;
fn is_ready(&self) -> bool;
fn clear(&mut self);
fn empty_event(&mut self) -> Self::UserEvent;
}
#[derive(Debug, Default)]
pub struct NoExt;
impl GXExt for NoExt {
type UserEvent = NoUserEvent;
async fn update_sources(&mut self) -> Result<()> {
future::pending().await
}
fn do_cycle(&mut self, _event: &mut Event<Self::UserEvent>) -> Result<()> {
Ok(())
}
fn is_ready(&self) -> bool {
false
}
fn clear(&mut self) {}
fn empty_event(&mut self) -> Self::UserEvent {
NoUserEvent
}
}
type UpdateBatch = GPooled<Vec<(SubId, subscriber::Event)>>;
type WriteBatch = GPooled<Vec<WriteRequest>>;
#[derive(Debug)]
pub struct CompExp<X: GXExt> {
pub id: ExprId,
pub typ: Type,
pub output: bool,
rt: GXHandle<X>,
}
impl<X: GXExt> Drop for CompExp<X> {
fn drop(&mut self) {
let _ = self.rt.0.tx.send(ToGX::Delete { id: self.id });
}
}
#[derive(Debug)]
pub struct CompRes<X: GXExt> {
pub exprs: SmallVec<[CompExp<X>; 1]>,
pub env: Env,
}
#[derive(Debug)]
pub struct CheckResult {
pub env: Env,
pub references: GPooled<Vec<graphix_compiler::ReferenceSite>>,
pub module_references: GPooled<Vec<graphix_compiler::ModuleRefSite>>,
pub scope_map: GPooled<Vec<graphix_compiler::ScopeMapEntry>>,
pub lsp: graphix_compiler::env::Lsp,
}
pub struct Ref<X: GXExt> {
pub id: ExprId,
pub last: Option<Value>,
pub bid: BindId,
pub target_bid: Option<BindId>,
pub typ: Type,
rt: GXHandle<X>,
}
impl<X: GXExt> Drop for Ref<X> {
fn drop(&mut self) {
let _ = self.rt.0.tx.send(ToGX::Delete { id: self.id });
}
}
impl<X: GXExt> Ref<X> {
pub fn set<T: Into<Value>>(&mut self, v: T) -> Result<()> {
let v = v.into();
self.last = Some(v.clone());
self.rt.set(self.bid, v)
}
pub fn set_deref<T: Into<Value>>(&mut self, v: T) -> Result<()> {
if let Some(id) = self.target_bid {
self.rt.set(id, v)?
}
Ok(())
}
pub fn update(&mut self, id: ExprId, v: &Value) -> Option<&mut Value> {
if self.id == id {
self.last = Some(v.clone());
self.last.as_mut()
} else {
None
}
}
}
pub struct TRef<X: GXExt, T: FromValue> {
pub r: Ref<X>,
pub t: Option<T>,
}
impl<X: GXExt, T: FromValue> TRef<X, T> {
pub fn new(mut r: Ref<X>) -> Result<Self> {
let t = r.last.take().map(|v| v.cast_to()).transpose()?;
Ok(TRef { r, t })
}
pub fn update(&mut self, id: ExprId, v: &Value) -> Result<Option<&mut T>> {
if self.r.id == id {
let v = v.clone().cast_to()?;
self.t = Some(v);
Ok(self.t.as_mut())
} else {
Ok(None)
}
}
}
impl<X: GXExt, T: Into<Value> + FromValue + Clone> TRef<X, T> {
pub fn set(&mut self, t: T) -> Result<()> {
self.t = Some(t.clone());
self.r.set(t)
}
pub fn set_deref(&mut self, t: T) -> Result<()> {
self.t = Some(t.clone());
self.r.set_deref(t.into())
}
}
atomic_id!(CallableId);
pub struct Callable<X: GXExt> {
rt: GXHandle<X>,
id: CallableId,
env: Env,
pub typ: FnType,
pub expr: ExprId,
}
impl<X: GXExt> Drop for Callable<X> {
fn drop(&mut self) {
let _ = self.rt.0.tx.send(ToGX::DeleteCallable { id: self.id });
}
}
impl<X: GXExt> Callable<X> {
pub fn id(&self) -> CallableId {
self.id
}
pub async fn call(&self, args: ValArray) -> Result<()> {
if self.typ.args.len() != args.len() {
bail!("expected {} args", self.typ.args.len())
}
for (i, (a, v)) in self.typ.args.iter().zip(args.iter()).enumerate() {
if !a.typ.is_a(&self.env, v) {
bail!("type mismatch arg {i} expected {}", a.typ)
}
}
self.call_unchecked(args).await
}
pub async fn call_unchecked(&self, args: ValArray) -> Result<()> {
self.rt
.0
.tx
.send(ToGX::Call { id: self.id, args })
.map_err(|_| anyhow!("runtime is dead"))
}
pub fn update<'a>(&self, id: ExprId, v: &'a Value) -> Option<&'a Value> {
if self.expr == id {
Some(v)
} else {
None
}
}
}
enum DeferredCall {
Call(ValArray, oneshot::Sender<Result<()>>),
CallUnchecked(ValArray, oneshot::Sender<Result<()>>),
}
pub struct NamedCallable<X: GXExt> {
fname: Ref<X>,
current: Option<Callable<X>>,
ids: IntSet<ExprId>,
deferred: Vec<DeferredCall>,
h: GXHandle<X>,
}
impl<X: GXExt> NamedCallable<X> {
pub async fn update<'a>(
&mut self,
id: ExprId,
v: &'a Value,
) -> Result<Option<&'a Value>> {
match self.fname.update(id, v) {
Some(v) => {
let callable = self.h.compile_callable(v.clone()).await?;
self.ids.insert(callable.expr);
for dc in self.deferred.drain(..) {
match dc {
DeferredCall::Call(args, reply) => {
let _ = reply.send(callable.call(args).await);
}
DeferredCall::CallUnchecked(args, reply) => {
let _ = reply.send(callable.call_unchecked(args).await);
}
}
}
self.current = Some(callable);
Ok(None)
}
None if self.ids.contains(&id) => Ok(Some(v)),
None => Ok(None),
}
}
pub async fn call(&mut self, args: ValArray) -> Result<()> {
match &self.current {
Some(c) => c.call(args).await,
None => {
let (tx, rx) = oneshot::channel();
self.deferred.push(DeferredCall::Call(args, tx));
rx.await?
}
}
}
pub async fn call_unchecked(&mut self, args: ValArray) -> Result<()> {
match &self.current {
Some(c) => c.call(args).await,
None => {
let (tx, rx) = oneshot::channel();
self.deferred.push(DeferredCall::CallUnchecked(args, tx));
rx.await?
}
}
}
}
enum ToGX<X: GXExt> {
GetEnv {
res: oneshot::Sender<Env>,
},
Delete {
id: ExprId,
},
Load {
path: Source,
rt: GXHandle<X>,
res: oneshot::Sender<Result<CompRes<X>>>,
},
Check {
path: Source,
resolvers: Option<Vec<ModuleResolver>>,
initial_scope: Option<ArcStr>,
res: oneshot::Sender<Result<CheckResult>>,
},
Compile {
text: ArcStr,
rt: GXHandle<X>,
res: oneshot::Sender<Result<CompRes<X>>>,
},
CompileCallable {
id: Value,
rt: GXHandle<X>,
res: oneshot::Sender<Result<Callable<X>>>,
},
CompileRef {
id: BindId,
rt: GXHandle<X>,
res: oneshot::Sender<Result<Ref<X>>>,
},
Set {
id: BindId,
v: Value,
},
Call {
id: CallableId,
args: ValArray,
},
DeleteCallable {
id: CallableId,
},
}
#[derive(Debug, Clone)]
pub enum GXEvent {
Updated(ExprId, Value),
Env(Env),
}
struct GXHandleInner<X: GXExt> {
tx: tmpsc::UnboundedSender<ToGX<X>>,
task: JoinHandle<()>,
subscriber: netidx::subscriber::Subscriber,
}
impl<X: GXExt> Drop for GXHandleInner<X> {
fn drop(&mut self) {
self.task.abort()
}
}
pub struct GXHandle<X: GXExt>(Arc<GXHandleInner<X>>);
impl<X: GXExt> fmt::Debug for GXHandle<X> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "GXHandle")
}
}
impl<X: GXExt> Clone for GXHandle<X> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<X: GXExt> GXHandle<X> {
pub fn subscriber(&self) -> netidx::subscriber::Subscriber {
self.0.subscriber.clone()
}
async fn exec<R, F: FnOnce(oneshot::Sender<R>) -> ToGX<X>>(&self, f: F) -> Result<R> {
let (tx, rx) = oneshot::channel();
self.0.tx.send(f(tx)).map_err(|_| anyhow!("runtime is dead"))?;
Ok(rx.await.map_err(|_| anyhow!("runtime did not respond"))?)
}
pub async fn get_env(&self) -> Result<Env> {
self.exec(|res| ToGX::GetEnv { res }).await
}
pub async fn check(
&self,
path: Source,
initial_scope: Option<ArcStr>,
) -> Result<CheckResult> {
Ok(self
.exec(|tx| ToGX::Check { path, resolvers: None, initial_scope, res: tx })
.await??)
}
pub async fn check_with_resolvers(
&self,
path: Source,
resolvers: Vec<ModuleResolver>,
initial_scope: Option<ArcStr>,
) -> Result<CheckResult> {
Ok(self
.exec(|tx| ToGX::Check {
path,
resolvers: Some(resolvers),
initial_scope,
res: tx,
})
.await??)
}
pub async fn compile(&self, text: ArcStr) -> Result<CompRes<X>> {
Ok(self.exec(|tx| ToGX::Compile { text, res: tx, rt: self.clone() }).await??)
}
pub async fn load(&self, path: Source) -> Result<CompRes<X>> {
Ok(self.exec(|tx| ToGX::Load { path, res: tx, rt: self.clone() }).await??)
}
pub async fn compile_callable(&self, id: Value) -> Result<Callable<X>> {
Ok(self
.exec(|tx| ToGX::CompileCallable { id, rt: self.clone(), res: tx })
.await??)
}
pub async fn compile_callable_by_name(
&self,
env: &Env,
scope: &Scope,
name: &ModPath,
) -> Result<NamedCallable<X>> {
let r = self.compile_ref_by_name(env, scope, name).await?;
match &r.typ {
Type::Fn(_) => (),
t => bail!(
"{name} in scope {} has type {t}. expected a function",
scope.lexical
),
}
Ok(NamedCallable {
fname: r,
current: None,
ids: IntSet::default(),
deferred: vec![],
h: self.clone(),
})
}
pub async fn compile_ref(&self, id: impl Into<BindId>) -> Result<Ref<X>> {
Ok(self
.exec(|tx| ToGX::CompileRef { id: id.into(), res: tx, rt: self.clone() })
.await??)
}
pub async fn compile_ref_by_name(
&self,
env: &Env,
scope: &Scope,
name: &ModPath,
) -> Result<Ref<X>> {
let id = env
.lookup_bind(&scope.lexical, name)
.ok_or_else(|| anyhow!("no such value {name} in scope {}", scope.lexical))?
.1
.id;
self.compile_ref(id).await
}
pub fn set<T: Into<Value>>(&self, id: BindId, v: T) -> Result<()> {
let v = v.into();
self.0.tx.send(ToGX::Set { id, v }).map_err(|_| anyhow!("runtime is dead"))
}
pub fn call(&self, id: CallableId, args: ValArray) -> Result<()> {
self.0.tx.send(ToGX::Call { id, args }).map_err(|_| anyhow!("runtime is dead"))
}
}
#[derive(Builder)]
#[builder(pattern = "owned")]
pub struct GXConfig<X: GXExt> {
#[builder(setter(strip_option), default)]
resolve_timeout: Option<Duration>,
#[builder(setter(strip_option), default)]
publish_timeout: Option<Duration>,
ctx: ExecCtx<GXRt<X>, X::UserEvent>,
#[builder(setter(strip_option), default)]
root: Option<ArcStr>,
#[builder(default)]
resolvers: Vec<ModuleResolver>,
sub: tmpsc::Sender<GPooled<Vec<GXEvent>>>,
#[builder(default)]
flags: BitFlags<CFlag>,
#[builder(default)]
lsp_mode: bool,
}
impl<X: GXExt> GXConfig<X> {
pub fn builder(
ctx: ExecCtx<GXRt<X>, X::UserEvent>,
sub: tmpsc::Sender<GPooled<Vec<GXEvent>>>,
) -> GXConfigBuilder<X> {
GXConfigBuilder::default().ctx(ctx).sub(sub)
}
pub async fn start(self) -> Result<GXHandle<X>> {
let subscriber = self.ctx.rt.subscriber.clone();
let (init_tx, init_rx) = oneshot::channel();
let (tx, rx) = tmpsc::unbounded_channel();
let task = task::spawn(async move {
match GX::new(self).await {
Ok(bs) => {
let _ = init_tx.send(Ok(()));
if let Err(e) = bs.run(rx).await {
error!("run loop exited with error {e:?}")
}
}
Err(e) => {
let _ = init_tx.send(Err(e));
}
};
});
init_rx.await??;
Ok(GXHandle(Arc::new(GXHandleInner { tx, task, subscriber })))
}
}