1use anyhow::{anyhow, bail, Result};
9use arcstr::ArcStr;
10use core::fmt;
11use derive_builder::Builder;
12use graphix_compiler::{
13 env::Env,
14 expr::{ExprId, ModuleResolver},
15 typ::{FnType, Type},
16 BindId, Event, ExecCtx, NoUserEvent, UserEvent,
17};
18use log::error;
19use netidx::{
20 pool::Pooled,
21 protocol::valarray::ValArray,
22 publisher::{Value, WriteRequest},
23 subscriber::{self, SubId},
24};
25use netidx_core::atomic_id;
26use netidx_value::FromValue;
27use serde_derive::{Deserialize, Serialize};
28use smallvec::SmallVec;
29use std::{future, path::PathBuf, time::Duration};
30use tokio::{
31 sync::{
32 mpsc::{self as tmpsc},
33 oneshot,
34 },
35 task,
36};
37
38mod gx;
39mod rt;
40use gx::GX;
41pub use rt::GXRt;
42
43pub trait GXExt: Default + fmt::Debug + Send + Sync + 'static {
63 type UserEvent: UserEvent + Send + Sync + 'static;
64
65 fn update_sources(&mut self) -> impl Future<Output = Result<()>> + Send;
69
70 fn do_cycle(&mut self, event: &mut Event<Self::UserEvent>) -> Result<()>;
79
80 fn is_ready(&self) -> bool;
82
83 fn clear(&mut self);
85
86 fn empty_event(&mut self) -> Self::UserEvent;
88}
89
90#[derive(Debug, Default)]
91pub struct NoExt;
92
93impl GXExt for NoExt {
94 type UserEvent = NoUserEvent;
95
96 async fn update_sources(&mut self) -> Result<()> {
97 future::pending().await
98 }
99
100 fn do_cycle(&mut self, _event: &mut Event<Self::UserEvent>) -> Result<()> {
101 Ok(())
102 }
103
104 fn is_ready(&self) -> bool {
105 false
106 }
107
108 fn clear(&mut self) {}
109
110 fn empty_event(&mut self) -> Self::UserEvent {
111 NoUserEvent
112 }
113}
114
115type UpdateBatch = Pooled<Vec<(SubId, subscriber::Event)>>;
116type WriteBatch = Pooled<Vec<WriteRequest>>;
117
118#[derive(Debug)]
119pub struct CouldNotResolve;
120
121impl fmt::Display for CouldNotResolve {
122 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
123 write!(f, "could not resolve module")
124 }
125}
126
127pub struct CompExp<X: GXExt> {
128 pub id: ExprId,
129 pub typ: Type,
130 pub output: bool,
131 rt: GXHandle<X>,
132}
133
134impl<X: GXExt> Drop for CompExp<X> {
135 fn drop(&mut self) {
136 let _ = self.rt.0.send(ToGX::Delete { id: self.id });
137 }
138}
139
140pub struct CompRes<X: GXExt> {
141 pub exprs: SmallVec<[CompExp<X>; 1]>,
142 pub env: Env<GXRt<X>, X::UserEvent>,
143}
144
145pub struct Ref<X: GXExt> {
146 pub id: ExprId,
147 pub last: Option<Value>,
148 pub bid: BindId,
149 pub target_bid: Option<BindId>,
150 rt: GXHandle<X>,
151}
152
153impl<X: GXExt> Drop for Ref<X> {
154 fn drop(&mut self) {
155 let _ = self.rt.0.send(ToGX::Delete { id: self.id });
156 }
157}
158
159impl<X: GXExt> Ref<X> {
160 pub fn set(&self, v: Value) -> Result<()> {
161 self.rt.set(self.bid, v)
162 }
163
164 pub fn set_deref<T: Into<Value>>(&self, v: T) -> Result<()> {
165 if let Some(id) = self.target_bid {
166 self.rt.set(id, v)?
167 }
168 Ok(())
169 }
170}
171
172pub struct TRef<X: GXExt, T: FromValue> {
173 pub r: Ref<X>,
174 pub t: Option<T>,
175}
176
177impl<X: GXExt, T: FromValue> TRef<X, T> {
178 pub fn new(mut r: Ref<X>) -> Result<Self> {
179 let t = r.last.take().map(|v| v.cast_to()).transpose()?;
180 Ok(TRef { r, t })
181 }
182
183 pub fn update(&mut self, id: ExprId, v: &Value) -> Result<Option<&mut T>> {
184 if self.r.id == id {
185 let v = v.clone().cast_to()?;
186 self.t = Some(v);
187 Ok(self.t.as_mut())
188 } else {
189 Ok(None)
190 }
191 }
192}
193
194impl<X: GXExt, T: Into<Value> + FromValue + Clone> TRef<X, T> {
195 pub fn set(&mut self, t: T) -> Result<()> {
196 self.t = Some(t.clone());
197 self.r.set(t.into())
198 }
199
200 pub fn set_deref(&mut self, t: T) -> Result<()> {
201 self.t = Some(t.clone());
202 self.r.set_deref(t.into())
203 }
204}
205
206atomic_id!(CallableId);
207
208pub struct Callable<X: GXExt> {
209 rt: GXHandle<X>,
210 id: CallableId,
211 env: Env<GXRt<X>, X::UserEvent>,
212 pub typ: FnType,
213 pub expr: ExprId,
214}
215
216impl<X: GXExt> Drop for Callable<X> {
217 fn drop(&mut self) {
218 let _ = self.rt.0.send(ToGX::DeleteCallable { id: self.id });
219 }
220}
221
222impl<X: GXExt> Callable<X> {
223 pub async fn call(&self, args: ValArray) -> Result<()> {
226 if self.typ.args.len() != args.len() {
227 bail!("expected {} args", self.typ.args.len())
228 }
229 for (i, (a, v)) in self.typ.args.iter().zip(args.iter()).enumerate() {
230 if !a.typ.is_a(&self.env, v) {
231 bail!("type mismatch arg {i} expected {}", a.typ)
232 }
233 }
234 self.call_unchecked(args).await
235 }
236
237 pub async fn call_unchecked(&self, args: ValArray) -> Result<()> {
241 self.rt
242 .0
243 .send(ToGX::Call { id: self.id, args })
244 .map_err(|_| anyhow!("runtime is dead"))
245 }
246}
247
248enum ToGX<X: GXExt> {
249 GetEnv {
250 res: oneshot::Sender<Env<GXRt<X>, X::UserEvent>>,
251 },
252 Delete {
253 id: ExprId,
254 },
255 Load {
256 path: PathBuf,
257 rt: GXHandle<X>,
258 res: oneshot::Sender<Result<CompRes<X>>>,
259 },
260 Compile {
261 text: ArcStr,
262 rt: GXHandle<X>,
263 res: oneshot::Sender<Result<CompRes<X>>>,
264 },
265 CompileCallable {
266 id: Value,
267 rt: GXHandle<X>,
268 res: oneshot::Sender<Result<Callable<X>>>,
269 },
270 CompileRef {
271 id: BindId,
272 rt: GXHandle<X>,
273 res: oneshot::Sender<Result<Ref<X>>>,
274 },
275 Set {
276 id: BindId,
277 v: Value,
278 },
279 Call {
280 id: CallableId,
281 args: ValArray,
282 },
283 DeleteCallable {
284 id: CallableId,
285 },
286}
287
288#[derive(Clone)]
289pub enum GXEvent<X: GXExt> {
290 Updated(ExprId, Value),
291 Env(Env<GXRt<X>, X::UserEvent>),
292}
293
294pub struct GXHandle<X: GXExt>(tmpsc::UnboundedSender<ToGX<X>>);
298
299impl<X: GXExt> Clone for GXHandle<X> {
300 fn clone(&self) -> Self {
301 Self(self.0.clone())
302 }
303}
304
305impl<X: GXExt> GXHandle<X> {
306 async fn exec<R, F: FnOnce(oneshot::Sender<R>) -> ToGX<X>>(&self, f: F) -> Result<R> {
307 let (tx, rx) = oneshot::channel();
308 self.0.send(f(tx)).map_err(|_| anyhow!("runtime is dead"))?;
309 Ok(rx.await.map_err(|_| anyhow!("runtime did not respond"))?)
310 }
311
312 pub async fn get_env(&self) -> Result<Env<GXRt<X>, X::UserEvent>> {
314 self.exec(|res| ToGX::GetEnv { res }).await
315 }
316
317 pub async fn compile(&self, text: ArcStr) -> Result<CompRes<X>> {
325 Ok(self.exec(|tx| ToGX::Compile { text, res: tx, rt: self.clone() }).await??)
326 }
327
328 pub async fn load(&self, path: PathBuf) -> Result<CompRes<X>> {
338 Ok(self.exec(|tx| ToGX::Load { path, res: tx, rt: self.clone() }).await??)
339 }
340
341 pub async fn compile_callable(&self, id: Value) -> Result<Callable<X>> {
346 Ok(self
347 .exec(|tx| ToGX::CompileCallable { id, rt: self.clone(), res: tx })
348 .await??)
349 }
350
351 pub async fn compile_ref(&self, id: impl Into<BindId>) -> Result<Ref<X>> {
357 Ok(self
358 .exec(|tx| ToGX::CompileRef { id: id.into(), res: tx, rt: self.clone() })
359 .await??)
360 }
361
362 pub fn set<T: Into<Value>>(&self, id: BindId, v: T) -> Result<()> {
366 let v = v.into();
367 self.0.send(ToGX::Set { id, v }).map_err(|_| anyhow!("runtime is dead"))
368 }
369}
370
371#[derive(Builder)]
372#[builder(pattern = "owned")]
373pub struct GXConfig<X: GXExt> {
374 #[builder(setter(strip_option), default)]
378 resolve_timeout: Option<Duration>,
379 #[builder(setter(strip_option), default)]
381 publish_timeout: Option<Duration>,
382 ctx: ExecCtx<GXRt<X>, X::UserEvent>,
384 #[builder(setter(strip_option), default)]
386 root: Option<ArcStr>,
387 #[builder(default)]
389 resolvers: Vec<ModuleResolver>,
390 sub: tmpsc::Sender<Pooled<Vec<GXEvent<X>>>>,
392}
393
394impl<X: GXExt> GXConfig<X> {
395 pub fn builder(
397 ctx: ExecCtx<GXRt<X>, X::UserEvent>,
398 sub: tmpsc::Sender<Pooled<Vec<GXEvent<X>>>>,
399 ) -> GXConfigBuilder<X> {
400 GXConfigBuilder::default().ctx(ctx).sub(sub)
401 }
402
403 pub async fn start(self) -> Result<GXHandle<X>> {
412 let (init_tx, init_rx) = oneshot::channel();
413 let (tx, rx) = tmpsc::unbounded_channel();
414 task::spawn(async move {
415 match GX::new(self).await {
416 Ok(bs) => {
417 let _ = init_tx.send(Ok(()));
418 if let Err(e) = bs.run(rx).await {
419 error!("run loop exited with error {e:?}")
420 }
421 }
422 Err(e) => {
423 let _ = init_tx.send(Err(e));
424 }
425 };
426 });
427 init_rx.await??;
428 Ok(GXHandle(tx))
429 }
430}