sqltool 0.4.1

功能强大的数据库迁移、同步、运维工具,支持自动分库分表、慢查询检测、数据对比、备份恢复,提供多语言SDK
Documentation
use crate::databases::DatabaseConnection;
use crate::core::sql_executor::{SqlExecutor, ExecutionResult};
use anyhow::{Result, anyhow};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::time::{SystemTime, UNIX_EPOCH};

#[derive(Debug, Clone)]
pub struct Session {
    pub id: String,
    pub database_name: String,
    pub db_type: String,
    pub user: Option<String>,
    pub created_at: u64,
    pub last_activity: u64,
    pub is_active: bool,
    pub session_variables: HashMap<String, String>,
    pub connection_info: ConnectionInfo,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConnectionInfo {
    pub host: String,
    pub port: u16,
    pub database: String,
    pub charset: String,
    pub autocommit: bool,
}

impl Session {
    pub fn new(id: &str, db_type: &str, database: &str) -> Self {
        let now = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap()
            .as_secs();

        Self {
            id: id.to_string(),
            database_name: database.to_string(),
            db_type: db_type.to_string(),
            user: None,
            created_at: now,
            last_activity: now,
            is_active: true,
            session_variables: HashMap::new(),
            connection_info: ConnectionInfo {
                host: String::new(),
                port: 0,
                database: database.to_string(),
                charset: "utf8mb4".to_string(),
                autocommit: true,
            },
        }
    }

    pub fn touch(&mut self) {
        self.last_activity = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap()
            .as_secs();
    }

    pub fn set_variable(&mut self, key: &str, value: &str) {
        self.session_variables.insert(key.to_string(), value.to_string());
    }

    pub fn get_variable(&self, key: &str) -> Option<String> {
        self.session_variables.get(key).cloned()
    }

    pub fn is_expired(&self, timeout_secs: u64) -> bool {
        let now = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap()
            .as_secs();
        now - self.last_activity > timeout_secs
    }
}

pub struct SessionManager {
    sessions: RwLock<HashMap<String, Arc<RwLock<Session>>>>,
    max_sessions: usize,
    session_timeout_secs: u64,
}

impl SessionManager {
    pub fn new(max_sessions: usize, session_timeout_secs: u64) -> Self {
        Self {
            sessions: RwLock::new(HashMap::new()),
            max_sessions,
            session_timeout_secs,
        }
    }

    pub fn create_session(&self, id: &str, db_type: &str, database: &str) -> Result<Session> {
        {
            let sessions = self.sessions.write().unwrap();
            if sessions.len() >= self.max_sessions {
                drop(sessions);
                self.cleanup_expired_sessions()?;
            }
        }

        let mut sessions = self.sessions.write().unwrap();
        let session = Session::new(id, db_type, database);
        let session_arc = Arc::new(RwLock::new(session.clone()));
        sessions.insert(id.to_string(), session_arc);

        Ok(session)
    }

    pub fn get_session(&self, id: &str) -> Result<Session> {
        let sessions = self.sessions.read().unwrap();
        sessions
            .get(id)
            .ok_or_else(|| anyhow!("Session not found: {}", id))
            .map(|s| s.read().unwrap().clone())
    }

    pub fn update_session(&self, session: &Session) -> Result<()> {
        let sessions = self.sessions.write().unwrap();
        if let Some(s) = sessions.get(&session.id) {
            let mut s = s.write().unwrap();
            *s = session.clone();
            Ok(())
        } else {
            Err(anyhow!("Session not found: {}", session.id))
        }
    }

    pub fn close_session(&self, id: &str) -> Result<()> {
        let mut sessions = self.sessions.write().unwrap();
        sessions
            .remove(id)
            .ok_or_else(|| anyhow!("Session not found: {}", id))?;
        Ok(())
    }

    pub fn list_sessions(&self) -> Vec<Session> {
        let sessions = self.sessions.read().unwrap();
        sessions
            .values()
            .map(|s| s.read().unwrap().clone())
            .collect()
    }

    pub fn cleanup_expired_sessions(&self) -> Result<usize> {
        let mut sessions = self.sessions.write().unwrap();
        self.cleanup_expired_sessions_internal(&mut sessions)
    }

    fn cleanup_expired_sessions_internal(
        &self,
        sessions: &mut HashMap<String, Arc<RwLock<Session>>>,
    ) -> Result<usize> {
        let before = sessions.len();
        sessions.retain(|_, session| {
            let s = session.read().unwrap();
            !s.is_expired(self.session_timeout_secs)
        });
        let after = sessions.len();
        Ok(before - after)
    }

    pub fn session_count(&self) -> usize {
        let sessions = self.sessions.read().unwrap();
        sessions.len()
    }

    pub fn get_active_sessions(&self) -> Vec<Session> {
        let sessions = self.sessions.read().unwrap();
        sessions
            .values()
            .filter_map(|s| {
                let s = s.read().unwrap();
                if s.is_active && !s.is_expired(self.session_timeout_secs) {
                    Some(s.clone())
                } else {
                    None
                }
            })
            .collect()
    }
}

pub struct DatabaseSession {
    pub session: Session,
    pub executor: SqlExecutor,
}

impl DatabaseSession {
    pub async fn connect(
        id: &str,
        connection: Box<dyn DatabaseConnection>,
        _db_type: &str,
        database: &str,
    ) -> Result<Self> {
        let session = Session::new(id, "unknown", database);
        let executor = SqlExecutor::new(connection, "unknown");

        Ok(Self {
            session,
            executor,
        })
    }

