edc_dataplane_proxy/db/sqlite/
edr.rs1use sqlx::SqlitePool;
2
3use crate::{db::edr::EdrRepo, model::edr::EdrEntry};
4
5#[derive(Clone)]
6pub struct SqliteEdrRepo {
7 pool: SqlitePool,
8}
9
10impl SqliteEdrRepo {
11 pub async fn connect(url: &str) -> anyhow::Result<Self> {
12 let pool = SqlitePool::connect(url).await?;
13 Ok(Self { pool })
14 }
15}
16
17#[async_trait::async_trait]
18impl EdrRepo for SqliteEdrRepo {
19 async fn save(&self, edr: EdrEntry) -> anyhow::Result<()> {
20 if self.fetch_by_id(&edr.transfer_id).await?.is_none() {
21 self.internal_save(edr).await?;
22 } else {
23 self.internal_update(edr).await?;
24 }
25 Ok(())
26 }
27
28 async fn fetch_by_id(&self, transfer_id: &str) -> anyhow::Result<Option<EdrEntry>> {
29 sqlx::query_as::<_, EdrEntry>(
30 r#"
31 SELECT * FROM tokens where transfer_id = $1
32 "#,
33 )
34 .bind(transfer_id)
35 .fetch_optional(&self.pool)
36 .await
37 .map(Ok)?
38 }
39
40 async fn delete(&self, transfer_id: &str) -> anyhow::Result<()> {
41 sqlx::query(
42 r#"
43 DELETE FROM tokens where transfer_id = $1
44 "#,
45 )
46 .bind(transfer_id)
47 .execute(&self.pool)
48 .await?;
49
50 Ok(())
51 }
52}
53
54impl SqliteEdrRepo {
55 async fn internal_save(&self, edr: EdrEntry) -> anyhow::Result<()> {
56 sqlx::query(
57 r#"
58 INSERT INTO tokens (transfer_id, token_id, refresh_token_id)
59 VALUES ($1, $2, $3)
60 "#,
61 )
62 .bind(edr.transfer_id)
63 .bind(edr.token_id)
64 .bind(edr.refresh_token_id)
65 .execute(&self.pool)
66 .await?;
67 Ok(())
68 }
69
70 async fn internal_update(&self, edr: EdrEntry) -> anyhow::Result<()> {
71 sqlx::query(
72 r#"
73 UPDATE tokens SET token_id=$1, refresh_token_id=$2
74 WHERE transfer_id = $3
75 "#,
76 )
77 .bind(edr.token_id)
78 .bind(edr.refresh_token_id)
79 .bind(edr.transfer_id)
80 .execute(&self.pool)
81 .await?;
82
83 Ok(())
84 }
85
86 pub async fn migrate(&self) -> anyhow::Result<()> {
87 sqlx::migrate!("./migrations/sqlite")
88 .run(&self.pool)
89 .await?;
90 Ok(())
91 }
92}