use std::io;
use std::pin::Pin;
use futures_util::sink::SinkExt;
use minarrow::{Field, Table};
use tokio::net::TcpStream;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async};
use crate::compression::Compression;
use crate::enums::IPCMessageProtocol;
use crate::models::sinks::table_sink::TableSink;
use crate::models::streams::websocket::WebSocketSinkAdapter;
use crate::traits::transport_writer::TransportWriter;
type WsSplitSink =
futures_util::stream::SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>;
pub struct WebSocketTableWriter {
sink: TableSink<WebSocketSinkAdapter<WsSplitSink>>,
}
impl WebSocketTableWriter {
pub async fn connect(url: &str, schema: Vec<Field>) -> io::Result<Self> {
let (ws_stream, _response) = connect_async(url)
.await
.map_err(|e| io::Error::new(io::ErrorKind::ConnectionRefused, e))?;
let (ws_sink, _read_half) = futures_util::StreamExt::split(ws_stream);
let adapter = WebSocketSinkAdapter::new(ws_sink);
let sink = TableSink::new(adapter, schema, IPCMessageProtocol::Stream)?;
Ok(Self { sink })
}
pub async fn connect_with_compression(
url: &str,
schema: Vec<Field>,
compression: Compression,
) -> io::Result<Self> {
let (ws_stream, _response) = connect_async(url)
.await
.map_err(|e| io::Error::new(io::ErrorKind::ConnectionRefused, e))?;
let (ws_sink, _read_half) = futures_util::StreamExt::split(ws_stream);
let adapter = WebSocketSinkAdapter::new(ws_sink);
let sink =
TableSink::with_compression(adapter, schema, IPCMessageProtocol::Stream, compression)?;
Ok(Self { sink })
}
pub fn from_sink(
adapter: WebSocketSinkAdapter<WsSplitSink>,
schema: Vec<Field>,
) -> io::Result<Self> {
let sink = TableSink::new(adapter, schema, IPCMessageProtocol::Stream)?;
Ok(Self { sink })
}
}
impl TransportWriter for WebSocketTableWriter {
fn schema(&self) -> &[Field] {
&self.sink.schema
}
fn register_dictionary(&mut self, dict_id: i64, values: Vec<String>) {
self.sink.inner.register_dictionary(dict_id, values);
}
async fn write_table(&mut self, table: Table) -> io::Result<()> {
SinkExt::send(&mut self.sink, table).await?;
SinkExt::flush(&mut self.sink).await?;
Ok(())
}
async fn write_all_tables(&mut self, tables: Vec<Table>) -> io::Result<()> {
let mut sink = Pin::new(&mut self.sink);
for table in tables {
SinkExt::send(&mut sink, table).await?;
}
SinkExt::close(&mut sink).await?;
Ok(())
}
async fn finish(&mut self) -> io::Result<()> {
SinkExt::close(&mut self.sink).await
}
}