1#![doc(
2 html_logo_url = "https://graphix-lang.github.io/graphix/graphix-icon.svg",
3 html_favicon_url = "https://graphix-lang.github.io/graphix/graphix-icon.svg"
4)]
5use arcstr::ArcStr;
6use compact_str::CompactString;
7use graphix_compiler::{
8 errf, expr::ExprId, typ::FnType, Apply, BuiltIn, Event, ExecCtx, Node, Rt, Scope,
9 UserEvent,
10};
11use graphix_package_core::{
12 CachedArgs, CachedArgsAsync, CachedVals, EvalCached, EvalCachedAsync, ProgramArgs,
13};
14use graphix_rt::GXRt;
15use netidx_value::{abstract_type::AbstractWrapper, Abstract, ValArray, Value};
16use poolshark::local::LPooled;
17use std::{
18 cell::RefCell,
19 cmp::Ordering,
20 hash::{Hash, Hasher},
21 path::{Path, PathBuf},
22 pin::Pin,
23 sync::{Arc, LazyLock},
24 task::{Context, Poll},
25};
26use tempfile::TempDir;
27use tokio::{
28 io::{AsyncRead, AsyncWrite, ReadBuf},
29 sync::Mutex,
30};
31
32pub(crate) mod dir;
33pub(crate) mod dirs_mod;
34pub(crate) mod fs;
35pub(crate) mod io;
36pub(crate) mod metadata;
37pub(crate) mod net;
38pub(crate) mod tcp;
39pub(crate) mod time;
40pub(crate) mod tls;
41pub(crate) mod watch;
42
43pub enum StreamKind {
46 File(tokio::fs::File),
47 Tcp(tokio::net::TcpStream),
48 Tls(tokio_rustls::TlsStream<tokio::net::TcpStream>),
49 Stdin(tokio::io::Stdin),
50 Stdout(tokio::io::Stdout),
51 Stderr(tokio::io::Stderr),
52}
53
54impl std::fmt::Debug for StreamKind {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 match self {
57 StreamKind::File(_) => f.debug_tuple("File").finish(),
58 StreamKind::Tcp(s) => f.debug_tuple("Tcp").field(s).finish(),
59 StreamKind::Tls(_) => f.debug_tuple("Tls").finish(),
60 StreamKind::Stdin(_) => f.debug_tuple("Stdin").finish(),
61 StreamKind::Stdout(_) => f.debug_tuple("Stdout").finish(),
62 StreamKind::Stderr(_) => f.debug_tuple("Stderr").finish(),
63 }
64 }
65}
66
67impl StreamKind {
68 pub(crate) fn tcp_ref(&self) -> Option<&tokio::net::TcpStream> {
69 match self {
70 StreamKind::Tcp(s) => Some(s),
71 StreamKind::Tls(s) => {
72 let (tcp, _) = s.get_ref();
73 Some(tcp)
74 }
75 _ => None,
76 }
77 }
78}
79
80impl AsyncRead for StreamKind {
81 fn poll_read(
82 self: Pin<&mut Self>,
83 cx: &mut Context<'_>,
84 buf: &mut ReadBuf<'_>,
85 ) -> Poll<std::io::Result<()>> {
86 match self.get_mut() {
87 StreamKind::File(s) => Pin::new(s).poll_read(cx, buf),
88 StreamKind::Tcp(s) => Pin::new(s).poll_read(cx, buf),
89 StreamKind::Tls(s) => Pin::new(s).poll_read(cx, buf),
90 StreamKind::Stdin(s) => Pin::new(s).poll_read(cx, buf),
91 StreamKind::Stdout(_) | StreamKind::Stderr(_) => Poll::Ready(Err(
92 std::io::Error::new(std::io::ErrorKind::Unsupported, "cannot read from stdout/stderr"),
93 )),
94 }
95 }
96}
97
98impl AsyncWrite for StreamKind {
99 fn poll_write(
100 self: Pin<&mut Self>,
101 cx: &mut Context<'_>,
102 buf: &[u8],
103 ) -> Poll<std::io::Result<usize>> {
104 match self.get_mut() {
105 StreamKind::File(s) => Pin::new(s).poll_write(cx, buf),
106 StreamKind::Tcp(s) => Pin::new(s).poll_write(cx, buf),
107 StreamKind::Tls(s) => Pin::new(s).poll_write(cx, buf),
108 StreamKind::Stdout(s) => Pin::new(s).poll_write(cx, buf),
109 StreamKind::Stderr(s) => Pin::new(s).poll_write(cx, buf),
110 StreamKind::Stdin(_) => Poll::Ready(Err(
111 std::io::Error::new(std::io::ErrorKind::Unsupported, "cannot write to stdin"),
112 )),
113 }
114 }
115
116 fn poll_flush(
117 self: Pin<&mut Self>,
118 cx: &mut Context<'_>,
119 ) -> Poll<std::io::Result<()>> {
120 match self.get_mut() {
121 StreamKind::File(s) => Pin::new(s).poll_flush(cx),
122 StreamKind::Tcp(s) => Pin::new(s).poll_flush(cx),
123 StreamKind::Tls(s) => Pin::new(s).poll_flush(cx),
124 StreamKind::Stdout(s) => Pin::new(s).poll_flush(cx),
125 StreamKind::Stderr(s) => Pin::new(s).poll_flush(cx),
126 StreamKind::Stdin(_) => Poll::Ready(Ok(())),
127 }
128 }
129
130 fn poll_shutdown(
131 self: Pin<&mut Self>,
132 cx: &mut Context<'_>,
133 ) -> Poll<std::io::Result<()>> {
134 match self.get_mut() {
135 StreamKind::File(s) => Pin::new(s).poll_shutdown(cx),
136 StreamKind::Tcp(s) => Pin::new(s).poll_shutdown(cx),
137 StreamKind::Tls(s) => Pin::new(s).poll_shutdown(cx),
138 StreamKind::Stdout(s) => Pin::new(s).poll_shutdown(cx),
139 StreamKind::Stderr(s) => Pin::new(s).poll_shutdown(cx),
140 StreamKind::Stdin(_) => Poll::Ready(Ok(())),
141 }
142 }
143}
144
145#[derive(Debug, Clone)]
148pub struct StreamValue {
149 pub inner: Arc<Mutex<Option<StreamKind>>>,
150}
151
152impl PartialEq for StreamValue {
153 fn eq(&self, other: &Self) -> bool {
154 Arc::ptr_eq(&self.inner, &other.inner)
155 }
156}
157
158impl Eq for StreamValue {}
159
160impl PartialOrd for StreamValue {
161 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
162 Some(self.cmp(other))
163 }
164}
165
166impl Ord for StreamValue {
167 fn cmp(&self, other: &Self) -> Ordering {
168 Arc::as_ptr(&self.inner).cmp(&Arc::as_ptr(&other.inner))
169 }
170}
171
172impl Hash for StreamValue {
173 fn hash<H: Hasher>(&self, state: &mut H) {
174 Arc::as_ptr(&self.inner).hash(state)
175 }
176}
177
178graphix_package_core::impl_no_pack!(StreamValue);
179
180pub static STREAM_WRAPPER: LazyLock<AbstractWrapper<StreamValue>> = LazyLock::new(|| {
181 let id = uuid::Uuid::from_bytes([
182 0xb7, 0xc8, 0xd9, 0xea, 0xfb, 0x0c, 0x4d, 0x1e, 0x2f, 0x30, 0x41, 0x52, 0x63,
183 0x74, 0x85, 0x96,
184 ]);
185 Abstract::register::<StreamValue>(id).expect("failed to register StreamValue")
186});
187
188pub(crate) fn wrap_file(file: tokio::fs::File) -> Value {
189 STREAM_WRAPPER
190 .wrap(StreamValue { inner: Arc::new(Mutex::new(Some(StreamKind::File(file)))) })
191}
192
193pub(crate) fn wrap_tcp(stream: tokio::net::TcpStream) -> Value {
194 STREAM_WRAPPER
195 .wrap(StreamValue { inner: Arc::new(Mutex::new(Some(StreamKind::Tcp(stream)))) })
196}
197
198pub fn get_stream(
199 cached: &CachedVals,
200 idx: usize,
201) -> Option<Arc<Mutex<Option<StreamKind>>>> {
202 match cached.0.get(idx)?.as_ref()? {
203 Value::Abstract(a) => {
204 let sv = a.downcast_ref::<StreamValue>()?;
205 Some(sv.inner.clone())
206 }
207 _ => None,
208 }
209}
210
211pub fn get_stream_value(cached: &CachedVals, idx: usize) -> Option<StreamValue> {
212 match cached.0.get(idx)?.as_ref()? {
213 Value::Abstract(a) => a.downcast_ref::<StreamValue>().cloned(),
214 _ => None,
215 }
216}
217
218#[derive(Debug)]
221struct TempDirValue {
222 path: ArcStr,
223 _dir: TempDir,
224}
225
226impl PartialEq for TempDirValue {
227 fn eq(&self, other: &Self) -> bool {
228 self.path == other.path
229 }
230}
231
232impl Eq for TempDirValue {}
233
234impl PartialOrd for TempDirValue {
235 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
236 Some(self.cmp(other))
237 }
238}
239
240impl Ord for TempDirValue {
241 fn cmp(&self, other: &Self) -> Ordering {
242 self.path.cmp(&other.path)
243 }
244}
245
246impl Hash for TempDirValue {
247 fn hash<H: Hasher>(&self, state: &mut H) {
248 self.path.hash(state)
249 }
250}
251
252graphix_package_core::impl_no_pack!(TempDirValue);
253
254static TEMPDIR_WRAPPER: LazyLock<AbstractWrapper<TempDirValue>> = LazyLock::new(|| {
255 let id = uuid::Uuid::from_bytes([
256 0xa1, 0xb2, 0xc3, 0xd4, 0xe5, 0xf6, 0x47, 0x89, 0x9a, 0xbc, 0xde, 0xf0, 0x12,
257 0x34, 0x56, 0x78,
258 ]);
259 Abstract::register::<TempDirValue>(id).expect("failed to register TempDirValue")
260});
261
262#[derive(Debug)]
263enum Name {
264 Prefix(ArcStr),
265 Suffix(ArcStr),
266}
267
268#[derive(Debug)]
269pub(crate) struct TempDirArgs {
270 dir: Option<ArcStr>,
271 name: Option<Name>,
272}
273
274#[derive(Debug, Default)]
275pub(crate) struct GxTempDirEv;
276
277impl EvalCachedAsync for GxTempDirEv {
278 const NAME: &str = "sys_tempdir";
279 const NEEDS_CALLSITE: bool = false;
280 type Args = TempDirArgs;
281
282 fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
283 if cached.0.iter().any(|v| v.is_none()) {
284 None
285 } else {
286 let dir = cached.get::<Option<ArcStr>>(0).flatten();
287 let name = cached
288 .get::<Option<(ArcStr, ArcStr)>>(1)
289 .and_then(|v| v)
290 .and_then(|(tag, v)| match &*tag {
291 "Prefix" => Some(Name::Prefix(v)),
292 "Suffix" => Some(Name::Suffix(v)),
293 _ => None,
294 });
295 let _ = cached.get::<Value>(2)?;
296 Some(TempDirArgs { dir, name })
297 }
298 }
299
300 fn eval(args: Self::Args) -> impl Future<Output = Value> + Send {
301 async move {
302 let td = tokio::task::spawn_blocking(|| match (args.dir, args.name) {
303 (None, None) => TempDir::new(),
304 (None, Some(Name::Prefix(pfx))) => TempDir::with_prefix(&*pfx),
305 (None, Some(Name::Suffix(sfx))) => TempDir::with_suffix(&*sfx),
306 (Some(dir), None) => TempDir::new_in(&*dir),
307 (Some(dir), Some(Name::Prefix(pfx))) => {
308 TempDir::with_prefix_in(&*pfx, &*dir)
309 }
310 (Some(dir), Some(Name::Suffix(sfx))) => {
311 TempDir::with_suffix_in(&*sfx, &*dir)
312 }
313 })
314 .await;
315 match td {
316 Err(e) => errf!("IOError", "failed to spawn create temp dir {e:?}"),
317 Ok(Err(e)) => errf!("IOError", "failed to create temp dir {e:?}"),
318 Ok(Ok(td)) => {
319 use std::fmt::Write;
320 let mut buf = CompactString::new("");
321 write!(buf, "{}", td.path().display()).unwrap();
322 let path = ArcStr::from(buf.as_str());
323 TEMPDIR_WRAPPER.wrap(TempDirValue { path, _dir: td })
324 }
325 }
326 }
327 }
328}
329
330pub(crate) type GxTempDir = CachedArgsAsync<GxTempDirEv>;
331
332#[derive(Debug, Default)]
333pub(crate) struct TempDirPathEv;
334
335impl<R: Rt, E: UserEvent> EvalCached<R, E> for TempDirPathEv {
336 const NAME: &str = "sys_tempdir_path";
337 const NEEDS_CALLSITE: bool = false;
338
339 fn eval(&mut self, _ctx: &mut ExecCtx<R, E>, from: &CachedVals) -> Option<Value> {
340 let v = from.0.first()?.as_ref()?;
341 match v {
342 Value::Abstract(a) => {
343 let td = a.downcast_ref::<TempDirValue>()?;
344 Some(Value::String(td.path.clone()))
345 }
346 _ => None,
347 }
348 }
349}
350
351pub(crate) type TempDirPath = CachedArgs<TempDirPathEv>;
352
353pub(crate) fn convert_path(path: &Path) -> ArcStr {
354 thread_local! {
355 static BUF: RefCell<String> = RefCell::new(String::new());
356 }
357 BUF.with_borrow_mut(|buf| {
358 buf.clear();
359 use std::fmt::Write;
360 write!(buf, "{}", path.display()).unwrap();
361 ArcStr::from(buf.as_str())
362 })
363}
364
365#[derive(Debug, Default)]
366pub(crate) struct JoinPathEv;
367
368impl<R: Rt, E: UserEvent> EvalCached<R, E> for JoinPathEv {
369 const NAME: &str = "sys_join_path";
370 const NEEDS_CALLSITE: bool = false;
371
372 fn eval(&mut self, _ctx: &mut ExecCtx<R, E>, from: &CachedVals) -> Option<Value> {
373 let mut parts: LPooled<Vec<ArcStr>> = LPooled::take();
374 for part in from.0.iter() {
375 match part {
376 None => return None,
377 Some(Value::String(s)) => parts.push(s.clone()),
378 Some(Value::Array(a)) => {
379 for part in a.iter() {
380 match part {
381 Value::String(s) => parts.push(s.clone()),
382 _ => return None,
383 }
384 }
385 }
386 _ => return None,
387 }
388 }
389 thread_local! {
390 static BUF: RefCell<PathBuf> = RefCell::new(PathBuf::new());
391 }
392 BUF.with_borrow_mut(|path| {
393 path.clear();
394 for part in parts.drain(..) {
395 path.push(&*part)
396 }
397 Some(Value::String(convert_path(&path)))
398 })
399 }
400}
401
402pub(crate) type JoinPath = CachedArgs<JoinPathEv>;
403
404#[derive(Debug)]
407pub(crate) struct Args {
408 fired: bool,
409}
410
411impl<R: Rt, E: UserEvent> BuiltIn<R, E> for Args {
412 const NAME: &str = "sys_args";
413 const NEEDS_CALLSITE: bool = false;
414
415 fn init<'a, 'b, 'c, 'd>(
416 _ctx: &'a mut ExecCtx<R, E>,
417 _typ: &'a FnType,
418 _resolved: Option<&'d FnType>,
419 _scope: &'b Scope,
420 _from: &'c [Node<R, E>],
421 _top_id: ExprId,
422 ) -> anyhow::Result<Box<dyn Apply<R, E>>> {
423 Ok(Box::new(Self { fired: false }))
424 }
425}
426
427impl<R: Rt, E: UserEvent> Apply<R, E> for Args {
428 fn update(
429 &mut self,
430 ctx: &mut ExecCtx<R, E>,
431 _from: &mut [Node<R, E>],
432 event: &mut Event<E>,
433 ) -> Option<Value> {
434 if event.init && !self.fired {
435 self.fired = true;
436 let pargs = ctx.libstate.get_or_default::<ProgramArgs>();
437 let arr: ValArray =
438 pargs.0.iter().map(|s| Value::String(s.clone())).collect();
439 Some(Value::Array(arr))
440 } else {
441 None
442 }
443 }
444
445 fn delete(&mut self, _ctx: &mut ExecCtx<R, E>) {}
446
447 fn sleep(&mut self, _ctx: &mut ExecCtx<R, E>) {
448 self.fired = false;
449 }
450}
451
452graphix_derive::defpackage! {
453 builtins => [
454 Args,
455 GxTempDir,
456 TempDirPath,
457 JoinPath,
458 metadata::IsFile,
459 metadata::IsDir,
460 metadata::Metadata,
461 watch::CreateWatcher,
462 watch::WatchApply,
463 watch::WatchPath,
464 watch::WatchEvents,
465 fs::ReadAll,
466 fs::ReadAllBin,
467 fs::WriteAll,
468 fs::WriteAllBin,
469 fs::RemoveFile,
470 fs::FileOpen,
471 fs::FileSeek,
472 fs::FileFstat,
473 fs::FileTruncate,
474 dir::ReadDir,
475 dir::CreateDir,
476 dir::RemoveDir,
477 io::IoRead,
478 io::IoReadExact,
479 io::IoWrite,
480 io::IoWriteExact,
481 io::IoFlush,
482 io::IoStdin,
483 io::IoStdout,
484 io::IoStderr,
485 tcp::TcpConnect,
486 tcp::TcpListen,
487 tcp::TcpAccept,
488 tcp::TcpShutdown,
489 tcp::TcpPeerAddr,
490 tcp::TcpLocalAddr,
491 tcp::TcpListenerAddr,
492 tls::TlsConnect,
493 tls::TlsAccept,
494 net::Write,
495 net::Subscribe,
496 net::RpcCall,
497 net::List,
498 net::ListTable,
499 net::Publish as net::Publish<GXRt<X>, X::UserEvent>,
500 net::PublishRpc as net::PublishRpc<GXRt<X>, X::UserEvent>,
501 time::AfterIdle,
502 time::Timer,
503 time::Now,
504 dirs_mod::HomeDir,
505 dirs_mod::CacheDir,
506 dirs_mod::ConfigDir,
507 dirs_mod::ConfigLocalDir,
508 dirs_mod::DataDir,
509 dirs_mod::DataLocalDir,
510 dirs_mod::ExecutableDir,
511 dirs_mod::PreferenceDir,
512 dirs_mod::RuntimeDir,
513 dirs_mod::StateDir,
514 dirs_mod::AudioDir,
515 dirs_mod::DesktopDir,
516 dirs_mod::DocumentDir,
517 dirs_mod::DownloadDir,
518 dirs_mod::FontDir,
519 dirs_mod::PictureDir,
520 dirs_mod::PublicDir,
521 dirs_mod::TemplateDir,
522 dirs_mod::VideoDir,
523 ],
524}