1#![doc(
2 html_logo_url = "https://graphix-lang.github.io/graphix/graphix-icon.svg",
3 html_favicon_url = "https://graphix-lang.github.io/graphix/graphix-icon.svg"
4)]
5use anyhow::{bail, Result};
6use arcstr::ArcStr;
7use bytes::Bytes;
8use graphix_compiler::{
9 errf, typ::FnType, typ::Type, ExecCtx, Node, Rt, Scope, TypecheckPhase, UserEvent,
10};
11use graphix_package_core::{
12 extract_cast_type, is_struct, CachedArgs, CachedArgsAsync, CachedVals, EvalCached,
13 EvalCachedAsync,
14};
15use graphix_package_sys::{get_stream, StreamKind};
16use netidx_value::{PBytes, ValArray, Value};
17use poolshark::local::LPooled;
18use std::sync::Arc;
19use tokio::{io::AsyncReadExt, io::AsyncWriteExt, sync::Mutex};
20
21fn json_to_value(json: serde_json::Value) -> Value {
24 match json {
25 serde_json::Value::Null => Value::Null,
26 serde_json::Value::Bool(b) => Value::Bool(b),
27 serde_json::Value::Number(n) => {
28 if let Some(i) = n.as_i64() {
29 Value::I64(i)
30 } else if let Some(u) = n.as_u64() {
31 Value::U64(u)
32 } else {
33 Value::F64(n.as_f64().unwrap_or(f64::NAN))
34 }
35 }
36 serde_json::Value::String(s) => Value::String(ArcStr::from(s.as_str())),
37 serde_json::Value::Array(arr) => {
38 let mut vals: LPooled<Vec<Value>> =
39 arr.into_iter().map(json_to_value).collect();
40 Value::Array(ValArray::from_iter_exact(vals.drain(..)))
41 }
42 serde_json::Value::Object(obj) => {
43 let mut pairs: LPooled<Vec<(String, Value)>> =
44 obj.into_iter().map(|(k, v)| (k, json_to_value(v))).collect();
45 pairs.sort_by(|a, b| a.0.cmp(&b.0));
46 let mut vals: LPooled<Vec<Value>> = pairs
47 .drain(..)
48 .map(|(k, v)| {
49 Value::Array(ValArray::from([
50 Value::String(ArcStr::from(k.as_str())),
51 v,
52 ]))
53 })
54 .collect();
55 Value::Array(ValArray::from_iter_exact(vals.drain(..)))
56 }
57 }
58}
59
60pub fn value_to_json(value: &Value) -> Result<serde_json::Value, String> {
61 match value {
62 Value::Null => Ok(serde_json::Value::Null),
63 Value::Bool(b) => Ok(serde_json::Value::Bool(*b)),
64 Value::I8(n) => Ok(serde_json::Value::from(*n)),
65 Value::I16(n) => Ok(serde_json::Value::from(*n)),
66 Value::I32(n) => Ok(serde_json::Value::from(*n)),
67 Value::I64(n) => Ok(serde_json::Value::from(*n)),
68 Value::U8(n) => Ok(serde_json::Value::from(*n)),
69 Value::U16(n) => Ok(serde_json::Value::from(*n)),
70 Value::U32(n) => Ok(serde_json::Value::from(*n)),
71 Value::U64(n) => Ok(serde_json::Value::from(*n)),
72 Value::V32(n) => Ok(serde_json::Value::from(*n)),
73 Value::V64(n) => Ok(serde_json::Value::from(*n)),
74 Value::Z32(n) => Ok(serde_json::Value::from(*n)),
75 Value::Z64(n) => Ok(serde_json::Value::from(*n)),
76 Value::F32(n) => {
77 let f = *n as f64;
78 if f.is_finite() {
79 Ok(serde_json::Value::from(f))
80 } else {
81 Err(format!("cannot represent {n} as JSON"))
82 }
83 }
84 Value::F64(n) => {
85 if n.is_finite() {
86 Ok(serde_json::Value::from(*n))
87 } else {
88 Err(format!("cannot represent {n} as JSON"))
89 }
90 }
91 Value::Decimal(d) => Ok(serde_json::Value::String(d.to_string())),
92 Value::String(s) => Ok(serde_json::Value::String(s.to_string())),
93 Value::Bytes(b) => {
94 let mut arr: LPooled<Vec<serde_json::Value>> =
95 b.iter().map(|byte| serde_json::Value::from(*byte)).collect();
96 Ok(serde_json::Value::Array(arr.drain(..).collect()))
97 }
98 Value::DateTime(dt) => Ok(serde_json::Value::String(dt.to_rfc3339())),
99 Value::Duration(d) => Ok(serde_json::Value::from(d.as_secs_f64())),
100 Value::Array(arr) => {
101 if is_struct(arr) {
102 let mut map = serde_json::Map::with_capacity(arr.len());
103 for v in arr.iter() {
104 if let Value::Array(pair) = v {
105 if let Value::String(k) = &pair[0] {
106 map.insert(k.to_string(), value_to_json(&pair[1])?);
107 }
108 }
109 }
110 Ok(serde_json::Value::Object(map))
111 } else {
112 let mut vals: LPooled<Vec<serde_json::Value>> =
113 arr.iter().map(value_to_json).collect::<Result<_, _>>()?;
114 Ok(serde_json::Value::Array(vals.drain(..).collect()))
115 }
116 }
117 Value::Map(m) => {
118 let mut map = serde_json::Map::with_capacity(m.len());
119 for (k, v) in m.into_iter() {
120 map.insert(format!("{k}"), value_to_json(v)?);
121 }
122 Ok(serde_json::Value::Object(map))
123 }
124 Value::Error(_) => Err("cannot serialize Error to JSON".into()),
125 Value::Abstract(_) => Err("cannot serialize abstract type to JSON".into()),
126 }
127}
128
129#[derive(Debug)]
132enum ReadInput {
133 Str(ArcStr),
134 Bytes(Bytes),
135 Stream(Arc<Mutex<Option<StreamKind>>>),
136}
137
138#[derive(Debug, Default)]
139struct JsonReadEv {
140 cast_typ: Option<Type>,
141}
142
143impl EvalCachedAsync for JsonReadEv {
144 const NAME: &str = "json_read";
145 const NEEDS_CALLSITE: bool = true;
146 type Args = ReadInput;
147
148 fn init<R: Rt, E: UserEvent>(
149 _ctx: &mut ExecCtx<R, E>,
150 _typ: &FnType,
151 resolved: Option<&FnType>,
152 _scope: &Scope,
153 _from: &[Node<R, E>],
154 _top_id: graphix_compiler::expr::ExprId,
155 ) -> Self {
156 Self { cast_typ: extract_cast_type(resolved) }
157 }
158
159 fn typecheck<R: Rt, E: UserEvent>(
160 &mut self,
161 _ctx: &mut ExecCtx<R, E>,
162 _from: &mut [Node<R, E>],
163 phase: TypecheckPhase<'_>,
164 ) -> Result<()> {
165 match phase {
166 TypecheckPhase::Lambda => Ok(()),
167 TypecheckPhase::CallSite(resolved) => {
168 self.cast_typ = extract_cast_type(Some(resolved));
169 if self.cast_typ.is_none() {
170 bail!("json read requires a concrete return type")
171 }
172 Ok(())
173 }
174 }
175 }
176
177 fn map_value<R: Rt, E: UserEvent>(
178 &mut self,
179 ctx: &mut ExecCtx<R, E>,
180 v: Value,
181 ) -> Option<Value> {
182 match self.cast_typ.as_ref() {
183 Some(typ) => Some(typ.cast_value(&ctx.env, v)),
184 None => Some(errf!("JsonErr", "no concrete return type found")),
185 }
186 }
187
188 fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
189 let v = cached.0.first()?.as_ref()?;
190 match v {
191 Value::String(s) => Some(ReadInput::Str(s.clone())),
192 Value::Bytes(b) => Some(ReadInput::Bytes((**b).clone())),
193 Value::Abstract(_) => Some(ReadInput::Stream(get_stream(cached, 0)?)),
194 _ => None,
195 }
196 }
197
198 fn eval(input: Self::Args) -> impl Future<Output = Value> + Send {
199 async move {
200 match input {
201 ReadInput::Str(s) => {
202 match serde_json::from_str::<serde_json::Value>(&s) {
203 Ok(json) => json_to_value(json),
204 Err(e) => errf!("JsonErr", "{e}"),
205 }
206 }
207 ReadInput::Bytes(b) => {
208 match serde_json::from_slice::<serde_json::Value>(&b) {
209 Ok(json) => json_to_value(json),
210 Err(e) => errf!("JsonErr", "{e}"),
211 }
212 }
213 ReadInput::Stream(stream) => {
214 let mut guard = stream.lock().await;
215 let s = match guard.as_mut() {
216 Some(s) => s,
217 None => return errf!("IOErr", "stream unavailable"),
218 };
219 let mut buf: LPooled<Vec<u8>> = LPooled::take();
220 if let Err(e) = s.read_to_end(&mut buf).await {
221 return errf!("IOErr", "read failed: {e}");
222 }
223 match serde_json::from_slice::<serde_json::Value>(&buf) {
224 Ok(json) => json_to_value(json),
225 Err(e) => errf!("JsonErr", "{e}"),
226 }
227 }
228 }
229 }
230 }
231}
232
233type JsonRead = CachedArgsAsync<JsonReadEv>;
234
235#[derive(Debug, Default)]
238struct JsonWriteStrEv;
239
240impl<R: Rt, E: UserEvent> EvalCached<R, E> for JsonWriteStrEv {
241 const NAME: &str = "json_write_str";
242 const NEEDS_CALLSITE: bool = false;
243
244 fn eval(&mut self, _ctx: &mut ExecCtx<R, E>, cached: &CachedVals) -> Option<Value> {
245 let pretty = cached.get::<bool>(0)?;
246 let v = cached.0.get(1)?.as_ref()?;
247 let json = match value_to_json(v) {
248 Ok(j) => j,
249 Err(e) => return Some(errf!("JsonErr", "{e}")),
250 };
251 let mut buf: LPooled<Vec<u8>> = LPooled::take();
252 let res = if pretty {
253 serde_json::to_writer_pretty(&mut *buf, &json)
254 } else {
255 serde_json::to_writer(&mut *buf, &json)
256 };
257 Some(match res {
258 Ok(()) => {
259 let s = unsafe { std::str::from_utf8_unchecked(&buf) };
261 Value::String(ArcStr::from(s))
262 }
263 Err(e) => errf!("JsonErr", "{e}"),
264 })
265 }
266}
267
268type JsonWriteStr = CachedArgs<JsonWriteStrEv>;
269
270#[derive(Debug, Default)]
273struct JsonWriteBytesEv;
274
275impl<R: Rt, E: UserEvent> EvalCached<R, E> for JsonWriteBytesEv {
276 const NAME: &str = "json_write_bytes";
277 const NEEDS_CALLSITE: bool = false;
278
279 fn eval(&mut self, _ctx: &mut ExecCtx<R, E>, cached: &CachedVals) -> Option<Value> {
280 let pretty = cached.get::<bool>(0)?;
281 let v = cached.0.get(1)?.as_ref()?;
282 let json = match value_to_json(v) {
283 Ok(j) => j,
284 Err(e) => return Some(errf!("JsonErr", "{e}")),
285 };
286 let mut buf: LPooled<Vec<u8>> = LPooled::take();
287 let res = if pretty {
288 serde_json::to_writer_pretty(&mut *buf, &json)
289 } else {
290 serde_json::to_writer(&mut *buf, &json)
291 };
292 Some(match res {
293 Ok(()) => Value::Bytes(PBytes::new(Bytes::copy_from_slice(&buf))),
294 Err(e) => errf!("JsonErr", "{e}"),
295 })
296 }
297}
298
299type JsonWriteBytes = CachedArgs<JsonWriteBytesEv>;
300
301#[derive(Debug, Default)]
304struct JsonWriteStreamEv;
305
306impl EvalCachedAsync for JsonWriteStreamEv {
307 const NAME: &str = "json_write_stream";
308 const NEEDS_CALLSITE: bool = false;
309 type Args = (bool, Arc<Mutex<Option<StreamKind>>>, serde_json::Value);
310
311 fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
312 let pretty = cached.get::<bool>(0)?;
313 let stream = get_stream(cached, 1)?;
314 let v = cached.0.get(2)?.as_ref()?;
315 let json = value_to_json(v).ok()?;
316 Some((pretty, stream, json))
317 }
318
319 fn eval((pretty, stream, json): Self::Args) -> impl Future<Output = Value> + Send {
320 async move {
321 let buf = if pretty {
322 serde_json::to_vec_pretty(&json)
323 } else {
324 serde_json::to_vec(&json)
325 };
326 let buf = match buf {
327 Ok(b) => b,
328 Err(e) => return errf!("JsonErr", "{e}"),
329 };
330 let mut guard = stream.lock().await;
331 let s = match guard.as_mut() {
332 Some(s) => s,
333 None => return errf!("IOErr", "stream unavailable"),
334 };
335 match s.write_all(&buf).await {
336 Ok(()) => Value::Null,
337 Err(e) => errf!("IOErr", "write failed: {e}"),
338 }
339 }
340 }
341}
342
343type JsonWriteStream = CachedArgsAsync<JsonWriteStreamEv>;
344
345graphix_derive::defpackage! {
348 builtins => [
349 JsonRead,
350 JsonWriteStr,
351 JsonWriteBytes,
352 JsonWriteStream,
353 ],
354}