rio_rs/cluster/storage/
postgres.rs1use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use futures::TryFutureExt;
8use sqlx::postgres::{PgPoolOptions, PgRow};
9use sqlx::{self, PgPool, Row};
10
11use crate::sql_migration::SqlMigrations;
12
13use super::{Member, MembershipResult, MembershipStorage, MembershipUnitResult};
14
15pub struct PgMembershipStorageMigrations {}
16
17impl SqlMigrations for PgMembershipStorageMigrations {
18 fn queries() -> Vec<String> {
19 let migrations: Vec<_> = include_str!("./migrations/0001-postgres-init.sql")
20 .split(";")
21 .map(|x| x.to_string())
22 .collect();
23 migrations
24 }
25}
26
27#[derive(Clone, Debug)]
29pub struct PostgresMembershipStorage {
30 pool: PgPool,
31}
32
33impl PostgresMembershipStorage {
34 pub fn new(pool: PgPool) -> PostgresMembershipStorage {
36 PostgresMembershipStorage { pool }
37 }
38
39 pub fn pool() -> PgPoolOptions {
54 PgPoolOptions::new()
55 }
56}
57
58#[async_trait]
59impl MembershipStorage for PostgresMembershipStorage {
60 async fn prepare(&self) {
62 let mut transaction = self.pool.begin().await.unwrap();
63 let queries = PgMembershipStorageMigrations::queries();
64 for query in queries {
65 sqlx::query(&query)
66 .execute(&mut *transaction)
67 .await
68 .unwrap();
69 }
70 transaction.commit().await.unwrap();
71 }
72
73 async fn push(&self, member: Member) -> MembershipUnitResult {
74 let last_seen = Utc::now();
75 sqlx::query(
76 r#"
77 INSERT INTO
78 cluster_provider_members (ip, port, last_seen, active)
79 VALUES ($1, $2, $3, $4)
80 ON CONFLICT(ip, port) DO UPDATE SET last_seen=$3, active=$4
81 "#,
82 )
83 .bind(member.ip)
84 .bind(member.port)
85 .bind(last_seen)
86 .bind(member.active)
87 .execute(&self.pool)
88 .err_into()
89 .await
90 .map(|_| ())
91 }
92
93 async fn remove(&self, ip: &str, port: &str) -> MembershipUnitResult {
94 sqlx::query("DELETE FROM cluster_provider_members WHERE ip = $1 AND port = $2")
95 .bind(ip)
96 .bind(port)
97 .execute(&self.pool)
98 .err_into()
99 .await
100 .map(|_| ())
101 }
102
103 async fn set_is_active(&self, ip: &str, port: &str, is_active: bool) -> MembershipUnitResult {
104 let last_seen = Utc::now();
105 sqlx::query("UPDATE cluster_provider_members SET active = $3, last_seen = $4 WHERE ip = $1 and port = $2")
106 .bind(ip)
107 .bind(port)
108 .bind(is_active)
109 .bind(last_seen)
110 .execute(&self.pool)
111 .err_into()
112 .await
113 .map(|_| ())
114 }
115
116 async fn members(&self) -> MembershipResult<Vec<Member>> {
117 let items = sqlx::query(
118 "SELECT ip, port, active, last_seen FROM cluster_provider_members ORDER BY last_seen DESC",
119 )
120 .fetch_all(&self.pool)
121 .await?;
122
123 let items = items
124 .iter()
125 .map(|x| {
126 let mut new_item = Member::new(x.get("ip"), x.get("port"));
127 new_item.last_seen = x.get("last_seen");
128 new_item.set_active(x.get("active"));
129 new_item
130 })
131 .collect();
132 Ok(items)
133 }
134
135 async fn active_members(&self) -> MembershipResult<Vec<Member>> {
136 let items = sqlx::query("SELECT ip, port, active, last_seen FROM cluster_provider_members WHERE active ORDER BY last_seen DESC")
137 .map(|x: PgRow| {
138 let mut new_item = Member::new(x.get("ip"), x.get("port"));
139 new_item.last_seen = x.get("last_seen");
140 new_item.set_active(x.get("active"));
141 new_item
142 })
143 .fetch_all(&self.pool)
144 .await?;
145 Ok(items)
146 }
147
148 async fn notify_failure(&self, ip: &str, port: &str) -> MembershipUnitResult {
149 let query = r#"
150 INSERT INTO
151 cluster_provider_member_failures (ip, port)
152 VALUES ($1, $2)
153 "#;
154 sqlx::query(query)
155 .bind(ip)
156 .bind(port)
157 .execute(&self.pool)
158 .err_into()
159 .await
160 .map(|_| ())
161 }
162
163 async fn member_failures(&self, ip: &str, port: &str) -> MembershipResult<Vec<DateTime<Utc>>> {
165 let query = r#"
166 SELECT time FROM
167 cluster_provider_member_failures
168 WHERE ip = $1 AND port = $2
169 ORDER BY time DESC LIMIT 100
170 "#;
171 sqlx::query(query)
172 .bind(ip)
173 .bind(port)
174 .map(|x: PgRow| x.get("time"))
175 .fetch_all(&self.pool)
176 .err_into()
177 .await
178 }
179}
180
181#[cfg(test)]
183mod test {}