use std::sync::Arc;
use tokio::net::TcpStream;
use tokio::sync::RwLock;
use crate::command::CommandManager;
use crate::configure::DoreaFileConfig;
use crate::database::{DataBase, DataBaseManager};
use crate::network::{Frame, NetPacket, NetPacketState};
use crate::Result;
pub(crate) async fn process(
socket: &mut TcpStream,
config: DoreaFileConfig,
current: String,
database_manager: Arc<DataBaseManager>,
startup_time: i64,
value_ser_style: String,
connect_id: uuid::Uuid,
) -> Result<()> {
let mut current = current;
let mut value_ser_style = value_ser_style;
let mut auth = false;
if config.connection.connection_password.is_empty() {
auth = true;
}
let mut frame = Frame::new();
let mut message: Vec<u8>;
loop {
message = match frame.parse_frame(socket).await {
Ok(message) => message,
Err(e) => {
let err_msg = e.to_string();
if err_msg.contains("invalid magic") || err_msg.contains("unsupported protocol") {
return Err(e);
}
if let Some(io_err) = e.downcast_ref::<std::io::Error>() {
if io_err.kind() == std::io::ErrorKind::ConnectionReset
|| io_err.kind() == std::io::ErrorKind::UnexpectedEof
{
return Err(e);
}
}
NetPacket::make(e.to_string().as_bytes().to_vec(), NetPacketState::ERR)
.send(socket)
.await?;
continue;
}
};
if message.is_empty() {
return Ok(());
}
let res = CommandManager::command_handle(
String::from_utf8_lossy(&message[..]).to_string(),
&mut auth,
&mut current,
&mut value_ser_style,
&config,
&database_manager,
&connect_id,
)
.await;
if res.0 != NetPacketState::EMPTY {
let body = match String::from_utf8_lossy(&res.1[..]).to_string().as_str() {
"@[SERVER_STARTUP_TIME]" => startup_time.to_string().as_bytes().to_vec(),
_ => {
let mut tmp = res.1.clone();
let val = String::from_utf8_lossy(&res.1[..]).to_string();
if val.len() > 14 && &val[0..14] == "@[PRELOAD_DB]:" {
let db_name = String::from(&val[14..]);
let tmp_db_manager = database_manager.clone();
let db_config = config.database.clone();
tokio::spawn(async move {
crate::database::DB_STATE
.lock()
.await
.insert(db_name.clone(), crate::database::DataBaseState::LOADING);
let storage_path =
tmp_db_manager.location.clone().join("storage");
let ndb = DataBase::init(
db_name.to_string(),
storage_path,
db_config,
)
.await;
match tmp_db_manager.load_from(&db_name, Arc::new(RwLock::new(ndb))).await {
Ok(_) => {
crate::database::DB_STATE.lock().await.insert(
db_name.clone(),
crate::database::DataBaseState::NORMAL,
);
}
Err(e) => {
log::error!("database load error: {}", e.to_string())
}
};
});
tmp = vec![];
}
tmp
}
};
NetPacket::make(body, res.0).send(socket).await?;
} else {
return Ok(());
}
}
}