kumo 0.2.8

An async web crawling framework for Rust — Scrapy for Rust
Documentation
use super::ItemStore;
use crate::error::KumoError;
use async_trait::async_trait;
use sqlx::MySqlPool;

pub struct MySqlStore {
    pool: MySqlPool,
    table: String,
    extra_columns: Vec<String>,
}

pub struct MySqlStoreBuilder {
    database_url: String,
    table: String,
    create_table: bool,
    extra_columns: Vec<(String, String)>,
}

impl MySqlStore {
    /// Connect to a MySQL/MariaDB database and create the default table `kumo_items` if missing.
    ///
    /// Use `mysql://user:pass@host/db`.
    pub async fn connect(database_url: &str) -> Result<Self, KumoError> {
        Self::builder(database_url).connect().await
    }

    /// Builder for a custom table name, extra columns, or to skip auto-create.
    pub fn builder(database_url: impl Into<String>) -> MySqlStoreBuilder {
        MySqlStoreBuilder {
            database_url: database_url.into(),
            table: "kumo_items".into(),
            create_table: true,
            extra_columns: Vec::new(),
        }
    }
}

impl MySqlStoreBuilder {
    /// Override the table name (default: `kumo_items`).
    pub fn table(mut self, name: impl Into<String>) -> Self {
        self.table = name.into();
        self
    }

    /// Whether to CREATE TABLE IF NOT EXISTS on connect (default: true).
    pub fn create_table(mut self, yes: bool) -> Self {
        self.create_table = yes;
        self
    }

    /// Add an extra column extracted from the scraped JSON by matching key name.
    ///
    /// `sql_type` is any valid MySQL type (`VARCHAR(255)`, `INT`, `JSON`, etc.).
    /// Missing fields are stored as NULL.
    pub fn add_column(
        mut self,
        name: impl Into<String>,
        sql_type: impl Into<String>,
    ) -> Result<Self, KumoError> {
        let name = name.into();
        super::validate_table_name(&name)?;
        self.extra_columns.push((name, sql_type.into()));
        Ok(self)
    }

    /// Validate the table name, connect, optionally create the table, return the store.
    pub async fn connect(self) -> Result<MySqlStore, KumoError> {
        super::validate_table_name(&self.table)?;

        let pool = MySqlPool::connect(&self.database_url)
            .await
            .map_err(|e| KumoError::store("mysql store", e))?;

        if self.create_table {
            let extra = self
                .extra_columns
                .iter()
                .map(|(name, ty)| format!(",\n                    `{}` {}", name, ty))
                .collect::<String>();
            let sql = format!(
                r#"CREATE TABLE IF NOT EXISTS `{}` (
                    id         BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
                    data       JSON NOT NULL,
                    created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP{}
                )"#,
                self.table, extra
            );
            sqlx::query(&sql)
                .execute(&pool)
                .await
                .map_err(|e| KumoError::store("mysql store", e))?;
        }

        Ok(MySqlStore {
            pool,
            table: self.table,
            extra_columns: self.extra_columns.into_iter().map(|(n, _)| n).collect(),
        })
    }
}

#[async_trait]
impl ItemStore for MySqlStore {
    async fn store(&self, item: &serde_json::Value) -> Result<(), KumoError> {
        let col_list: String = self
            .extra_columns
            .iter()
            .map(|n| format!(", `{}`", n))
            .collect();
        let param_list: String = self.extra_columns.iter().map(|_| ", ?").collect();
        let sql = format!(
            r#"INSERT INTO `{}` (data{}) VALUES (?{})"#,
            self.table, col_list, param_list
        );
        let mut q = sqlx::query(&sql).bind(item.to_string());
        for name in &self.extra_columns {
            q = q.bind(super::json_val_to_sql_string(item.get(name)));
        }
        q.execute(&self.pool)
            .await
            .map_err(|e| KumoError::store("mysql store", e))?;
        Ok(())
    }
}