1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
use failure::Error; use crate::api; use path_abs::FileRead; use postgres::{self, transaction::Transaction, TlsMode}; use std::ops::Deref; use warden_core::migration::snapshot; use warden_core::{dbms, migration::meta::Meta}; #[derive(Debug)] pub struct Connection { connection: postgres::Connection, catalog: String, initialised: bool, } impl dbms::Connection for Connection { fn get_catalog(&self) -> &str { &self.catalog } fn get_last_deployed_migration(&self) -> Result<Option<u128>, Error> { if !self.is_initialised()? { return Ok(None); } let result: i64 = self .connection .query("select warden.get_latest_deployed_migration()", &[])? .get(0) .get_opt(0) .ok_or(failure::err_msg("Could not fetch warden metadata"))??; Ok(Some(result as u128)) } fn deploy(&self, meta: Meta) -> Result<(), Error> { if !self.is_initialised()? { let id = meta.get_identity().get_id().unwrap_or(1); if id == 0 { return self.deploy_initial(meta); } else { return Err(failure::err_msg( format!( r#"The database is not initialised with Warden. Error trying to deploy migration "{}". The initial migration must be deployed first"#, meta.get_identity() ) )); } } let transaction = self.connection.transaction()?; self.register_migration(&transaction, &meta)?; self.deploy_migration(&transaction, &meta)?; Ok(transaction.commit()?) } } impl Connection { fn is_initialised(&self) -> Result<bool, Error> { Ok(self.initialised || is_initialised(&self.connection, &self.catalog)?) } fn deploy_migration(&self, transaction: &Transaction, meta: &Meta) -> Result<(), Error> { api::do_deploy_migration( transaction, meta.get_identity() .get_id() .ok_or(failure::err_msg("could not decode migration id"))?, ) } fn register_migration(&self, transaction: &Transaction, meta: &Meta) -> Result<(), Error> { let snapshot = snapshot::Snapshot::take(snapshot::Format::TarGz, &meta)?; let seal = meta.get_seal_meta().read_the_seal()?; api::do_register_migration( transaction, meta.get_identity() .get_id() .ok_or(failure::err_msg("could not decode migration id"))?, meta.get_identity().get_name(), &FileRead::read(meta.get_target())?.read_string()?, snapshot.format.as_str(), &snapshot.data, &seal.timestamp, seal.algo.stringify(), &seal.sign, ) } fn deploy_initial(&self, meta: Meta) -> Result<(), Error> { log::trace!("Deploying initial migration"); let sql = &FileRead::read(meta.get_target())?.read_string()?; let transaction = self.connection.transaction()?; transaction.batch_execute(sql)?; self.register_migration(&transaction, &meta)?; transaction.execute( "update warden.migration set deploy_ts = now() where id = 0", &[], )?; Ok(transaction.commit()?) } } impl Deref for Connection { type Target = postgres::Connection; fn deref(&self) -> &Self::Target { &self.connection } } pub fn open(url: &str) -> Result<Connection, Error> { let connection = postgres::Connection::connect(url.clone(), TlsMode::None)?; connection.execute( "select set_config('application_name', 'warden', false)", &[], )?; let catalog: String = connection .query("select current_database()", &[])? .get(0) .get_opt(0) .ok_or(failure::err_msg("Could not fetch current catalog"))??; let initialised: bool = connection .query( "select count(*) = 1 from information_schema.tables where table_catalog = $1 and table_schema = 'warden' and table_name = 'migration' ", &[&catalog], )? .get(0) .get_opt(0) .ok_or(failure::err_msg("Could not read information schema"))??; Ok(Connection { connection: connection, catalog: catalog, initialised: initialised, }) } fn is_initialised(connection: &postgres::Connection, catalog: &str) -> Result<bool, Error> { let result: bool = connection .query( "select count(*) = 1 from information_schema.tables where table_catalog = $1 and table_schema = 'warden' and table_name = 'migration' ", &[&catalog], )? .get(0) .get_opt(0) .ok_or(failure::err_msg("Could not read information schema"))??; Ok(result) }