1use anyhow::{anyhow, bail, Result};
9use arcstr::ArcStr;
10use derive_builder::Builder;
11use enumflags2::BitFlags;
12use graphix_compiler::{
13 env::Env,
14 expr::{ExprId, ModuleResolver},
15 typ::{FnType, Type},
16 BindId, CFlag, Event, ExecCtx, NoUserEvent, UserEvent,
17};
18use log::error;
19use netidx::{
20 protocol::valarray::ValArray,
21 publisher::{Value, WriteRequest},
22 subscriber::{self, SubId},
23};
24use netidx_core::atomic_id;
25use netidx_value::FromValue;
26use poolshark::global::GPooled;
27use serde_derive::{Deserialize, Serialize};
28use smallvec::SmallVec;
29use std::{fmt, future, path::PathBuf, time::Duration};
30use tokio::{
31 sync::{
32 mpsc::{self as tmpsc},
33 oneshot,
34 },
35 task,
36};
37
38mod gx;
39mod rt;
40use gx::GX;
41pub use rt::GXRt;
42
43pub trait GXExt: Default + fmt::Debug + Send + Sync + 'static {
63 type UserEvent: UserEvent + Send + Sync + 'static;
64
65 fn update_sources(&mut self) -> impl Future<Output = Result<()>> + Send;
69
70 fn do_cycle(&mut self, event: &mut Event<Self::UserEvent>) -> Result<()>;
79
80 fn is_ready(&self) -> bool;
82
83 fn clear(&mut self);
85
86 fn empty_event(&mut self) -> Self::UserEvent;
88}
89
90#[derive(Debug, Default)]
91pub struct NoExt;
92
93impl GXExt for NoExt {
94 type UserEvent = NoUserEvent;
95
96 async fn update_sources(&mut self) -> Result<()> {
97 future::pending().await
98 }
99
100 fn do_cycle(&mut self, _event: &mut Event<Self::UserEvent>) -> Result<()> {
101 Ok(())
102 }
103
104 fn is_ready(&self) -> bool {
105 false
106 }
107
108 fn clear(&mut self) {}
109
110 fn empty_event(&mut self) -> Self::UserEvent {
111 NoUserEvent
112 }
113}
114
115type UpdateBatch = GPooled<Vec<(SubId, subscriber::Event)>>;
116type WriteBatch = GPooled<Vec<WriteRequest>>;
117
118#[derive(Debug)]
119pub struct CouldNotResolve;
120
121impl fmt::Display for CouldNotResolve {
122 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
123 write!(f, "could not resolve module")
124 }
125}
126
127#[derive(Debug)]
128pub struct CompExp<X: GXExt> {
129 pub id: ExprId,
130 pub typ: Type,
131 pub output: bool,
132 rt: GXHandle<X>,
133}
134
135impl<X: GXExt> Drop for CompExp<X> {
136 fn drop(&mut self) {
137 let _ = self.rt.0.send(ToGX::Delete { id: self.id });
138 }
139}
140
141#[derive(Debug)]
142pub struct CompRes<X: GXExt> {
143 pub exprs: SmallVec<[CompExp<X>; 1]>,
144 pub env: Env<GXRt<X>, X::UserEvent>,
145}
146
147pub struct Ref<X: GXExt> {
148 pub id: ExprId,
149 pub last: Option<Value>,
150 pub bid: BindId,
151 pub target_bid: Option<BindId>,
152 rt: GXHandle<X>,
153}
154
155impl<X: GXExt> Drop for Ref<X> {
156 fn drop(&mut self) {
157 let _ = self.rt.0.send(ToGX::Delete { id: self.id });
158 }
159}
160
161impl<X: GXExt> Ref<X> {
162 pub fn set(&self, v: Value) -> Result<()> {
163 self.rt.set(self.bid, v)
164 }
165
166 pub fn set_deref<T: Into<Value>>(&self, v: T) -> Result<()> {
167 if let Some(id) = self.target_bid {
168 self.rt.set(id, v)?
169 }
170 Ok(())
171 }
172}
173
174pub struct TRef<X: GXExt, T: FromValue> {
175 pub r: Ref<X>,
176 pub t: Option<T>,
177}
178
179impl<X: GXExt, T: FromValue> TRef<X, T> {
180 pub fn new(mut r: Ref<X>) -> Result<Self> {
181 let t = r.last.take().map(|v| v.cast_to()).transpose()?;
182 Ok(TRef { r, t })
183 }
184
185 pub fn update(&mut self, id: ExprId, v: &Value) -> Result<Option<&mut T>> {
186 if self.r.id == id {
187 let v = v.clone().cast_to()?;
188 self.t = Some(v);
189 Ok(self.t.as_mut())
190 } else {
191 Ok(None)
192 }
193 }
194}
195
196impl<X: GXExt, T: Into<Value> + FromValue + Clone> TRef<X, T> {
197 pub fn set(&mut self, t: T) -> Result<()> {
198 self.t = Some(t.clone());
199 self.r.set(t.into())
200 }
201
202 pub fn set_deref(&mut self, t: T) -> Result<()> {
203 self.t = Some(t.clone());
204 self.r.set_deref(t.into())
205 }
206}
207
208atomic_id!(CallableId);
209
210pub struct Callable<X: GXExt> {
211 rt: GXHandle<X>,
212 id: CallableId,
213 env: Env<GXRt<X>, X::UserEvent>,
214 pub typ: FnType,
215 pub expr: ExprId,
216}
217
218impl<X: GXExt> Drop for Callable<X> {
219 fn drop(&mut self) {
220 let _ = self.rt.0.send(ToGX::DeleteCallable { id: self.id });
221 }
222}
223
224impl<X: GXExt> Callable<X> {
225 pub async fn call(&self, args: ValArray) -> Result<()> {
228 if self.typ.args.len() != args.len() {
229 bail!("expected {} args", self.typ.args.len())
230 }
231 for (i, (a, v)) in self.typ.args.iter().zip(args.iter()).enumerate() {
232 if !a.typ.is_a(&self.env, v) {
233 bail!("type mismatch arg {i} expected {}", a.typ)
234 }
235 }
236 self.call_unchecked(args).await
237 }
238
239 pub async fn call_unchecked(&self, args: ValArray) -> Result<()> {
243 self.rt
244 .0
245 .send(ToGX::Call { id: self.id, args })
246 .map_err(|_| anyhow!("runtime is dead"))
247 }
248}
249
250enum ToGX<X: GXExt> {
251 GetEnv {
252 res: oneshot::Sender<Env<GXRt<X>, X::UserEvent>>,
253 },
254 Delete {
255 id: ExprId,
256 },
257 Load {
258 path: PathBuf,
259 rt: GXHandle<X>,
260 res: oneshot::Sender<Result<CompRes<X>>>,
261 },
262 Compile {
263 text: ArcStr,
264 rt: GXHandle<X>,
265 res: oneshot::Sender<Result<CompRes<X>>>,
266 },
267 CompileCallable {
268 id: Value,
269 rt: GXHandle<X>,
270 res: oneshot::Sender<Result<Callable<X>>>,
271 },
272 CompileRef {
273 id: BindId,
274 rt: GXHandle<X>,
275 res: oneshot::Sender<Result<Ref<X>>>,
276 },
277 Set {
278 id: BindId,
279 v: Value,
280 },
281 Call {
282 id: CallableId,
283 args: ValArray,
284 },
285 DeleteCallable {
286 id: CallableId,
287 },
288}
289
290#[derive(Debug, Clone)]
291pub enum GXEvent<X: GXExt> {
292 Updated(ExprId, Value),
293 Env(Env<GXRt<X>, X::UserEvent>),
294}
295
296pub struct GXHandle<X: GXExt>(tmpsc::UnboundedSender<ToGX<X>>);
300
301impl<X: GXExt> fmt::Debug for GXHandle<X> {
302 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
303 write!(f, "GXHandle")
304 }
305}
306
307impl<X: GXExt> Clone for GXHandle<X> {
308 fn clone(&self) -> Self {
309 Self(self.0.clone())
310 }
311}
312
313impl<X: GXExt> GXHandle<X> {
314 async fn exec<R, F: FnOnce(oneshot::Sender<R>) -> ToGX<X>>(&self, f: F) -> Result<R> {
315 let (tx, rx) = oneshot::channel();
316 self.0.send(f(tx)).map_err(|_| anyhow!("runtime is dead"))?;
317 Ok(rx.await.map_err(|_| anyhow!("runtime did not respond"))?)
318 }
319
320 pub async fn get_env(&self) -> Result<Env<GXRt<X>, X::UserEvent>> {
322 self.exec(|res| ToGX::GetEnv { res }).await
323 }
324
325 pub async fn compile(&self, text: ArcStr) -> Result<CompRes<X>> {
333 Ok(self.exec(|tx| ToGX::Compile { text, res: tx, rt: self.clone() }).await??)
334 }
335
336 pub async fn load(&self, path: PathBuf) -> Result<CompRes<X>> {
346 Ok(self.exec(|tx| ToGX::Load { path, res: tx, rt: self.clone() }).await??)
347 }
348
349 pub async fn compile_callable(&self, id: Value) -> Result<Callable<X>> {
354 Ok(self
355 .exec(|tx| ToGX::CompileCallable { id, rt: self.clone(), res: tx })
356 .await??)
357 }
358
359 pub async fn compile_ref(&self, id: impl Into<BindId>) -> Result<Ref<X>> {
365 Ok(self
366 .exec(|tx| ToGX::CompileRef { id: id.into(), res: tx, rt: self.clone() })
367 .await??)
368 }
369
370 pub fn set<T: Into<Value>>(&self, id: BindId, v: T) -> Result<()> {
374 let v = v.into();
375 self.0.send(ToGX::Set { id, v }).map_err(|_| anyhow!("runtime is dead"))
376 }
377}
378
379#[derive(Builder)]
380#[builder(pattern = "owned")]
381pub struct GXConfig<X: GXExt> {
382 #[builder(setter(strip_option), default)]
386 resolve_timeout: Option<Duration>,
387 #[builder(setter(strip_option), default)]
389 publish_timeout: Option<Duration>,
390 ctx: ExecCtx<GXRt<X>, X::UserEvent>,
392 #[builder(setter(strip_option), default)]
394 root: Option<ArcStr>,
395 #[builder(default)]
397 resolvers: Vec<ModuleResolver>,
398 sub: tmpsc::Sender<GPooled<Vec<GXEvent<X>>>>,
400 #[builder(default)]
402 flags: BitFlags<CFlag>,
403}
404
405impl<X: GXExt> GXConfig<X> {
406 pub fn builder(
408 ctx: ExecCtx<GXRt<X>, X::UserEvent>,
409 sub: tmpsc::Sender<GPooled<Vec<GXEvent<X>>>>,
410 ) -> GXConfigBuilder<X> {
411 GXConfigBuilder::default().ctx(ctx).sub(sub)
412 }
413
414 pub async fn start(self) -> Result<GXHandle<X>> {
423 let (init_tx, init_rx) = oneshot::channel();
424 let (tx, rx) = tmpsc::unbounded_channel();
425 task::spawn(async move {
426 match GX::new(self).await {
427 Ok(bs) => {
428 let _ = init_tx.send(Ok(()));
429 if let Err(e) = bs.run(rx).await {
430 error!("run loop exited with error {e:?}")
431 }
432 }
433 Err(e) => {
434 let _ = init_tx.send(Err(e));
435 }
436 };
437 });
438 init_rx.await??;
439 Ok(GXHandle(tx))
440 }
441}