dolphindb 3.3.1

A Rust native API for DolphinDB
Documentation
use super::request_info::{ConnectInfo, RequestInfo};
use super::Client;
use crate::{
    error::Result,
    request::{BehaviorOptions, Request},
    response::Response,
    Deserialize, Serialize,
};

use bytes::BytesMut;
use std::time::Duration;
use tokio::io::{AsyncWriteExt, BufReader};
use tokio::net::{TcpStream, ToSocketAddrs};

pub struct ClientBuilder<'a, A: ToSocketAddrs> {
    addr: A,
    ssl: bool,
    auth: Option<(&'a str, &'a str)>,
    option: BehaviorOptions,
}

impl<'a, A: ToSocketAddrs> ClientBuilder<'a, A> {
    pub fn new(addr: A) -> Self {
        Self {
            addr,
            ssl: false,
            auth: None,
            option: BehaviorOptions::default(),
        }
    }

    pub fn with_auth(&mut self, auth: (impl Into<&'a str>, impl Into<&'a str>)) -> &mut Self {
        self.auth = Some((auth.0.into(), auth.1.into()));
        self
    }

    #[allow(dead_code)]
    fn with_ssl(&mut self, ssl: bool) -> &mut Self {
        self.ssl = ssl;
        self
    }

    pub fn with_option(&mut self, option: BehaviorOptions) -> &mut Self {
        self.option = option;
        self
    }

    pub async fn connect(mut self) -> Result<Client> {
        let conn = TcpStream::connect(&self.addr).await?;

        {
            let socket_ref = socket2::SockRef::from(&conn);

            let keepalive = socket2::TcpKeepalive::new()
                .with_time(Duration::from_secs(5))
                .with_interval(Duration::from_secs(1));

            let _ = socket_ref.set_tcp_keepalive(&keepalive);
        }

        let (rx, mut tx) = conn.into_split();

        let info = ConnectInfo::new(self.ssl, self.auth.take());
        let request = Request::new(vec![b'0'], RequestInfo::Connect(info), &self.option);

        let mut buf = BytesMut::new();
        request.serialize(&mut buf)?;

        tx.write_all(&buf).await?;

        buf.clear();

        let mut rx = BufReader::new(rx);

        let mut resp = Response::default();
        resp.deserialize(&mut rx).await?;

        Ok(Client {
            session_id: resp.header.session_id,
            tx,
            rx,
            endian: resp.header.endian,
            option: self.option,
        })
    }
}