Skip to main content

rio_rs/cluster/storage/
postgres.rs

1//! MembershipStorage implementation to work with relational databases
2//!
3//! This uses [sqlx] under the hood
4
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use futures::TryFutureExt;
8use sqlx::postgres::{PgPoolOptions, PgRow};
9use sqlx::{self, PgPool, Row};
10
11use crate::sql_migration::SqlMigrations;
12
13use super::{Member, MembershipResult, MembershipStorage, MembershipUnitResult};
14
15pub struct PgMembershipStorageMigrations {}
16
17impl SqlMigrations for PgMembershipStorageMigrations {
18    fn queries() -> Vec<String> {
19        let migrations: Vec<_> = include_str!("./migrations/0001-postgres-init.sql")
20            .split(";")
21            .map(|x| x.to_string())
22            .collect();
23        migrations
24    }
25}
26
27/// MembershipStorage implementation to work with relational databases
28#[derive(Clone, Debug)]
29pub struct PostgresMembershipStorage {
30    pool: PgPool,
31}
32
33impl PostgresMembershipStorage {
34    /// Builds a [PostgresMembershipStorage ] from a [sqlx]'s [PgPool]
35    pub fn new(pool: PgPool) -> PostgresMembershipStorage {
36        PostgresMembershipStorage { pool }
37    }
38
39    /// Pool builder, so one doesn't need to include sqlx as a dependency
40    ///
41    /// # Example
42    ///
43    /// ```
44    /// # use rio_rs::cluster::storage::postgres::PostgresMembershipStorage;
45    /// # async fn test_fn() {
46    /// let pool = PostgresMembershipStorage::pool()
47    ///     .connect("sqlite::memory:")
48    ///     .await
49    ///     .expect("Connection failure");
50    /// let members_storage = PostgresMembershipStorage::new(pool);
51    /// # }
52    /// ```
53    pub fn pool() -> PgPoolOptions {
54        PgPoolOptions::new()
55    }
56}
57
58#[async_trait]
59impl MembershipStorage for PostgresMembershipStorage {
60    /// Run the schema/data migrations for this membership storage.
61    async fn prepare(&self) {
62        let mut transaction = self.pool.begin().await.unwrap();
63        let queries = PgMembershipStorageMigrations::queries();
64        for query in queries {
65            sqlx::query(&query)
66                .execute(&mut *transaction)
67                .await
68                .unwrap();
69        }
70        transaction.commit().await.unwrap();
71    }
72
73    async fn push(&self, member: Member) -> MembershipUnitResult {
74        let last_seen = Utc::now();
75        sqlx::query(
76            r#"
77            INSERT INTO
78                cluster_provider_members (ip, port, last_seen, active)
79            VALUES ($1, $2, $3, $4)
80            ON CONFLICT(ip, port) DO UPDATE SET last_seen=$3, active=$4
81            "#,
82        )
83        .bind(member.ip)
84        .bind(member.port)
85        .bind(last_seen)
86        .bind(member.active)
87        .execute(&self.pool)
88        .err_into()
89        .await
90        .map(|_| ())
91    }
92
93    async fn remove(&self, ip: &str, port: &str) -> MembershipUnitResult {
94        sqlx::query("DELETE FROM cluster_provider_members WHERE ip = $1 AND port = $2")
95            .bind(ip)
96            .bind(port)
97            .execute(&self.pool)
98            .err_into()
99            .await
100            .map(|_| ())
101    }
102
103    async fn set_is_active(&self, ip: &str, port: &str, is_active: bool) -> MembershipUnitResult {
104        let last_seen = Utc::now();
105        sqlx::query("UPDATE cluster_provider_members SET active = $3, last_seen = $4 WHERE ip = $1 and port = $2")
106            .bind(ip)
107            .bind(port)
108            .bind(is_active)
109            .bind(last_seen)
110            .execute(&self.pool)
111            .err_into()
112            .await
113            .map(|_| ())
114    }
115
116    async fn members(&self) -> MembershipResult<Vec<Member>> {
117        let items = sqlx::query(
118            "SELECT ip, port, active, last_seen FROM cluster_provider_members ORDER BY last_seen DESC",
119        )
120        .fetch_all(&self.pool)
121        .await?;
122
123        let items = items
124            .iter()
125            .map(|x| {
126                let mut new_item = Member::new(x.get("ip"), x.get("port"));
127                new_item.last_seen = x.get("last_seen");
128                new_item.set_active(x.get("active"));
129                new_item
130            })
131            .collect();
132        Ok(items)
133    }
134
135    async fn active_members(&self) -> MembershipResult<Vec<Member>> {
136        let items = sqlx::query("SELECT ip, port, active, last_seen FROM cluster_provider_members WHERE active ORDER BY last_seen DESC")
137            .map(|x: PgRow| {
138                let mut new_item = Member::new(x.get("ip"), x.get("port"));
139                new_item.last_seen = x.get("last_seen");
140                new_item.set_active(x.get("active"));
141                new_item
142            })
143            .fetch_all(&self.pool)
144            .await?;
145        Ok(items)
146    }
147
148    async fn notify_failure(&self, ip: &str, port: &str) -> MembershipUnitResult {
149        let query = r#"
150            INSERT INTO
151                cluster_provider_member_failures (ip, port)
152            VALUES ($1, $2)
153        "#;
154        sqlx::query(query)
155            .bind(ip)
156            .bind(port)
157            .execute(&self.pool)
158            .err_into()
159            .await
160            .map(|_| ())
161    }
162
163    /// TODO configure LIMIT
164    async fn member_failures(&self, ip: &str, port: &str) -> MembershipResult<Vec<DateTime<Utc>>> {
165        let query = r#"
166            SELECT time FROM
167                cluster_provider_member_failures
168            WHERE ip = $1 AND port = $2
169            ORDER BY time DESC LIMIT 100
170        "#;
171        sqlx::query(query)
172            .bind(ip)
173            .bind(port)
174            .map(|x: PgRow| x.get("time"))
175            .fetch_all(&self.pool)
176            .err_into()
177            .await
178    }
179}
180
181/// TODO create the tests based on the sqlite implementation
182#[cfg(test)]
183mod test {}