use std::io;
use std::path::Path;
use std::pin::Pin;
use futures_util::sink::SinkExt;
use minarrow::{Field, Table};
use tokio::net::UnixStream;
use tokio::net::unix::OwnedWriteHalf;
use crate::compression::Compression;
use crate::enums::IPCMessageProtocol;
use crate::models::sinks::table_sink::TableSink;
use crate::traits::transport_writer::TransportWriter;
pub struct UdsTableWriter {
sink: TableSink<OwnedWriteHalf>,
}
impl UdsTableWriter {
pub async fn connect(path: impl AsRef<Path>, schema: Vec<Field>) -> io::Result<Self> {
let stream = UnixStream::connect(path).await?;
let (_read, write) = stream.into_split();
let sink = TableSink::new(write, schema, IPCMessageProtocol::Stream)?;
Ok(Self { sink })
}
pub async fn connect_with_compression(
path: impl AsRef<Path>,
schema: Vec<Field>,
compression: Compression,
) -> io::Result<Self> {
let stream = UnixStream::connect(path).await?;
let (_read, write) = stream.into_split();
let sink =
TableSink::with_compression(write, schema, IPCMessageProtocol::Stream, compression)?;
Ok(Self { sink })
}
pub fn from_write_half(write_half: OwnedWriteHalf, schema: Vec<Field>) -> io::Result<Self> {
let sink = TableSink::new(write_half, schema, IPCMessageProtocol::Stream)?;
Ok(Self { sink })
}
}
impl TransportWriter for UdsTableWriter {
fn schema(&self) -> &[Field] {
&self.sink.schema
}
fn register_dictionary(&mut self, dict_id: i64, values: Vec<String>) {
self.sink.inner.register_dictionary(dict_id, values);
}
async fn write_table(&mut self, table: Table) -> io::Result<()> {
SinkExt::send(&mut self.sink, table).await?;
SinkExt::flush(&mut self.sink).await?;
Ok(())
}
async fn write_all_tables(&mut self, tables: Vec<Table>) -> io::Result<()> {
let mut sink = Pin::new(&mut self.sink);
for table in tables {
SinkExt::send(&mut sink, table).await?;
}
SinkExt::close(&mut sink).await?;
Ok(())
}
async fn finish(&mut self) -> io::Result<()> {
SinkExt::close(&mut self.sink).await
}
}