crdb_sqlite/
lib.rs

1use anyhow::Context;
2use crdb_core::{
3    normalizer_version, BinPtr, ClientSideDb, ClientStorageInfo, CrdbSyncFn, Db, EventId,
4    Importance, LoginInfo, Object, ObjectId, Query, QueryId, ResultExt, SavedObjectMeta,
5    SavedQuery, TypeId, Updatedness, Upload, UploadId,
6};
7use std::{
8    collections::{HashMap, HashSet},
9    sync::Arc,
10};
11
12// TODO(sqlite-high): store Importance and importance_from_queries
13
14#[cfg(test)]
15mod tests;
16
17pub use crdb_core::{Error, Result};
18
19pub struct SqliteDb {
20    db: sqlx::SqlitePool,
21}
22
23impl SqliteDb {
24    pub async fn connect_impl(db: sqlx::SqlitePool) -> anyhow::Result<SqliteDb> {
25        sqlx::migrate!("./migrations")
26            .run(&db)
27            .await
28            .context("running migrations on sqlite database")?;
29        Ok(SqliteDb { db })
30    }
31
32    pub async fn connect(url: &str) -> anyhow::Result<SqliteDb> {
33        Self::connect_impl(sqlx::SqlitePool::connect(url).await?).await
34    }
35}
36
37#[allow(unused_variables)] // TODO(sqlite-high): remove
38impl Db for SqliteDb {
39    async fn create<T: Object>(
40        &self,
41        object_id: ObjectId,
42        created_at: EventId,
43        object: Arc<T>,
44        updatedness: Option<Updatedness>,
45        importance: Importance,
46    ) -> crate::Result<Option<Arc<T>>> {
47        let mut t = self
48            .db
49            .begin()
50            .await
51            .wrap_context("acquiring sqlite transaction")?;
52
53        // Object ID uniqueness is enforced by the `snapshot_creations` unique index
54        let type_id = *T::type_ulid();
55        let snapshot_version = T::snapshot_version();
56        let object_json = sqlx::types::Json(&object);
57        let affected = sqlx::query(
58            "INSERT INTO snapshots VALUES ($1, $2, $3, TRUE, TRUE, $4, $5, $6, $7, $8, $9)
59                         ON CONFLICT DO NOTHING",
60        )
61        .bind(created_at)
62        .bind(type_id)
63        .bind(object_id)
64        .bind(normalizer_version())
65        .bind(snapshot_version)
66        .bind(object_json)
67        .bind(updatedness)
68        .bind(importance.bits())
69        .bind(Importance::NONE.bits())
70        .execute(&mut *t)
71        .await
72        .wrap_with_context(|| format!("inserting snapshot {created_at:?}"))?
73        .rows_affected();
74        if affected != 1 {
75            // Check for equality with pre-existing
76            let affected = sqlx::query(
77                "
78                    SELECT snapshot_id FROM snapshots
79                    WHERE snapshot_id = $1
80                    AND object_id = $2
81                    AND is_creation = TRUE
82                    AND snapshot_version = $3
83                    AND snapshot = $4
84                ",
85            )
86            .bind(created_at)
87            .bind(object_id)
88            .bind(snapshot_version)
89            .bind(object_json)
90            .fetch_all(&mut *t)
91            .await
92            .wrap_with_context(|| {
93                format!("checking pre-existing snapshot for {created_at:?} is the same")
94            })?
95            .len();
96            if affected != 1 {
97                return Err(crate::Error::EventAlreadyExists(created_at));
98            }
99            return Ok(None);
100        }
101
102        // We just inserted. Check that no event existed at this id
103        let affected = sqlx::query("SELECT event_id FROM events WHERE event_id = $1")
104            .bind(created_at)
105            .fetch_all(&mut *t)
106            .await
107            .wrap_context("checking that no event existed with this id yet")?
108            .len();
109        if affected != 0 {
110            return Err(crate::Error::EventAlreadyExists(created_at));
111        }
112
113        for binary_id in object.required_binaries() {
114            sqlx::query("INSERT INTO snapshots_binaries VALUES ($1, $2)")
115                .bind(created_at)
116                .bind(binary_id)
117                .execute(&mut *t)
118                .await
119                .wrap_with_context(|| format!("marking {created_at:?} as using {binary_id:?}"))?;
120        }
121
122        t.commit()
123            .await
124            .wrap_with_context(|| format!("committing transaction that created {object_id:?}"))?;
125        Ok(Some(object))
126    }
127
128    async fn submit<T: Object>(
129        &self,
130        object: ObjectId,
131        event_id: EventId,
132        event: Arc<T::Event>,
133        updatedness: Option<Updatedness>,
134        additional_importance: Importance,
135    ) -> crate::Result<Option<Arc<T>>> {
136        unimplemented!() // TODO(sqlite-high): implement
137    }
138
139    async fn get_latest<T: Object>(
140        &self,
141        object_id: ObjectId,
142        importance: Importance,
143    ) -> crate::Result<Arc<T>> {
144        unimplemented!() // TODO(sqlite-high): implement
145    }
146
147    async fn create_binary(&self, binary_id: BinPtr, data: Arc<[u8]>) -> crate::Result<()> {
148        unimplemented!() // TODO(sqlite-high): implement
149    }
150
151    async fn get_binary(&self, binary_id: BinPtr) -> crate::Result<Option<Arc<[u8]>>> {
152        unimplemented!() // TODO(sqlite-high): implement
153    }
154
155    /// Returns the number of errors that happened while re-encoding
156    async fn reencode_old_versions<T: Object>(&self) -> usize {
157        unimplemented!() // TODO(sqlite-high)
158    }
159
160    async fn assert_invariants_generic(&self) {
161        unimplemented!()
162    }
163
164    async fn assert_invariants_for<T: Object>(&self) {
165        unimplemented!()
166    }
167}
168
169#[allow(unused_variables)] // TODO(sqlite-high)
170impl ClientSideDb for SqliteDb {
171    async fn storage_info(&self) -> crate::Result<ClientStorageInfo> {
172        unimplemented!() // TODO(sqlite-high)
173    }
174
175    async fn save_login(&self, _info: LoginInfo) -> crate::Result<()> {
176        unimplemented!() // TODO(sqlite-high)
177    }
178
179    async fn get_saved_login(&self) -> crate::Result<Option<LoginInfo>> {
180        unimplemented!() // TODO(sqlite-high)
181    }
182
183    async fn remove_everything(&self) -> crate::Result<()> {
184        unimplemented!() // TODO(sqlite-high)
185    }
186
187    async fn get_json(
188        &self,
189        object_id: ObjectId,
190        importance: Importance,
191    ) -> crate::Result<serde_json::Value> {
192        unimplemented!() // TODO(sqlite-high)
193    }
194
195    async fn recreate<T: Object>(
196        &self,
197        object_id: ObjectId,
198        new_created_at: EventId,
199        object: Arc<T>,
200        updatedness: Option<Updatedness>,
201        additional_importance: Importance,
202    ) -> crate::Result<Option<Arc<T>>> {
203        unimplemented!() // TODO(sqlite-high): implement
204    }
205
206    async fn client_query(
207        &self,
208        _type_id: TypeId,
209        _query: Arc<Query>,
210    ) -> crate::Result<Vec<ObjectId>> {
211        unimplemented!() // TODO(sqlite-high): implement
212    }
213
214    async fn remove(&self, _object_id: ObjectId) -> crate::Result<()> {
215        unimplemented!() // TODO(sqlite-high): implement
216    }
217
218    async fn remove_event<T: Object>(
219        &self,
220        _object_id: ObjectId,
221        _event_id: EventId,
222    ) -> crate::Result<()> {
223        unimplemented!() // TODO(sqlite-high)
224    }
225
226    async fn set_object_importance(
227        &self,
228        object_id: ObjectId,
229        new_importance: Importance,
230    ) -> crate::Result<()> {
231        unimplemented!() // TODO(sqlite-high)
232    }
233
234    async fn set_importance_from_queries(
235        &self,
236        object_id: ObjectId,
237        new_importance_from_queries: Importance,
238    ) -> crate::Result<()> {
239        unimplemented!() // TODO(sqlite-high)
240    }
241
242    async fn client_vacuum(
243        &self,
244        _notify_removals: impl 'static + CrdbSyncFn<ObjectId>,
245        _notify_query_removals: impl 'static + CrdbSyncFn<QueryId>,
246    ) -> crate::Result<()> {
247        unimplemented!() // TODO(sqlite-high)
248    }
249
250    async fn list_uploads(&self) -> crate::Result<Vec<UploadId>> {
251        unimplemented!() // TODO(sqlite-high)
252    }
253
254    async fn get_upload(&self, _upload_id: UploadId) -> crate::Result<Option<Upload>> {
255        unimplemented!() // TODO(sqlite-high)
256    }
257
258    async fn enqueue_upload(
259        &self,
260        _upload: Upload,
261        _required_binaries: Vec<BinPtr>,
262    ) -> crate::Result<UploadId> {
263        unimplemented!() // TODO(sqlite-high)
264    }
265
266    async fn upload_finished(&self, _upload_id: UploadId) -> crate::Result<()> {
267        unimplemented!() // TODO(sqlite-high)
268    }
269
270    async fn get_saved_objects(&self) -> crate::Result<HashMap<ObjectId, SavedObjectMeta>> {
271        unimplemented!() // TODO(sqlite-high)
272    }
273
274    async fn get_saved_queries(&self) -> crate::Result<HashMap<QueryId, SavedQuery>> {
275        unimplemented!() // TODO(sqlite-high)
276    }
277
278    async fn record_query(
279        &self,
280        _query_id: QueryId,
281        _query: Arc<Query>,
282        _type_id: TypeId,
283        _importance: Importance,
284    ) -> crate::Result<()> {
285        unimplemented!() // TODO(sqlite-high)
286    }
287
288    async fn set_query_importance(
289        &self,
290        query_id: QueryId,
291        importance: Importance,
292        objects_matching_query: Vec<ObjectId>,
293    ) -> crate::Result<()> {
294        unimplemented!() // TODO(sqlite-high)
295    }
296
297    async fn forget_query(
298        &self,
299        _query_id: QueryId,
300        _objects_to_unlock: Vec<ObjectId>,
301    ) -> crate::Result<()> {
302        unimplemented!() // TODO(sqlite-high)
303    }
304
305    async fn update_queries(
306        &self,
307        _queries: &HashSet<QueryId>,
308        _now_have_all_until: Updatedness,
309    ) -> crate::Result<()> {
310        unimplemented!() // TODO(sqlite-high)
311    }
312}