use std::io;
use std::pin::Pin;
use futures_util::sink::SinkExt;
use minarrow::{Field, Table};
use crate::compression::Compression;
use crate::enums::IPCMessageProtocol;
use crate::models::sinks::table_sink::TableSink;
use crate::traits::transport_writer::TransportWriter;
pub struct QuicTableWriter {
sink: TableSink<quinn::SendStream>,
}
impl QuicTableWriter {
pub fn new(send: quinn::SendStream, schema: Vec<Field>) -> io::Result<Self> {
let sink = TableSink::new(send, schema, IPCMessageProtocol::Stream)?;
Ok(Self { sink })
}
pub fn with_compression(
send: quinn::SendStream,
schema: Vec<Field>,
compression: Compression,
) -> io::Result<Self> {
let sink =
TableSink::with_compression(send, schema, IPCMessageProtocol::Stream, compression)?;
Ok(Self { sink })
}
}
impl TransportWriter for QuicTableWriter {
fn schema(&self) -> &[Field] {
&self.sink.schema
}
fn register_dictionary(&mut self, dict_id: i64, values: Vec<String>) {
self.sink.inner.register_dictionary(dict_id, values);
}
async fn write_table(&mut self, table: Table) -> io::Result<()> {
SinkExt::send(&mut self.sink, table).await?;
SinkExt::flush(&mut self.sink).await?;
Ok(())
}
async fn write_all_tables(&mut self, tables: Vec<Table>) -> io::Result<()> {
let mut sink = Pin::new(&mut self.sink);
for table in tables {
SinkExt::send(&mut sink, table).await?;
}
SinkExt::close(&mut sink).await?;
Ok(())
}
async fn finish(&mut self) -> io::Result<()> {
SinkExt::close(&mut self.sink).await
}
}