use super::schema::TableSchema;
use crate::error::{QuickDbError, QuickDbResult};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct SchemaVersion {
pub table_name: String,
pub version: u32,
pub schema: TableSchema,
pub created_at: chrono::DateTime<chrono::Utc>,
pub migration_script: Option<MigrationScript>,
pub description: Option<String>,
pub is_current: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct MigrationScript {
pub id: String,
pub from_version: u32,
pub to_version: u32,
pub up_script: String,
pub down_script: Option<String>,
pub script_type: MigrationScriptType,
pub created_at: chrono::DateTime<chrono::Utc>,
pub executed_at: Option<chrono::DateTime<chrono::Utc>>,
pub status: MigrationStatus,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum MigrationScriptType {
Ddl,
Dml,
Mixed,
Custom,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum MigrationStatus {
Pending,
Running,
Success,
Failed,
Rollback,
}
#[derive(Debug)]
pub struct VersionManager {
versions: HashMap<String, Vec<SchemaVersion>>,
migration_history: Vec<MigrationRecord>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MigrationRecord {
pub id: String,
pub table_name: String,
pub migration_id: String,
pub from_version: u32,
pub to_version: u32,
pub started_at: chrono::DateTime<chrono::Utc>,
pub completed_at: Option<chrono::DateTime<chrono::Utc>>,
pub status: MigrationStatus,
pub error_message: Option<String>,
pub duration_ms: Option<u64>,
}
impl VersionManager {
pub fn new() -> Self {
Self {
versions: HashMap::new(),
migration_history: Vec::new(),
}
}
pub fn register_version(
&mut self,
table_name: String,
schema: TableSchema,
description: Option<String>,
) -> QuickDbResult<u32> {
let versions = self
.versions
.entry(table_name.clone())
.or_insert_with(Vec::new);
let new_version = versions.iter().map(|v| v.version).max().unwrap_or(0) + 1;
for version in versions.iter_mut() {
version.is_current = false;
}
let schema_version = SchemaVersion {
table_name: table_name.clone(),
version: new_version,
schema,
created_at: chrono::Utc::now(),
migration_script: None,
description,
is_current: true,
};
versions.push(schema_version);
Ok(new_version)
}
pub fn get_current_version(&self, table_name: &str) -> Option<&SchemaVersion> {
self.versions.get(table_name)?.iter().find(|v| v.is_current)
}
pub fn get_version(&self, table_name: &str, version: u32) -> Option<&SchemaVersion> {
self.versions
.get(table_name)?
.iter()
.find(|v| v.version == version)
}
pub fn get_all_versions(&self, table_name: &str) -> Option<&Vec<SchemaVersion>> {
self.versions.get(table_name)
}
pub fn get_version_history(&self, table_name: &str) -> Vec<&SchemaVersion> {
if let Some(versions) = self.versions.get(table_name) {
let mut sorted_versions: Vec<&SchemaVersion> = versions.iter().collect();
sorted_versions.sort_by(|a, b| a.version.cmp(&b.version));
sorted_versions
} else {
Vec::new()
}
}
pub fn create_migration(
&mut self,
table_name: &str,
from_version: u32,
to_version: u32,
up_script: String,
down_script: Option<String>,
script_type: MigrationScriptType,
) -> QuickDbResult<String> {
if self.get_version(table_name, from_version).is_none() {
return Err(QuickDbError::ValidationError {
field: "from_version".to_string(),
message: format!("源版本 {} 不存在", from_version),
});
}
if self.get_version(table_name, to_version).is_none() {
return Err(QuickDbError::ValidationError {
field: "to_version".to_string(),
message: format!("目标版本 {} 不存在", to_version),
});
}
let migration_id = format!(
"{}_{}_to_{}_{}_{}",
table_name,
from_version,
to_version,
chrono::Utc::now().timestamp(),
uuid::Uuid::new_v4().to_string()[..8].to_string()
);
let migration = MigrationScript {
id: migration_id.clone(),
from_version,
to_version,
up_script,
down_script,
script_type,
created_at: chrono::Utc::now(),
executed_at: None,
status: MigrationStatus::Pending,
};
if let Some(versions) = self.versions.get_mut(table_name) {
if let Some(version) = versions.iter_mut().find(|v| v.version == to_version) {
version.migration_script = Some(migration);
}
}
Ok(migration_id)
}
pub async fn execute_migration(
&mut self,
table_name: &str,
migration_id: &str,
) -> QuickDbResult<()> {
let (from_version, to_version) = {
let migration = self
.find_migration_mut(table_name, migration_id)
.ok_or_else(|| QuickDbError::ValidationError {
field: "migration_id".to_string(),
message: format!("迁移脚本 {} 不存在", migration_id),
})?;
if migration.status != MigrationStatus::Pending {
return Err(QuickDbError::ValidationError {
field: "migration_status".to_string(),
message: format!(
"迁移脚本 {} 状态不正确: {:?}",
migration_id, migration.status
),
});
}
let start_time = chrono::Utc::now();
migration.status = MigrationStatus::Running;
(migration.from_version, migration.to_version)
};
let start_time = chrono::Utc::now();
let record = MigrationRecord {
id: uuid::Uuid::new_v4().to_string(),
table_name: table_name.to_string(),
migration_id: migration_id.to_string(),
from_version,
to_version,
started_at: start_time,
completed_at: None,
status: MigrationStatus::Running,
error_message: None,
duration_ms: None,
};
self.migration_history.push(record);
let end_time = chrono::Utc::now();
let duration = end_time.signed_duration_since(start_time);
if let Some(migration) = self.find_migration_mut(table_name, migration_id) {
migration.status = MigrationStatus::Success;
migration.executed_at = Some(end_time);
}
if let Some(record) = self.migration_history.last_mut() {
record.completed_at = Some(end_time);
record.status = MigrationStatus::Success;
record.duration_ms = Some(duration.num_milliseconds() as u64);
}
Ok(())
}
pub async fn rollback_migration(
&mut self,
table_name: &str,
migration_id: &str,
) -> QuickDbResult<()> {
let (from_version, to_version, down_script) = {
let migration = self
.find_migration_mut(table_name, migration_id)
.ok_or_else(|| QuickDbError::ValidationError {
field: "migration_id".to_string(),
message: format!("迁移脚本 {} 不存在", migration_id),
})?;
let down_script =
migration
.down_script
.clone()
.ok_or_else(|| QuickDbError::ValidationError {
field: "down_script".to_string(),
message: format!("迁移脚本 {} 没有回滚脚本", migration_id),
})?;
if migration.status != MigrationStatus::Success {
return Err(QuickDbError::ValidationError {
field: "migration_status".to_string(),
message: format!(
"迁移脚本 {} 状态不正确,无法回滚: {:?}",
migration_id, migration.status
),
});
}
(migration.to_version, migration.from_version, down_script)
};
let start_time = chrono::Utc::now();
let record = MigrationRecord {
id: uuid::Uuid::new_v4().to_string(),
table_name: table_name.to_string(),
migration_id: format!("{}_rollback", migration_id),
from_version,
to_version,
started_at: start_time,
completed_at: None,
status: MigrationStatus::Running,
error_message: None,
duration_ms: None,
};
self.migration_history.push(record);
let end_time = chrono::Utc::now();
let duration = end_time.signed_duration_since(start_time);
if let Some(migration) = self.find_migration_mut(table_name, migration_id) {
migration.status = MigrationStatus::Rollback;
}
if let Some(record) = self.migration_history.last_mut() {
record.completed_at = Some(end_time);
record.status = MigrationStatus::Success;
record.duration_ms = Some(duration.num_milliseconds() as u64);
}
Ok(())
}
pub fn get_migration_history(&self, table_name: Option<&str>) -> Vec<&MigrationRecord> {
if let Some(table) = table_name {
self.migration_history
.iter()
.filter(|record| record.table_name == table)
.collect()
} else {
self.migration_history.iter().collect()
}
}
pub fn needs_migration(&self, table_name: &str, current_db_version: u32) -> bool {
if let Some(current_version) = self.get_current_version(table_name) {
current_version.version > current_db_version
} else {
false
}
}
pub fn get_migration_path(
&self,
table_name: &str,
from_version: u32,
to_version: u32,
) -> QuickDbResult<Vec<&MigrationScript>> {
if from_version == to_version {
return Ok(Vec::new());
}
let versions =
self.versions
.get(table_name)
.ok_or_else(|| QuickDbError::ValidationError {
field: "table_name".to_string(),
message: format!("表 {} 不存在", table_name),
})?;
let mut path = Vec::new();
if from_version < to_version {
for version in (from_version + 1)..=to_version {
if let Some(schema_version) = versions.iter().find(|v| v.version == version) {
if let Some(migration) = &schema_version.migration_script {
path.push(migration);
} else {
return Err(QuickDbError::ValidationError {
field: "migration_script".to_string(),
message: format!("版本 {} 缺少迁移脚本", version),
});
}
} else {
return Err(QuickDbError::ValidationError {
field: "version".to_string(),
message: format!("版本 {} 不存在", version),
});
}
}
} else {
for version in (to_version + 1..=from_version).rev() {
if let Some(schema_version) = versions.iter().find(|v| v.version == version) {
if let Some(migration) = &schema_version.migration_script {
if migration.down_script.is_some() {
path.push(migration);
} else {
return Err(QuickDbError::ValidationError {
field: "down_script".to_string(),
message: format!("版本 {} 缺少回滚脚本", version),
});
}
} else {
return Err(QuickDbError::ValidationError {
field: "migration_script".to_string(),
message: format!("版本 {} 缺少迁移脚本", version),
});
}
} else {
return Err(QuickDbError::ValidationError {
field: "version".to_string(),
message: format!("版本 {} 不存在", version),
});
}
}
}
Ok(path)
}
fn find_migration_mut(
&mut self,
table_name: &str,
migration_id: &str,
) -> Option<&mut MigrationScript> {
self.versions.get_mut(table_name)?.iter_mut().find_map(|v| {
if let Some(migration) = &mut v.migration_script {
if migration.id == migration_id {
Some(migration)
} else {
None
}
} else {
None
}
})
}
pub fn cleanup_history(&mut self, keep_days: u32) {
let cutoff_date = chrono::Utc::now() - chrono::Duration::days(keep_days as i64);
self.migration_history
.retain(|record| record.started_at > cutoff_date);
}
pub fn export_versions(&self) -> QuickDbResult<String> {
serde_json::to_string_pretty(&self.versions).map_err(|e| QuickDbError::SerializationError {
message: e.to_string(),
})
}
pub fn import_versions(&mut self, data: &str) -> QuickDbResult<()> {
let versions: HashMap<String, Vec<SchemaVersion>> =
serde_json::from_str(data).map_err(|e| QuickDbError::SerializationError {
message: e.to_string(),
})?;
self.versions = versions;
Ok(())
}
}
impl Default for VersionManager {
fn default() -> Self {
Self::new()
}
}