anansi-core 0.14.2

Anansi's core.
Documentation
use std::str;
use std::fmt;
use std::sync::Arc;
use std::slice::Iter;
use toml::Value::Table;
use async_trait::async_trait;
use tokio_postgres::types::ToSql;
use moka::future::Cache;

use crate::try_sql;
use crate::server::Settings;
use crate::web::{Result, WebErrorKind, BaseRequest};
use crate::records::Record;
use crate::db::{DbRow, DbRowVec, DbPool, DbType, Builder, sql_stmt, AsDb};

pub struct PgQuery<'q> {
    query: &'q str,
    params: &'q[&'q(dyn ToSql + Sync)],
}

impl<'q> PgQuery<'q> {
    pub fn new(query: &'q str, params: &'q[&'q(dyn ToSql + Sync)]) -> Self {
        Self {query, params}
    }
    pub async fn fetch_one<B: BaseRequest<SqlPool = D>, D: AsDb<SqlDb = PgDbPool>>(self, req: &B) -> Result<PgDbRow> {
        let statement = req.raw().pool().0.prepare(self.query).await?;
        Ok(PgDbRow {row: req.raw().pool().0.query_one(&statement, self.params).await?})
    }
    pub async fn fetch_all<B: BaseRequest<SqlPool = D>, D: AsDb<SqlDb = PgDbPool>>(self, req: &B) -> Result<PgDbRowVec> {
        let statement = req.raw().pool().0.prepare(self.query).await?;
        Ok(PgDbRowVec {rows: req.raw().pool().0.query(&statement, self.params).await?})
    }
    pub async fn execute<B: BaseRequest<SqlPool = D>, D: AsDb<SqlDb = PgDbPool>>(self, req: &B) -> Result<()> {
        let statement = req.raw().pool().0.prepare(self.query).await?;
        match req.raw().pool().0.execute(&statement, self.params).await {
            Ok(_) => Ok(()),
            Err(e) => Err(Box::new(e)),
        }
    }
}

pub struct PgDbRow {
    row: tokio_postgres::row::Row,
}

impl PgDbRow {
    pub fn get_i32(&self, index: usize) -> i32 {
        self.row.get(index)
    }
    pub fn get_i64(&self, index: usize) -> i64 {
        self.row.get(index)
    }
    pub fn get_string(&self, index: usize) -> String {
        self.row.get(index)
    }
}

impl DbRow for PgDbRow {
    type RawRow = tokio_postgres::row::Row;
    fn new(row: Self::RawRow) -> Self {
        Self {row}
    }
    fn try_bool(&self, index: &str) -> Result<bool> {
        try_sql!(self, index)
    }
    fn try_count(&self) -> Result<i64> {
        match self.row.try_get(0) {
            Ok(c) => Ok(c),
            Err(e) => {
                Err(Box::new(e))
            }
        }
    }
    fn try_i32(&self, index: &str) -> Result<i32> {
        let index: &str = &(index.to_lowercase());
        try_sql!(self, index)
    }
    fn try_i64(&self, index: &str) -> Result<i64> {
        try_sql!(self, index)
    }
    fn try_option_string(&self, index: &str) -> Result<Option<String>> {
        try_sql!(self, index)
    }
    fn try_string(&self, index: &str) -> Result<String> {
        try_sql!(self, index)
    }
    fn try_date_time(&self, index: &str) -> Result<String> {
        try_sql!(self, index)
    }
}

pub struct PgDbRowVec {
    rows: Vec<tokio_postgres::Row>,
}

impl DbRowVec for PgDbRowVec {
    type Row = PgDbRow;
    type Rows = Vec<tokio_postgres::Row>;
    fn from(rows: Self::Rows) -> Self {
        Self {rows}
    }
}

impl PgDbRowVec {
    pub fn len(&self) -> usize {
        self.rows.len()
    }
    pub fn iter(&self) -> Iter<'_, tokio_postgres::Row> {
        self.rows.iter()
    }
}

impl IntoIterator for PgDbRowVec {
    type Item = PgDbRow;
    type IntoIter = PgDbRowIntoIter;
    
    fn into_iter(self) -> Self::IntoIter {
        PgDbRowIntoIter {
            row_into_iter: self.rows.into_iter(),
        }
    }
}

pub struct PgDbRowIntoIter {
    row_into_iter: std::vec::IntoIter<tokio_postgres::Row>,
}

impl<'a> Iterator for PgDbRowIntoIter {
    type Item = PgDbRow;
    fn next(&mut self) -> Option<PgDbRow> {
        match self.row_into_iter.next() {
            Some(row) => Some(PgDbRow {row}),
            None => None,
        }
    }
}

#[derive(Clone)]
pub struct PgStatement(tokio_postgres::Statement);

impl PgStatement {
    pub async fn new(query: &str, pool: &PgDbPool) -> Result<Self> {
        let statement = pool.0.prepare(query).await?;
        Ok(Self(statement))
    }
    pub async fn fetch_all(&self, params: &[&(dyn ToSql + Sync)], pool: &PgDbPool) -> Result<PgDbRowVec> {
        Ok(PgDbRowVec {rows: pool.0.query(&self.0, params).await?})
    }
    pub async fn fetch_one(&self, params: &[&(dyn ToSql + Sync)], pool: &PgDbPool) -> Result<PgDbRow> {
        Ok(PgDbRow {row: pool.0.query_one(&self.0, params).await?})
    }
    pub async fn execute(&self, params: &[&(dyn ToSql + Sync)], pool: &PgDbPool) -> Result<()> {
        match pool.0.execute(&self.0, params).await {
            Ok(_) => Ok(()),
            Err(e) => Err(Box::new(e)),
        }
    }
}

