jax_daemon/database/
bucket_log_provider.rs1use async_trait::async_trait;
2use uuid::Uuid;
3
4use common::bucket_log::BucketLogProvider;
5use common::linked_data::Link;
6
7use crate::database::{types::DCid, Database};
8
9#[async_trait]
10impl BucketLogProvider for Database {
11 type Error = sqlx::Error;
12
13 async fn exists(
14 &self,
15 id: Uuid,
16 ) -> Result<bool, common::bucket_log::BucketLogError<Self::Error>> {
17 let id_str = id.to_string();
18 let result = sqlx::query!(
19 r#"
20 SELECT COUNT(*) as "count!: i64"
21 FROM bucket_log
22 WHERE bucket_id = $1
23 "#,
24 id_str
25 )
26 .fetch_one(&**self)
27 .await
28 .map_err(common::bucket_log::BucketLogError::Provider)?;
29
30 Ok(result.count > 0)
31 }
32
33 async fn heads(
34 &self,
35 id: Uuid,
36 height: u64,
37 ) -> Result<Vec<Link>, common::bucket_log::BucketLogError<Self::Error>> {
38 let height_i64 = height as i64;
39 let id_str = id.to_string();
40
41 let rows = sqlx::query!(
42 r#"
43 SELECT current_link as "current_link!: DCid"
44 FROM bucket_log
45 WHERE bucket_id = $1 AND height = $2
46 "#,
47 id_str,
48 height_i64
49 )
50 .fetch_all(&**self)
51 .await
52 .map_err(common::bucket_log::BucketLogError::Provider)?;
53
54 Ok(rows.into_iter().map(|r| r.current_link.into()).collect())
55 }
56
57 async fn append(
58 &self,
59 id: Uuid,
60 name: String,
61 current: Link,
62 previous: Option<Link>,
63 height: u64,
64 published: bool,
65 ) -> Result<(), common::bucket_log::BucketLogError<Self::Error>> {
66 let current_dcid: DCid = current.clone().into();
67 let previous_dcid: Option<DCid> = previous.clone().map(Into::into);
68 let height_i64 = height as i64;
69
70 if previous.is_none() && height != 0 {
72 return Err(common::bucket_log::BucketLogError::InvalidAppend(
73 current,
74 Link::default(),
75 height,
76 ));
77 }
78
79 if let Some(prev_link) = previous.clone() {
81 if height == 0 {
82 return Err(common::bucket_log::BucketLogError::InvalidAppend(
83 current, prev_link, height,
84 ));
85 }
86
87 let prev_dcid: DCid = prev_link.clone().into();
88 let prev_height = (height - 1) as i64;
89 let id_str = id.to_string();
90
91 let exists = sqlx::query!(
92 r#"
93 SELECT COUNT(*) as count
94 FROM bucket_log
95 WHERE bucket_id = $1 AND current_link = $2 AND height = $3
96 "#,
97 id_str,
98 prev_dcid,
99 prev_height
100 )
101 .fetch_one(&**self)
102 .await
103 .map_err(common::bucket_log::BucketLogError::Provider)?;
104
105 if exists.count == 0 {
106 return Err(common::bucket_log::BucketLogError::InvalidAppend(
107 current, prev_link, height,
108 ));
109 }
110 }
111
112 let id_str = id.to_string();
114 sqlx::query!(
115 r#"
116 INSERT INTO bucket_log (bucket_id, name, current_link, previous_link, height, published, created_at)
117 VALUES ($1, $2, $3, $4, $5, $6, CURRENT_TIMESTAMP)
118 "#,
119 id_str,
120 name,
121 current_dcid,
122 previous_dcid,
123 height_i64,
124 published
125 )
126 .execute(&**self)
127 .await
128 .map_err(|e| match e {
129 sqlx::Error::Database(ref db_error) => {
130 if db_error.constraint().is_some() {
131 common::bucket_log::BucketLogError::Conflict
132 } else {
133 common::bucket_log::BucketLogError::Provider(e)
134 }
135 }
136 _ => common::bucket_log::BucketLogError::Provider(e),
137 })?;
138
139 Ok(())
140 }
141
142 async fn height(
143 &self,
144 id: Uuid,
145 ) -> Result<u64, common::bucket_log::BucketLogError<Self::Error>> {
146 let id_str = id.to_string();
147 let result = sqlx::query!(
148 r#"
149 SELECT MAX(height) as "max_height: i64"
150 FROM bucket_log
151 WHERE bucket_id = $1
152 "#,
153 id_str
154 )
155 .fetch_one(&**self)
156 .await
157 .map_err(common::bucket_log::BucketLogError::Provider)?;
158
159 match result.max_height {
160 Some(h) => Ok(h as u64),
161 None => Err(common::bucket_log::BucketLogError::HeadNotFound(0)),
162 }
163 }
164
165 async fn has(
166 &self,
167 id: Uuid,
168 link: Link,
169 ) -> Result<Vec<u64>, common::bucket_log::BucketLogError<Self::Error>> {
170 let dcid: DCid = link.into();
171 let id_str = id.to_string();
172
173 let rows = sqlx::query!(
174 r#"
175 SELECT height
176 FROM bucket_log
177 WHERE bucket_id = $1 AND current_link = $2
178 "#,
179 id_str,
180 dcid
181 )
182 .fetch_all(&**self)
183 .await
184 .map_err(common::bucket_log::BucketLogError::Provider)?;
185
186 Ok(rows.into_iter().map(|r| r.height as u64).collect())
187 }
188
189 async fn list_buckets(
190 &self,
191 ) -> Result<Vec<Uuid>, common::bucket_log::BucketLogError<Self::Error>> {
192 let rows = sqlx::query!(
193 r#"
194 SELECT DISTINCT bucket_id
195 FROM bucket_log
196 ORDER BY bucket_id
197 "#
198 )
199 .fetch_all(&**self)
200 .await
201 .map_err(common::bucket_log::BucketLogError::Provider)?;
202
203 Ok(rows
204 .into_iter()
205 .map(|r| Uuid::parse_str(&r.bucket_id).expect("invalid bucket_id UUID in database"))
206 .collect())
207 }
208
209 async fn latest_published(
210 &self,
211 id: Uuid,
212 ) -> Result<Option<(Link, u64)>, common::bucket_log::BucketLogError<Self::Error>> {
213 let id_str = id.to_string();
214
215 let result = sqlx::query!(
216 r#"
217 SELECT current_link as "current_link!: DCid", height
218 FROM bucket_log
219 WHERE bucket_id = $1 AND published = TRUE
220 ORDER BY height DESC
221 LIMIT 1
222 "#,
223 id_str
224 )
225 .fetch_optional(&**self)
226 .await
227 .map_err(common::bucket_log::BucketLogError::Provider)?;
228
229 Ok(result.map(|r| (r.current_link.into(), r.height as u64)))
230 }
231}