aurora-db 0.5.8

A lightweight, real-time embedded database with built-in PubSub, reactive queries, background workers, and intelligent caching.
Documentation
use crate::error::{AqlError, Result, ErrorCode};
use crate::network::protocol::{Request, Response};
use crate::query::SimpleQueryBuilder;
use crate::types::{Document, FieldType, Value};
use std::collections::HashMap;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;

pub struct Client {
    stream: TcpStream,
    current_transaction: Option<u64>,
}

impl Client {
    /// Connect to an Aurora server at the given address.
    pub async fn connect(addr: &str) -> Result<Self> {
        let stream = TcpStream::connect(addr).await?;
        Ok(Self {
            stream,
            current_transaction: None,
        })
    }

    /// Sends a request to the server and awaits a response.
    async fn send_request(&mut self, request: Request) -> Result<Response> {
        let request_bytes = bincode::serialize(&request).map_err(AqlError::from)?;
        let len_bytes = (request_bytes.len() as u32).to_le_bytes();

        self.stream.write_all(&len_bytes).await?;
        self.stream.write_all(&request_bytes).await?;

        let mut len_bytes = [0u8; 4];
        self.stream.read_exact(&mut len_bytes).await?;
        const MAX_FRAME_SIZE: usize = 8 * 1024 * 1024; // 8 MiB
        let len = u32::from_le_bytes(len_bytes) as usize;
        if len > MAX_FRAME_SIZE {
            return Err(AqlError::new(
                ErrorCode::ProtocolError,
                format!("Response too large: {} bytes", len),
            ));
        }

        let mut buffer = vec![0u8; len];
        self.stream.read_exact(&mut buffer).await?;

        let response: Response = bincode::deserialize(&buffer).map_err(AqlError::from)?;

        Ok(response)
    }

    pub async fn new_collection(
        &mut self,
        name: &str,
        fields: Vec<(String, FieldType, bool)>,
    ) -> Result<()> {
        let req = Request::NewCollection {
            name: name.to_string(),
            fields,
        };
        match self.send_request(req).await? {
            Response::Done => Ok(()),
            Response::Error(e) => Err(AqlError::new(ErrorCode::ProtocolError, e)),
            _ => Err(AqlError::new(ErrorCode::ProtocolError, "Unexpected response".to_string())),
        }
    }

    pub async fn insert(
        &mut self,
        collection: &str,
        data: HashMap<String, Value>,
    ) -> Result<String> {
        let req = Request::Insert {
            collection: collection.to_string(),
            data,
        };
        match self.send_request(req).await? {
            Response::Message(id) => Ok(id),
            Response::Error(e) => Err(AqlError::new(ErrorCode::ProtocolError, e)),
            _ => Err(AqlError::new(ErrorCode::ProtocolError, "Unexpected response".to_string())),
        }
    }

    pub async fn get_document(&mut self, collection: &str, id: &str) -> Result<Option<Document>> {
        let req = Request::GetDocument {
            collection: collection.to_string(),
            id: id.to_string(),
        };
        match self.send_request(req).await? {
            Response::Document(doc) => Ok(doc),
            Response::Error(e) => Err(AqlError::new(ErrorCode::ProtocolError, e)),
            _ => Err(AqlError::new(ErrorCode::ProtocolError, "Unexpected response".to_string())),
        }
    }

    pub async fn query(&mut self, builder: SimpleQueryBuilder) -> Result<Vec<Document>> {
        let req = Request::Query(builder);
        match self.send_request(req).await? {
            Response::Documents(docs) => Ok(docs),
            Response::Error(e) => Err(AqlError::new(ErrorCode::ProtocolError, e)),
            _ => Err(AqlError::new(ErrorCode::ProtocolError, "Unexpected response".to_string())),
        }
    }

    pub async fn begin_transaction(&mut self) -> Result<u64> {
        match self.send_request(Request::BeginTransaction).await? {
            Response::TransactionId(tx_id) => {
                self.current_transaction = Some(tx_id);
                Ok(tx_id)
            }
            Response::Error(e) => Err(AqlError::new(ErrorCode::ProtocolError, e)),
            _ => Err(AqlError::new(ErrorCode::ProtocolError, "Unexpected response".to_string())),
        }
    }

    pub async fn commit_transaction(&mut self) -> Result<()> {
        let tx_id = self
            .current_transaction
            .ok_or_else(|| AqlError::invalid_operation("No active transaction".to_string()))?;

        match self.send_request(Request::CommitTransaction(tx_id)).await? {
            Response::Done => {
                self.current_transaction = None;
                Ok(())
            }
            Response::Error(e) => Err(AqlError::new(ErrorCode::ProtocolError, e)),
            _ => Err(AqlError::new(ErrorCode::ProtocolError, "Unexpected response".to_string())),
        }
    }

    pub async fn rollback_transaction(&mut self) -> Result<()> {
        let tx_id = self
            .current_transaction
            .ok_or_else(|| AqlError::invalid_operation("No active transaction".to_string()))?;

        match self
            .send_request(Request::RollbackTransaction(tx_id))
            .await?
        {
            Response::Done => {
                self.current_transaction = None;
                Ok(())
            }
            Response::Error(e) => Err(AqlError::new(ErrorCode::ProtocolError, e)),
            _ => Err(AqlError::new(ErrorCode::ProtocolError, "Unexpected response".to_string())),
        }
    }

    pub async fn delete(&mut self, key: &str) -> Result<()> {
        match self.send_request(Request::Delete(key.to_string())).await? {
            Response::Done => Ok(()),
            Response::Error(e) => Err(AqlError::new(ErrorCode::ProtocolError, e)),
            _ => Err(AqlError::new(ErrorCode::ProtocolError, "Unexpected response".to_string())),
        }
    }
}