use anyhow::{anyhow, Result};
use aqueue::{inner_wait, Actor};
use sqlx::sqlite::SqlitePoolOptions;
use sqlx::SqlitePool;
use std::env;
use tokio::task::JoinHandle;
#[derive(sqlx::FromRow, Debug)]
#[allow(dead_code)]
pub struct User {
id: i64,
name: String,
gold: f64,
}
pub struct DataBases {
auto_id: u32,
pool: SqlitePool,
}
unsafe impl Send for DataBases {}
unsafe impl Sync for DataBases {}
impl DataBases {
pub fn new(sqlite_max_connections: u32) -> Result<Actor<DataBases>> {
let pool = SqlitePoolOptions::new()
.max_connections(sqlite_max_connections)
.connect_lazy(&env::var("DATABASE_URL")?)?;
Ok(Actor::new(DataBases { auto_id: 0, pool }))
}
async fn create_table(&self) -> Result<()> {
sqlx::query(include_str!("table.sql")).execute(&self.pool).await?;
Ok(())
}
async fn insert_user(&mut self, name: &str, gold: f64) -> Result<bool> {
self.auto_id += 1;
let row = sqlx::query(
r#"
insert into `user`(`id`,`name`,`gold`)
values(?,?,?)
"#,
)
.bind(&self.auto_id)
.bind(name)
.bind(gold)
.execute(&self.pool)
.await?
.rows_affected();
Ok(row == 1)
}
async fn select_all_users(&self) -> Result<Vec<User>> {
Ok(sqlx::query_as::<_, User>("select * from `user`").fetch_all(&self.pool).await?)
}
}
pub(crate) trait IDatabase {
async fn create_table(&self) -> Result<()>;
async fn insert_user(&self, name: String, gold: f64) -> Result<bool>;
async fn insert_user_ref_name(&self, name: &str, gold: f64) -> Result<bool>;
async fn select_all_users(&self) -> Result<Vec<User>>;
async fn test_unsafe_blocking(&self, name: String, gold: f64) -> Result<bool>;
}
impl IDatabase for Actor<DataBases> {
async fn create_table(&self) -> Result<()> {
self.inner_call(|inner| async move { inner.get().create_table().await }).await
}
async fn insert_user(&self, name: String, gold: f64) -> Result<bool> {
self.inner_call(|inner| async move { inner.get_mut().insert_user(&name, gold).await })
.await
}
async fn insert_user_ref_name(&self, name: &str, gold: f64) -> Result<bool> {
self.inner_call(|inner| async move { inner.get_mut().insert_user(name, gold).await })
.await
}
async fn select_all_users(&self) -> Result<Vec<User>> {
unsafe {
self.deref_inner().select_all_users().await
}
}
async fn test_unsafe_blocking(&self, name: String, gold: f64) -> Result<bool> {
inner_wait!(self, 3000, |_| async move { DB.insert_user(name, gold).await }).await?
}
}
lazy_static::lazy_static! {
static ref DB:Actor<DataBases>={
DataBases::new(50).expect("install db error")
};
}
#[tokio::main]
async fn main() -> Result<()> {
dotenv::dotenv().ok().ok_or_else(|| anyhow!(".env file not found"))?;
DB.create_table().await?;
let mut join_vec = Vec::with_capacity(100);
for i in 0..100 {
let join: JoinHandle<Result<()>> = tokio::spawn(async move {
for j in 0..1000 {
DB.insert_user(i.to_string(), j as f64).await?;
}
Ok(())
});
join_vec.push(join);
}
for join in join_vec {
join.await??;
}
for user in DB.select_all_users().await? {
println!("{:?}", user);
}
DB.test_unsafe_blocking("123123".to_string(), 1111111f64).await?;
Ok(())
}