1use std::{
2 collections::HashMap,
3 hint::black_box,
4 io::{Read, Write},
5 net::{TcpStream, UdpSocket},
6 process::{self, Stdio},
7 sync::{Arc, LazyLock},
8};
9
10use crate::*;
11
12use fs::OpenOptions;
13use mutex::Mut;
14use stream::StreamExtraData;
15
16static STREAM_TYPES: LazyLock<Arc<Mut<HashMap<String, StreamType>>>> =
17 LazyLock::new(|| Arc::new(Mut::new(HashMap::new())));
18
19pub fn register_stream_type(
21 name: &str,
22 supplier: impl Fn(&mut Stack) -> Result<Stream, Error> + Sync + Send + 'static,
23) {
24 STREAM_TYPES
25 .lock()
26 .insert(name.to_owned(), StreamType::from(supplier));
27}
28
29pub fn get_stream_type(name: String) -> Option<StreamType> {
31 STREAM_TYPES.lock_ro().get(&name).cloned()
32}
33
34#[derive(Clone)]
36pub struct StreamType {
37 func: Arc<Box<dyn Fn(&mut Stack) -> Result<Stream, Error> + Sync + Send + 'static>>,
38}
39
40impl StreamType {
41 pub fn make_stream(&self, stack: &mut Stack) -> Result<Stream, Error> {
42 (self.func)(stack)
43 }
44}
45
46pub struct Stream {
48 pub(super) reader: Box<dyn Read + Send + Sync + 'static>,
49 pub(super) _writer_storage: Option<Box<dyn Write + Send + Sync + 'static>>,
50 pub(super) writer: &'static mut (dyn Write + Send + Sync + 'static),
51 pub extra: StreamExtraData,
52}
53
54impl Stream {
55 pub fn new<T: Read + Write + Send + Sync + 'static>(main: T) -> Self {
56 let mut rw = Box::new(main);
57 Self {
58 writer: unsafe {
59 (rw.as_mut() as *mut (dyn Write + Send + Sync + 'static))
60 .as_mut()
61 .unwrap()
62 },
63 _writer_storage: None,
64 reader: rw,
65 extra: StreamExtraData::default(),
66 }
67 }
68 pub fn new_split(
69 reader: impl Read + Send + Sync + 'static,
70 writer: impl Write + Send + Sync + 'static,
71 ) -> Self {
72 let mut bx = Box::new(writer);
73 Self {
74 reader: Box::new(reader),
75 writer: unsafe {
76 (bx.as_mut() as *mut (dyn Write + Send + Sync + 'static))
77 .as_mut()
78 .unwrap()
79 },
80 _writer_storage: Some(bx),
81 extra: StreamExtraData::default(),
82 }
83 }
84
85 pub fn append_extra(mut self, f: impl Fn(&mut StreamExtraData)) -> Stream {
86 f(&mut self.extra);
87 self
88 }
89
90 pub fn shutdown_write(&mut self) {
91 let mut bx = Box::new(IgnoreWrite());
92 self.writer = unsafe {
93 (bx.as_mut() as *mut (dyn Write + Send + Sync + 'static))
94 .as_mut()
95 .unwrap()
96 };
97 self._writer_storage = Some(bx);
98 }
99}
100
101impl Read for Stream {
102 fn read_vectored(&mut self, bufs: &mut [std::io::IoSliceMut<'_>]) -> std::io::Result<usize> {
103 self.reader.read_vectored(bufs)
104 }
105
106 fn read_to_end(&mut self, buf: &mut Vec<u8>) -> std::io::Result<usize> {
107 self.reader.read_to_end(buf)
108 }
109
110 fn read_to_string(&mut self, buf: &mut String) -> std::io::Result<usize> {
111 self.reader.read_to_string(buf)
112 }
113
114 fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
115 self.reader.read_exact(buf)
116 }
117
118 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
119 self.reader.read(buf)
120 }
121}
122
123impl Write for Stream {
124 fn write_vectored(&mut self, bufs: &[std::io::IoSlice<'_>]) -> std::io::Result<usize> {
125 self.writer.write_vectored(bufs)
126 }
127
128 fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
129 self.writer.write_all(buf)
130 }
131
132 fn write_fmt(&mut self, fmt: std::fmt::Arguments<'_>) -> std::io::Result<()> {
133 self.writer.write_fmt(fmt)
134 }
135
136 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
137 self.writer.write(buf)
138 }
139
140 fn flush(&mut self) -> std::io::Result<()> {
141 self.writer.flush()
142 }
143}
144
145struct IgnoreWrite();
146impl Write for IgnoreWrite {
147 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
148 Ok(buf.len())
149 }
150
151 fn flush(&mut self) -> std::io::Result<()> {
152 Ok(())
153 }
154}
155
156impl<T> From<T> for StreamType
157where
158 T: Fn(&mut Stack) -> Result<Stream, Error> + Sync + Send + 'static,
159{
160 fn from(value: T) -> Self {
161 Self {
162 func: Arc::new(Box::new(value)),
163 }
164 }
165}
166
167pub fn new_stream(stack: &mut Stack) -> OError {
168 require_on_stack!(s, Str, stack, "new-stream");
169 let stream = get_stream_type(s.clone())
170 .ok_or_else(|| stack.error(ErrorKind::VariableNotFound(format!("__stream-type-{s}"))))?
171 .make_stream(stack)?;
172 let stream = runtime_mut(move |mut rt| Ok(rt.register_stream(stream)))?;
173 stack.push(Value::Mega(stream.0 as i128).spl());
174 Ok(())
175}
176
177pub fn write_stream(stack: &mut Stack) -> OError {
178 require_on_stack!(id, Mega, stack, "write-stream");
179 require_byte_array_on_stack!(a, stack, "write-stream");
180 let stream = runtime(|rt| {
181 rt.get_stream(id as u128)
182 .ok_or_else(|| stack.error(ErrorKind::VariableNotFound(format!("__stream-{id}"))))
183 })?;
184 stack.push(
185 Value::Mega(
186 stream
187 .lock()
188 .write(&a)
189 .map_err(|x| stack.error(ErrorKind::IO(format!("{x:?}"))))? as i128,
190 )
191 .spl(),
192 );
193 black_box(&stream.lock_ro()._writer_storage);
194 Ok(())
195}
196
197pub fn write_all_stream(stack: &mut Stack) -> OError {
198 require_on_stack!(id, Mega, stack, "write-all-stream");
199 require_byte_array_on_stack!(a, stack, "write-all-stream");
200 let stream = runtime(|rt| {
201 rt.get_stream(id as u128)
202 .ok_or_else(|| stack.error(ErrorKind::VariableNotFound(format!("__stream-{id}"))))
203 })?;
204 stream
205 .lock()
206 .write_all(&a)
207 .map_err(|x| stack.error(ErrorKind::IO(format!("{x:?}"))))?;
208 black_box(&stream.lock_ro()._writer_storage);
209 Ok(())
210}
211
212pub fn flush_stream(stack: &mut Stack) -> OError {
213 require_on_stack!(id, Mega, stack, "flush-stream");
214 let stream = runtime(|rt| {
215 rt.get_stream(id as u128)
216 .ok_or_else(|| stack.error(ErrorKind::VariableNotFound(format!("__stream-{id}"))))
217 })?;
218 stream
219 .lock()
220 .flush()
221 .map_err(|x| stack.error(ErrorKind::IO(format!("{x:?}"))))?;
222 black_box(&stream.lock_ro()._writer_storage);
223 Ok(())
224}
225
226pub fn read_stream(stack: &mut Stack) -> OError {
227 require_on_stack!(id, Mega, stack, "read-stream");
228 let array = stack.pop();
229 let kind = array.lock_ro().kind.lock_ro().get_name();
230 let stream = runtime(|rt| {
231 rt.get_stream(id as u128)
232 .ok_or_else(|| stack.error(ErrorKind::VariableNotFound(format!("__stream-{id}"))))
233 })?;
234 if kind == "array" {
235 require_mut_array!(a, array, stack, "read-stream");
236 let mut vec = vec![0; a.len()];
237 stack.push(
238 Value::Mega(
239 stream
240 .lock()
241 .read(&mut vec[..])
242 .map_err(|x| stack.error(ErrorKind::IO(format!("{x:?}"))))?
243 as i128,
244 )
245 .spl(),
246 );
247 a.clone_from_slice(
248 &vec.into_iter()
249 .map(|x| Value::Int(x as i32).spl())
250 .collect::<Vec<_>>(),
251 );
252 }
253 if kind == "bytearray" {
254 require_mut!(a, ByteArray, array, stack, "read-stream");
255 stack.push(
256 Value::Mega(
257 stream
258 .lock()
259 .read(a)
260 .map_err(|x| stack.error(ErrorKind::IO(format!("{x:?}"))))?
261 as i128,
262 )
263 .spl(),
264 );
265 }
266 stack.push(array);
267 Ok(())
268}
269
270pub fn read_all_stream(stack: &mut Stack) -> OError {
271 require_on_stack!(id, Mega, stack, "read-all-stream");
272 let array = stack.pop();
273 let kind = array.lock_ro().kind.lock_ro().get_name();
274 let stream = runtime(|rt| {
275 rt.get_stream(id as u128)
276 .ok_or_else(|| stack.error(ErrorKind::VariableNotFound(format!("__stream-{id}"))))
277 })?;
278 if kind == "array" {
279 require_mut_array!(a, array, stack, "read-all-stream");
280 let mut vec = vec![0; a.len()];
281 stream
282 .lock()
283 .read_exact(&mut vec[..])
284 .map_err(|x| stack.error(ErrorKind::IO(format!("{x:?}"))))?;
285 a.clone_from_slice(
286 &vec.into_iter()
287 .map(|x| Value::Int(x as i32).spl())
288 .collect::<Vec<_>>(),
289 );
290 }
291 if kind == "bytearray" {
292 require_mut!(a, ByteArray, array, stack, "read-stream");
293 stream
294 .lock()
295 .read_exact(a)
296 .map_err(|x| stack.error(ErrorKind::IO(format!("{x:?}"))))?;
297 }
298 stack.push(array);
299 Ok(())
300}
301
302pub fn close_stream(stack: &mut Stack) -> OError {
303 require_on_stack!(id, Mega, stack, "close-stream");
304 runtime_mut(|mut rt| rt.destroy_stream(id as u128));
305 Ok(())
306}
307
308pub fn shutdown_input_stream(stack: &mut Stack) -> OError {
309 require_on_stack!(id, Mega, stack, "shutdown-input-stream");
310 let stream = runtime(|rt| {
311 rt.get_stream(id as u128)
312 .ok_or_else(|| stack.error(ErrorKind::VariableNotFound(format!("__stream-{id}"))))
313 })?;
314 stream.lock().shutdown_write();
315 Ok(())
316}
317
318pub(super) fn stream_file(stack: &mut Stack) -> Result<Stream, Error> {
319 let truncate = stack.pop().lock_ro().is_truthy();
320 require_on_stack!(path, Str, stack, "FILE new-stream");
321 Ok(Stream::new(
322 OpenOptions::new()
323 .read(!truncate)
324 .write(true)
325 .create(truncate)
326 .truncate(truncate)
327 .open(path)
328 .map_err(|x| stack.error(ErrorKind::IO(x.to_string())))?,
329 ))
330}
331
332pub(super) fn stream_tcp(stack: &mut Stack) -> Result<Stream, Error> {
333 require_int_on_stack!(port, stack, "TCP new-stream");
334 require_on_stack!(ip, Str, stack, "TCP new-stream");
335 Ok(Stream::new(
336 TcpStream::connect((ip, port as u16))
337 .map_err(|x| stack.error(ErrorKind::IO(x.to_string())))?,
338 ))
339}
340
341pub(super) fn stream_udp(stack: &mut Stack) -> Result<Stream, Error> {
342 require_int_on_stack!(port, stack, "UDP new-stream");
343 require_on_stack!(ip, Str, stack, "UDP new-stream");
344 require_int_on_stack!(self_port, stack, "UDP new-stream");
345 require_on_stack!(self_ip, Str, stack, "UDP new-stream");
346 let sock = UdpSocket::bind((self_ip, self_port as u16))
347 .map_err(|x| stack.error(ErrorKind::IO(x.to_string())))?;
348 sock.connect((ip, port as u16))
349 .map_err(|x| stack.error(ErrorKind::IO(x.to_string())))?;
350 struct UdpRW(UdpSocket);
351 impl Write for UdpRW {
352 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
353 self.0.send(buf)
354 }
355
356 fn flush(&mut self) -> std::io::Result<()> {
357 Ok(())
358 }
359 }
360 impl Read for UdpRW {
361 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
362 self.0.recv(buf)
363 }
364 }
365 Ok(Stream::new(UdpRW(sock)))
366}
367
368pub(super) fn stream_cmd(stack: &mut Stack) -> Result<Stream, Error> {
369 require_on_stack!(a, Array, stack, "CMD new-stream");
370 let mut args = Vec::new();
371 for item in a.iter() {
372 if let Value::Str(ref s) = item.lock_ro().native {
373 args.push(s.to_owned());
374 }
375 }
376 if args.is_empty() {
377 return stack.err(ErrorKind::InvalidCall("CMD new-stream".to_owned()));
378 }
379 let mut command = process::Command::new(&args[0])
380 .args(&args[1..])
381 .stdin(Stdio::piped())
382 .stdout(Stdio::piped())
383 .stderr(Stdio::null())
384 .spawn()
385 .map_err(|x| stack.error(ErrorKind::IO(x.to_string())))?;
386 let stream = Stream::new_split(
387 command.stdout.take().unwrap(),
388 command.stdin.take().unwrap(),
389 );
390 runtime(|rt| rt.child(command));
391 Ok(stream)
392}
393
394pub(super) fn get_stream_peer(stack: &mut Stack) -> OError {
395 require_on_stack!(id, Mega, stack, "get-stream-peer");
396 let Some((addr, port)) = runtime(|rt| -> Result<_, Error> {
397 Ok(rt
398 .get_stream(id as u128)
399 .ok_or(stack.error(ErrorKind::VariableNotFound(format!("__stream-{id}"))))?
400 .lock_ro()
401 .extra
402 .peer
403 .clone())
404 })?
405 else {
406 stack.push(Value::Null.spl());
407 stack.push(Value::Null.spl());
408 return Ok(());
409 };
410 stack.push(addr.spl());
411 stack.push((port as i32).spl());
412 Ok(())
413}