Skip to main content

matrix_bridge_telegram/db/
manager.rs

1use std::sync::Arc;
2
3#[cfg(any(feature = "postgres", feature = "mysql", feature = "sqlite"))]
4use diesel::RunQueryDsl;
5#[cfg(feature = "mysql")]
6use diesel::mysql::MysqlConnection;
7#[cfg(feature = "postgres")]
8use diesel::pg::PgConnection;
9#[cfg(any(feature = "postgres", feature = "mysql"))]
10use diesel::r2d2::{self, ConnectionManager};
11
12use crate::config::{DatabaseConfig as ConfigDatabaseConfig, DbType as ConfigDbType};
13use crate::db::stores::{
14    InMemoryMessageStore, InMemoryPortalStore, InMemoryReactionStore, InMemoryTelegramFileStore,
15    InMemoryUserStore,
16};
17use crate::db::{
18    DatabaseError, MessageStore, PortalStore, ReactionStore, TelegramFileStore, UserStore,
19};
20
21#[cfg(feature = "postgres")]
22pub type Pool = r2d2::Pool<ConnectionManager<PgConnection>>;
23#[cfg(feature = "mysql")]
24pub type MysqlPool = r2d2::Pool<ConnectionManager<MysqlConnection>>;
25
26#[cfg(feature = "sqlite")]
27use diesel::Connection;
28#[cfg(feature = "sqlite")]
29use diesel::sqlite::SqliteConnection;
30
31#[derive(Clone)]
32pub struct DatabaseManager {
33    #[cfg(feature = "postgres")]
34    postgres_pool: Option<Pool>,
35    #[cfg(feature = "mysql")]
36    mysql_pool: Option<MysqlPool>,
37    #[cfg(feature = "sqlite")]
38    sqlite_path: Option<String>,
39    user_store: Arc<dyn UserStore>,
40    portal_store: Arc<dyn PortalStore>,
41    message_store: Arc<dyn MessageStore>,
42    reaction_store: Arc<dyn ReactionStore>,
43    telegram_file_store: Arc<dyn TelegramFileStore>,
44    db_type: DbType,
45}
46
47#[derive(Clone, Copy, Debug, PartialEq, Eq)]
48pub enum DbType {
49    Postgres,
50    Sqlite,
51    Mysql,
52}
53
54impl From<ConfigDbType> for DbType {
55    fn from(value: ConfigDbType) -> Self {
56        match value {
57            ConfigDbType::Postgres => DbType::Postgres,
58            ConfigDbType::Sqlite => DbType::Sqlite,
59            ConfigDbType::Mysql => DbType::Mysql,
60        }
61    }
62}
63
64impl DatabaseManager {
65    pub async fn new(config: &ConfigDatabaseConfig) -> Result<Self, DatabaseError> {
66        let db_type = DbType::from(config.db_type());
67
68        match db_type {
69            #[cfg(feature = "postgres")]
70            DbType::Postgres => {
71                let connection_string = config.connection_string();
72                let max_connections = config.max_connections();
73                let min_connections = config.min_connections();
74
75                let manager = ConnectionManager::<PgConnection>::new(connection_string);
76
77                let builder = r2d2::Pool::builder()
78                    .max_size(max_connections.unwrap_or(10))
79                    .min_idle(Some(min_connections.unwrap_or(1)));
80
81                let pool = builder
82                    .build(manager)
83                    .map_err(|e| DatabaseError::Connection(e.to_string()))?;
84
85                let user_store = Arc::new(InMemoryUserStore::new());
86                let portal_store = Arc::new(InMemoryPortalStore::new());
87                let message_store = Arc::new(InMemoryMessageStore::new());
88                let reaction_store = Arc::new(InMemoryReactionStore::new());
89                let telegram_file_store = Arc::new(InMemoryTelegramFileStore::new());
90
91                Ok(Self {
92                    postgres_pool: Some(pool),
93                    #[cfg(feature = "mysql")]
94                    mysql_pool: None,
95                    #[cfg(feature = "sqlite")]
96                    sqlite_path: None,
97                    user_store,
98                    portal_store,
99                    message_store,
100                    reaction_store,
101                    telegram_file_store,
102                    db_type,
103                })
104            }
105            #[cfg(feature = "sqlite")]
106            DbType::Sqlite => {
107                let path = config.sqlite_path().unwrap();
108                let path_arc = Arc::new(path.clone());
109
110                let user_store = Arc::new(InMemoryUserStore::new());
111                let portal_store = Arc::new(InMemoryPortalStore::new());
112                let message_store = Arc::new(InMemoryMessageStore::new());
113                let reaction_store = Arc::new(InMemoryReactionStore::new());
114                let telegram_file_store = Arc::new(InMemoryTelegramFileStore::new());
115
116                Ok(Self {
117                    #[cfg(feature = "postgres")]
118                    postgres_pool: None,
119                    #[cfg(feature = "mysql")]
120                    mysql_pool: None,
121                    sqlite_path: Some(path),
122                    user_store,
123                    portal_store,
124                    message_store,
125                    reaction_store,
126                    telegram_file_store,
127                    db_type,
128                })
129            }
130            #[cfg(feature = "mysql")]
131            DbType::Mysql => {
132                let connection_string = config.connection_string();
133                let max_connections = config.max_connections();
134                let min_connections = config.min_connections();
135
136                let manager = ConnectionManager::<MysqlConnection>::new(connection_string);
137
138                let builder = r2d2::Pool::builder()
139                    .max_size(max_connections.unwrap_or(10))
140                    .min_idle(Some(min_connections.unwrap_or(1)));
141
142                let pool = builder
143                    .build(manager)
144                    .map_err(|e| DatabaseError::Connection(e.to_string()))?;
145
146                let user_store = Arc::new(InMemoryUserStore::new());
147                let portal_store = Arc::new(InMemoryPortalStore::new());
148                let message_store = Arc::new(InMemoryMessageStore::new());
149                let reaction_store = Arc::new(InMemoryReactionStore::new());
150                let telegram_file_store = Arc::new(InMemoryTelegramFileStore::new());
151
152                Ok(Self {
153                    #[cfg(feature = "postgres")]
154                    postgres_pool: None,
155                    mysql_pool: Some(pool),
156                    #[cfg(feature = "sqlite")]
157                    sqlite_path: None,
158                    user_store,
159                    portal_store,
160                    message_store,
161                    reaction_store,
162                    telegram_file_store,
163                    db_type,
164                })
165            }
166            #[cfg(not(feature = "postgres"))]
167            DbType::Postgres => {
168                return Err(DatabaseError::Connection(
169                    "PostgreSQL feature not enabled".to_string(),
170                ));
171            }
172            #[cfg(not(feature = "sqlite"))]
173            DbType::Sqlite => {
174                return Err(DatabaseError::Connection(
175                    "SQLite feature not enabled".to_string(),
176                ));
177            }
178            #[cfg(not(feature = "mysql"))]
179            DbType::Mysql => {
180                return Err(DatabaseError::Connection(
181                    "MySQL feature not enabled".to_string(),
182                ));
183            }
184        }
185    }
186
187    #[cfg(feature = "sqlite")]
188    pub fn new_in_memory() -> Result<Self, DatabaseError> {
189        use std::sync::Arc;
190
191        let user_store = Arc::new(InMemoryUserStore::new());
192        let portal_store = Arc::new(InMemoryPortalStore::new());
193        let message_store = Arc::new(InMemoryMessageStore::new());
194        let reaction_store = Arc::new(InMemoryReactionStore::new());
195        let telegram_file_store = Arc::new(InMemoryTelegramFileStore::new());
196
197        Ok(Self {
198            #[cfg(feature = "postgres")]
199            postgres_pool: None,
200            #[cfg(feature = "mysql")]
201            mysql_pool: None,
202            sqlite_path: Some(":memory:".to_string()),
203            user_store,
204            portal_store,
205            message_store,
206            reaction_store,
207            telegram_file_store,
208            db_type: DbType::Sqlite,
209        })
210    }
211
212    pub async fn migrate(&self) -> Result<(), DatabaseError> {
213        match self.db_type {
214            #[cfg(feature = "postgres")]
215            DbType::Postgres => {
216                let pool = self.postgres_pool.as_ref().unwrap();
217                return Self::migrate_postgres(pool).await;
218            }
219            #[cfg(feature = "sqlite")]
220            DbType::Sqlite => {
221                let path = self.sqlite_path.as_ref().unwrap();
222                return Self::migrate_sqlite(path).await;
223            }
224            #[cfg(feature = "mysql")]
225            DbType::Mysql => {
226                let pool = self.mysql_pool.as_ref().unwrap();
227                return Self::migrate_mysql(pool).await;
228            }
229            #[cfg(not(feature = "postgres"))]
230            DbType::Postgres => {
231                return Err(DatabaseError::Migration(
232                    "PostgreSQL feature not enabled".to_string(),
233                ));
234            }
235            #[cfg(not(feature = "sqlite"))]
236            DbType::Sqlite => {
237                return Err(DatabaseError::Migration(
238                    "SQLite feature not enabled".to_string(),
239                ));
240            }
241            #[cfg(not(feature = "mysql"))]
242            DbType::Mysql => {
243                return Err(DatabaseError::Migration(
244                    "MySQL feature not enabled".to_string(),
245                ));
246            }
247        }
248    }
249
250    #[cfg(feature = "postgres")]
251    async fn migrate_postgres(pool: &Pool) -> Result<(), DatabaseError> {
252        let pool = pool.clone();
253        tokio::task::spawn_blocking(move || {
254            let mut conn = pool
255                .get()
256                .map_err(|e| DatabaseError::Connection(e.to_string()))?;
257
258            let statements = [
259                r#"
260                CREATE TABLE IF NOT EXISTS user_mappings (
261                    id BIGSERIAL PRIMARY KEY,
262                    matrix_user_id TEXT NOT NULL UNIQUE,
263                    telegram_user_id BIGINT NOT NULL UNIQUE,
264                    telegram_username TEXT,
265                    telegram_first_name TEXT,
266                    telegram_last_name TEXT,
267                    telegram_phone TEXT,
268                    telegram_avatar TEXT,
269                    created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
270                    updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
271                )
272                "#,
273                r#"
274                CREATE TABLE IF NOT EXISTS portal (
275                    id BIGSERIAL PRIMARY KEY,
276                    matrix_room_id TEXT NOT NULL UNIQUE,
277                    telegram_chat_id BIGINT NOT NULL UNIQUE,
278                    telegram_chat_type TEXT NOT NULL,
279                    telegram_chat_title TEXT,
280                    telegram_chat_username TEXT,
281                    created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
282                    updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
283                )
284                "#,
285                r#"
286                CREATE TABLE IF NOT EXISTS message_mappings (
287                    id BIGSERIAL PRIMARY KEY,
288                    telegram_message_id BIGINT NOT NULL,
289                    telegram_chat_id BIGINT NOT NULL,
290                    matrix_room_id TEXT NOT NULL,
291                    matrix_event_id TEXT NOT NULL,
292                    created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
293                    updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
294                    UNIQUE(telegram_chat_id, telegram_message_id)
295                )
296                "#,
297                r#"
298                CREATE TABLE IF NOT EXISTS reaction_mappings (
299                    id BIGSERIAL PRIMARY KEY,
300                    telegram_message_id BIGINT NOT NULL,
301                    telegram_chat_id BIGINT NOT NULL,
302                    telegram_user_id BIGINT NOT NULL,
303                    reaction_emoji TEXT NOT NULL,
304                    matrix_event_id TEXT NOT NULL,
305                    matrix_room_id TEXT NOT NULL,
306                    created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
307                    UNIQUE(telegram_chat_id, telegram_message_id, telegram_user_id, reaction_emoji)
308                )
309                "#,
310                r#"
311                CREATE TABLE IF NOT EXISTS telegram_files (
312                    id BIGSERIAL PRIMARY KEY,
313                    telegram_file_id TEXT NOT NULL,
314                    telegram_file_unique_id TEXT NOT NULL UNIQUE,
315                    mxc_url TEXT NOT NULL,
316                    mime_type TEXT,
317                    file_name TEXT,
318                    file_size BIGINT,
319                    created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
320                )
321                "#,
322                r#"
323                CREATE TABLE IF NOT EXISTS processed_events (
324                    id BIGSERIAL PRIMARY KEY,
325                    event_id TEXT NOT NULL UNIQUE,
326                    event_type TEXT NOT NULL,
327                    source TEXT NOT NULL,
328                    processed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
329                )
330                "#,
331                "CREATE INDEX IF NOT EXISTS idx_user_mappings_matrix_id ON user_mappings(matrix_user_id)",
332                "CREATE INDEX IF NOT EXISTS idx_user_mappings_telegram_id ON user_mappings(telegram_user_id)",
333                "CREATE INDEX IF NOT EXISTS idx_portal_matrix_room ON portal(matrix_room_id)",
334                "CREATE INDEX IF NOT EXISTS idx_portal_telegram_chat ON portal(telegram_chat_id)",
335                "CREATE INDEX IF NOT EXISTS idx_message_mappings_telegram ON message_mappings(telegram_chat_id, telegram_message_id)",
336                "CREATE INDEX IF NOT EXISTS idx_message_mappings_matrix ON message_mappings(matrix_room_id, matrix_event_id)",
337                "CREATE INDEX IF NOT EXISTS idx_reaction_mappings_telegram ON reaction_mappings(telegram_chat_id, telegram_message_id)",
338                "CREATE INDEX IF NOT EXISTS idx_telegram_files_unique ON telegram_files(telegram_file_unique_id)",
339                "CREATE INDEX IF NOT EXISTS idx_processed_events_event_id ON processed_events(event_id)",
340            ];
341
342            for statement in statements {
343                diesel::sql_query(statement)
344                    .execute(&mut conn)
345                    .map_err(|e| DatabaseError::Migration(e.to_string()))?;
346            }
347
348            Ok(())
349        })
350        .await
351        .map_err(|e| DatabaseError::Migration(format!("migration task failed: {e}")))?
352    }
353
354    #[cfg(feature = "mysql")]
355    async fn migrate_mysql(pool: &MysqlPool) -> Result<(), DatabaseError> {
356        let pool = pool.clone();
357        tokio::task::spawn_blocking(move || {
358            let mut conn = pool
359                .get()
360                .map_err(|e| DatabaseError::Connection(e.to_string()))?;
361
362            let statements = [
363                r#"
364                CREATE TABLE IF NOT EXISTS user_mappings (
365                    id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
366                    matrix_user_id VARCHAR(255) NOT NULL UNIQUE,
367                    telegram_user_id BIGINT NOT NULL UNIQUE,
368                    telegram_username VARCHAR(255) NULL,
369                    telegram_first_name VARCHAR(255) NULL,
370                    telegram_last_name VARCHAR(255) NULL,
371                    telegram_phone VARCHAR(32) NULL,
372                    telegram_avatar TEXT NULL,
373                    created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
374                    updated_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6)
375                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
376                "#,
377                r#"
378                CREATE TABLE IF NOT EXISTS portal (
379                    id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
380                    matrix_room_id VARCHAR(255) NOT NULL UNIQUE,
381                    telegram_chat_id BIGINT NOT NULL UNIQUE,
382                    telegram_chat_type VARCHAR(32) NOT NULL,
383                    telegram_chat_title VARCHAR(255) NULL,
384                    telegram_chat_username VARCHAR(255) NULL,
385                    created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
386                    updated_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6)
387                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
388                "#,
389                r#"
390                CREATE TABLE IF NOT EXISTS message_mappings (
391                    id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
392                    telegram_message_id BIGINT NOT NULL,
393                    telegram_chat_id BIGINT NOT NULL,
394                    matrix_room_id VARCHAR(255) NOT NULL,
395                    matrix_event_id VARCHAR(255) NOT NULL,
396                    created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
397                    updated_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6),
398                    UNIQUE KEY uk_telegram_message (telegram_chat_id, telegram_message_id)
399                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
400                "#,
401                r#"
402                CREATE TABLE IF NOT EXISTS reaction_mappings (
403                    id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
404                    telegram_message_id BIGINT NOT NULL,
405                    telegram_chat_id BIGINT NOT NULL,
406                    telegram_user_id BIGINT NOT NULL,
407                    reaction_emoji VARCHAR(64) NOT NULL,
408                    matrix_event_id VARCHAR(255) NOT NULL,
409                    matrix_room_id VARCHAR(255) NOT NULL,
410                    created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
411                    UNIQUE KEY uk_telegram_reaction (telegram_chat_id, telegram_message_id, telegram_user_id, reaction_emoji)
412                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
413                "#,
414                r#"
415                CREATE TABLE IF NOT EXISTS telegram_files (
416                    id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
417                    telegram_file_id VARCHAR(255) NOT NULL,
418                    telegram_file_unique_id VARCHAR(255) NOT NULL UNIQUE,
419                    mxc_url VARCHAR(1024) NOT NULL,
420                    mime_type VARCHAR(128) NULL,
421                    file_name VARCHAR(255) NULL,
422                    file_size BIGINT NULL,
423                    created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6)
424                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
425                "#,
426            ];
427
428            for statement in statements {
429                diesel::sql_query(statement)
430                    .execute(&mut conn)
431                    .map_err(|e| DatabaseError::Migration(e.to_string()))?;
432            }
433
434            Ok(())
435        })
436        .await
437        .map_err(|e| DatabaseError::Migration(format!("migration task failed: {e}")))?
438    }
439
440    #[cfg(feature = "sqlite")]
441    async fn migrate_sqlite(path: &str) -> Result<(), DatabaseError> {
442        let path = path.to_string();
443        tokio::task::spawn_blocking(move || {
444            let conn_string = format!("sqlite://{}", path);
445            let mut conn = SqliteConnection::establish(&conn_string)
446                .map_err(|e| DatabaseError::Connection(e.to_string()))?;
447
448            let statements = [
449                r#"
450                CREATE TABLE IF NOT EXISTS user_mappings (
451                    id INTEGER PRIMARY KEY AUTOINCREMENT,
452                    matrix_user_id TEXT NOT NULL UNIQUE,
453                    telegram_user_id INTEGER NOT NULL UNIQUE,
454                    telegram_username TEXT,
455                    telegram_first_name TEXT,
456                    telegram_last_name TEXT,
457                    telegram_phone TEXT,
458                    telegram_avatar TEXT,
459                    created_at TEXT NOT NULL DEFAULT (datetime('now')),
460                    updated_at TEXT NOT NULL DEFAULT (datetime('now'))
461                )
462                "#,
463                r#"
464                CREATE TABLE IF NOT EXISTS portal (
465                    id INTEGER PRIMARY KEY AUTOINCREMENT,
466                    matrix_room_id TEXT NOT NULL UNIQUE,
467                    telegram_chat_id INTEGER NOT NULL UNIQUE,
468                    telegram_chat_type TEXT NOT NULL,
469                    telegram_chat_title TEXT,
470                    telegram_chat_username TEXT,
471                    created_at TEXT NOT NULL DEFAULT (datetime('now')),
472                    updated_at TEXT NOT NULL DEFAULT (datetime('now'))
473                )
474                "#,
475                r#"
476                CREATE TABLE IF NOT EXISTS message_mappings (
477                    id INTEGER PRIMARY KEY AUTOINCREMENT,
478                    telegram_message_id INTEGER NOT NULL,
479                    telegram_chat_id INTEGER NOT NULL,
480                    matrix_room_id TEXT NOT NULL,
481                    matrix_event_id TEXT NOT NULL,
482                    created_at TEXT NOT NULL DEFAULT (datetime('now')),
483                    updated_at TEXT NOT NULL DEFAULT (datetime('now')),
484                    UNIQUE(telegram_chat_id, telegram_message_id)
485                )
486                "#,
487                r#"
488                CREATE TABLE IF NOT EXISTS reaction_mappings (
489                    id INTEGER PRIMARY KEY AUTOINCREMENT,
490                    telegram_message_id INTEGER NOT NULL,
491                    telegram_chat_id INTEGER NOT NULL,
492                    telegram_user_id INTEGER NOT NULL,
493                    reaction_emoji TEXT NOT NULL,
494                    matrix_event_id TEXT NOT NULL,
495                    matrix_room_id TEXT NOT NULL,
496                    created_at TEXT NOT NULL DEFAULT (datetime('now')),
497                    UNIQUE(telegram_chat_id, telegram_message_id, telegram_user_id, reaction_emoji)
498                )
499                "#,
500                r#"
501                CREATE TABLE IF NOT EXISTS telegram_files (
502                    id INTEGER PRIMARY KEY AUTOINCREMENT,
503                    telegram_file_id TEXT NOT NULL,
504                    telegram_file_unique_id TEXT NOT NULL UNIQUE,
505                    mxc_url TEXT NOT NULL,
506                    mime_type TEXT,
507                    file_name TEXT,
508                    file_size INTEGER,
509                    created_at TEXT NOT NULL DEFAULT (datetime('now'))
510                )
511                "#,
512                r#"
513                CREATE TABLE IF NOT EXISTS processed_events (
514                    id INTEGER PRIMARY KEY AUTOINCREMENT,
515                    event_id TEXT NOT NULL UNIQUE,
516                    event_type TEXT NOT NULL,
517                    source TEXT NOT NULL,
518                    processed_at TEXT NOT NULL DEFAULT (datetime('now'))
519                )
520                "#,
521                "CREATE INDEX IF NOT EXISTS idx_user_mappings_matrix_id ON user_mappings(matrix_user_id)",
522                "CREATE INDEX IF NOT EXISTS idx_user_mappings_telegram_id ON user_mappings(telegram_user_id)",
523                "CREATE INDEX IF NOT EXISTS idx_portal_matrix_room ON portal(matrix_room_id)",
524                "CREATE INDEX IF NOT EXISTS idx_portal_telegram_chat ON portal(telegram_chat_id)",
525                "CREATE INDEX IF NOT EXISTS idx_message_mappings_telegram ON message_mappings(telegram_chat_id, telegram_message_id)",
526                "CREATE INDEX IF NOT EXISTS idx_message_mappings_matrix ON message_mappings(matrix_room_id, matrix_event_id)",
527                "CREATE INDEX IF NOT EXISTS idx_reaction_mappings_telegram ON reaction_mappings(telegram_chat_id, telegram_message_id)",
528                "CREATE INDEX IF NOT EXISTS idx_telegram_files_unique ON telegram_files(telegram_file_unique_id)",
529                "CREATE INDEX IF NOT EXISTS idx_processed_events_event_id ON processed_events(event_id)",
530            ];
531
532            for statement in statements {
533                diesel::sql_query(statement)
534                    .execute(&mut conn)
535                    .map_err(|e| DatabaseError::Migration(e.to_string()))?;
536            }
537
538            Ok(())
539        })
540        .await
541        .map_err(|e| DatabaseError::Migration(format!("migration task failed: {e}")))?
542    }
543
544    pub fn user_store(&self) -> Arc<dyn UserStore> {
545        self.user_store.clone()
546    }
547
548    pub fn portal_store(&self) -> Arc<dyn PortalStore> {
549        self.portal_store.clone()
550    }
551
552    pub fn message_store(&self) -> Arc<dyn MessageStore> {
553        self.message_store.clone()
554    }
555
556    pub fn reaction_store(&self) -> Arc<dyn ReactionStore> {
557        self.reaction_store.clone()
558    }
559
560    pub fn telegram_file_store(&self) -> Arc<dyn TelegramFileStore> {
561        self.telegram_file_store.clone()
562    }
563
564    #[cfg(feature = "postgres")]
565    pub fn pool(&self) -> Option<&Pool> {
566        self.postgres_pool.as_ref()
567    }
568
569    pub fn db_type(&self) -> DbType {
570        self.db_type
571    }
572}