#[macro_export]
macro_rules! prep {
    ($req:ident, $key:literal, $s:literal, $params:expr, $q:ident) => {
        {
            let opt = $req.raw().pool().1.get($key);
            let stmt = if let Some(st) = opt {
                st
            } else {
                let statement = std::sync::Arc::new(anansi::db::postgres::PgStatement::new($s, $req.raw().pool()).await?);
                $req.raw().pool().1.insert($key.to_string(), statement.clone()).await;
                statement
            };
            stmt.$q($params, $req.raw().pool()).await
        }
    };
    ($req:ident, $key:expr, $s:expr, $params:expr, $q:ident) => {
        {
            let opt = $req.raw().pool().1.get(&$key).map(|s| s.clone());
            let stmt = if let Some(st) = opt {
                st
            } else {
                let statement = std::sync::Arc::new(anansi::db::postgres::PgStatement::new(&$s, $req.raw().pool()).await?);
                $req.raw().pool().1.insert($key, statement.clone()).await;
                statement
            };
            stmt.$q($params, $req.raw().pool()).await
        }
    };
}

#[derive(Clone)]
pub struct PgDbPool(pub(in crate) Arc<tokio_postgres::Client>, pub Cache<String, Arc<PgStatement>, ahash::RandomState>);

impl fmt::Debug for PgDbPool {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("PgDbPool")
         .field("0", &self.0)
         .finish()
    }
}

#[async_trait]
impl DbPool for PgDbPool {
    type SqlRow = PgDbRow;
    type SqlRowVec = PgDbRowVec;
    async fn new(settings: &Settings) -> Result<Self> {
        let databases = match settings.get("databases") {
            Some(v) => match v {
                Table(t) => t,
                _ => return Err(WebErrorKind::BadDb.to_box()),
            }
            None => return Err(WebErrorKind::BadDb.to_box()),
        };
        let database = databases.get("default").expect("Could not get database information");
        let name = database.get("name").expect("Could not get database name").as_str().expect("Could not convert database name to string");
        let user = database.get("user").expect("Could not get database user").as_str().expect("Could not convert database user to string");
        let password = database.get("password").expect("Could not get database password").as_str().expect("Could not convert database password to string");
        let address = database.get("address").expect("Could not get database host").as_str().expect("Could not convert database host to string");
        let arg = format!("postgres://{user}:{password}@{address}/{name}");
        match Self::connect(&arg).await {
            Ok(p) => {
                let row = p.query_one("SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = 'public' AND tablename = 'anansi_records');", &[]).await?;
                if !row.try_get(0)? {
                    p.execute("CREATE TABLE \"anansi_records\"(\n\t\"name\" text NOT NULL,\n\t\"schema\" text NOT NULL\n);", &[]).await?;
                    p.execute("CREATE TABLE \"anansi_migrations\"(\n\t\"id\" SERIAL PRIMARY KEY,\n\t\"app\" TEXT NOT NULL,\n\t\"name\" TEXT NOT NULL,\n\t\"applied\" TIMESTAMP NOT NULL\n);\n", &[]).await?;
                }
                Ok(Self(Arc::new(p), Cache::builder().max_capacity(100).build_with_hasher(ahash::RandomState::default())))
            }
            Err(e) => {
                Err(e)
            }
        }
    }
    async fn query(&self, val: &str) -> Result<PgDbRowVec> {
        Ok(PgDbRowVec {rows: self.0.query(val, &[]).await?})
    }
    async fn raw_fetch_one(&self, val: &str) -> Result<Self::SqlRow> {
        Ok(PgDbRow {row: self.0.query_one(val, &[]).await?})
    }
    async fn raw_fetch_all(&self, val: &str) -> Result<Self::SqlRowVec> {
        Ok(PgDbRowVec {rows: self.0.query(val, &[]).await?})
    }
    async fn raw_execute(&self, val: &str) -> Result<()> {
        match self.0.execute(val, &[]).await {
            Ok(_) => Ok(()),
            Err(e) => Err(Box::new(e)),
        }
    }
    fn now() -> &'static str {
        "NOW()"
    }
    async fn test() -> Result<Self> {
        unimplemented!()
    }
    fn to_stmt<R: Record>(val: Builder<R>) -> String {
        sql_stmt(val)
    }
}

impl DbType for PgDbPool {
    fn db_type(ty: &str) -> String {
        if ty == "datetime" {
            "timestamp".to_string()
        } else {
            ty.to_string()
        }
    }
}

impl PgDbPool {
    async fn connect(arg: &str) -> Result<tokio_postgres::Client> {
        let (client, connection) = tokio_postgres::connect(arg, tokio_postgres::NoTls).await?;

        tokio::spawn(async move {
            if let Err(e) = connection.await {
                eprintln!("connection error: {}", e);
            }
        });
        Ok(client)
    }
}