ydb_unofficial/sqlx/
migration.rs1use std::time::Duration;
22
23use futures::future::{BoxFuture, ok};
24use sqlx_core::encode::Encode;
25use sqlx_core::migrate::*;
26
27use super::prelude::{query, query_as, YdbError, Ydb, YdbConnection};
28
29impl From<YdbError> for MigrateError {
30 fn from(value: YdbError) -> Self {
31 let sqlx_error = sqlx_core::Error::from(value);
32 sqlx_error.into()
33 }
34}
35
36
37impl Migrate for YdbConnection {
38 fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> { Box::pin(async move {
39 query(r#"
40 create table _sqlx_migrations (
41 version Int64,
42 description Utf8,
43 checksum String,
44 installed_on Timestamp,
45 success Bool,
46 PRIMARY KEY (version)
47 );
48 "#).execute(self.scheme_executor()?).await?;
49 Ok(())
50 })}
51
52 fn dirty_version(&mut self) -> BoxFuture<'_, Result<Option<i64>, MigrateError>> { Box::pin(async move {
53 let id = query_as::<_, (i64,)>(
54 "select version from (select version, installed_on from _sqlx_migrations where success = false order by installed_on desc limit 1);"
55 )
56 .fetch_optional(self.executor()?).await?;
57 Ok(id.map(|(x,)|x))
58 })}
59
60 fn list_applied_migrations(&mut self) -> BoxFuture<'_, Result<Vec<AppliedMigration>, MigrateError>> { Box::pin(async move {
61 let migrations = query_as::<_,(i64,Vec<u8>)>("select version, checksum from _sqlx_migrations;").fetch_all(self.executor()?).await?;
62 Ok(migrations.into_iter().map(|(version, checksum)|AppliedMigration{ version, checksum: std::borrow::Cow::Owned(checksum) }).collect())
63 })}
64
65 fn lock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
66 Box::pin(ok(()))
67 }
68
69 fn unlock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
70 Box::pin(ok(()))
71 }
72
73 fn apply<'e: 'm, 'm>(&'e mut self, migration: &'m Migration) -> BoxFuture<'m, Result<Duration, MigrateError>> { Box::pin(async move {
74 let upsert_yql = r#"
75 declare $version as Int64;
76 declare $description as Utf8;
77 declare $checksum as String;
78 declare $success as Bool;
79 $installed_on = CurrentUtcTimestamp(0);
80 upsert into _sqlx_migrations ( version, description, checksum, installed_on, success)
81 values ($version, $description, $checksum, $installed_on, $success);
82 "#;
83 query(upsert_yql).bind(migration).execute(self.executor()?).await?;
84 let now = std::time::Instant::now();
85 query(&migration.sql).execute(self.scheme_executor()?).await?;
86 let elapsed = now.elapsed();
87 query(upsert_yql).bind(migration)
88 .bind(("$success", true)).execute(self.executor()?).await?;
89 Ok(elapsed)
90 })}
91
92 fn revert<'e: 'm, 'm>(&'e mut self, _migration: &'m Migration) -> BoxFuture<'m, Result<Duration, MigrateError>> {
93 unimplemented!()
94 }
95}
96
97impl<'q> Encode<'q, Ydb> for Migration {
98 fn encode_by_ref(&self, buf: &mut <Ydb as sqlx_core::database::HasArguments<'q>>::ArgumentBuffer) -> sqlx_core::encode::IsNull {
99 let Migration { version, description, checksum, .. } = self;
100 let _ = ("$version", *version).encode(buf);
101 let _ = ("$description", description.to_string()).encode(buf);
102 let _ = ("$checksum", checksum.to_vec()).encode(buf);
103 let _ = ("$success", false).encode(buf);
104 sqlx_core::encode::IsNull::No
105 }
106}
107
108impl sqlx_core::types::Type<Ydb> for Migration {
109 fn type_info() -> <Ydb as sqlx_core::database::Database>::TypeInfo {
110 unimplemented!()
111 }
112}