orientdb-client 0.6.0

A Rust client for OrientDB™
Documentation
use super::network::cluster::AsyncConnection;
use super::network::cluster::{Cluster, Server};
use super::session::{OSession, SessionPool, SessionPoolManager};
use crate::common::protocol::messages::request::{
    Close, Connect, CreateDB, DropDB, ExistDB, MsgHeader, Open,
};
use crate::common::protocol::messages::response;
use crate::common::ConnectionOptions;
use crate::{DatabaseType, OrientResult};
use std::future::Future;
use std::net::SocketAddr;
use std::net::ToSocketAddrs;
use std::ops::Deref;
use std::sync::Arc;

#[derive(Clone)]
pub struct OrientDB {
    internal: OrientDBClientInternal,
}

impl OrientDB {
    pub async fn connect<T: Into<ConnectionOptions>>(options: T) -> OrientResult<OrientDB> {
        let opts = options.into();
        let addr: SocketAddr = format!("{}:{}", opts.host, opts.port)
            .to_socket_addrs()?
            .next()
            .expect("Cannot parse socket address");

        let cluster = Cluster::builder().add_server(addr).build().await?;

        let internal = OrientDBClientInternal {
            cluster: Arc::new(cluster),
        };

        Ok(OrientDB { internal })
    }
}

#[derive(Clone, Debug)]
pub struct OrientDBClientInternal {
    cluster: Arc<Cluster>,
}

impl Deref for OrientDB {
    type Target = OrientDBClientInternal;

    fn deref(&self) -> &OrientDBClientInternal {
        &self.internal
    }
}

struct AdminSession {
    session_id: i32,
    token: Option<Vec<u8>>,
}

impl OrientDBClientInternal {
    pub async fn sessions(
        &self,
        db_name: &str,
        user: &str,
        password: &str,
        min_size: Option<u32>,
        max_size: Option<u32>,
    ) -> OrientResult<SessionPool> {
        let server = self.cluster.select();
        SessionPoolManager::new(self.clone(), server, db_name, user, password)
            .managed(min_size, max_size)
    }
    pub async fn session(
        &self,
        db_name: &str,
        user: &str,
        password: &str,
    ) -> OrientResult<OSession> {
        self._session(db_name, user, password, false).await
    }
    pub(crate) async fn _session(
        &self,
        db_name: &str,
        user: &str,
        password: &str,
        pooled: bool,
    ) -> OrientResult<OSession> {
        let server = self.cluster.select();
        self._server_session(server, db_name, user, password, pooled)
            .await
    }
    pub(crate) async fn _server_session(
        &self,
        server: Arc<Server>,
        db_name: &str,
        user: &str,
        password: &str,
        pooled: bool,
    ) -> OrientResult<OSession> {
        let mut conn = server.connection().await?;

        let response: response::Open = conn
            .send(Open::new(db_name, user, password).into())
            .await?
            .payload();

        Ok(OSession::new(
            -1,
            response.session_id,
            response.token,
            self.cluster.clone(),
            server.clone(),
            pooled,
        ))
    }

    async fn run_as_admin<R, W, T>(&self, user: &str, password: &str, work: W) -> OrientResult<R>
    where
        W: FnOnce(AdminSession, AsyncConnection) -> T,
        T: Future<Output = OrientResult<(AsyncConnection, R)>>,
    {
        let pooled = self.cluster.connection().await?;
        let mut conn = pooled.0;
        let response: response::Connect = conn
            .send(Connect::new(user, password).into())
            .await?
            .payload();
        let admin = AdminSession {
            session_id: response.session_id,
            token: response.token.clone(),
        };
        let (mut conn, result) = work(admin, conn).await?;

        conn.send_and_forget(Close::new(response.session_id, response.token).into())
            .await?;

        Ok(result)
    }

    pub async fn create_database(
        &self,
        db_name: &str,
        user: &str,
        password: &str,
        db_mode: DatabaseType,
    ) -> OrientResult<()> {
        self.run_as_admin(user, password, move |session, mut conn| async move {
            let _open: response::CreateDB = conn
                .send(
                    CreateDB::new(
                        MsgHeader::new(session.session_id, session.token),
                        db_name,
                        db_mode,
                    )
                    .into(),
                )
                .await?
                .payload();
            Ok((conn, ()))
        })
        .await
    }

    pub async fn exist_database(
        &self,
        db_name: &str,
        user: &str,
        password: &str,
        db_type: DatabaseType,
    ) -> OrientResult<bool> {
        self.run_as_admin(user, password, move |session, mut conn| async move {
            let exist: response::ExistDB = conn
                .send(
                    ExistDB::new(
                        MsgHeader::new(session.session_id, session.token),
                        db_name,
                        db_type,
                    )
                    .into(),
                )
                .await?
                .payload();
            Ok((conn, exist.exist))
        })
        .await
    }

    pub async fn drop_database(
        &self,
        db_name: &str,
        user: &str,
        password: &str,
        db_type: DatabaseType,
    ) -> OrientResult<()> {
        self.run_as_admin(user, password, move |session, mut conn| async move {
            let _drop: response::DropDB = conn
                .send(
                    DropDB::new(
                        MsgHeader::new(session.session_id, session.token),
                        db_name,
                        db_type,
                    )
                    .into(),
                )
                .await?
                .payload();
            Ok((conn, ()))
        })
        .await
    }
}