Skip to main content

rio_rs/object_placement/
postgres.rs

1//! SQL implementation of the trait [ObjectPlacement] to work with relational databases
2//!
3//! This uses [sqlx] under the hood
4
5use async_trait::async_trait;
6use sqlx::postgres::PgPoolOptions;
7use sqlx::{self, PgPool, Row};
8
9use super::{ObjectPlacement, ObjectPlacementItem};
10use crate::sql_migration::SqlMigrations;
11use crate::ObjectId;
12
13pub struct PgObjectPlacementMigrations {}
14
15impl SqlMigrations for PgObjectPlacementMigrations {
16    fn queries() -> Vec<String> {
17        let migrations = include_str!("./migrations/0001-postgres-init.sql")
18            .split(";")
19            .map(|x| x.to_string())
20            .collect();
21        migrations
22    }
23}
24
25#[derive(Clone, Debug)]
26pub struct PostgresObjectPlacement {
27    pool: PgPool,
28}
29
30impl PostgresObjectPlacement {
31    pub fn new(pool: PgPool) -> Self {
32        PostgresObjectPlacement { pool }
33    }
34
35    /// Pool builder, so one doesn't need to include sqlx as a dependency
36    ///
37    /// # Example
38    ///
39    /// ```
40    /// # use rio_rs::object_placement::postgres::PostgresObjectPlacement;
41    /// # async fn test_fn() {
42    /// let pool = PostgresObjectPlacement::pool()
43    ///     .connect("sqlite::memory:")
44    ///     .await
45    ///     .expect("Connection failure");
46    /// let object_placement = PostgresObjectPlacement::new(pool);
47    /// # }
48    /// ```
49    pub fn pool() -> PgPoolOptions {
50        PgPoolOptions::new()
51    }
52}
53
54#[async_trait]
55impl ObjectPlacement for PostgresObjectPlacement {
56    /// Run the schema/data migrations for this membership storage.
57    ///
58    /// For now, the Rio server doesn't run this at start-up and it needs
59    /// to be invoked on manually in the server's setup.
60    async fn prepare(&self) {
61        let mut transaction = self.pool.begin().await.unwrap();
62        let queries = PgObjectPlacementMigrations::queries();
63        for query in queries {
64            sqlx::query(&query)
65                .execute(&mut *transaction)
66                .await
67                .unwrap();
68        }
69        transaction.commit().await.unwrap();
70    }
71
72    async fn update(&self, object_placement: ObjectPlacementItem) {
73        sqlx::query(
74            r#"
75            INSERT INTO
76            object_placement(struct_name, object_id, server_address)
77            VALUES ($1, $2, $3)
78            ON CONFLICT(struct_name, object_id) DO UPDATE SET server_address=$3"#,
79        )
80        .bind(&object_placement.object_id.0)
81        .bind(&object_placement.object_id.1)
82        .bind(&object_placement.server_address)
83        .execute(&self.pool)
84        .await
85        .unwrap();
86    }
87    async fn lookup(&self, object_id: &ObjectId) -> Option<String> {
88        let row = sqlx::query(
89            r#"
90            SELECT server_address
91            FROM object_placement
92            WHERE struct_name = $1 and object_id = $2
93            "#,
94        )
95        .bind(&object_id.0)
96        .bind(&object_id.1)
97        .fetch_one(&self.pool)
98        .await
99        .ok();
100        row.map(|row| row.get("server_address"))
101    }
102    async fn clean_server(&self, address: String) {
103        sqlx::query(
104            r#"
105            DELETE FROM object_placement
106            WHERE server_address = $1
107            "#,
108        )
109        .bind(&address)
110        .execute(&self.pool)
111        .await
112        .unwrap();
113    }
114
115    async fn remove(&self, object_id: &ObjectId) {
116        sqlx::query(
117            r#"
118            DELETE FROM object_placement
119            WHERE struct_name = $1 and object_id = $2
120            "#,
121        )
122        .bind(&object_id.0)
123        .bind(&object_id.1)
124        .execute(&self.pool)
125        .await
126        .unwrap();
127    }
128}
129
130/// TODO - Add tests, using sqlite as reference
131#[cfg(test)]
132mod test {}