bridge_common/
database.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
use anyhow::Result;
use sqlx::postgres::PgPoolOptions;
use sqlx::{Pool, Postgres};
use tracing::{debug, info};

use crate::repo::{messages, tasks};

const DEFAULT_POOL_SIZE: u32 = 5;

#[derive(Debug, thiserror::Error)]
pub enum Error {
    #[error("`DATABASE_URL` is not set")]
    DatabaseUrlNotSet,
    #[error(transparent)]
    Migrate(#[from] sqlx::migrate::MigrateError),
}

/// Create a new database pool.
///
/// # Errors
///
/// Will return an error if the database URL can't be read, or if the connection to the database
/// can't be established.
pub async fn new_pool() -> Result<Pool<Postgres>> {
    let pool_size = get_pool_size();
    let database_url = get_database_url()?;

    info!("Connecting to a database with a pool size of {}", pool_size);

    Ok(PgPoolOptions::new()
        .max_connections(pool_size)
        .connect(&database_url)
        .await?)
}

/// Prepare the database by running migrations and cleaning up after possible previous termination.
///
/// # Errors
///
/// Will return an error if the migrations can't be run or if there was a problem while cleaning up
/// after possible previous termination.
pub async fn prepare(pool: &Pool<Postgres>) -> Result<()> {
    debug!("Running migrations");
    sqlx::migrate!("db/migrations")
        .run(pool)
        .await
        .map_err(Error::Migrate)?;

    debug!("Cleaning up after possible previous termination");

    // TODO: continue writing the messages that were writing before the termination
    messages::transition_all(
        pool,
        crate::types::messages::Status::Writing,
        crate::types::messages::Status::Failed,
    )
    .await?;
    tasks::transition_all(
        pool,
        crate::types::tasks::Status::InProgress,
        crate::types::tasks::Status::ToDo,
    )
    .await?;

    Ok(())
}

fn get_pool_size() -> u32 {
    if let Ok(pool_size) = std::env::var("DATABASE_POOL_SIZE") {
        pool_size.parse().unwrap_or(DEFAULT_POOL_SIZE)
    } else {
        DEFAULT_POOL_SIZE
    }
}

fn get_database_url() -> Result<String> {
    Ok(std::env::var("DATABASE_URL").map_err(|_| Error::DatabaseUrlNotSet)?)
}