1#![doc(
2 html_logo_url = "https://graphix-lang.github.io/graphix/graphix-icon.svg",
3 html_favicon_url = "https://graphix-lang.github.io/graphix/graphix-icon.svg"
4)]
5use anyhow::{bail, Result};
6use arcstr::ArcStr;
7use bytes::Bytes;
8use graphix_compiler::{
9 errf, typ::FnType, typ::Type, ExecCtx, Node, Rt, Scope, TypecheckPhase, UserEvent,
10};
11use graphix_package_core::{
12 extract_cast_type, CachedArgs, CachedArgsAsync, CachedVals, EvalCached,
13 EvalCachedAsync,
14};
15use graphix_package_sys::{get_stream, StreamKind};
16use netidx_core::pack::Pack;
17use netidx_value::{PBytes, Value};
18use poolshark::local::LPooled;
19use std::sync::Arc;
20use tokio::{io::AsyncReadExt, io::AsyncWriteExt, sync::Mutex};
21
22#[derive(Debug)]
25enum ReadInput {
26 Bytes(Bytes),
27 Stream(Arc<Mutex<Option<StreamKind>>>),
28}
29
30#[derive(Debug, Default)]
33struct PackReadEv {
34 cast_typ: Option<Type>,
35}
36
37impl EvalCachedAsync for PackReadEv {
38 const NAME: &str = "pack_read";
39 const NEEDS_CALLSITE: bool = true;
40 type Args = ReadInput;
41
42 fn init<R: Rt, E: UserEvent>(
43 _ctx: &mut ExecCtx<R, E>,
44 _typ: &FnType,
45 resolved: Option<&FnType>,
46 _scope: &Scope,
47 _from: &[Node<R, E>],
48 _top_id: graphix_compiler::expr::ExprId,
49 ) -> Self {
50 Self { cast_typ: extract_cast_type(resolved) }
51 }
52
53 fn typecheck<R: Rt, E: UserEvent>(
54 &mut self,
55 _ctx: &mut ExecCtx<R, E>,
56 _from: &mut [Node<R, E>],
57 phase: TypecheckPhase<'_>,
58 ) -> Result<()> {
59 match phase {
60 TypecheckPhase::Lambda => Ok(()),
61 TypecheckPhase::CallSite(resolved) => {
62 self.cast_typ = extract_cast_type(Some(resolved));
63 if self.cast_typ.is_none() {
64 bail!("pack::read requires a concrete return type")
65 }
66 Ok(())
67 }
68 }
69 }
70
71 fn map_value<R: Rt, E: UserEvent>(
72 &mut self,
73 ctx: &mut ExecCtx<R, E>,
74 v: Value,
75 ) -> Option<Value> {
76 match &self.cast_typ {
77 Some(typ) => Some(typ.cast_value(&ctx.env, v)),
78 None => Some(errf!("PackErr", "no concrete return type found")),
79 }
80 }
81
82 fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
83 let v = cached.0.first()?.as_ref()?;
84 match v {
85 Value::Bytes(b) => Some(ReadInput::Bytes((**b).clone())),
86 Value::Abstract(_) => Some(ReadInput::Stream(get_stream(cached, 0)?)),
87 _ => None,
88 }
89 }
90
91 fn eval(input: Self::Args) -> impl Future<Output = Value> + Send {
92 async move {
93 match input {
94 ReadInput::Bytes(b) => match Value::decode(&mut b.as_ref()) {
95 Ok(v) => v,
96 Err(e) => errf!("PackErr", "{e}"),
97 },
98 ReadInput::Stream(stream) => {
99 let mut guard = stream.lock().await;
100 let s = match guard.as_mut() {
101 Some(s) => s,
102 None => return errf!("IOErr", "stream unavailable"),
103 };
104 let mut buf: LPooled<Vec<u8>> = LPooled::take();
105 if let Err(e) = s.read_to_end(&mut buf).await {
106 return errf!("IOErr", "read failed: {e}");
107 }
108 match Value::decode(&mut buf.as_slice()) {
109 Ok(v) => v,
110 Err(e) => errf!("PackErr", "{e}"),
111 }
112 }
113 }
114 }
115 }
116}
117
118type PackRead = CachedArgsAsync<PackReadEv>;
119
120#[derive(Debug, Default)]
123struct PackWriteBytesEv;
124
125impl<R: Rt, E: UserEvent> EvalCached<R, E> for PackWriteBytesEv {
126 const NAME: &str = "pack_write_bytes";
127 const NEEDS_CALLSITE: bool = false;
128
129 fn eval(&mut self, _ctx: &mut ExecCtx<R, E>, cached: &CachedVals) -> Option<Value> {
130 let v = cached.0.first()?.as_ref()?;
131 let len = v.encoded_len();
132 let mut buf = Vec::with_capacity(len);
133 Some(match v.encode(&mut buf) {
134 Ok(()) => Value::Bytes(PBytes::new(Bytes::from(buf))),
135 Err(e) => errf!("PackErr", "{e}"),
136 })
137 }
138}
139
140type PackWriteBytes = CachedArgs<PackWriteBytesEv>;
141
142#[derive(Debug, Default)]
145struct PackWriteStreamEv;
146
147impl EvalCachedAsync for PackWriteStreamEv {
148 const NAME: &str = "pack_write_stream";
149 const NEEDS_CALLSITE: bool = false;
150 type Args = (Arc<Mutex<Option<StreamKind>>>, Vec<u8>);
151
152 fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
153 let stream = get_stream(cached, 0)?;
154 let v = cached.0.get(1)?.as_ref()?;
155 let len = v.encoded_len();
156 let mut buf = Vec::with_capacity(len);
157 v.encode(&mut buf).ok()?;
158 Some((stream, buf))
159 }
160
161 fn eval((stream, buf): Self::Args) -> impl Future<Output = Value> + Send {
162 async move {
163 let mut guard = stream.lock().await;
164 let s = match guard.as_mut() {
165 Some(s) => s,
166 None => return errf!("IOErr", "stream unavailable"),
167 };
168 match s.write_all(&buf).await {
169 Ok(()) => Value::Null,
170 Err(e) => errf!("IOErr", "write failed: {e}"),
171 }
172 }
173 }
174}
175
176type PackWriteStream = CachedArgsAsync<PackWriteStreamEv>;
177
178graphix_derive::defpackage! {
181 builtins => [
182 PackRead,
183 PackWriteBytes,
184 PackWriteStream,
185 ],
186}