use std::io;
use futures_core::Stream;
use minarrow::Field;
use tokio::io::AsyncWrite;
use crate::models::frames::protocol_message::LightstreamMessage;
use crate::models::readers::lightstream::LightstreamReader;
use crate::models::writers::lightstream::LightstreamWriter;
use crate::traits::stream_buffer::StreamBuffer;
pub struct LightstreamConnection<S, W, B = Vec<u8>>
where
S: Stream<Item = Result<B, io::Error>> + Unpin + Send,
W: AsyncWrite + Unpin + Send,
B: StreamBuffer + Unpin,
{
pub writer: LightstreamWriter<W, B>,
pub reader: LightstreamReader<S, B>,
}
impl<S, W, B> LightstreamConnection<S, W, B>
where
S: Stream<Item = Result<B, io::Error>> + Unpin + Send,
W: AsyncWrite + Unpin + Send,
B: StreamBuffer + Unpin,
{
pub fn from_parts(reader_stream: S, writer_dest: W) -> Self {
Self {
writer: LightstreamWriter::new(writer_dest),
reader: LightstreamReader::new(reader_stream),
}
}
pub fn register_message(&mut self, name: impl Into<String>) -> u8 {
let name = name.into();
let tag = self.writer.register_message(name.clone());
let _ = self.reader.register_message(name);
tag
}
pub fn register_table(&mut self, name: impl Into<String>, schema: Vec<Field>) -> u8 {
let name = name.into();
let tag = self.writer.register_table(name.clone(), schema.clone());
let _ = self.reader.register_table(name, schema);
tag
}
pub async fn send(&mut self, name: &str, payload: &[u8]) -> io::Result<()> {
self.writer.send(name, payload).await
}
pub async fn send_table(&mut self, name: &str, table: &minarrow::Table) -> io::Result<()> {
self.writer.send_table(name, table).await
}
pub async fn recv(&mut self) -> Option<io::Result<LightstreamMessage>> {
use futures_util::StreamExt;
self.reader.next().await
}
pub async fn flush(&mut self) -> io::Result<()> {
self.writer.flush().await
}
pub async fn shutdown(&mut self) -> io::Result<()> {
self.writer.shutdown().await
}
#[cfg(feature = "protobuf")]
pub async fn send_proto<M: prost::Message>(
&mut self,
name: &str,
msg: &M,
) -> io::Result<()> {
self.writer.send_proto(name, msg).await
}
#[cfg(feature = "msgpack")]
pub async fn send_msgpack<M: serde::Serialize>(
&mut self,
name: &str,
msg: &M,
) -> io::Result<()> {
self.writer.send_msgpack(name, msg).await
}
}
#[cfg(feature = "tcp")]
mod tcp_impl {
use super::*;
use crate::enums::BufferChunkSize;
use crate::models::streams::tcp::TcpByteStream;
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::TcpStream;
pub type TcpLightstreamConnection =
LightstreamConnection<TcpByteStream, OwnedWriteHalf, Vec<u8>>;
impl TcpLightstreamConnection {
pub fn from_tcp(stream: TcpStream) -> Self {
let (read_half, write_half) = stream.into_split();
let byte_stream = TcpByteStream::from_read_half(read_half, BufferChunkSize::Http);
Self::from_parts(byte_stream, write_half)
}
pub fn from_tcp_halves(read_half: OwnedReadHalf, write_half: OwnedWriteHalf) -> Self {
let byte_stream = TcpByteStream::from_read_half(read_half, BufferChunkSize::Http);
Self::from_parts(byte_stream, write_half)
}
}
}
#[cfg(feature = "tcp")]
pub use tcp_impl::TcpLightstreamConnection;
#[cfg(feature = "uds")]
mod uds_impl {
use super::*;
use crate::enums::BufferChunkSize;
use crate::models::streams::uds::UdsByteStream;
use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::UnixStream;
pub type UdsLightstreamConnection =
LightstreamConnection<UdsByteStream, OwnedWriteHalf, Vec<u8>>;
impl UdsLightstreamConnection {
pub fn from_uds(stream: UnixStream) -> Self {
let (read_half, write_half) = stream.into_split();
let byte_stream = UdsByteStream::from_read_half(read_half, BufferChunkSize::Http);
Self::from_parts(byte_stream, write_half)
}
pub fn from_uds_halves(read_half: OwnedReadHalf, write_half: OwnedWriteHalf) -> Self {
let byte_stream = UdsByteStream::from_read_half(read_half, BufferChunkSize::Http);
Self::from_parts(byte_stream, write_half)
}
}
}
#[cfg(feature = "uds")]
pub use uds_impl::UdsLightstreamConnection;
#[cfg(feature = "websocket")]
mod websocket_impl {
use super::*;
use crate::models::streams::websocket::{WebSocketByteStream, WebSocketSinkAdapter};
use futures_util::stream::{SplitSink, SplitStream};
use futures_util::StreamExt;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::WebSocketStream;
pub type WebSocketLightstreamConnection<T> = LightstreamConnection<
WebSocketByteStream<SplitStream<WebSocketStream<T>>>,
WebSocketSinkAdapter<SplitSink<WebSocketStream<T>, Message>>,
Vec<u8>,
>;
impl<T> WebSocketLightstreamConnection<T>
where
T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
{
pub fn from_websocket(ws: WebSocketStream<T>) -> Self {
let (sink, stream) = ws.split();
let byte_stream = WebSocketByteStream::new(stream);
let write_adapter = WebSocketSinkAdapter::new(sink);
Self::from_parts(byte_stream, write_adapter)
}
}
}
#[cfg(feature = "websocket")]
pub use websocket_impl::WebSocketLightstreamConnection;
#[cfg(feature = "quic")]
mod quic_impl {
use super::*;
use crate::enums::BufferChunkSize;
use crate::models::streams::quic::QuicByteStream;
pub type QuicLightstreamConnection =
LightstreamConnection<QuicByteStream, quinn::SendStream, Vec<u8>>;
impl QuicLightstreamConnection {
pub fn from_quic(recv: quinn::RecvStream, send: quinn::SendStream) -> Self {
let byte_stream = QuicByteStream::new(recv, BufferChunkSize::WebTransport);
Self::from_parts(byte_stream, send)
}
}
}
#[cfg(feature = "quic")]
pub use quic_impl::QuicLightstreamConnection;
#[cfg(feature = "webtransport")]
mod webtransport_impl {
use super::*;
use crate::enums::BufferChunkSize;
use crate::models::streams::webtransport::WebTransportByteStream;
pub type WebTransportLightstreamConnection =
LightstreamConnection<WebTransportByteStream, wtransport::SendStream, Vec<u8>>;
impl WebTransportLightstreamConnection {
pub fn from_webtransport(
recv: wtransport::RecvStream,
send: wtransport::SendStream,
) -> Self {
let byte_stream = WebTransportByteStream::new(recv, BufferChunkSize::WebTransport);
Self::from_parts(byte_stream, send)
}
}
}
#[cfg(feature = "webtransport")]
pub use webtransport_impl::WebTransportLightstreamConnection;
#[cfg(feature = "stdio")]
mod stdio_impl {
use super::*;
use crate::models::streams::stdio::{StdinByteStream, from_stdin_default};
pub type StdioLightstreamConnection =
LightstreamConnection<StdinByteStream, tokio::io::Stdout, Vec<u8>>;
impl StdioLightstreamConnection {
pub fn from_stdio() -> Self {
let byte_stream = from_stdin_default();
let stdout = tokio::io::stdout();
Self::from_parts(byte_stream, stdout)
}
}
}
#[cfg(feature = "stdio")]
pub use stdio_impl::StdioLightstreamConnection;