ydb_unofficial/sqlx/
migration.rs

1//! Migration implementation for Ydb
2//! 
3//! # Examples
4//! 
5//! ``` rust
6//! # #[tokio::main]
7//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
8//!     use ydb_unofficial::sqlx::prelude::*;
9//!     let token = std::env::var("DB_TOKEN").unwrap();
10//!     let db_url = std::env::var("YDB_URL").unwrap();
11//!     let db_name = std::env::var("DB_NAME").unwrap();
12//!     let conn_str = format!("{db_url}{db_name}?token={token}");
13//!     let options = YdbConnectOptions::from_str(&conn_str)?;
14//!     let mut conn = options.connect().await?;
15//!     let path = std::path::Path::new("test/migrations");
16//!     let migrator = Migrator::new(path).await?;
17//!     migrator.run_direct(&mut conn).await?;
18//! #   Ok(())
19//! # }
20//! ```
21use std::time::Duration;
22
23use futures::future::{BoxFuture, ok};
24use sqlx_core::encode::Encode;
25use sqlx_core::migrate::*;
26
27use super::prelude::{query, query_as, YdbError, Ydb, YdbConnection};
28
29impl From<YdbError> for MigrateError {
30    fn from(value: YdbError) -> Self {
31        let sqlx_error = sqlx_core::Error::from(value);
32        sqlx_error.into()
33    }
34}
35
36
37impl Migrate for YdbConnection {
38    fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> { Box::pin(async move {
39        query(r#"
40            create table _sqlx_migrations (
41                version Int64,
42                description Utf8,
43                checksum String,
44                installed_on Timestamp,
45                success Bool,
46                PRIMARY KEY (version)
47            );
48        "#).execute(self.scheme_executor()?).await?;
49        Ok(())
50    })}
51
52    fn dirty_version(&mut self) -> BoxFuture<'_, Result<Option<i64>, MigrateError>> { Box::pin(async move {
53        let id = query_as::<_, (i64,)>(
54            "select version from (select version, installed_on from _sqlx_migrations where success = false order by installed_on desc limit 1);"
55        )
56        .fetch_optional(self.executor()?).await?;
57        Ok(id.map(|(x,)|x))
58    })}
59
60    fn list_applied_migrations(&mut self) -> BoxFuture<'_, Result<Vec<AppliedMigration>, MigrateError>> { Box::pin(async move {
61        let migrations = query_as::<_,(i64,Vec<u8>)>("select version, checksum from _sqlx_migrations;").fetch_all(self.executor()?).await?;
62        Ok(migrations.into_iter().map(|(version, checksum)|AppliedMigration{ version, checksum: std::borrow::Cow::Owned(checksum) }).collect())
63    })}
64
65    fn lock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
66        Box::pin(ok(()))
67    }
68
69    fn unlock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
70        Box::pin(ok(()))
71    }
72
73    fn apply<'e: 'm, 'm>(&'e mut self, migration: &'m Migration) -> BoxFuture<'m, Result<Duration, MigrateError>> { Box::pin(async move {
74        let upsert_yql = r#"
75            declare $version as Int64;
76            declare $description as Utf8;
77            declare $checksum as String;
78            declare $success as Bool;
79            $installed_on = CurrentUtcTimestamp(0);
80            upsert into _sqlx_migrations    ( version,  description,  checksum,  installed_on,  success) 
81                                    values  ($version, $description, $checksum, $installed_on, $success);
82        "#;
83        query(upsert_yql).bind(migration).execute(self.executor()?).await?;
84        let now = std::time::Instant::now();
85        query(&migration.sql).execute(self.scheme_executor()?).await?;
86        let elapsed = now.elapsed();
87        query(upsert_yql).bind(migration)
88            .bind(("$success", true)).execute(self.executor()?).await?;
89        Ok(elapsed)
90    })}
91
92    fn revert<'e: 'm, 'm>(&'e mut self, _migration: &'m Migration) -> BoxFuture<'m, Result<Duration, MigrateError>> {
93        unimplemented!()
94    }
95}
96
97impl<'q> Encode<'q, Ydb> for Migration {
98    fn encode_by_ref(&self, buf: &mut <Ydb as sqlx_core::database::HasArguments<'q>>::ArgumentBuffer) -> sqlx_core::encode::IsNull {
99        let Migration { version, description, checksum, .. } = self;
100        let _ = ("$version", *version).encode(buf);
101        let _ = ("$description", description.to_string()).encode(buf);
102        let _ = ("$checksum", checksum.to_vec()).encode(buf);
103        let _ = ("$success", false).encode(buf);
104        sqlx_core::encode::IsNull::No
105    }
106}
107
108impl sqlx_core::types::Type<Ydb> for Migration {
109    fn type_info() -> <Ydb as sqlx_core::database::Database>::TypeInfo {
110        unimplemented!()
111    }
112}