use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_core::Stream;
use minarrow::{Field, SuperTable, Table};
use tokio::net::TcpStream;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async};
use crate::enums::{BufferChunkSize, IPCMessageProtocol};
use crate::models::readers::ipc::table_reader::TableReader;
use crate::models::streams::websocket::WebSocketByteStream;
use crate::traits::transport_reader::TransportReader;
type WsSplitStream = futures_util::stream::SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;
pub struct WebSocketTableReader {
inner: TableReader<WebSocketByteStream<WsSplitStream>, Vec<u8>>,
}
impl WebSocketTableReader {
pub async fn connect(url: &str) -> io::Result<Self> {
let (ws_stream, _response) = connect_async(url)
.await
.map_err(|e| io::Error::new(io::ErrorKind::ConnectionRefused, e))?;
let (_, read_half) = futures_util::StreamExt::split(ws_stream);
let byte_stream = WebSocketByteStream::new(read_half);
let inner = TableReader::new(byte_stream, 64 * 1024, IPCMessageProtocol::Stream);
Ok(Self { inner })
}
pub async fn connect_with(
url: &str,
chunk_size: BufferChunkSize,
protocol: IPCMessageProtocol,
) -> io::Result<Self> {
let (ws_stream, _response) = connect_async(url)
.await
.map_err(|e| io::Error::new(io::ErrorKind::ConnectionRefused, e))?;
let (_, read_half) = futures_util::StreamExt::split(ws_stream);
let byte_stream = WebSocketByteStream::new(read_half);
let inner = TableReader::new(byte_stream, chunk_size.chunk_size(), protocol);
Ok(Self { inner })
}
pub fn from_stream(
stream: WebSocketByteStream<WsSplitStream>,
protocol: IPCMessageProtocol,
) -> Self {
let inner = TableReader::new(stream, 64 * 1024, protocol);
Self { inner }
}
}
impl TransportReader for WebSocketTableReader {
async fn read_all_tables(self) -> io::Result<Vec<Table>> {
self.inner.read_all_tables().await
}
async fn read_tables(self, n: Option<usize>) -> io::Result<Vec<Table>> {
self.inner.read_tables(n).await
}
async fn read_to_super_table(
self,
name: Option<String>,
n: Option<usize>,
) -> io::Result<SuperTable> {
self.inner.read_to_super_table(name, n).await
}
async fn combine_to_table(self, name: Option<String>) -> io::Result<Table> {
self.inner.combine_to_table(name).await
}
fn schema(&self) -> Option<&[Field]> {
self.inner.schema()
}
async fn read_next(&mut self) -> io::Result<Option<Table>> {
self.inner.read_next().await
}
}
impl Stream for WebSocketTableReader {
type Item = io::Result<Table>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let me = self.get_mut();
Pin::new(&mut me.inner).poll_next(cx)
}
}