rio_rs/object_placement/
postgres.rs1use async_trait::async_trait;
6use sqlx::postgres::PgPoolOptions;
7use sqlx::{self, PgPool, Row};
8
9use super::{ObjectPlacement, ObjectPlacementItem};
10use crate::sql_migration::SqlMigrations;
11use crate::ObjectId;
12
13pub struct PgObjectPlacementMigrations {}
14
15impl SqlMigrations for PgObjectPlacementMigrations {
16 fn queries() -> Vec<String> {
17 let migrations = include_str!("./migrations/0001-postgres-init.sql")
18 .split(";")
19 .map(|x| x.to_string())
20 .collect();
21 migrations
22 }
23}
24
25#[derive(Clone, Debug)]
26pub struct PostgresObjectPlacement {
27 pool: PgPool,
28}
29
30impl PostgresObjectPlacement {
31 pub fn new(pool: PgPool) -> Self {
32 PostgresObjectPlacement { pool }
33 }
34
35 pub fn pool() -> PgPoolOptions {
50 PgPoolOptions::new()
51 }
52}
53
54#[async_trait]
55impl ObjectPlacement for PostgresObjectPlacement {
56 async fn prepare(&self) {
61 let mut transaction = self.pool.begin().await.unwrap();
62 let queries = PgObjectPlacementMigrations::queries();
63 for query in queries {
64 sqlx::query(&query)
65 .execute(&mut *transaction)
66 .await
67 .unwrap();
68 }
69 transaction.commit().await.unwrap();
70 }
71
72 async fn update(&self, object_placement: ObjectPlacementItem) {
73 sqlx::query(
74 r#"
75 INSERT INTO
76 object_placement(struct_name, object_id, server_address)
77 VALUES ($1, $2, $3)
78 ON CONFLICT(struct_name, object_id) DO UPDATE SET server_address=$3"#,
79 )
80 .bind(&object_placement.object_id.0)
81 .bind(&object_placement.object_id.1)
82 .bind(&object_placement.server_address)
83 .execute(&self.pool)
84 .await
85 .unwrap();
86 }
87 async fn lookup(&self, object_id: &ObjectId) -> Option<String> {
88 let row = sqlx::query(
89 r#"
90 SELECT server_address
91 FROM object_placement
92 WHERE struct_name = $1 and object_id = $2
93 "#,
94 )
95 .bind(&object_id.0)
96 .bind(&object_id.1)
97 .fetch_one(&self.pool)
98 .await
99 .ok();
100 row.map(|row| row.get("server_address"))
101 }
102 async fn clean_server(&self, address: String) {
103 sqlx::query(
104 r#"
105 DELETE FROM object_placement
106 WHERE server_address = $1
107 "#,
108 )
109 .bind(&address)
110 .execute(&self.pool)
111 .await
112 .unwrap();
113 }
114
115 async fn remove(&self, object_id: &ObjectId) {
116 sqlx::query(
117 r#"
118 DELETE FROM object_placement
119 WHERE struct_name = $1 and object_id = $2
120 "#,
121 )
122 .bind(&object_id.0)
123 .bind(&object_id.1)
124 .execute(&self.pool)
125 .await
126 .unwrap();
127 }
128}
129
130#[cfg(test)]
132mod test {}