lancedb 0.27.1

LanceDB: A serverless, low-latency vector database for AI applications
Documentation
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors

use arrow_ipc::CompressionType;
use futures::{Stream, StreamExt};
use reqwest::Response;

use crate::{Result, arrow::SendableRecordBatchStream};

use super::db::ServerVersion;

pub fn stream_as_ipc(
    data: SendableRecordBatchStream,
) -> Result<impl Stream<Item = Result<bytes::Bytes>>> {
    let options = arrow_ipc::writer::IpcWriteOptions::default()
        .try_with_compression(Some(CompressionType::LZ4_FRAME))?;
    const WRITE_BUF_SIZE: usize = 4096;
    let buf = Vec::with_capacity(WRITE_BUF_SIZE);
    let writer =
        arrow_ipc::writer::StreamWriter::try_new_with_options(buf, &data.schema(), options)?;
    let stream = futures::stream::try_unfold(
        (data, writer, false),
        move |(mut data, mut writer, finished)| async move {
            if finished {
                return Ok(None);
            }
            match data.next().await {
                Some(Ok(batch)) => {
                    writer.write(&batch)?;
                    let buffer = std::mem::take(writer.get_mut());
                    Ok(Some((bytes::Bytes::from(buffer), (data, writer, false))))
                }
                Some(Err(e)) => Err(e),
                None => {
                    writer.finish()?;
                    let buffer = std::mem::take(writer.get_mut());
                    Ok(Some((bytes::Bytes::from(buffer), (data, writer, true))))
                }
            }
        },
    );
    Ok(stream)
}

pub fn stream_as_body(data: SendableRecordBatchStream) -> Result<reqwest::Body> {
    let stream = stream_as_ipc(data)?;
    Ok(reqwest::Body::wrap_stream(stream))
}

pub fn parse_server_version(req_id: &str, rsp: &Response) -> Result<ServerVersion> {
    let version = rsp
        .headers()
        .get("phalanx-version")
        .map(|v| {
            let v = v.to_str().map_err(|e| crate::Error::Http {
                source: e.into(),
                request_id: req_id.to_string(),
                status_code: Some(rsp.status()),
            })?;
            ServerVersion::parse(v).map_err(|e| crate::Error::Http {
                source: e.into(),
                request_id: req_id.to_string(),
                status_code: Some(rsp.status()),
            })
        })
        .transpose()?
        .unwrap_or_default();
    Ok(version)
}