Skip to main content

jax_daemon/database/
bucket_log_provider.rs

1use async_trait::async_trait;
2use uuid::Uuid;
3
4use common::bucket_log::BucketLogProvider;
5use common::linked_data::Link;
6
7use crate::database::{types::DCid, Database};
8
9#[async_trait]
10impl BucketLogProvider for Database {
11    type Error = sqlx::Error;
12
13    async fn exists(
14        &self,
15        id: Uuid,
16    ) -> Result<bool, common::bucket_log::BucketLogError<Self::Error>> {
17        let id_str = id.to_string();
18        let result = sqlx::query!(
19            r#"
20            SELECT COUNT(*) as "count!: i64"
21            FROM bucket_log
22            WHERE bucket_id = $1
23            "#,
24            id_str
25        )
26        .fetch_one(&**self)
27        .await
28        .map_err(common::bucket_log::BucketLogError::Provider)?;
29
30        Ok(result.count > 0)
31    }
32
33    async fn heads(
34        &self,
35        id: Uuid,
36        height: u64,
37    ) -> Result<Vec<Link>, common::bucket_log::BucketLogError<Self::Error>> {
38        let height_i64 = height as i64;
39        let id_str = id.to_string();
40
41        let rows = sqlx::query!(
42            r#"
43            SELECT current_link as "current_link!: DCid"
44            FROM bucket_log
45            WHERE bucket_id = $1 AND height = $2
46            "#,
47            id_str,
48            height_i64
49        )
50        .fetch_all(&**self)
51        .await
52        .map_err(common::bucket_log::BucketLogError::Provider)?;
53
54        Ok(rows.into_iter().map(|r| r.current_link.into()).collect())
55    }
56
57    async fn append(
58        &self,
59        id: Uuid,
60        name: String,
61        current: Link,
62        previous: Option<Link>,
63        height: u64,
64        published: bool,
65    ) -> Result<(), common::bucket_log::BucketLogError<Self::Error>> {
66        let current_dcid: DCid = current.clone().into();
67        let previous_dcid: Option<DCid> = previous.clone().map(Into::into);
68        let height_i64 = height as i64;
69
70        // Validate: For genesis (previous_link is None), height should be 0
71        if previous.is_none() && height != 0 {
72            return Err(common::bucket_log::BucketLogError::InvalidAppend(
73                current,
74                Link::default(),
75                height,
76            ));
77        }
78
79        // For non-genesis, validate that previous link exists at height - 1
80        if let Some(prev_link) = previous.clone() {
81            if height == 0 {
82                return Err(common::bucket_log::BucketLogError::InvalidAppend(
83                    current, prev_link, height,
84                ));
85            }
86
87            let prev_dcid: DCid = prev_link.clone().into();
88            let prev_height = (height - 1) as i64;
89            let id_str = id.to_string();
90
91            let exists = sqlx::query!(
92                r#"
93                SELECT COUNT(*) as count
94                FROM bucket_log
95                WHERE bucket_id = $1 AND current_link = $2 AND height = $3
96                "#,
97                id_str,
98                prev_dcid,
99                prev_height
100            )
101            .fetch_one(&**self)
102            .await
103            .map_err(common::bucket_log::BucketLogError::Provider)?;
104
105            if exists.count == 0 {
106                return Err(common::bucket_log::BucketLogError::InvalidAppend(
107                    current, prev_link, height,
108                ));
109            }
110        }
111
112        // Insert the log entry with name
113        let id_str = id.to_string();
114        sqlx::query!(
115            r#"
116            INSERT INTO bucket_log (bucket_id, name, current_link, previous_link, height, published, created_at)
117            VALUES ($1, $2, $3, $4, $5, $6, CURRENT_TIMESTAMP)
118            "#,
119            id_str,
120            name,
121            current_dcid,
122            previous_dcid,
123            height_i64,
124            published
125        )
126        .execute(&**self)
127        .await
128        .map_err(|e| match e {
129            sqlx::Error::Database(ref db_error) => {
130                if db_error.constraint().is_some() {
131                    common::bucket_log::BucketLogError::Conflict
132                } else {
133                    common::bucket_log::BucketLogError::Provider(e)
134                }
135            }
136            _ => common::bucket_log::BucketLogError::Provider(e),
137        })?;
138
139        Ok(())
140    }
141
142    async fn height(
143        &self,
144        id: Uuid,
145    ) -> Result<u64, common::bucket_log::BucketLogError<Self::Error>> {
146        let id_str = id.to_string();
147        let result = sqlx::query!(
148            r#"
149            SELECT MAX(height) as "max_height: i64"
150            FROM bucket_log
151            WHERE bucket_id = $1
152            "#,
153            id_str
154        )
155        .fetch_one(&**self)
156        .await
157        .map_err(common::bucket_log::BucketLogError::Provider)?;
158
159        match result.max_height {
160            Some(h) => Ok(h as u64),
161            None => Err(common::bucket_log::BucketLogError::HeadNotFound(0)),
162        }
163    }
164
165    async fn has(
166        &self,
167        id: Uuid,
168        link: Link,
169    ) -> Result<Vec<u64>, common::bucket_log::BucketLogError<Self::Error>> {
170        let dcid: DCid = link.into();
171        let id_str = id.to_string();
172
173        let rows = sqlx::query!(
174            r#"
175            SELECT height
176            FROM bucket_log
177            WHERE bucket_id = $1 AND current_link = $2
178            "#,
179            id_str,
180            dcid
181        )
182        .fetch_all(&**self)
183        .await
184        .map_err(common::bucket_log::BucketLogError::Provider)?;
185
186        Ok(rows.into_iter().map(|r| r.height as u64).collect())
187    }
188
189    async fn list_buckets(
190        &self,
191    ) -> Result<Vec<Uuid>, common::bucket_log::BucketLogError<Self::Error>> {
192        let rows = sqlx::query!(
193            r#"
194            SELECT DISTINCT bucket_id
195            FROM bucket_log
196            ORDER BY bucket_id
197            "#
198        )
199        .fetch_all(&**self)
200        .await
201        .map_err(common::bucket_log::BucketLogError::Provider)?;
202
203        Ok(rows
204            .into_iter()
205            .map(|r| Uuid::parse_str(&r.bucket_id).expect("invalid bucket_id UUID in database"))
206            .collect())
207    }
208
209    async fn latest_published(
210        &self,
211        id: Uuid,
212    ) -> Result<Option<(Link, u64)>, common::bucket_log::BucketLogError<Self::Error>> {
213        let id_str = id.to_string();
214
215        let result = sqlx::query!(
216            r#"
217            SELECT current_link as "current_link!: DCid", height
218            FROM bucket_log
219            WHERE bucket_id = $1 AND published = TRUE
220            ORDER BY height DESC
221            LIMIT 1
222            "#,
223            id_str
224        )
225        .fetch_optional(&**self)
226        .await
227        .map_err(common::bucket_log::BucketLogError::Provider)?;
228
229        Ok(result.map(|r| (r.current_link.into(), r.height as u64)))
230    }
231}