Skip to main content

graphix_package_pack/
lib.rs

1#![doc(
2    html_logo_url = "https://graphix-lang.github.io/graphix/graphix-icon.svg",
3    html_favicon_url = "https://graphix-lang.github.io/graphix/graphix-icon.svg"
4)]
5use anyhow::{bail, Result};
6use arcstr::ArcStr;
7use bytes::Bytes;
8use graphix_compiler::{
9    errf, typ::FnType, typ::Type, ExecCtx, Node, Rt, Scope, TypecheckPhase, UserEvent,
10};
11use graphix_package_core::{
12    extract_cast_type, CachedArgs, CachedArgsAsync, CachedVals, EvalCached,
13    EvalCachedAsync,
14};
15use graphix_package_sys::{get_stream, StreamKind};
16use netidx_core::pack::Pack;
17use netidx_value::{PBytes, Value};
18use poolshark::local::LPooled;
19use std::sync::Arc;
20use tokio::{io::AsyncReadExt, io::AsyncWriteExt, sync::Mutex};
21
22// ── ReadInput ────────────────────────────────────────────────
23
24#[derive(Debug)]
25enum ReadInput {
26    Bytes(Bytes),
27    Stream(Arc<Mutex<Option<StreamKind>>>),
28}
29
30// ── PackRead (async) ─────────────────────────────────────────
31
32#[derive(Debug, Default)]
33struct PackReadEv {
34    cast_typ: Option<Type>,
35}
36
37impl EvalCachedAsync for PackReadEv {
38    const NAME: &str = "pack_read";
39    const NEEDS_CALLSITE: bool = true;
40    type Args = ReadInput;
41
42    fn init<R: Rt, E: UserEvent>(
43        _ctx: &mut ExecCtx<R, E>,
44        _typ: &FnType,
45        resolved: Option<&FnType>,
46        _scope: &Scope,
47        _from: &[Node<R, E>],
48        _top_id: graphix_compiler::expr::ExprId,
49    ) -> Self {
50        Self { cast_typ: extract_cast_type(resolved) }
51    }
52
53    fn typecheck<R: Rt, E: UserEvent>(
54        &mut self,
55        _ctx: &mut ExecCtx<R, E>,
56        _from: &mut [Node<R, E>],
57        phase: TypecheckPhase<'_>,
58    ) -> Result<()> {
59        match phase {
60            TypecheckPhase::Lambda => Ok(()),
61            TypecheckPhase::CallSite(resolved) => {
62                self.cast_typ = extract_cast_type(Some(resolved));
63                if self.cast_typ.is_none() {
64                    bail!("pack::read requires a concrete return type")
65                }
66                Ok(())
67            }
68        }
69    }
70
71    fn map_value<R: Rt, E: UserEvent>(
72        &mut self,
73        ctx: &mut ExecCtx<R, E>,
74        v: Value,
75    ) -> Option<Value> {
76        match &self.cast_typ {
77            Some(typ) => Some(typ.cast_value(&ctx.env, v)),
78            None => Some(errf!("PackErr", "no concrete return type found")),
79        }
80    }
81
82    fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
83        let v = cached.0.first()?.as_ref()?;
84        match v {
85            Value::Bytes(b) => Some(ReadInput::Bytes((**b).clone())),
86            Value::Abstract(_) => Some(ReadInput::Stream(get_stream(cached, 0)?)),
87            _ => None,
88        }
89    }
90
91    fn eval(input: Self::Args) -> impl Future<Output = Value> + Send {
92        async move {
93            match input {
94                ReadInput::Bytes(b) => match Value::decode(&mut b.as_ref()) {
95                    Ok(v) => v,
96                    Err(e) => errf!("PackErr", "{e}"),
97                },
98                ReadInput::Stream(stream) => {
99                    let mut guard = stream.lock().await;
100                    let s = match guard.as_mut() {
101                        Some(s) => s,
102                        None => return errf!("IOErr", "stream unavailable"),
103                    };
104                    let mut buf: LPooled<Vec<u8>> = LPooled::take();
105                    if let Err(e) = s.read_to_end(&mut buf).await {
106                        return errf!("IOErr", "read failed: {e}");
107                    }
108                    match Value::decode(&mut buf.as_slice()) {
109                        Ok(v) => v,
110                        Err(e) => errf!("PackErr", "{e}"),
111                    }
112                }
113            }
114        }
115    }
116}
117
118type PackRead = CachedArgsAsync<PackReadEv>;
119
120// ── PackWriteBytes (sync) ────────────────────────────────────
121
122#[derive(Debug, Default)]
123struct PackWriteBytesEv;
124
125impl<R: Rt, E: UserEvent> EvalCached<R, E> for PackWriteBytesEv {
126    const NAME: &str = "pack_write_bytes";
127    const NEEDS_CALLSITE: bool = false;
128
129    fn eval(&mut self, _ctx: &mut ExecCtx<R, E>, cached: &CachedVals) -> Option<Value> {
130        let v = cached.0.first()?.as_ref()?;
131        let len = v.encoded_len();
132        let mut buf = Vec::with_capacity(len);
133        Some(match v.encode(&mut buf) {
134            Ok(()) => Value::Bytes(PBytes::new(Bytes::from(buf))),
135            Err(e) => errf!("PackErr", "{e}"),
136        })
137    }
138}
139
140type PackWriteBytes = CachedArgs<PackWriteBytesEv>;
141
142// ── PackWriteStream (async) ──────────────────────────────────
143
144#[derive(Debug, Default)]
145struct PackWriteStreamEv;
146
147impl EvalCachedAsync for PackWriteStreamEv {
148    const NAME: &str = "pack_write_stream";
149    const NEEDS_CALLSITE: bool = false;
150    type Args = (Arc<Mutex<Option<StreamKind>>>, Vec<u8>);
151
152    fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
153        let stream = get_stream(cached, 0)?;
154        let v = cached.0.get(1)?.as_ref()?;
155        let len = v.encoded_len();
156        let mut buf = Vec::with_capacity(len);
157        v.encode(&mut buf).ok()?;
158        Some((stream, buf))
159    }
160
161    fn eval((stream, buf): Self::Args) -> impl Future<Output = Value> + Send {
162        async move {
163            let mut guard = stream.lock().await;
164            let s = match guard.as_mut() {
165                Some(s) => s,
166                None => return errf!("IOErr", "stream unavailable"),
167            };
168            match s.write_all(&buf).await {
169                Ok(()) => Value::Null,
170                Err(e) => errf!("IOErr", "write failed: {e}"),
171            }
172        }
173    }
174}
175
176type PackWriteStream = CachedArgsAsync<PackWriteStreamEv>;
177
178// ── Package registration ─────────────────────────────────────
179
180graphix_derive::defpackage! {
181    builtins => [
182        PackRead,
183        PackWriteBytes,
184        PackWriteStream,
185    ],
186}