lightstream 0.4.3

Composable, zero-copy Arrow IPC and native data streaming for Rust with SIMD-aligned I/O, async support, and memory-mapping.
Documentation
//! # WebSocket table writer
//!
//! High-level async writer that connects to a WebSocket endpoint and sends
//! Arrow IPC encoded tables as binary messages.
//!
//! Wraps a [`TableSink`] over a [`WebSocketSinkAdapter`], hiding the wiring
//! so callers get a one-liner API.
//!
//! Uses `Vec<u8>` (8-byte aligned) encoding, matching the alignment
//! expected by the standard Arrow IPC frame decoder on the read side.

use std::io;
use std::pin::Pin;

use futures_util::sink::SinkExt;
use minarrow::{Field, Table};
use tokio::net::TcpStream;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async};

use crate::compression::Compression;
use crate::enums::IPCMessageProtocol;
use crate::models::sinks::table_sink::TableSink;
use crate::models::streams::websocket::WebSocketSinkAdapter;
use crate::traits::transport_writer::TransportWriter;

/// The concrete sink type produced by splitting a client WebSocket connection.
type WsSplitSink =
    futures_util::stream::SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>;

/// Async Arrow IPC writer over a WebSocket connection.
///
/// Connects to a remote WebSocket endpoint and writes Arrow IPC stream
/// protocol data as binary WebSocket messages.
///
/// Uses 8-byte aligned buffers to match the frame decoder on the
/// read side, which always uses 8-byte alignment for frame boundary
/// calculation.
pub struct WebSocketTableWriter {
    sink: TableSink<WebSocketSinkAdapter<WsSplitSink>>,
}

impl WebSocketTableWriter {
    /// Connect to a WebSocket server and prepare to write Arrow IPC tables.
    ///
    /// Uses `IPCMessageProtocol::Stream` — the unbounded protocol suited
    /// for network transport where the total number of batches is not
    /// known up front.
    ///
    /// The read half of the WebSocket is dropped — use `from_sink` if you
    /// need bidirectional communication.
    pub async fn connect(url: &str, schema: Vec<Field>) -> io::Result<Self> {
        let (ws_stream, _response) = connect_async(url)
            .await
            .map_err(|e| io::Error::new(io::ErrorKind::ConnectionRefused, e))?;
        let (ws_sink, _read_half) = futures_util::StreamExt::split(ws_stream);
        let adapter = WebSocketSinkAdapter::new(ws_sink);
        let sink = TableSink::new(adapter, schema, IPCMessageProtocol::Stream)?;
        Ok(Self { sink })
    }

    /// Connect with optional compression.
    pub async fn connect_with_compression(
        url: &str,
        schema: Vec<Field>,
        compression: Compression,
    ) -> io::Result<Self> {
        let (ws_stream, _response) = connect_async(url)
            .await
            .map_err(|e| io::Error::new(io::ErrorKind::ConnectionRefused, e))?;
        let (ws_sink, _read_half) = futures_util::StreamExt::split(ws_stream);
        let adapter = WebSocketSinkAdapter::new(ws_sink);
        let sink =
            TableSink::with_compression(adapter, schema, IPCMessageProtocol::Stream, compression)?;
        Ok(Self { sink })
    }

    /// Wrap an existing WebSocket sink adapter as a table writer.
    pub fn from_sink(
        adapter: WebSocketSinkAdapter<WsSplitSink>,
        schema: Vec<Field>,
    ) -> io::Result<Self> {
        let sink = TableSink::new(adapter, schema, IPCMessageProtocol::Stream)?;
        Ok(Self { sink })
    }
}

impl TransportWriter for WebSocketTableWriter {
    /// Get the schema used for this writer.
    fn schema(&self) -> &[Field] {
        &self.sink.schema
    }

    /// Register a dictionary for categorical columns.
    fn register_dictionary(&mut self, dict_id: i64, values: Vec<String>) {
        self.sink.inner.register_dictionary(dict_id, values);
    }

    /// Write a single table and flush.
    async fn write_table(&mut self, table: Table) -> io::Result<()> {
        SinkExt::send(&mut self.sink, table).await?;
        SinkExt::flush(&mut self.sink).await?;
        Ok(())
    }

    /// Write all tables and close.
    async fn write_all_tables(&mut self, tables: Vec<Table>) -> io::Result<()> {
        let mut sink = Pin::new(&mut self.sink);
        for table in tables {
            SinkExt::send(&mut sink, table).await?;
        }
        SinkExt::close(&mut sink).await?;
        Ok(())
    }

    /// Finalise the stream. Must be called after writing all tables.
    async fn finish(&mut self) -> io::Result<()> {
        SinkExt::close(&mut self.sink).await
    }
}