1use anyhow::{anyhow, bail, Result};
9use arcstr::ArcStr;
10use core::fmt;
11use derive_builder::Builder;
12use graphix_compiler::{
13 env::Env,
14 expr::{ExprId, ModuleResolver},
15 typ::{FnType, Type},
16 BindId, Event, ExecCtx, NoUserEvent, UserEvent,
17};
18use log::error;
19use netidx::{
20 pool::Pooled,
21 protocol::valarray::ValArray,
22 publisher::{Value, WriteRequest},
23 subscriber::{self, SubId},
24};
25use netidx_core::atomic_id;
26use serde_derive::{Deserialize, Serialize};
27use smallvec::SmallVec;
28use std::{future, path::PathBuf, time::Duration};
29use tokio::{
30 sync::{
31 mpsc::{self as tmpsc},
32 oneshot,
33 },
34 task,
35};
36
37mod gx;
38mod rt;
39use gx::GX;
40pub use rt::GXRt;
41
42pub trait GXExt: Default + fmt::Debug + Send + Sync + 'static {
62 type UserEvent: UserEvent + Send + Sync + 'static;
63
64 fn update_sources(&mut self) -> impl Future<Output = Result<()>> + Send;
68
69 fn do_cycle(&mut self, event: &mut Event<Self::UserEvent>) -> Result<()>;
78
79 fn is_ready(&self) -> bool;
81
82 fn clear(&mut self);
84
85 fn empty_event(&mut self) -> Self::UserEvent;
87}
88
89#[derive(Debug, Default)]
90pub struct NoExt;
91
92impl GXExt for NoExt {
93 type UserEvent = NoUserEvent;
94
95 async fn update_sources(&mut self) -> Result<()> {
96 future::pending().await
97 }
98
99 fn do_cycle(&mut self, _event: &mut Event<Self::UserEvent>) -> Result<()> {
100 Ok(())
101 }
102
103 fn is_ready(&self) -> bool {
104 false
105 }
106
107 fn clear(&mut self) {}
108
109 fn empty_event(&mut self) -> Self::UserEvent {
110 NoUserEvent
111 }
112}
113
114type UpdateBatch = Pooled<Vec<(SubId, subscriber::Event)>>;
115type WriteBatch = Pooled<Vec<WriteRequest>>;
116
117#[derive(Debug)]
118pub struct CouldNotResolve;
119
120impl fmt::Display for CouldNotResolve {
121 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122 write!(f, "could not resolve module")
123 }
124}
125
126pub struct CompExp<X: GXExt> {
127 pub id: ExprId,
128 pub typ: Type,
129 pub output: bool,
130 rt: GXHandle<X>,
131}
132
133impl<X: GXExt> Drop for CompExp<X> {
134 fn drop(&mut self) {
135 let _ = self.rt.0.send(ToGX::Delete { id: self.id });
136 }
137}
138
139pub struct CompRes<X: GXExt> {
140 pub exprs: SmallVec<[CompExp<X>; 1]>,
141 pub env: Env<GXRt<X>, X::UserEvent>,
142}
143
144pub struct Ref<X: GXExt> {
145 pub id: ExprId,
146 pub last: Option<Value>,
147 pub bid: BindId,
148 pub target_bid: Option<BindId>,
149 rt: GXHandle<X>,
150}
151
152impl<X: GXExt> Drop for Ref<X> {
153 fn drop(&mut self) {
154 let _ = self.rt.0.send(ToGX::Delete { id: self.id });
155 }
156}
157
158impl<X: GXExt> Ref<X> {
159 pub fn set(&self, v: Value) -> Result<()> {
160 self.rt.set(self.bid, v)
161 }
162
163 pub fn set_deref<T: Into<Value>>(&self, v: T) -> Result<()> {
164 if let Some(id) = self.target_bid {
165 self.rt.set(id, v)?
166 }
167 Ok(())
168 }
169}
170
171atomic_id!(CallableId);
172
173pub struct Callable<X: GXExt> {
174 rt: GXHandle<X>,
175 id: CallableId,
176 env: Env<GXRt<X>, X::UserEvent>,
177 pub typ: FnType,
178 pub expr: ExprId,
179}
180
181impl<X: GXExt> Drop for Callable<X> {
182 fn drop(&mut self) {
183 let _ = self.rt.0.send(ToGX::DeleteCallable { id: self.id });
184 }
185}
186
187impl<X: GXExt> Callable<X> {
188 pub async fn call(&self, args: ValArray) -> Result<()> {
191 if self.typ.args.len() != args.len() {
192 bail!("expected {} args", self.typ.args.len())
193 }
194 for (i, (a, v)) in self.typ.args.iter().zip(args.iter()).enumerate() {
195 if !a.typ.is_a(&self.env, v) {
196 bail!("type mismatch arg {i} expected {}", a.typ)
197 }
198 }
199 self.call_unchecked(args).await
200 }
201
202 pub async fn call_unchecked(&self, args: ValArray) -> Result<()> {
206 self.rt
207 .0
208 .send(ToGX::Call { id: self.id, args })
209 .map_err(|_| anyhow!("runtime is dead"))
210 }
211}
212
213enum ToGX<X: GXExt> {
214 GetEnv {
215 res: oneshot::Sender<Env<GXRt<X>, X::UserEvent>>,
216 },
217 Delete {
218 id: ExprId,
219 },
220 Load {
221 path: PathBuf,
222 rt: GXHandle<X>,
223 res: oneshot::Sender<Result<CompRes<X>>>,
224 },
225 Compile {
226 text: ArcStr,
227 rt: GXHandle<X>,
228 res: oneshot::Sender<Result<CompRes<X>>>,
229 },
230 CompileCallable {
231 id: Value,
232 rt: GXHandle<X>,
233 res: oneshot::Sender<Result<Callable<X>>>,
234 },
235 CompileRef {
236 id: BindId,
237 rt: GXHandle<X>,
238 res: oneshot::Sender<Result<Ref<X>>>,
239 },
240 Set {
241 id: BindId,
242 v: Value,
243 },
244 Call {
245 id: CallableId,
246 args: ValArray,
247 },
248 DeleteCallable {
249 id: CallableId,
250 },
251}
252
253#[derive(Clone)]
254pub enum GXEvent<X: GXExt> {
255 Updated(ExprId, Value),
256 Env(Env<GXRt<X>, X::UserEvent>),
257}
258
259pub struct GXHandle<X: GXExt>(tmpsc::UnboundedSender<ToGX<X>>);
263
264impl<X: GXExt> Clone for GXHandle<X> {
265 fn clone(&self) -> Self {
266 Self(self.0.clone())
267 }
268}
269
270impl<X: GXExt> GXHandle<X> {
271 async fn exec<R, F: FnOnce(oneshot::Sender<R>) -> ToGX<X>>(&self, f: F) -> Result<R> {
272 let (tx, rx) = oneshot::channel();
273 self.0.send(f(tx)).map_err(|_| anyhow!("runtime is dead"))?;
274 Ok(rx.await.map_err(|_| anyhow!("runtime did not respond"))?)
275 }
276
277 pub async fn get_env(&self) -> Result<Env<GXRt<X>, X::UserEvent>> {
279 self.exec(|res| ToGX::GetEnv { res }).await
280 }
281
282 pub async fn compile(&self, text: ArcStr) -> Result<CompRes<X>> {
290 Ok(self.exec(|tx| ToGX::Compile { text, res: tx, rt: self.clone() }).await??)
291 }
292
293 pub async fn load(&self, path: PathBuf) -> Result<CompRes<X>> {
303 Ok(self.exec(|tx| ToGX::Load { path, res: tx, rt: self.clone() }).await??)
304 }
305
306 pub async fn compile_callable(&self, id: Value) -> Result<Callable<X>> {
311 Ok(self
312 .exec(|tx| ToGX::CompileCallable { id, rt: self.clone(), res: tx })
313 .await??)
314 }
315
316 pub async fn compile_ref(&self, id: impl Into<BindId>) -> Result<Ref<X>> {
322 Ok(self
323 .exec(|tx| ToGX::CompileRef { id: id.into(), res: tx, rt: self.clone() })
324 .await??)
325 }
326
327 pub fn set<T: Into<Value>>(&self, id: BindId, v: T) -> Result<()> {
331 let v = v.into();
332 self.0.send(ToGX::Set { id, v }).map_err(|_| anyhow!("runtime is dead"))
333 }
334}
335
336#[derive(Builder)]
337#[builder(pattern = "owned")]
338pub struct GXConfig<X: GXExt> {
339 #[builder(setter(strip_option), default)]
343 resolve_timeout: Option<Duration>,
344 #[builder(setter(strip_option), default)]
346 publish_timeout: Option<Duration>,
347 ctx: ExecCtx<GXRt<X>, X::UserEvent>,
349 #[builder(setter(strip_option), default)]
351 root: Option<ArcStr>,
352 #[builder(default)]
354 resolvers: Vec<ModuleResolver>,
355 sub: tmpsc::Sender<Pooled<Vec<GXEvent<X>>>>,
357}
358
359impl<X: GXExt> GXConfig<X> {
360 pub fn builder(
362 ctx: ExecCtx<GXRt<X>, X::UserEvent>,
363 sub: tmpsc::Sender<Pooled<Vec<GXEvent<X>>>>,
364 ) -> GXConfigBuilder<X> {
365 GXConfigBuilder::default().ctx(ctx).sub(sub)
366 }
367
368 pub async fn start(self) -> Result<GXHandle<X>> {
377 let (init_tx, init_rx) = oneshot::channel();
378 let (tx, rx) = tmpsc::unbounded_channel();
379 task::spawn(async move {
380 match GX::new(self).await {
381 Ok(bs) => {
382 let _ = init_tx.send(Ok(()));
383 if let Err(e) = bs.run(rx).await {
384 error!("run loop exited with error {e:?}")
385 }
386 }
387 Err(e) => {
388 let _ = init_tx.send(Err(e));
389 }
390 };
391 });
392 init_rx.await??;
393 Ok(GXHandle(tx))
394 }
395}