1use anyhow::{anyhow, bail, Result};
9use arcstr::ArcStr;
10use derive_builder::Builder;
11use enumflags2::BitFlags;
12use graphix_compiler::{
13 env::Env,
14 expr::{ExprId, ModuleResolver},
15 typ::{FnType, Type},
16 BindId, CFlag, Event, ExecCtx, NoUserEvent, UserEvent,
17};
18use log::error;
19use netidx::{
20 protocol::valarray::ValArray,
21 publisher::{Value, WriteRequest},
22 subscriber::{self, SubId},
23};
24use netidx_core::atomic_id;
25use netidx_value::FromValue;
26use poolshark::global::GPooled;
27use serde_derive::{Deserialize, Serialize};
28use smallvec::SmallVec;
29use std::{fmt, future, path::PathBuf, time::Duration};
30use tokio::{
31 sync::{
32 mpsc::{self as tmpsc},
33 oneshot,
34 },
35 task,
36};
37
38mod gx;
39mod rt;
40use gx::GX;
41pub use rt::GXRt;
42
43pub trait GXExt: Default + fmt::Debug + Send + Sync + 'static {
63 type UserEvent: UserEvent + Send + Sync + 'static;
64
65 fn update_sources(&mut self) -> impl Future<Output = Result<()>> + Send;
69
70 fn do_cycle(&mut self, event: &mut Event<Self::UserEvent>) -> Result<()>;
79
80 fn is_ready(&self) -> bool;
82
83 fn clear(&mut self);
85
86 fn empty_event(&mut self) -> Self::UserEvent;
88}
89
90#[derive(Debug, Default)]
91pub struct NoExt;
92
93impl GXExt for NoExt {
94 type UserEvent = NoUserEvent;
95
96 async fn update_sources(&mut self) -> Result<()> {
97 future::pending().await
98 }
99
100 fn do_cycle(&mut self, _event: &mut Event<Self::UserEvent>) -> Result<()> {
101 Ok(())
102 }
103
104 fn is_ready(&self) -> bool {
105 false
106 }
107
108 fn clear(&mut self) {}
109
110 fn empty_event(&mut self) -> Self::UserEvent {
111 NoUserEvent
112 }
113}
114
115type UpdateBatch = GPooled<Vec<(SubId, subscriber::Event)>>;
116type WriteBatch = GPooled<Vec<WriteRequest>>;
117
118#[derive(Debug)]
119pub struct CompExp<X: GXExt> {
120 pub id: ExprId,
121 pub typ: Type,
122 pub output: bool,
123 rt: GXHandle<X>,
124}
125
126impl<X: GXExt> Drop for CompExp<X> {
127 fn drop(&mut self) {
128 let _ = self.rt.0.send(ToGX::Delete { id: self.id });
129 }
130}
131
132#[derive(Debug)]
133pub struct CompRes<X: GXExt> {
134 pub exprs: SmallVec<[CompExp<X>; 1]>,
135 pub env: Env<GXRt<X>, X::UserEvent>,
136}
137
138pub struct Ref<X: GXExt> {
139 pub id: ExprId,
140 pub last: Option<Value>,
141 pub bid: BindId,
142 pub target_bid: Option<BindId>,
143 rt: GXHandle<X>,
144}
145
146impl<X: GXExt> Drop for Ref<X> {
147 fn drop(&mut self) {
148 let _ = self.rt.0.send(ToGX::Delete { id: self.id });
149 }
150}
151
152impl<X: GXExt> Ref<X> {
153 pub fn set(&self, v: Value) -> Result<()> {
154 self.rt.set(self.bid, v)
155 }
156
157 pub fn set_deref<T: Into<Value>>(&self, v: T) -> Result<()> {
158 if let Some(id) = self.target_bid {
159 self.rt.set(id, v)?
160 }
161 Ok(())
162 }
163}
164
165pub struct TRef<X: GXExt, T: FromValue> {
166 pub r: Ref<X>,
167 pub t: Option<T>,
168}
169
170impl<X: GXExt, T: FromValue> TRef<X, T> {
171 pub fn new(mut r: Ref<X>) -> Result<Self> {
172 let t = r.last.take().map(|v| v.cast_to()).transpose()?;
173 Ok(TRef { r, t })
174 }
175
176 pub fn update(&mut self, id: ExprId, v: &Value) -> Result<Option<&mut T>> {
177 if self.r.id == id {
178 let v = v.clone().cast_to()?;
179 self.t = Some(v);
180 Ok(self.t.as_mut())
181 } else {
182 Ok(None)
183 }
184 }
185}
186
187impl<X: GXExt, T: Into<Value> + FromValue + Clone> TRef<X, T> {
188 pub fn set(&mut self, t: T) -> Result<()> {
189 self.t = Some(t.clone());
190 self.r.set(t.into())
191 }
192
193 pub fn set_deref(&mut self, t: T) -> Result<()> {
194 self.t = Some(t.clone());
195 self.r.set_deref(t.into())
196 }
197}
198
199atomic_id!(CallableId);
200
201pub struct Callable<X: GXExt> {
202 rt: GXHandle<X>,
203 id: CallableId,
204 env: Env<GXRt<X>, X::UserEvent>,
205 pub typ: FnType,
206 pub expr: ExprId,
207}
208
209impl<X: GXExt> Drop for Callable<X> {
210 fn drop(&mut self) {
211 let _ = self.rt.0.send(ToGX::DeleteCallable { id: self.id });
212 }
213}
214
215impl<X: GXExt> Callable<X> {
216 pub async fn call(&self, args: ValArray) -> Result<()> {
219 if self.typ.args.len() != args.len() {
220 bail!("expected {} args", self.typ.args.len())
221 }
222 for (i, (a, v)) in self.typ.args.iter().zip(args.iter()).enumerate() {
223 if !a.typ.is_a(&self.env, v) {
224 bail!("type mismatch arg {i} expected {}", a.typ)
225 }
226 }
227 self.call_unchecked(args).await
228 }
229
230 pub async fn call_unchecked(&self, args: ValArray) -> Result<()> {
234 self.rt
235 .0
236 .send(ToGX::Call { id: self.id, args })
237 .map_err(|_| anyhow!("runtime is dead"))
238 }
239}
240
241enum ToGX<X: GXExt> {
242 GetEnv {
243 res: oneshot::Sender<Env<GXRt<X>, X::UserEvent>>,
244 },
245 Delete {
246 id: ExprId,
247 },
248 Load {
249 path: PathBuf,
250 rt: GXHandle<X>,
251 res: oneshot::Sender<Result<CompRes<X>>>,
252 },
253 Check {
254 path: PathBuf,
255 res: oneshot::Sender<Result<()>>,
256 },
257 Compile {
258 text: ArcStr,
259 rt: GXHandle<X>,
260 res: oneshot::Sender<Result<CompRes<X>>>,
261 },
262 CompileCallable {
263 id: Value,
264 rt: GXHandle<X>,
265 res: oneshot::Sender<Result<Callable<X>>>,
266 },
267 CompileRef {
268 id: BindId,
269 rt: GXHandle<X>,
270 res: oneshot::Sender<Result<Ref<X>>>,
271 },
272 Set {
273 id: BindId,
274 v: Value,
275 },
276 Call {
277 id: CallableId,
278 args: ValArray,
279 },
280 DeleteCallable {
281 id: CallableId,
282 },
283}
284
285#[derive(Debug, Clone)]
286pub enum GXEvent<X: GXExt> {
287 Updated(ExprId, Value),
288 Env(Env<GXRt<X>, X::UserEvent>),
289}
290
291pub struct GXHandle<X: GXExt>(tmpsc::UnboundedSender<ToGX<X>>);
295
296impl<X: GXExt> fmt::Debug for GXHandle<X> {
297 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
298 write!(f, "GXHandle")
299 }
300}
301
302impl<X: GXExt> Clone for GXHandle<X> {
303 fn clone(&self) -> Self {
304 Self(self.0.clone())
305 }
306}
307
308impl<X: GXExt> GXHandle<X> {
309 async fn exec<R, F: FnOnce(oneshot::Sender<R>) -> ToGX<X>>(&self, f: F) -> Result<R> {
310 let (tx, rx) = oneshot::channel();
311 self.0.send(f(tx)).map_err(|_| anyhow!("runtime is dead"))?;
312 Ok(rx.await.map_err(|_| anyhow!("runtime did not respond"))?)
313 }
314
315 pub async fn get_env(&self) -> Result<Env<GXRt<X>, X::UserEvent>> {
317 self.exec(|res| ToGX::GetEnv { res }).await
318 }
319
320 pub async fn check(&self, path: PathBuf) -> Result<()> {
328 Ok(self.exec(|tx| ToGX::Check { path, res: tx }).await??)
329 }
330
331 pub async fn compile(&self, text: ArcStr) -> Result<CompRes<X>> {
339 Ok(self.exec(|tx| ToGX::Compile { text, res: tx, rt: self.clone() }).await??)
340 }
341
342 pub async fn load(&self, path: PathBuf) -> Result<CompRes<X>> {
352 Ok(self.exec(|tx| ToGX::Load { path, res: tx, rt: self.clone() }).await??)
353 }
354
355 pub async fn compile_callable(&self, id: Value) -> Result<Callable<X>> {
360 Ok(self
361 .exec(|tx| ToGX::CompileCallable { id, rt: self.clone(), res: tx })
362 .await??)
363 }
364
365 pub async fn compile_ref(&self, id: impl Into<BindId>) -> Result<Ref<X>> {
371 Ok(self
372 .exec(|tx| ToGX::CompileRef { id: id.into(), res: tx, rt: self.clone() })
373 .await??)
374 }
375
376 pub fn set<T: Into<Value>>(&self, id: BindId, v: T) -> Result<()> {
380 let v = v.into();
381 self.0.send(ToGX::Set { id, v }).map_err(|_| anyhow!("runtime is dead"))
382 }
383}
384
385#[derive(Builder)]
386#[builder(pattern = "owned")]
387pub struct GXConfig<X: GXExt> {
388 #[builder(setter(strip_option), default)]
392 resolve_timeout: Option<Duration>,
393 #[builder(setter(strip_option), default)]
395 publish_timeout: Option<Duration>,
396 ctx: ExecCtx<GXRt<X>, X::UserEvent>,
398 #[builder(setter(strip_option), default)]
400 root: Option<ArcStr>,
401 #[builder(default)]
403 resolvers: Vec<ModuleResolver>,
404 sub: tmpsc::Sender<GPooled<Vec<GXEvent<X>>>>,
406 #[builder(default)]
408 flags: BitFlags<CFlag>,
409}
410
411impl<X: GXExt> GXConfig<X> {
412 pub fn builder(
414 ctx: ExecCtx<GXRt<X>, X::UserEvent>,
415 sub: tmpsc::Sender<GPooled<Vec<GXEvent<X>>>>,
416 ) -> GXConfigBuilder<X> {
417 GXConfigBuilder::default().ctx(ctx).sub(sub)
418 }
419
420 pub async fn start(self) -> Result<GXHandle<X>> {
429 let (init_tx, init_rx) = oneshot::channel();
430 let (tx, rx) = tmpsc::unbounded_channel();
431 task::spawn(async move {
432 match GX::new(self).await {
433 Ok(bs) => {
434 let _ = init_tx.send(Ok(()));
435 if let Err(e) = bs.run(rx).await {
436 error!("run loop exited with error {e:?}")
437 }
438 }
439 Err(e) => {
440 let _ = init_tx.send(Err(e));
441 }
442 };
443 });
444 init_rx.await??;
445 Ok(GXHandle(tx))
446 }
447}