use actix::prelude::{Actor, Handler, Message, Syn};
use actix::sync::{SyncArbiter, SyncContext};
use actix::Addr;
use diesel::r2d2::{ConnectionManager, Pool};
use diesel::{self, ExpressionMethods, QueryDsl, RunQueryDsl};
#[cfg(feature = "sqlite")]
use diesel::sqlite::SqliteConnection;
#[cfg(feature = "mysql")]
use diesel::mysql::MysqlConnection;
#[cfg(feature = "postgres")]
use diesel::pg::PgConnection;
use failure::Error;
use super::SqlIdentityError;
table! {
identities (token) {
token -> Text,
userid -> Text,
}
}
#[derive(Debug, Queryable)]
pub struct SqlIdentityModel {
pub token: String,
pub userid: String,
}
enum SqlPool {
#[cfg(feature = "sqlite")]
SqlitePool(Pool<ConnectionManager<SqliteConnection>>),
#[cfg(feature = "mysql")]
MySqlPool(Pool<ConnectionManager<MysqlConnection>>),
#[cfg(feature = "postgres")]
PgPool(Pool<ConnectionManager<PgConnection>>),
}
pub struct SqlActor(SqlPool);
impl SqlActor {
pub fn sqlite(n: usize, s: &str) -> Result<Addr<Syn, SqlActor>, Error> {
#[cfg(feature = "sqlite")]
{
let manager = ConnectionManager::<SqliteConnection>::new(s);
let pool = Pool::builder()
.build(manager)?;
let addr = SyncArbiter::start(n, move || SqlActor(SqlPool::SqlitePool(pool.clone())));
Ok(addr)
}
#[cfg(not(feature = "sqlite"))]
{
let _ = n;
let _ = s;
Err(SqlIdentityError::SqlVariantNotSupported.into())
}
}
pub fn mysql(n: usize, s: &str) -> Result<Addr<Syn, SqlActor>, Error> {
#[cfg(feature = "mysql")]
{
let manager = ConnectionManager::<MysqlConnection>::new(s);
let pool = Pool::builder()
.build(manager)?;
let addr = SyncArbiter::start(n, move || SqlActor(SqlPool::MySqlPool(pool.clone())));
Ok(addr)
}
#[cfg(not(feature = "mysql"))]
{
let _ = n;
let _ = s;
Err(SqlIdentityError::SqlVariantNotSupported.into())
}
}
pub fn pg(n: usize, s: &str) -> Result<Addr<Syn, SqlActor>, Error> {
#[cfg(feature = "postgres")]
{
let manager = ConnectionManager::<PgConnection>::new(s);
let pool = Pool::builder()
.build(manager)?;
let addr = SyncArbiter::start(n, move || SqlActor(SqlPool::PgPool(pool.clone())));
Ok(addr)
}
#[cfg(not(feature = "postgres"))]
{
let _ = n;
let _ = s;
Err(SqlIdentityError::SqlVariantNotSupported.into())
}
}
}
impl Actor for SqlActor {
type Context = SyncContext<Self>;
}
pub struct FindIdentity {
pub token: String,
}
impl Message for FindIdentity {
type Result = Result<SqlIdentityModel, Error>;
}
impl Handler<FindIdentity> for SqlActor {
type Result = Result<SqlIdentityModel, Error>;
fn handle(&mut self, msg: FindIdentity, _: &mut Self::Context) -> Self::Result {
use self::identities::dsl::*;
let mut results = match self.0 {
#[cfg(feature = "sqlite")]
SqlPool::SqlitePool(ref p) => {
let conn: &SqliteConnection = &(*(p.get()?));
identities
.filter(token.eq(msg.token))
.limit(1)
.load::<SqlIdentityModel>(conn)?
}
#[cfg(feature = "mysql")]
SqlPool::MySqlPool(ref p) => {
let conn: &MysqlConnection = &(*(p.get()?));
identities
.filter(token.eq(msg.token))
.limit(1)
.load::<SqlIdentityModel>(conn)?
}
#[cfg(feature = "postgres")]
SqlPool::PgPool(ref p) => {
let conn: &PgConnection = &(*(p.get()?));
identities
.filter(token.eq(msg.token))
.limit(1)
.load::<SqlIdentityModel>(conn)?
}
};
match results.len() {
1 => Ok(results.remove(0)),
_ => Err(SqlIdentityError::SqlTokenNotFound.into()),
}
}
}
#[derive(Debug, Insertable)]
#[table_name = "identities"]
pub struct UpdateIdentity {
pub token: String,
pub userid: String,
}
impl Message for UpdateIdentity {
type Result = Result<usize, Error>;
}
impl Handler<UpdateIdentity> for SqlActor {
type Result = Result<usize, Error>;
fn handle(&mut self, msg: UpdateIdentity, _: &mut Self::Context) -> Self::Result {
use self::identities::dsl::*;
match self.0 {
#[cfg(feature = "sqlite")]
SqlPool::SqlitePool(ref p) => {
let conn: &SqliteConnection = &(*(p.get()?));
let n = diesel::replace_into(identities).values(&msg).execute(conn)?;
Ok(n)
}
#[cfg(feature = "mysql")]
SqlPool::MySqlPool(ref p) => {
let conn: &MysqlConnection = &(*(p.get()?));
let n = diesel::replace_into(identities).values(&msg).execute(conn)?;
Ok(n)
}
#[cfg(feature = "postgres")]
SqlPool::PgPool(ref p) => {
let conn: &PgConnection = &(*(p.get()?));
let n = diesel::insert_into(identities)
.values(&msg)
.on_conflict(token)
.do_update()
.set(userid.eq(msg.userid.clone()))
.execute(conn)?;
Ok(n)
}
}
}
}
pub struct DeleteIdentity {
pub token: String,
}
impl Message for DeleteIdentity {
type Result = Result<usize, Error>;
}
impl Handler<DeleteIdentity> for SqlActor {
type Result = Result<usize, Error>;
fn handle(&mut self, msg: DeleteIdentity, _: &mut Self::Context) -> Self::Result {
use self::identities::dsl::*;
match self.0 {
#[cfg(feature = "sqlite")]
SqlPool::SqlitePool(ref p) => {
let conn: &SqliteConnection = &(*(p.get()?));
let n = diesel::delete(identities.filter(token.eq(msg.token))).execute(conn)?;
Ok(n)
}
#[cfg(feature = "mysql")]
SqlPool::MySqlPool(ref p) => {
let conn: &MysqlConnection = &(*(p.get()?));
let n = diesel::delete(identities.filter(token.eq(msg.token))).execute(conn)?;
Ok(n)
}
#[cfg(feature = "postgres")]
SqlPool::PgPool(ref p) => {
let conn: &PgConnection = &(*(p.get()?));
let n = diesel::delete(identities.filter(token.eq(msg.token))).execute(conn)?;
Ok(n)
}
}
}
}