ctrader-rs 0.1.2

Rust SDK for the cTrader Open API
Documentation
/// TLS TCP transport.
///
///
///
/// Wire format:
///   [4-byte big-endian length][protobuf bytes]
///
///
///
/// The connection is full-duplex:
///   - A dedicated reader task streams inbound frames and sends them on
///     `rx_raw`.
///   - The public `send()` method writes outbound frames while holding a
///     mutex so concurrent sends don't interleave.
///
///
use std::sync::Arc;

use bytes::{BufMut, BytesMut};
use native_tls::TlsConnector;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use tokio::sync::{mpsc, Mutex};
use tokio_native_tls::TlsStream;

use crate::client_helper::read_loop;
use crate::error::Error;

///
///
///
///
///
///
///
///
///
///
///
///
///
pub struct Transport {
    writer: Arc<Mutex<tokio::io::WriteHalf<TlsStream<TcpStream>>>>,
}

impl Transport {
    /// Connect to `host:port` over TLS and spawn a background reader task that
    ///
    ///
    ///
    ///
    ///
    ///
    ///
    ///
    ///
    ///
    ///
    ///
    /// pushes raw frame bytes into `frame_tx`.
    pub async fn connect(
        host: &str,
        port: u16,
        frame_tx: mpsc::UnboundedSender<Vec<u8>>,
    ) -> Result<Self, Error> {
        let addr = format!("{host}:{port}");
        tracing::debug!("connecting to {addr}");

        let tcp = TcpStream::connect(&addr).await?;

        let connector = TlsConnector::builder()
            .danger_accept_invalid_certs(false)
            .build()
            .map_err(Error::Tls)?;
        let connector = tokio_native_tls::TlsConnector::from(connector);
        let tls = connector
            .connect(host, tcp)
            .await
            .map_err(|e| Error::Tls(e.into()))?;

        let (reader, writer) = tokio::io::split(tls);
        let writer = Arc::new(Mutex::new(writer));

        // Spawn background reader
        tokio::spawn(async move {
            if let Err(e) = read_loop(reader, frame_tx).await {
                tracing::error!("transport read loop error: {e}");
            }
        });

        Ok(Self { writer })
    }

    /// Send a single framed protobuf message.
    ///
    ///
    ///
    ///
    ///
    ///
    ///
    ///
    ///
    ///
    ///
    ///
    ///
    pub async fn send(&self, payload: &[u8]) -> Result<(), Error> {
        let mut buf = BytesMut::with_capacity(4 + payload.len());
        buf.put_u32(payload.len() as u32);
        buf.put_slice(payload);

        let mut w = self.writer.lock().await;
        w.write_all(&buf).await?;
        Ok(())
    }
}

#[cfg(test)]
mod tests {

    #[tokio::test]
    async fn test() {}
}