dorea 0.4.0

A key-value storage system
Documentation
use std::sync::Arc;

use tokio::net::TcpStream;
use tokio::sync::RwLock;

use crate::command::CommandManager;
use crate::configure::DoreaFileConfig;
use crate::database::{DataBase, DataBaseManager};
use crate::network::{Frame, NetPacket, NetPacketState};
use crate::Result;

// connection process
pub(crate) async fn process(
    socket: &mut TcpStream,
    config: DoreaFileConfig,
    current: String,
    database_manager: Arc<DataBaseManager>,
    startup_time: i64,
    value_ser_style: String,
    connect_id: uuid::Uuid,
) -> Result<()> {
    let mut current = current;
    let mut value_ser_style = value_ser_style;

    let mut auth = false;

    if config.connection.connection_password.is_empty() {
        auth = true;
    }

    let mut frame = Frame::new();

    let mut message: Vec<u8>;

    loop {
        message = match frame.parse_frame(socket).await {
            Ok(message) => message,
            Err(e) => {
                // 流损坏(MAGIC 不匹配/版本不对)无法恢复,直接断开连接
                let err_msg = e.to_string();
                if err_msg.contains("invalid magic") || err_msg.contains("unsupported protocol") {
                    return Err(e);
                }
                if let Some(io_err) = e.downcast_ref::<std::io::Error>() {
                    if io_err.kind() == std::io::ErrorKind::ConnectionReset
                        || io_err.kind() == std::io::ErrorKind::UnexpectedEof
                    {
                        return Err(e);
                    }
                }

                NetPacket::make(e.to_string().as_bytes().to_vec(), NetPacketState::ERR)
                    .send(socket)
                    .await?;
                continue;
            }
        };

        if message.is_empty() {
            return Ok(());
        }

        let res = CommandManager::command_handle(
            String::from_utf8_lossy(&message[..]).to_string(),
            &mut auth,
            &mut current,
            &mut value_ser_style,
            &config,
            &database_manager,
            &connect_id,
        )
        .await;

        if res.0 != NetPacketState::EMPTY {
            // 将预设的数据转换为数据本值
            let body = match String::from_utf8_lossy(&res.1[..]).to_string().as_str() {
                "@[SERVER_STARTUP_TIME]" => startup_time.to_string().as_bytes().to_vec(),
                _ => {
                    let mut tmp = res.1.clone();

                    let val = String::from_utf8_lossy(&res.1[..]).to_string();
                    if val.len() > 14 && &val[0..14] == "@[PRELOAD_DB]:" {
                        let db_name = String::from(&val[14..]);
                        let tmp_db_manager = database_manager.clone();
                        let db_config = config.database.clone();
                        tokio::spawn(async move {
                            crate::database::DB_STATE
                                .lock()
                                .await
                                .insert(db_name.clone(), crate::database::DataBaseState::LOADING);

                            let storage_path =
                                tmp_db_manager.location.clone().join("storage");

                            let ndb = DataBase::init(
                                db_name.to_string(),
                                storage_path,
                                db_config,
                            )
                            .await;

                            match tmp_db_manager.load_from(&db_name, Arc::new(RwLock::new(ndb))).await {
                                Ok(_) => {
                                    crate::database::DB_STATE.lock().await.insert(
                                        db_name.clone(),
                                        crate::database::DataBaseState::NORMAL,
                                    );
                                }
                                Err(e) => {
                                    log::error!("database load error: {}", e.to_string())
                                }
                            };
                        });
                        tmp = vec![];
                    }

                    tmp
                } /* 不是预设数据,不处理 */
            };

            NetPacket::make(body, res.0).send(socket).await?;
        } else {
            // if is empty: connection closed
            return Ok(());
        }
    }
}