use std::{
collections::HashMap,
net::TcpListener,
sync::{Arc, LazyLock},
};
use crate::{mutex::Mut, *};
use super::Stream;
static SERVER_STREAM_TYPES: LazyLock<Arc<Mut<HashMap<String, ServerStreamType>>>> =
LazyLock::new(|| Arc::new(Mut::new(HashMap::new())));
pub fn register_server_stream_type(
name: &str,
supplier: impl Fn(&mut Stack) -> Result<ServerStream, Error> + Sync + Send + 'static,
) {
SERVER_STREAM_TYPES
.lock()
.insert(name.to_owned(), ServerStreamType::from(supplier));
}
pub fn get_server_stream_type(name: String) -> Option<ServerStreamType> {
SERVER_STREAM_TYPES.lock_ro().get(&name).cloned()
}
#[derive(Clone)]
pub struct ServerStreamType {
func: Arc<Box<dyn Fn(&mut Stack) -> Result<ServerStream, Error> + Sync + Send + 'static>>,
}
impl ServerStreamType {
pub fn make_stream(&self, stack: &mut Stack) -> Result<ServerStream, Error> {
(self.func)(stack)
}
}
impl<T> From<T> for ServerStreamType
where
T: Fn(&mut Stack) -> Result<ServerStream, Error> + Sync + Send + 'static,
{
fn from(value: T) -> Self {
Self {
func: Arc::new(Box::new(value)),
}
}
}
pub struct ServerStream {
pub(super) acceptor: Box<dyn Fn(&mut Stack) -> Result<Stream, Error> + Sync + Send + 'static>,
}
impl ServerStream {
pub fn new(
acceptor: impl Fn(&mut Stack) -> Result<Stream, Error> + Sync + Send + 'static,
) -> Self {
Self {
acceptor: Box::new(acceptor),
}
}
}
pub fn new_server_stream(stack: &mut Stack) -> OError {
require_on_stack!(s, Str, stack, "new-stream");
let stream = get_server_stream_type(s.clone())
.ok_or_else(|| stack.error(ErrorKind::VariableNotFound(format!("__stream-type-{s}"))))?
.make_stream(stack)?;
let stream = runtime_mut(move |mut rt| Ok(rt.register_server_stream(stream)))?;
stack.push(Value::Mega(stream.0 as i128).spl());
Ok(())
}
pub fn accept_server_stream(stack: &mut Stack) -> OError {
require_on_stack!(id, Mega, stack, "accept-server-stream");
let stream = runtime(|rt| {
rt.get_server_stream(id as u128)
.ok_or_else(|| stack.error(ErrorKind::VariableNotFound(format!("__stream-{id}"))))
})?;
let stream = (stream.lock_ro().acceptor)(stack)?;
stack.push((runtime_mut(move |mut rt| rt.register_stream(stream)).0 as i128).spl());
Ok(())
}
pub fn close_server_stream(stack: &mut Stack) -> OError {
require_on_stack!(id, Mega, stack, "close-server-stream");
runtime_mut(|mut rt| rt.destroy_server_stream(id as u128));
Ok(())
}
pub(crate) fn server_stream_tcp(stack: &mut Stack) -> Result<ServerStream, Error> {
require_int_on_stack!(port, stack, "TCP server-stream");
require_on_stack!(addr, Str, stack, "TCP server-stream");
let tcp = TcpListener::bind((addr, port as u16))
.map_err(|e| stack.error(ErrorKind::IO(format!("{e:?}"))))?;
let stream = ServerStream::new(move |stack| {
let socket = tcp
.accept()
.map_err(|e| stack.error(ErrorKind::IO(format!("{e:?}"))))?;
Ok(Stream::new(socket.0)
.append_extra(move |d| d.peer = Some((socket.1.ip().to_string(), socket.1.port()))))
});
Ok(stream)
}