1use anyhow::Context;
2use crdb_core::{
3 normalizer_version, BinPtr, ClientSideDb, ClientStorageInfo, CrdbSyncFn, Db, EventId,
4 Importance, LoginInfo, Object, ObjectId, Query, QueryId, ResultExt, SavedObjectMeta,
5 SavedQuery, TypeId, Updatedness, Upload, UploadId,
6};
7use std::{
8 collections::{HashMap, HashSet},
9 sync::Arc,
10};
11
12#[cfg(test)]
15mod tests;
16
17pub use crdb_core::{Error, Result};
18
19pub struct SqliteDb {
20 db: sqlx::SqlitePool,
21}
22
23impl SqliteDb {
24 pub async fn connect_impl(db: sqlx::SqlitePool) -> anyhow::Result<SqliteDb> {
25 sqlx::migrate!("./migrations")
26 .run(&db)
27 .await
28 .context("running migrations on sqlite database")?;
29 Ok(SqliteDb { db })
30 }
31
32 pub async fn connect(url: &str) -> anyhow::Result<SqliteDb> {
33 Self::connect_impl(sqlx::SqlitePool::connect(url).await?).await
34 }
35}
36
37#[allow(unused_variables)] impl Db for SqliteDb {
39 async fn create<T: Object>(
40 &self,
41 object_id: ObjectId,
42 created_at: EventId,
43 object: Arc<T>,
44 updatedness: Option<Updatedness>,
45 importance: Importance,
46 ) -> crate::Result<Option<Arc<T>>> {
47 let mut t = self
48 .db
49 .begin()
50 .await
51 .wrap_context("acquiring sqlite transaction")?;
52
53 let type_id = *T::type_ulid();
55 let snapshot_version = T::snapshot_version();
56 let object_json = sqlx::types::Json(&object);
57 let affected = sqlx::query(
58 "INSERT INTO snapshots VALUES ($1, $2, $3, TRUE, TRUE, $4, $5, $6, $7, $8, $9)
59 ON CONFLICT DO NOTHING",
60 )
61 .bind(created_at)
62 .bind(type_id)
63 .bind(object_id)
64 .bind(normalizer_version())
65 .bind(snapshot_version)
66 .bind(object_json)
67 .bind(updatedness)
68 .bind(importance.bits())
69 .bind(Importance::NONE.bits())
70 .execute(&mut *t)
71 .await
72 .wrap_with_context(|| format!("inserting snapshot {created_at:?}"))?
73 .rows_affected();
74 if affected != 1 {
75 let affected = sqlx::query(
77 "
78 SELECT snapshot_id FROM snapshots
79 WHERE snapshot_id = $1
80 AND object_id = $2
81 AND is_creation = TRUE
82 AND snapshot_version = $3
83 AND snapshot = $4
84 ",
85 )
86 .bind(created_at)
87 .bind(object_id)
88 .bind(snapshot_version)
89 .bind(object_json)
90 .fetch_all(&mut *t)
91 .await
92 .wrap_with_context(|| {
93 format!("checking pre-existing snapshot for {created_at:?} is the same")
94 })?
95 .len();
96 if affected != 1 {
97 return Err(crate::Error::EventAlreadyExists(created_at));
98 }
99 return Ok(None);
100 }
101
102 let affected = sqlx::query("SELECT event_id FROM events WHERE event_id = $1")
104 .bind(created_at)
105 .fetch_all(&mut *t)
106 .await
107 .wrap_context("checking that no event existed with this id yet")?
108 .len();
109 if affected != 0 {
110 return Err(crate::Error::EventAlreadyExists(created_at));
111 }
112
113 for binary_id in object.required_binaries() {
114 sqlx::query("INSERT INTO snapshots_binaries VALUES ($1, $2)")
115 .bind(created_at)
116 .bind(binary_id)
117 .execute(&mut *t)
118 .await
119 .wrap_with_context(|| format!("marking {created_at:?} as using {binary_id:?}"))?;
120 }
121
122 t.commit()
123 .await
124 .wrap_with_context(|| format!("committing transaction that created {object_id:?}"))?;
125 Ok(Some(object))
126 }
127
128 async fn submit<T: Object>(
129 &self,
130 object: ObjectId,
131 event_id: EventId,
132 event: Arc<T::Event>,
133 updatedness: Option<Updatedness>,
134 additional_importance: Importance,
135 ) -> crate::Result<Option<Arc<T>>> {
136 unimplemented!() }
138
139 async fn get_latest<T: Object>(
140 &self,
141 object_id: ObjectId,
142 importance: Importance,
143 ) -> crate::Result<Arc<T>> {
144 unimplemented!() }
146
147 async fn create_binary(&self, binary_id: BinPtr, data: Arc<[u8]>) -> crate::Result<()> {
148 unimplemented!() }
150
151 async fn get_binary(&self, binary_id: BinPtr) -> crate::Result<Option<Arc<[u8]>>> {
152 unimplemented!() }
154
155 async fn reencode_old_versions<T: Object>(&self) -> usize {
157 unimplemented!() }
159
160 async fn assert_invariants_generic(&self) {
161 unimplemented!()
162 }
163
164 async fn assert_invariants_for<T: Object>(&self) {
165 unimplemented!()
166 }
167}
168
169#[allow(unused_variables)] impl ClientSideDb for SqliteDb {
171 async fn storage_info(&self) -> crate::Result<ClientStorageInfo> {
172 unimplemented!() }
174
175 async fn save_login(&self, _info: LoginInfo) -> crate::Result<()> {
176 unimplemented!() }
178
179 async fn get_saved_login(&self) -> crate::Result<Option<LoginInfo>> {
180 unimplemented!() }
182
183 async fn remove_everything(&self) -> crate::Result<()> {
184 unimplemented!() }
186
187 async fn get_json(
188 &self,
189 object_id: ObjectId,
190 importance: Importance,
191 ) -> crate::Result<serde_json::Value> {
192 unimplemented!() }
194
195 async fn recreate<T: Object>(
196 &self,
197 object_id: ObjectId,
198 new_created_at: EventId,
199 object: Arc<T>,
200 updatedness: Option<Updatedness>,
201 additional_importance: Importance,
202 ) -> crate::Result<Option<Arc<T>>> {
203 unimplemented!() }
205
206 async fn client_query(
207 &self,
208 _type_id: TypeId,
209 _query: Arc<Query>,
210 ) -> crate::Result<Vec<ObjectId>> {
211 unimplemented!() }
213
214 async fn remove(&self, _object_id: ObjectId) -> crate::Result<()> {
215 unimplemented!() }
217
218 async fn remove_event<T: Object>(
219 &self,
220 _object_id: ObjectId,
221 _event_id: EventId,
222 ) -> crate::Result<()> {
223 unimplemented!() }
225
226 async fn set_object_importance(
227 &self,
228 object_id: ObjectId,
229 new_importance: Importance,
230 ) -> crate::Result<()> {
231 unimplemented!() }
233
234 async fn set_importance_from_queries(
235 &self,
236 object_id: ObjectId,
237 new_importance_from_queries: Importance,
238 ) -> crate::Result<()> {
239 unimplemented!() }
241
242 async fn client_vacuum(
243 &self,
244 _notify_removals: impl 'static + CrdbSyncFn<ObjectId>,
245 _notify_query_removals: impl 'static + CrdbSyncFn<QueryId>,
246 ) -> crate::Result<()> {
247 unimplemented!() }
249
250 async fn list_uploads(&self) -> crate::Result<Vec<UploadId>> {
251 unimplemented!() }
253
254 async fn get_upload(&self, _upload_id: UploadId) -> crate::Result<Option<Upload>> {
255 unimplemented!() }
257
258 async fn enqueue_upload(
259 &self,
260 _upload: Upload,
261 _required_binaries: Vec<BinPtr>,
262 ) -> crate::Result<UploadId> {
263 unimplemented!() }
265
266 async fn upload_finished(&self, _upload_id: UploadId) -> crate::Result<()> {
267 unimplemented!() }
269
270 async fn get_saved_objects(&self) -> crate::Result<HashMap<ObjectId, SavedObjectMeta>> {
271 unimplemented!() }
273
274 async fn get_saved_queries(&self) -> crate::Result<HashMap<QueryId, SavedQuery>> {
275 unimplemented!() }
277
278 async fn record_query(
279 &self,
280 _query_id: QueryId,
281 _query: Arc<Query>,
282 _type_id: TypeId,
283 _importance: Importance,
284 ) -> crate::Result<()> {
285 unimplemented!() }
287
288 async fn set_query_importance(
289 &self,
290 query_id: QueryId,
291 importance: Importance,
292 objects_matching_query: Vec<ObjectId>,
293 ) -> crate::Result<()> {
294 unimplemented!() }
296
297 async fn forget_query(
298 &self,
299 _query_id: QueryId,
300 _objects_to_unlock: Vec<ObjectId>,
301 ) -> crate::Result<()> {
302 unimplemented!() }
304
305 async fn update_queries(
306 &self,
307 _queries: &HashSet<QueryId>,
308 _now_have_all_until: Updatedness,
309 ) -> crate::Result<()> {
310 unimplemented!() }
312}