    pub async fn execute(&mut self, sql: &str) -> ExecutionResult {
        self.session.touch();
        self.executor.execute(sql).await
    }
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ViewOperation {
    pub name: String,
    pub definition: String,
    pub or_replace: bool,
}

pub struct ViewManager {
    executor: SqlExecutor,
}

impl ViewManager {
    pub fn new(executor: SqlExecutor) -> Self {
        Self { executor }
    }

    pub async fn create_view(&mut self, operation: ViewOperation) -> ExecutionResult {
        let or_replace = if operation.or_replace { "OR REPLACE" } else { "" };
        let sql = format!(
            "CREATE {} VIEW {} AS {}",
            or_replace,
            operation.name,
            operation.definition
        );
        self.executor.execute(&sql).await
    }

    pub async fn drop_view(&mut self, name: &str, if_exists: bool) -> ExecutionResult {
        let if_exists = if if_exists { "IF EXISTS" } else { "" };
        let sql = format!("DROP VIEW {} {}", if_exists, name);
        self.executor.execute(&sql).await
    }

    pub async fn rename_view(&mut self, old_name: &str, new_name: &str) -> ExecutionResult {
        let sql = format!("RENAME TABLE {} TO {}", old_name, new_name);
        self.executor.execute(&sql).await
    }
}

pub struct FunctionManager {
    executor: SqlExecutor,
}

impl FunctionManager {
    pub fn new(executor: SqlExecutor) -> Self {
        Self { executor }
    }

    pub async fn create_function(
        &mut self,
        name: &str,
        args: &[(&str, &str)],
        return_type: &str,
        body: &str,
        language: &str,
    ) -> ExecutionResult {
        let args_str = args
            .iter()
            .map(|(n, t)| format!("{} {}", n, t))
            .collect::<Vec<_>>()
            .join(", ");

        let sql = format!(
            "CREATE FUNCTION {} ({}) RETURNS {} LANGUAGE {} AS $$\n{}\n$$",
            name, args_str, return_type, language, body
        );

        self.executor.execute(&sql).await
    }

    pub async fn drop_function(&mut self, name: &str, if_exists: bool) -> ExecutionResult {
        let if_exists = if if_exists { "IF EXISTS" } else { "" };
        let sql = format!("DROP FUNCTION {} {}", if_exists, name);
        self.executor.execute(&sql).await
    }
}

pub struct IndexManager {
    executor: SqlExecutor,
}

impl IndexManager {
    pub fn new(executor: SqlExecutor) -> Self {
        Self { executor }
    }

    pub async fn create_index(
        &mut self,
        table: &str,
        name: &str,
        fields: &[&str],
        unique: bool,
    ) -> ExecutionResult {
        let unique_keyword = if unique { "UNIQUE" } else { "" };
        let fields_str = fields.join(", ");
        let sql = format!(
            "CREATE {} INDEX {} ON {} ({})",
            unique_keyword, name, table, fields_str
        );
        self.executor.execute(&sql).await
    }

    pub async fn drop_index(&mut self, name: &str, table: Option<&str>) -> ExecutionResult {
        let sql = if let Some(t) = table {
            format!("DROP INDEX {} ON {}", name, t)
        } else {
            format!("DROP INDEX {}", name)
        };
        self.executor.execute(&sql).await
    }

    pub async fn analyze_table(&mut self, table: &str) -> ExecutionResult {
        let sql = format!("ANALYZE TABLE {}", table);
        self.executor.execute(&sql).await
    }

    pub async fn check_table(&mut self, table: &str) -> ExecutionResult {
        let sql = format!("CHECK TABLE {}", table);
        self.executor.execute(&sql).await
    }

    pub async fn repair_table(&mut self, table: &str) -> ExecutionResult {
        let sql = format!("REPAIR TABLE {}", table);
        self.executor.execute(&sql).await
    }

    pub async fn optimize_table(&mut self, table: &str) -> ExecutionResult {
        let sql = format!("OPTIMIZE TABLE {}", table);
        self.executor.execute(&sql).await
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_session_creation() {
        let session = Session::new("sess_001", "mysql", "testdb");
        assert_eq!(session.id, "sess_001");
        assert_eq!(session.database_name, "testdb");
        assert!(session.is_active);
    }

    #[test]
    fn test_session_variables() {
        let mut session = Session::new("sess_001", "mysql", "testdb");
        session.set_variable("sql_mode", "STRICT_TRANS_TABLES");
        assert_eq!(session.get_variable("sql_mode"), Some("STRICT_TRANS_TABLES".to_string()));
    }

    #[test]
    fn test_session_manager() {
        let manager = SessionManager::new(10, 3600);
        let session = manager.create_session("sess_001", "mysql", "testdb").unwrap();
        assert_eq!(session.id, "sess_001");

        let retrieved = manager.get_session("sess_001").unwrap();
        assert_eq!(retrieved.id, "sess_001");

        let sessions = manager.list_sessions();
        assert_eq!(sessions.len(), 1);

        manager.close_session("sess_001").unwrap();
        assert_eq!(manager.session_count(), 0);
    }

    #[test]
    fn test_query_builder_integration() {
        use crate::core::sql_executor::QueryBuilder;
        use std::collections::HashMap;

        let query = QueryBuilder::new("users")
            .select(&["id", "name"])
            .where_cond("active = TRUE")
            .build_select();

        assert!(query.contains("SELECT id, name FROM users"));
    }
}