use crate::{Change, MigrationError, Schema, diff::SchemaExt, introspect::SchemaIntrospect};
use dibs_proto::*;
use std::net::SocketAddr;
use tokio::net::TcpStream;
fn to_migration_error(err: MigrationError) -> DibsError {
let caller_str = format!(
"{}:{}:{}",
err.caller.file(),
err.caller.line(),
err.caller.column()
);
if let Some(ctx) = err.inner.sql_context() {
DibsError::MigrationFailed(SqlError {
message: ctx.message.clone(),
sql: Some(ctx.sql.clone()),
position: ctx.position.map(|p| p as u32),
hint: ctx.hint.clone(),
detail: ctx.detail.clone(),
caller: Some(caller_str),
})
} else {
DibsError::MigrationFailed(SqlError {
message: err.inner.to_string(),
sql: None,
position: None,
hint: None,
detail: None,
caller: Some(caller_str),
})
}
}
fn error_to_dibs_error(err: crate::Error) -> DibsError {
if let Some(ctx) = err.sql_context() {
DibsError::MigrationFailed(SqlError {
message: ctx.message.clone(),
sql: Some(ctx.sql.clone()),
position: ctx.position.map(|p| p as u32),
hint: ctx.hint.clone(),
detail: ctx.detail.clone(),
caller: ctx.caller.clone(),
})
} else {
DibsError::MigrationFailed(SqlError {
message: err.to_string(),
sql: None,
position: None,
hint: None,
detail: None,
caller: None,
})
}
}
pub fn run_service() {
let addr_str = std::env::var("DIBS_CLI_ADDR").unwrap_or_else(|_| {
eprintln!("DIBS_CLI_ADDR not set - this binary should be spawned by the dibs CLI");
std::process::exit(1);
});
let addr: SocketAddr = addr_str.parse().unwrap_or_else(|e| {
eprintln!("Invalid DIBS_CLI_ADDR '{}': {}", addr_str, e);
std::process::exit(1);
});
let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
rt.block_on(run_service_async(addr));
}
async fn run_service_async(addr: SocketAddr) {
let dispatcher = DibsServiceDispatcher::new(DibsServiceImpl::new());
let result = async {
let stream = TcpStream::connect(addr).await?;
let link = vox_stream::StreamLink::tcp(stream);
vox::initiator_on(link)
.on_connection(dispatcher)
.establish::<vox::NoopClient>()
.await
.map_err(std::io::Error::other)
}
.await;
match result {
Ok(client) => {
let _ = client.caller.closed().await;
}
Err(e) => {
eprintln!("Failed to connect to dibs CLI: {}", e);
std::process::exit(1);
}
}
}
#[derive(Clone)]
pub struct DibsServiceImpl;
impl DibsServiceImpl {
pub fn new() -> Self {
Self
}
}
impl Default for DibsServiceImpl {
fn default() -> Self {
Self::new()
}
}
struct DiffWithContext {
diff: crate::SchemaDiff,
current_schema: crate::solver::VirtualSchema,
desired_schema: crate::solver::VirtualSchema,
}
impl DibsServiceImpl {
async fn compute_diff_with_context(
&self,
database_url: &str,
) -> Result<DiffWithContext, DibsError> {
let (client, connection) = tokio_postgres::connect(database_url, tokio_postgres::NoTls)
.await
.map_err(|e| DibsError::ConnectionFailed(e.to_string()))?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("Database connection error: {}", e);
}
});
let rust_schema = crate::schema::collect_schema();
let db_schema = Schema::from_database(&client)
.await
.map_err(|e| DibsError::ConnectionFailed(e.to_string()))?;
let current_schema = crate::solver::VirtualSchema::from_tables(db_schema.tables.values());
let desired_schema = crate::solver::VirtualSchema::from_tables(rust_schema.tables.values());
let diff = rust_schema.diff(&db_schema);
Ok(DiffWithContext {
diff,
current_schema,
desired_schema,
})
}
}
impl DibsService for DibsServiceImpl {
async fn schema(&self) -> SchemaInfo {
let schema = crate::schema::collect_schema();
schema_to_info(&schema)
}
async fn diff(&self, request: DiffRequest) -> Result<DiffResult, DibsError> {
let ctx = self
.compute_diff_with_context(&request.database_url)
.await?;
Ok(diff_to_result(&ctx.diff))
}
async fn generate_migration_sql(&self, request: DiffRequest) -> Result<String, DibsError> {
let ctx = self
.compute_diff_with_context(&request.database_url)
.await?;
ctx.diff
.to_ordered_sql(&ctx.current_schema, &ctx.desired_schema)
.map_err(|e| {
DibsError::MigrationFailed(dibs_proto::SqlError {
message: e.to_string(),
sql: None,
position: None,
hint: None,
detail: None,
caller: None,
})
})
}
async fn migration_status(
&self,
request: MigrationStatusRequest,
) -> Result<Vec<MigrationInfo>, DibsError> {
let (mut client, connection) =
tokio_postgres::connect(&request.database_url, tokio_postgres::NoTls)
.await
.map_err(|e| DibsError::ConnectionFailed(e.to_string()))?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("Database connection error: {}", e);
}
});
let runner = crate::MigrationRunner::new(&mut client);
let status = runner.status().await.map_err(error_to_dibs_error)?;
Ok(status
.into_iter()
.map(|s| {
let source = std::fs::read_to_string(&s.source_path).ok();
MigrationInfo {
version: s.version.to_string(),
name: s.name.to_string(),
applied: s.applied,
applied_at: None, source_file: Some(s.source_path.display().to_string()),
source,
}
})
.collect())
}
async fn migrate(
&self,
request: MigrateRequest,
logs: vox::Tx<MigrationLog>,
) -> Result<MigrateResult, DibsError> {
use dibs_proto::{AppliedMigration as ProtoApplied, RanMigration as ProtoRan};
let total_start = std::time::Instant::now();
let (mut client, connection) =
tokio_postgres::connect(&request.database_url, tokio_postgres::NoTls)
.await
.map_err(|e| DibsError::ConnectionFailed(e.to_string()))?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("Database connection error: {}", e);
}
});
let total_defined = crate::MigrationRunner::total_defined() as u32;
let mut runner = crate::MigrationRunner::new(&mut client);
let setup_start = std::time::Instant::now();
runner.init().await.map_err(error_to_dibs_error)?;
let already_applied = runner.applied().await.map_err(error_to_dibs_error)?;
let setup_ms = setup_start.elapsed().as_millis() as u64;
if let Some(migration) = request.migration {
return Err(DibsError::InvalidRequest(format!(
"Running specific migration '{}' not yet implemented",
migration
)));
}
let ran = runner.migrate().await.map_err(to_migration_error)?;
for m in &ran {
let _ = logs
.send(MigrationLog {
level: LogLevel::Info,
message: format!("Applied {} ({}ms)", m.version, m.duration.as_millis()),
migration: Some(m.version.to_string()),
})
.await;
}
let total_time_ms = total_start.elapsed().as_millis() as u64;
Ok(MigrateResult {
total_defined,
already_applied: already_applied
.into_iter()
.map(|m| ProtoApplied {
version: m.version,
applied_at: m.applied_at.to_string(),
})
.collect(),
applied: ran
.into_iter()
.map(|m| ProtoRan {
version: m.version.to_string(),
duration_ms: m.duration.as_millis() as u64,
})
.collect(),
setup_ms,
total_time_ms,
})
}
}
fn schema_to_info(schema: &Schema) -> SchemaInfo {
SchemaInfo {
tables: schema
.tables
.values()
.map(|t| TableInfo {
name: t.name.clone(),
columns: t
.columns
.iter()
.map(|c| ColumnInfo {
name: c.name.clone(),
sql_type: c.pg_type.to_string(),
rust_type: c.rust_type.clone(),
nullable: c.nullable,
default: c.default.clone(),
primary_key: c.primary_key,
unique: c.unique,
auto_generated: c.auto_generated,
long: c.long,
label: c.label,
enum_variants: c.enum_variants.clone(),
doc: c.doc.clone(),
lang: c.lang.clone(),
icon: c.icon.clone(),
subtype: c.subtype.clone(),
})
.collect(),
foreign_keys: t
.foreign_keys
.iter()
.map(|fk| ForeignKeyInfo {
columns: fk.columns.clone(),
references_table: fk.references_table.clone(),
references_columns: fk.references_columns.clone(),
})
.collect(),
indices: t
.indices
.iter()
.map(|idx| IndexInfo {
name: idx.name.clone(),
columns: idx
.columns
.iter()
.map(|c| IndexColumnInfo {
name: c.name.clone(),
order: match c.order {
crate::SortOrder::Asc => "asc".to_string(),
crate::SortOrder::Desc => "desc".to_string(),
},
nulls: match c.nulls {
crate::NullsOrder::Default => "default".to_string(),
crate::NullsOrder::First => "first".to_string(),
crate::NullsOrder::Last => "last".to_string(),
},
})
.collect(),
unique: idx.unique,
where_clause: idx.where_clause.clone(),
})
.collect(),
source_file: t.source.file.clone(),
source_line: t.source.line,
doc: t.doc.clone(),
icon: t.icon.clone(),
})
.collect(),
}
}
fn diff_to_result(diff: &crate::SchemaDiff) -> DiffResult {
DiffResult {
table_diffs: diff
.table_diffs
.iter()
.map(|td| TableDiffInfo {
table: td.table.clone(),
changes: td
.changes
.iter()
.map(|c| {
let kind = match c {
Change::AddTable(_)
| Change::AddColumn(_)
| Change::AddPrimaryKey(_)
| Change::AddForeignKey(_)
| Change::AddIndex(_)
| Change::AddUnique(_)
| Change::AddCheck(_)
| Change::AddTriggerCheckFunction(_)
| Change::AddTriggerCheck(_) => ChangeKind::Add,
Change::DropTable(_)
| Change::DropColumn(_)
| Change::DropPrimaryKey
| Change::DropForeignKey(_)
| Change::DropIndex(_)
| Change::DropUnique(_)
| Change::DropCheck(_)
| Change::DropTriggerCheck(_)
| Change::DropTriggerCheckFunction(_) => ChangeKind::Drop,
Change::RenameTable { .. }
| Change::RenameColumn { .. }
| Change::AlterColumnType { .. }
| Change::AlterColumnNullable { .. }
| Change::AlterColumnDefault { .. }
| Change::AlterColumnAutoGenerated { .. } => ChangeKind::Alter,
};
ChangeInfo {
description: format!("{}", c),
kind,
}
})
.collect(),
})
.collect(),
}
}