Skip to main content

graphix_package_json/
lib.rs

1#![doc(
2    html_logo_url = "https://graphix-lang.github.io/graphix/graphix-icon.svg",
3    html_favicon_url = "https://graphix-lang.github.io/graphix/graphix-icon.svg"
4)]
5use anyhow::{bail, Result};
6use arcstr::ArcStr;
7use bytes::Bytes;
8use graphix_compiler::{
9    errf, typ::FnType, typ::Type, ExecCtx, Node, Rt, Scope, TypecheckPhase, UserEvent,
10};
11use graphix_package_core::{
12    extract_cast_type, is_struct, CachedArgs, CachedArgsAsync, CachedVals, EvalCached,
13    EvalCachedAsync,
14};
15use graphix_package_sys::{get_stream, StreamKind};
16use netidx_value::{PBytes, ValArray, Value};
17use poolshark::local::LPooled;
18use std::sync::Arc;
19use tokio::{io::AsyncReadExt, io::AsyncWriteExt, sync::Mutex};
20
21// ── JSON ↔ Value conversion ──────────────────────────────────────
22
23fn json_to_value(json: serde_json::Value) -> Value {
24    match json {
25        serde_json::Value::Null => Value::Null,
26        serde_json::Value::Bool(b) => Value::Bool(b),
27        serde_json::Value::Number(n) => {
28            if let Some(i) = n.as_i64() {
29                Value::I64(i)
30            } else if let Some(u) = n.as_u64() {
31                Value::U64(u)
32            } else {
33                Value::F64(n.as_f64().unwrap_or(f64::NAN))
34            }
35        }
36        serde_json::Value::String(s) => Value::String(ArcStr::from(s.as_str())),
37        serde_json::Value::Array(arr) => {
38            let mut vals: LPooled<Vec<Value>> =
39                arr.into_iter().map(json_to_value).collect();
40            Value::Array(ValArray::from_iter_exact(vals.drain(..)))
41        }
42        serde_json::Value::Object(obj) => {
43            let mut pairs: LPooled<Vec<(String, Value)>> =
44                obj.into_iter().map(|(k, v)| (k, json_to_value(v))).collect();
45            pairs.sort_by(|a, b| a.0.cmp(&b.0));
46            let mut vals: LPooled<Vec<Value>> = pairs
47                .drain(..)
48                .map(|(k, v)| {
49                    Value::Array(ValArray::from([
50                        Value::String(ArcStr::from(k.as_str())),
51                        v,
52                    ]))
53                })
54                .collect();
55            Value::Array(ValArray::from_iter_exact(vals.drain(..)))
56        }
57    }
58}
59
60pub fn value_to_json(value: &Value) -> Result<serde_json::Value, String> {
61    match value {
62        Value::Null => Ok(serde_json::Value::Null),
63        Value::Bool(b) => Ok(serde_json::Value::Bool(*b)),
64        Value::I8(n) => Ok(serde_json::Value::from(*n)),
65        Value::I16(n) => Ok(serde_json::Value::from(*n)),
66        Value::I32(n) => Ok(serde_json::Value::from(*n)),
67        Value::I64(n) => Ok(serde_json::Value::from(*n)),
68        Value::U8(n) => Ok(serde_json::Value::from(*n)),
69        Value::U16(n) => Ok(serde_json::Value::from(*n)),
70        Value::U32(n) => Ok(serde_json::Value::from(*n)),
71        Value::U64(n) => Ok(serde_json::Value::from(*n)),
72        Value::V32(n) => Ok(serde_json::Value::from(*n)),
73        Value::V64(n) => Ok(serde_json::Value::from(*n)),
74        Value::Z32(n) => Ok(serde_json::Value::from(*n)),
75        Value::Z64(n) => Ok(serde_json::Value::from(*n)),
76        Value::F32(n) => {
77            let f = *n as f64;
78            if f.is_finite() {
79                Ok(serde_json::Value::from(f))
80            } else {
81                Err(format!("cannot represent {n} as JSON"))
82            }
83        }
84        Value::F64(n) => {
85            if n.is_finite() {
86                Ok(serde_json::Value::from(*n))
87            } else {
88                Err(format!("cannot represent {n} as JSON"))
89            }
90        }
91        Value::Decimal(d) => Ok(serde_json::Value::String(d.to_string())),
92        Value::String(s) => Ok(serde_json::Value::String(s.to_string())),
93        Value::Bytes(b) => {
94            let mut arr: LPooled<Vec<serde_json::Value>> =
95                b.iter().map(|byte| serde_json::Value::from(*byte)).collect();
96            Ok(serde_json::Value::Array(arr.drain(..).collect()))
97        }
98        Value::DateTime(dt) => Ok(serde_json::Value::String(dt.to_rfc3339())),
99        Value::Duration(d) => Ok(serde_json::Value::from(d.as_secs_f64())),
100        Value::Array(arr) => {
101            if is_struct(arr) {
102                let mut map = serde_json::Map::with_capacity(arr.len());
103                for v in arr.iter() {
104                    if let Value::Array(pair) = v {
105                        if let Value::String(k) = &pair[0] {
106                            map.insert(k.to_string(), value_to_json(&pair[1])?);
107                        }
108                    }
109                }
110                Ok(serde_json::Value::Object(map))
111            } else {
112                let mut vals: LPooled<Vec<serde_json::Value>> =
113                    arr.iter().map(value_to_json).collect::<Result<_, _>>()?;
114                Ok(serde_json::Value::Array(vals.drain(..).collect()))
115            }
116        }
117        Value::Map(m) => {
118            let mut map = serde_json::Map::with_capacity(m.len());
119            for (k, v) in m.into_iter() {
120                map.insert(format!("{k}"), value_to_json(v)?);
121            }
122            Ok(serde_json::Value::Object(map))
123        }
124        Value::Error(_) => Err("cannot serialize Error to JSON".into()),
125        Value::Abstract(_) => Err("cannot serialize abstract type to JSON".into()),
126    }
127}
128
129// ── JsonRead (async — handles string, bytes, and stream) ────────
130
131#[derive(Debug)]
132enum ReadInput {
133    Str(ArcStr),
134    Bytes(Bytes),
135    Stream(Arc<Mutex<Option<StreamKind>>>),
136}
137
138#[derive(Debug, Default)]
139struct JsonReadEv {
140    cast_typ: Option<Type>,
141}
142
143impl EvalCachedAsync for JsonReadEv {
144    const NAME: &str = "json_read";
145    const NEEDS_CALLSITE: bool = true;
146    type Args = ReadInput;
147
148    fn init<R: Rt, E: UserEvent>(
149        _ctx: &mut ExecCtx<R, E>,
150        _typ: &FnType,
151        resolved: Option<&FnType>,
152        _scope: &Scope,
153        _from: &[Node<R, E>],
154        _top_id: graphix_compiler::expr::ExprId,
155    ) -> Self {
156        Self { cast_typ: extract_cast_type(resolved) }
157    }
158
159    fn typecheck<R: Rt, E: UserEvent>(
160        &mut self,
161        _ctx: &mut ExecCtx<R, E>,
162        _from: &mut [Node<R, E>],
163        phase: TypecheckPhase<'_>,
164    ) -> Result<()> {
165        match phase {
166            TypecheckPhase::Lambda => Ok(()),
167            TypecheckPhase::CallSite(resolved) => {
168                self.cast_typ = extract_cast_type(Some(resolved));
169                if self.cast_typ.is_none() {
170                    bail!("json read requires a concrete return type")
171                }
172                Ok(())
173            }
174        }
175    }
176
177    fn map_value<R: Rt, E: UserEvent>(
178        &mut self,
179        ctx: &mut ExecCtx<R, E>,
180        v: Value,
181    ) -> Option<Value> {
182        match self.cast_typ.as_ref() {
183            Some(typ) => Some(typ.cast_value(&ctx.env, v)),
184            None => Some(errf!("JsonErr", "no concrete return type found")),
185        }
186    }
187
188    fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
189        let v = cached.0.first()?.as_ref()?;
190        match v {
191            Value::String(s) => Some(ReadInput::Str(s.clone())),
192            Value::Bytes(b) => Some(ReadInput::Bytes((**b).clone())),
193            Value::Abstract(_) => Some(ReadInput::Stream(get_stream(cached, 0)?)),
194            _ => None,
195        }
196    }
197
198    fn eval(input: Self::Args) -> impl Future<Output = Value> + Send {
199        async move {
200            match input {
201                ReadInput::Str(s) => {
202                    match serde_json::from_str::<serde_json::Value>(&s) {
203                        Ok(json) => json_to_value(json),
204                        Err(e) => errf!("JsonErr", "{e}"),
205                    }
206                }
207                ReadInput::Bytes(b) => {
208                    match serde_json::from_slice::<serde_json::Value>(&b) {
209                        Ok(json) => json_to_value(json),
210                        Err(e) => errf!("JsonErr", "{e}"),
211                    }
212                }
213                ReadInput::Stream(stream) => {
214                    let mut guard = stream.lock().await;
215                    let s = match guard.as_mut() {
216                        Some(s) => s,
217                        None => return errf!("IOErr", "stream unavailable"),
218                    };
219                    let mut buf: LPooled<Vec<u8>> = LPooled::take();
220                    if let Err(e) = s.read_to_end(&mut buf).await {
221                        return errf!("IOErr", "read failed: {e}");
222                    }
223                    match serde_json::from_slice::<serde_json::Value>(&buf) {
224                        Ok(json) => json_to_value(json),
225                        Err(e) => errf!("JsonErr", "{e}"),
226                    }
227                }
228            }
229        }
230    }
231}
232
233type JsonRead = CachedArgsAsync<JsonReadEv>;
234
235// ── JsonWriteStr (sync) ──────────────────────────────────────────
236
237#[derive(Debug, Default)]
238struct JsonWriteStrEv;
239
240impl<R: Rt, E: UserEvent> EvalCached<R, E> for JsonWriteStrEv {
241    const NAME: &str = "json_write_str";
242    const NEEDS_CALLSITE: bool = false;
243
244    fn eval(&mut self, _ctx: &mut ExecCtx<R, E>, cached: &CachedVals) -> Option<Value> {
245        let pretty = cached.get::<bool>(0)?;
246        let v = cached.0.get(1)?.as_ref()?;
247        let json = match value_to_json(v) {
248            Ok(j) => j,
249            Err(e) => return Some(errf!("JsonErr", "{e}")),
250        };
251        let mut buf: LPooled<Vec<u8>> = LPooled::take();
252        let res = if pretty {
253            serde_json::to_writer_pretty(&mut *buf, &json)
254        } else {
255            serde_json::to_writer(&mut *buf, &json)
256        };
257        Some(match res {
258            Ok(()) => {
259                // serde_json always produces valid UTF-8
260                let s = unsafe { std::str::from_utf8_unchecked(&buf) };
261                Value::String(ArcStr::from(s))
262            }
263            Err(e) => errf!("JsonErr", "{e}"),
264        })
265    }
266}
267
268type JsonWriteStr = CachedArgs<JsonWriteStrEv>;
269
270// ── JsonWriteBytes (sync) ────────────────────────────────────────
271
272#[derive(Debug, Default)]
273struct JsonWriteBytesEv;
274
275impl<R: Rt, E: UserEvent> EvalCached<R, E> for JsonWriteBytesEv {
276    const NAME: &str = "json_write_bytes";
277    const NEEDS_CALLSITE: bool = false;
278
279    fn eval(&mut self, _ctx: &mut ExecCtx<R, E>, cached: &CachedVals) -> Option<Value> {
280        let pretty = cached.get::<bool>(0)?;
281        let v = cached.0.get(1)?.as_ref()?;
282        let json = match value_to_json(v) {
283            Ok(j) => j,
284            Err(e) => return Some(errf!("JsonErr", "{e}")),
285        };
286        let mut buf: LPooled<Vec<u8>> = LPooled::take();
287        let res = if pretty {
288            serde_json::to_writer_pretty(&mut *buf, &json)
289        } else {
290            serde_json::to_writer(&mut *buf, &json)
291        };
292        Some(match res {
293            Ok(()) => Value::Bytes(PBytes::new(Bytes::copy_from_slice(&buf))),
294            Err(e) => errf!("JsonErr", "{e}"),
295        })
296    }
297}
298
299type JsonWriteBytes = CachedArgs<JsonWriteBytesEv>;
300
301// ── JsonWriteStream (async) ──────────────────────────────────────
302
303#[derive(Debug, Default)]
304struct JsonWriteStreamEv;
305
306impl EvalCachedAsync for JsonWriteStreamEv {
307    const NAME: &str = "json_write_stream";
308    const NEEDS_CALLSITE: bool = false;
309    type Args = (bool, Arc<Mutex<Option<StreamKind>>>, serde_json::Value);
310
311    fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
312        let pretty = cached.get::<bool>(0)?;
313        let stream = get_stream(cached, 1)?;
314        let v = cached.0.get(2)?.as_ref()?;
315        let json = value_to_json(v).ok()?;
316        Some((pretty, stream, json))
317    }
318
319    fn eval((pretty, stream, json): Self::Args) -> impl Future<Output = Value> + Send {
320        async move {
321            let buf = if pretty {
322                serde_json::to_vec_pretty(&json)
323            } else {
324                serde_json::to_vec(&json)
325            };
326            let buf = match buf {
327                Ok(b) => b,
328                Err(e) => return errf!("JsonErr", "{e}"),
329            };
330            let mut guard = stream.lock().await;
331            let s = match guard.as_mut() {
332                Some(s) => s,
333                None => return errf!("IOErr", "stream unavailable"),
334            };
335            match s.write_all(&buf).await {
336                Ok(()) => Value::Null,
337                Err(e) => errf!("IOErr", "write failed: {e}"),
338            }
339        }
340    }
341}
342
343type JsonWriteStream = CachedArgsAsync<JsonWriteStreamEv>;
344
345// ── Package registration ─────────────────────────────────────────
346
347graphix_derive::defpackage! {
348    builtins => [
349        JsonRead,
350        JsonWriteStr,
351        JsonWriteBytes,
352        JsonWriteStream,
353    ],
354}