spl/stream/
rw.rs

1use std::{
2    collections::HashMap,
3    hint::black_box,
4    io::{Read, Write},
5    net::{TcpStream, UdpSocket},
6    process::{self, Stdio},
7    sync::{Arc, LazyLock},
8};
9
10use crate::*;
11
12use fs::OpenOptions;
13use mutex::Mut;
14use stream::StreamExtraData;
15
16static STREAM_TYPES: LazyLock<Arc<Mut<HashMap<String, StreamType>>>> =
17    LazyLock::new(|| Arc::new(Mut::new(HashMap::new())));
18
19/// Registers a custom stream type.
20pub fn register_stream_type(
21    name: &str,
22    supplier: impl Fn(&mut Stack) -> Result<Stream, Error> + Sync + Send + 'static,
23) {
24    STREAM_TYPES
25        .lock()
26        .insert(name.to_owned(), StreamType::from(supplier));
27}
28
29/// Gets a stream type by name.
30pub fn get_stream_type(name: String) -> Option<StreamType> {
31    STREAM_TYPES.lock_ro().get(&name).cloned()
32}
33
34/// An SPL stream type.
35#[derive(Clone)]
36pub struct StreamType {
37    func: Arc<Box<dyn Fn(&mut Stack) -> Result<Stream, Error> + Sync + Send + 'static>>,
38}
39
40impl StreamType {
41    pub fn make_stream(&self, stack: &mut Stack) -> Result<Stream, Error> {
42        (self.func)(stack)
43    }
44}
45
46/// An SPL stream, holding a reader and a writer, and a function to close it.
47pub struct Stream {
48    pub(super) reader: Box<dyn Read + Send + Sync + 'static>,
49    pub(super) _writer_storage: Option<Box<dyn Write + Send + Sync + 'static>>,
50    pub(super) writer: &'static mut (dyn Write + Send + Sync + 'static),
51    pub extra: StreamExtraData,
52}
53
54impl Stream {
55    pub fn new<T: Read + Write + Send + Sync + 'static>(main: T) -> Self {
56        let mut rw = Box::new(main);
57        Self {
58            writer: unsafe {
59                (rw.as_mut() as *mut (dyn Write + Send + Sync + 'static))
60                    .as_mut()
61                    .unwrap()
62            },
63            _writer_storage: None,
64            reader: rw,
65            extra: StreamExtraData::default(),
66        }
67    }
68    pub fn new_split(
69        reader: impl Read + Send + Sync + 'static,
70        writer: impl Write + Send + Sync + 'static,
71    ) -> Self {
72        let mut bx = Box::new(writer);
73        Self {
74            reader: Box::new(reader),
75            writer: unsafe {
76                (bx.as_mut() as *mut (dyn Write + Send + Sync + 'static))
77                    .as_mut()
78                    .unwrap()
79            },
80            _writer_storage: Some(bx),
81            extra: StreamExtraData::default(),
82        }
83    }
84
85    pub fn append_extra(mut self, f: impl Fn(&mut StreamExtraData)) -> Stream {
86        f(&mut self.extra);
87        self
88    }
89
90    pub fn shutdown_write(&mut self) {
91        let mut bx = Box::new(IgnoreWrite());
92        self.writer = unsafe {
93            (bx.as_mut() as *mut (dyn Write + Send + Sync + 'static))
94                .as_mut()
95                .unwrap()
96        };
97        self._writer_storage = Some(bx);
98    }
99}
100
101impl Read for Stream {
102    fn read_vectored(&mut self, bufs: &mut [std::io::IoSliceMut<'_>]) -> std::io::Result<usize> {
103        self.reader.read_vectored(bufs)
104    }
105
106    fn read_to_end(&mut self, buf: &mut Vec<u8>) -> std::io::Result<usize> {
107        self.reader.read_to_end(buf)
108    }
109
110    fn read_to_string(&mut self, buf: &mut String) -> std::io::Result<usize> {
111        self.reader.read_to_string(buf)
112    }
113
114    fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
115        self.reader.read_exact(buf)
116    }
117
118    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
119        self.reader.read(buf)
120    }
121}
122
123impl Write for Stream {
124    fn write_vectored(&mut self, bufs: &[std::io::IoSlice<'_>]) -> std::io::Result<usize> {
125        self.writer.write_vectored(bufs)
126    }
127
128    fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
129        self.writer.write_all(buf)
130    }
131
132    fn write_fmt(&mut self, fmt: std::fmt::Arguments<'_>) -> std::io::Result<()> {
133        self.writer.write_fmt(fmt)
134    }
135
136    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
137        self.writer.write(buf)
138    }
139
140    fn flush(&mut self) -> std::io::Result<()> {
141        self.writer.flush()
142    }
143}
144
145struct IgnoreWrite();
146impl Write for IgnoreWrite {
147    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
148        Ok(buf.len())
149    }
150
151    fn flush(&mut self) -> std::io::Result<()> {
152        Ok(())
153    }
154}
155
156impl<T> From<T> for StreamType
157where
158    T: Fn(&mut Stack) -> Result<Stream, Error> + Sync + Send + 'static,
159{
160    fn from(value: T) -> Self {
161        Self {
162            func: Arc::new(Box::new(value)),
163        }
164    }
165}
166
167pub fn new_stream(stack: &mut Stack) -> OError {
168    require_on_stack!(s, Str, stack, "new-stream");
169    let stream = get_stream_type(s.clone())
170        .ok_or_else(|| stack.error(ErrorKind::VariableNotFound(format!("__stream-type-{s}"))))?
171        .make_stream(stack)?;
172    let stream = runtime_mut(move |mut rt| Ok(rt.register_stream(stream)))?;
173    stack.push(Value::Mega(stream.0 as i128).spl());
174    Ok(())
175}
176
177pub fn write_stream(stack: &mut Stack) -> OError {
178    require_on_stack!(id, Mega, stack, "write-stream");
179    require_byte_array_on_stack!(a, stack, "write-stream");
180    let stream = runtime(|rt| {
181        rt.get_stream(id as u128)
182            .ok_or_else(|| stack.error(ErrorKind::VariableNotFound(format!("__stream-{id}"))))
183    })?;
184    stack.push(
185        Value::Mega(
186            stream
187                .lock()
188                .write(&a)
189                .map_err(|x| stack.error(ErrorKind::IO(format!("{x:?}"))))? as i128,
190        )
191        .spl(),
192    );
193    black_box(&stream.lock_ro()._writer_storage);
194    Ok(())
195}
196
197pub fn write_all_stream(stack: &mut Stack) -> OError {
198    require_on_stack!(id, Mega, stack, "write-all-stream");
199    require_byte_array_on_stack!(a, stack, "write-all-stream");
200    let stream = runtime(|rt| {
201        rt.get_stream(id as u128)
202            .ok_or_else(|| stack.error(ErrorKind::VariableNotFound(format!("__stream-{id}"))))
203    })?;
204    stream
205        .lock()
206        .write_all(&a)
207        .map_err(|x| stack.error(ErrorKind::IO(format!("{x:?}"))))?;
208    black_box(&stream.lock_ro()._writer_storage);
209    Ok(())
210}
211
212pub fn flush_stream(stack: &mut Stack) -> OError {
213    require_on_stack!(id, Mega, stack, "flush-stream");
214    let stream = runtime(|rt| {
215        rt.get_stream(id as u128)
216            .ok_or_else(|| stack.error(ErrorKind::VariableNotFound(format!("__stream-{id}"))))
217    })?;
218    stream
219        .lock()
220        .flush()
221        .map_err(|x| stack.error(ErrorKind::IO(format!("{x:?}"))))?;
222    black_box(&stream.lock_ro()._writer_storage);
223    Ok(())
224}
225
226pub fn read_stream(stack: &mut Stack) -> OError {
227    require_on_stack!(id, Mega, stack, "read-stream");
228    let array = stack.pop();
229    let kind = array.lock_ro().kind.lock_ro().get_name();
230    let stream = runtime(|rt| {
231        rt.get_stream(id as u128)
232            .ok_or_else(|| stack.error(ErrorKind::VariableNotFound(format!("__stream-{id}"))))
233    })?;
234    if kind == "array" {
235        require_mut_array!(a, array, stack, "read-stream");
236        let mut vec = vec![0; a.len()];
237        stack.push(
238            Value::Mega(
239                stream
240                    .lock()
241                    .read(&mut vec[..])
242                    .map_err(|x| stack.error(ErrorKind::IO(format!("{x:?}"))))?
243                    as i128,
244            )
245            .spl(),
246        );
247        a.clone_from_slice(
248            &vec.into_iter()
249                .map(|x| Value::Int(x as i32).spl())
250                .collect::<Vec<_>>(),
251        );
252    }
253    if kind == "bytearray" {
254        require_mut!(a, ByteArray, array, stack, "read-stream");
255        stack.push(
256            Value::Mega(
257                stream
258                    .lock()
259                    .read(a)
260                    .map_err(|x| stack.error(ErrorKind::IO(format!("{x:?}"))))?
261                    as i128,
262            )
263            .spl(),
264        );
265    }
266    stack.push(array);
267    Ok(())
268}
269
270pub fn read_all_stream(stack: &mut Stack) -> OError {
271    require_on_stack!(id, Mega, stack, "read-all-stream");
272    let array = stack.pop();
273    let kind = array.lock_ro().kind.lock_ro().get_name();
274    let stream = runtime(|rt| {
275        rt.get_stream(id as u128)
276            .ok_or_else(|| stack.error(ErrorKind::VariableNotFound(format!("__stream-{id}"))))
277    })?;
278    if kind == "array" {
279        require_mut_array!(a, array, stack, "read-all-stream");
280        let mut vec = vec![0; a.len()];
281        stream
282            .lock()
283            .read_exact(&mut vec[..])
284            .map_err(|x| stack.error(ErrorKind::IO(format!("{x:?}"))))?;
285        a.clone_from_slice(
286            &vec.into_iter()
287                .map(|x| Value::Int(x as i32).spl())
288                .collect::<Vec<_>>(),
289        );
290    }
291    if kind == "bytearray" {
292        require_mut!(a, ByteArray, array, stack, "read-stream");
293        stream
294            .lock()
295            .read_exact(a)
296            .map_err(|x| stack.error(ErrorKind::IO(format!("{x:?}"))))?;
297    }
298    stack.push(array);
299    Ok(())
300}
301
302pub fn close_stream(stack: &mut Stack) -> OError {
303    require_on_stack!(id, Mega, stack, "close-stream");
304    runtime_mut(|mut rt| rt.destroy_stream(id as u128));
305    Ok(())
306}
307
308pub fn shutdown_input_stream(stack: &mut Stack) -> OError {
309    require_on_stack!(id, Mega, stack, "shutdown-input-stream");
310    let stream = runtime(|rt| {
311        rt.get_stream(id as u128)
312            .ok_or_else(|| stack.error(ErrorKind::VariableNotFound(format!("__stream-{id}"))))
313    })?;
314    stream.lock().shutdown_write();
315    Ok(())
316}
317
318pub(super) fn stream_file(stack: &mut Stack) -> Result<Stream, Error> {
319    let truncate = stack.pop().lock_ro().is_truthy();
320    require_on_stack!(path, Str, stack, "FILE new-stream");
321    Ok(Stream::new(
322        OpenOptions::new()
323            .read(!truncate)
324            .write(true)
325            .create(truncate)
326            .truncate(truncate)
327            .open(path)
328            .map_err(|x| stack.error(ErrorKind::IO(x.to_string())))?,
329    ))
330}
331
332pub(super) fn stream_tcp(stack: &mut Stack) -> Result<Stream, Error> {
333    require_int_on_stack!(port, stack, "TCP new-stream");
334    require_on_stack!(ip, Str, stack, "TCP new-stream");
335    Ok(Stream::new(
336        TcpStream::connect((ip, port as u16))
337            .map_err(|x| stack.error(ErrorKind::IO(x.to_string())))?,
338    ))
339}
340
341pub(super) fn stream_udp(stack: &mut Stack) -> Result<Stream, Error> {
342    require_int_on_stack!(port, stack, "UDP new-stream");
343    require_on_stack!(ip, Str, stack, "UDP new-stream");
344    require_int_on_stack!(self_port, stack, "UDP new-stream");
345    require_on_stack!(self_ip, Str, stack, "UDP new-stream");
346    let sock = UdpSocket::bind((self_ip, self_port as u16))
347        .map_err(|x| stack.error(ErrorKind::IO(x.to_string())))?;
348    sock.connect((ip, port as u16))
349        .map_err(|x| stack.error(ErrorKind::IO(x.to_string())))?;
350    struct UdpRW(UdpSocket);
351    impl Write for UdpRW {
352        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
353            self.0.send(buf)
354        }
355
356        fn flush(&mut self) -> std::io::Result<()> {
357            Ok(())
358        }
359    }
360    impl Read for UdpRW {
361        fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
362            self.0.recv(buf)
363        }
364    }
365    Ok(Stream::new(UdpRW(sock)))
366}
367
368pub(super) fn stream_cmd(stack: &mut Stack) -> Result<Stream, Error> {
369    require_on_stack!(a, Array, stack, "CMD new-stream");
370    let mut args = Vec::new();
371    for item in a.iter() {
372        if let Value::Str(ref s) = item.lock_ro().native {
373            args.push(s.to_owned());
374        }
375    }
376    if args.is_empty() {
377        return stack.err(ErrorKind::InvalidCall("CMD new-stream".to_owned()));
378    }
379    let mut command = process::Command::new(&args[0])
380        .args(&args[1..])
381        .stdin(Stdio::piped())
382        .stdout(Stdio::piped())
383        .stderr(Stdio::null())
384        .spawn()
385        .map_err(|x| stack.error(ErrorKind::IO(x.to_string())))?;
386    let stream = Stream::new_split(
387        command.stdout.take().unwrap(),
388        command.stdin.take().unwrap(),
389    );
390    runtime(|rt| rt.child(command));
391    Ok(stream)
392}
393
394pub(super) fn get_stream_peer(stack: &mut Stack) -> OError {
395    require_on_stack!(id, Mega, stack, "get-stream-peer");
396    let Some((addr, port)) = runtime(|rt| -> Result<_, Error> {
397        Ok(rt
398            .get_stream(id as u128)
399            .ok_or(stack.error(ErrorKind::VariableNotFound(format!("__stream-{id}"))))?
400            .lock_ro()
401            .extra
402            .peer
403            .clone())
404    })?
405    else {
406        stack.push(Value::Null.spl());
407        stack.push(Value::Null.spl());
408        return Ok(());
409    };
410    stack.push(addr.spl());
411    stack.push((port as i32).spl());
412    Ok(())
413}