1use std::{path::Path, sync::Arc};
2
3mod action;
4mod file;
5mod file_user_data;
6mod profile;
7mod push;
8mod reference;
9mod schema;
10mod setting;
11mod tag;
12mod task;
13mod tenant;
14mod utils;
15use async_trait::async_trait;
16use sqlx::{
17 sqlite::{self, SqlitePool},
18 Row,
19};
20use tokio::fs;
21
22use cloudillo_types::{meta_adapter::*, prelude::*, worker::WorkerPool};
23
24#[derive(Debug)]
25pub struct MetaAdapterSqlite {
26 db: SqlitePool,
27 dbr: SqlitePool,
28 #[allow(dead_code)]
29 worker: Arc<WorkerPool>,
30}
31
32impl MetaAdapterSqlite {
33 pub async fn new(worker: Arc<WorkerPool>, path: impl AsRef<Path>) -> ClResult<Self> {
34 let db_path = path.as_ref().join("meta.db");
35 fs::create_dir_all(&path).await.expect("Cannot create meta-adapter dir");
36 let opts = sqlite::SqliteConnectOptions::new()
37 .filename(&db_path)
38 .create_if_missing(true)
39 .journal_mode(sqlite::SqliteJournalMode::Wal);
40
41 let db = sqlite::SqlitePoolOptions::new()
42 .max_connections(1)
43 .connect_with(opts.clone())
44 .await
45 .inspect_err(|err| println!("DbError: {:#?}", err))
46 .or(Err(Error::DbError))?;
47 let dbr = sqlite::SqlitePoolOptions::new()
48 .max_connections(5)
49 .connect_with(opts.read_only(true))
50 .await
51 .inspect_err(|err| println!("DbError: {:#?}", err))
52 .or(Err(Error::DbError))?;
53
54 schema::init_db(&db)
55 .await
56 .inspect_err(|err| println!("DbError: {:#?}", err))
57 .or(Err(Error::DbError))?;
58
59 let res = sqlx::query("PRAGMA compile_options")
61 .fetch_all(&db)
62 .await
63 .inspect_err(|err| println!("DbError: {:#?}", err))
64 .or(Err(Error::DbError))?;
65 let max_attached = res
67 .iter()
68 .map(|row| row.get::<&str, _>(0))
69 .rfind(|s| s.starts_with("MAX_ATTACHED="))
70 .unwrap_or("")
71 .split("=")
72 .last();
73 println!("MAX_ATTACHED: {:?}", max_attached);
74 Ok(Self { worker, db, dbr })
77 }
78}
79
80#[async_trait]
81impl MetaAdapter for MetaAdapterSqlite {
82 async fn read_tenant(&self, tn_id: TnId) -> ClResult<Tenant<Box<str>>> {
85 tenant::read(&self.dbr, tn_id).await
86 }
87
88 async fn create_tenant(&self, tn_id: TnId, id_tag: &str) -> ClResult<TnId> {
89 tenant::create(&self.db, tn_id, id_tag).await
90 }
91
92 async fn update_tenant(&self, tn_id: TnId, tenant: &UpdateTenantData) -> ClResult<()> {
93 tenant::update(&self.db, tn_id, tenant).await
94 }
95 async fn delete_tenant(&self, tn_id: TnId) -> ClResult<()> {
96 tenant::delete(&self.db, tn_id).await
97 }
98
99 async fn list_tenants(&self, opts: &ListTenantsMetaOptions) -> ClResult<Vec<TenantListMeta>> {
100 tenant::list(&self.dbr, opts).await
101 }
102
103 async fn list_profiles(
104 &self,
105 tn_id: TnId,
106 opts: &ListProfileOptions,
107 ) -> ClResult<Vec<Profile<Box<str>>>> {
108 profile::list(&self.dbr, tn_id, opts).await
109 }
110
111 async fn get_relationships(
112 &self,
113 tn_id: TnId,
114 target_id_tags: &[&str],
115 ) -> ClResult<std::collections::HashMap<String, (bool, bool)>> {
116 profile::get_relationships(&self.dbr, tn_id, target_id_tags).await
117 }
118
119 async fn read_profile(
120 &self,
121 tn_id: TnId,
122 id_tag: &str,
123 ) -> ClResult<(Box<str>, Profile<Box<str>>)> {
124 profile::read(&self.dbr, tn_id, id_tag).await
125 }
126
127 async fn read_profile_roles(
128 &self,
129 tn_id: TnId,
130 id_tag: &str,
131 ) -> ClResult<Option<Box<[Box<str>]>>> {
132 profile::read_roles(&self.dbr, tn_id, id_tag).await
133 }
134
135 async fn create_profile(
136 &self,
137 tn_id: TnId,
138 profile: &Profile<&str>,
139 etag: &str,
140 ) -> ClResult<()> {
141 profile::create(&self.db, tn_id, profile, etag).await
142 }
143 async fn update_profile(
144 &self,
145 tn_id: TnId,
146 id_tag: &str,
147 profile: &UpdateProfileData,
148 ) -> ClResult<()> {
149 profile::update(&self.db, tn_id, id_tag, profile).await
150 }
151
152 async fn read_profile_public_key(
153 &self,
154 id_tag: &str,
155 key_id: &str,
156 ) -> ClResult<(Box<str>, Timestamp)> {
157 profile::read_public_key(&self.dbr, id_tag, key_id).await
158 }
159
160 async fn add_profile_public_key(
161 &self,
162 id_tag: &str,
163 key_id: &str,
164 public_key: &str,
165 ) -> ClResult<()> {
166 profile::add_public_key(&self.db, id_tag, key_id, public_key).await
167 }
168
169 async fn process_profile_refresh<'a>(
170 &self,
171 callback: Box<dyn Fn(TnId, &'a str, Option<&'a str>) -> ClResult<()> + Send>,
172 ) {
173 profile::process_refresh(&self.dbr, callback).await
174 }
175
176 async fn list_stale_profiles(
177 &self,
178 max_age_secs: i64,
179 limit: u32,
180 ) -> ClResult<Vec<(TnId, Box<str>, Option<Box<str>>)>> {
181 profile::list_stale_profiles(&self.dbr, max_age_secs, limit).await
182 }
183
184 async fn list_actions(
187 &self,
188 tn_id: TnId,
189 opts: &ListActionOptions,
190 ) -> ClResult<Vec<ActionView>> {
191 action::list(&self.dbr, tn_id, opts).await
192 }
193
194 async fn list_action_tokens(
195 &self,
196 tn_id: TnId,
197 opts: &ListActionOptions,
198 ) -> ClResult<Box<[Box<str>]>> {
199 action::list_tokens(&self.dbr, tn_id, opts).await
200 }
201
202 async fn get_action_id(&self, tn_id: TnId, a_id: u64) -> ClResult<Box<str>> {
203 action::get_id(&self.dbr, tn_id, a_id).await
204 }
205
206 async fn create_action(
207 &self,
208 tn_id: TnId,
209 action: &Action<&str>,
210 key: Option<&str>,
211 ) -> ClResult<ActionId<Box<str>>> {
212 action::create(&self.db, tn_id, action, key).await
213 }
214
215 async fn finalize_action(
216 &self,
217 tn_id: TnId,
218 a_id: u64,
219 action_id: &str,
220 options: FinalizeActionOptions<'_>,
221 ) -> ClResult<()> {
222 action::finalize(&self.db, tn_id, a_id, action_id, options).await
223 }
224
225 async fn create_inbound_action(
226 &self,
227 tn_id: TnId,
228 action_id: &str,
229 token: &str,
230 ack_token: Option<&str>,
231 ) -> ClResult<()> {
232 action::create_inbound(&self.db, tn_id, action_id, token, ack_token).await
233 }
234
235 async fn get_action_root_id(&self, tn_id: TnId, action_id: &str) -> ClResult<Box<str>> {
236 action::get_root_id(&self.dbr, tn_id, action_id).await
237 }
238
239 async fn get_action_data(&self, tn_id: TnId, action_id: &str) -> ClResult<Option<ActionData>> {
240 action::get_data(&self.dbr, tn_id, action_id).await
241 }
242
243 async fn get_action_by_key(
244 &self,
245 tn_id: TnId,
246 action_key: &str,
247 ) -> ClResult<Option<Action<Box<str>>>> {
248 action::get_by_key(&self.dbr, tn_id, action_key).await
249 }
250
251 async fn store_action_token(&self, tn_id: TnId, action_id: &str, token: &str) -> ClResult<()> {
252 action::store_token(&self.db, tn_id, action_id, token).await
253 }
254
255 async fn get_action_token(&self, tn_id: TnId, action_id: &str) -> ClResult<Option<Box<str>>> {
256 action::get_token(&self.dbr, tn_id, action_id).await
257 }
258
259 async fn update_action_data(
260 &self,
261 tn_id: TnId,
262 action_id: &str,
263 opts: &UpdateActionDataOptions,
264 ) -> ClResult<()> {
265 action::update_data(&self.db, tn_id, action_id, opts).await
266 }
267
268 async fn update_inbound_action(
269 &self,
270 tn_id: TnId,
271 action_id: &str,
272 status: Option<char>,
273 ) -> ClResult<()> {
274 action::update_inbound(&self.db, tn_id, action_id, status).await
275 }
276
277 async fn get_related_action_tokens(
278 &self,
279 tn_id: TnId,
280 aprv_action_id: &str,
281 ) -> ClResult<Vec<(Box<str>, Box<str>)>> {
282 action::get_related_tokens(&self.db, tn_id, aprv_action_id).await
283 }
284
285 async fn create_outbound_action(
286 &self,
287 tn_id: TnId,
288 action_id: &str,
289 token: &str,
290 opts: &CreateOutboundActionOptions,
291 ) -> ClResult<()> {
292 action::create_outbound(&self.db, tn_id, action_id, token, opts).await
293 }
294
295 async fn get_file_id(&self, tn_id: TnId, f_id: u64) -> ClResult<Box<str>> {
298 file::get_id(&self.dbr, tn_id, f_id).await
299 }
300
301 async fn list_files(&self, tn_id: TnId, opts: &ListFileOptions) -> ClResult<Vec<FileView>> {
302 file::list(&self.dbr, tn_id, opts).await
303 }
304
305 async fn list_file_variants(
306 &self,
307 tn_id: TnId,
308 file_id: FileId<&str>,
309 ) -> ClResult<Vec<FileVariant<Box<str>>>> {
310 file::list_variants(&self.dbr, tn_id, file_id).await
311 }
312
313 async fn list_available_variants(&self, tn_id: TnId, file_id: &str) -> ClResult<Vec<Box<str>>> {
314 file::list_available_variants(&self.dbr, tn_id, file_id).await
315 }
316
317 async fn read_file_variant(
318 &self,
319 tn_id: TnId,
320 variant_id: &str,
321 ) -> ClResult<FileVariant<Box<str>>> {
322 file::read_variant(&self.dbr, tn_id, variant_id).await
323 }
324
325 async fn read_file_id_by_variant(&self, tn_id: TnId, variant_id: &str) -> ClResult<Box<str>> {
326 file::read_file_id_by_variant(&self.dbr, tn_id, variant_id).await
327 }
328
329 async fn read_f_id_by_file_id(&self, tn_id: TnId, file_id: &str) -> ClResult<u64> {
330 file::read_f_id_by_file_id(&self.dbr, tn_id, file_id).await
331 }
332
333 async fn create_file(&self, tn_id: TnId, opts: CreateFile) -> ClResult<FileId<Box<str>>> {
334 file::create(&self.db, tn_id, opts).await
335 }
336
337 async fn create_file_variant<'a>(
338 &'a self,
339 tn_id: TnId,
340 f_id: u64,
341 opts: FileVariant<&'a str>,
342 ) -> ClResult<&'a str> {
343 file::create_variant(&self.db, tn_id, f_id, opts).await
344 }
345
346 async fn update_file_id(&self, tn_id: TnId, f_id: u64, file_id: &str) -> ClResult<()> {
347 file::update_id(&self.db, tn_id, f_id, file_id).await
348 }
349
350 async fn finalize_file(&self, tn_id: TnId, f_id: u64, file_id: &str) -> ClResult<()> {
351 file::finalize_file(&self.db, tn_id, f_id, file_id).await
352 }
353
354 async fn list_tasks(&self, opts: ListTaskOptions) -> ClResult<Vec<Task>> {
357 task::list(&self.dbr, &opts).await
358 }
359
360 async fn list_task_ids(&self, kind: &str, keys: &[Box<str>]) -> ClResult<Vec<u64>> {
361 task::list_ids(&self.dbr, kind, keys).await
362 }
363
364 async fn create_task(
365 &self,
366 kind: &'static str,
367 key: Option<&str>,
368 input: &str,
369 deps: &[u64],
370 ) -> ClResult<u64> {
371 task::create(&self.db, kind, key, input, deps).await
372 }
373
374 async fn update_task_finished(&self, task_id: u64, output: &str) -> ClResult<()> {
375 task::mark_finished(&self.db, task_id, output).await
376 }
377
378 async fn update_task_error(
379 &self,
380 task_id: u64,
381 output: &str,
382 next_at: Option<Timestamp>,
383 ) -> ClResult<()> {
384 task::mark_error(&self.db, task_id, output, next_at).await
385 }
386
387 async fn find_task_by_key(&self, key: &str) -> ClResult<Option<Task>> {
388 task::find_by_key(&self.dbr, key).await
389 }
390
391 async fn update_task(&self, task_id: u64, patch: &TaskPatch) -> ClResult<()> {
392 task::update(&self.db, task_id, patch).await
393 }
394
395 async fn get_profile_info(&self, tn_id: TnId, id_tag: &str) -> ClResult<ProfileData> {
397 profile::get_info(&self.dbr, tn_id, id_tag).await
398 }
399
400 async fn get_action(&self, tn_id: TnId, action_id: &str) -> ClResult<Option<ActionView>> {
404 action::get(&self.dbr, tn_id, action_id).await
405 }
406
407 async fn update_action(
408 &self,
409 tn_id: TnId,
410 action_id: &str,
411 content: Option<&str>,
412 attachments: Option<&[&str]>,
413 ) -> ClResult<()> {
414 action::update(&self.db, tn_id, action_id, content, attachments).await
415 }
416
417 async fn delete_action(&self, tn_id: TnId, action_id: &str) -> ClResult<()> {
418 action::delete(&self.db, tn_id, action_id).await
419 }
420
421 async fn add_reaction(
422 &self,
423 tn_id: TnId,
424 action_id: &str,
425 reactor_id_tag: &str,
426 reaction_type: &str,
427 content: Option<&str>,
428 ) -> ClResult<()> {
429 action::add_reaction(&self.db, tn_id, action_id, reactor_id_tag, reaction_type, content)
430 .await
431 }
432
433 async fn list_reactions(&self, tn_id: TnId, action_id: &str) -> ClResult<Vec<ReactionData>> {
434 action::list_reactions(&self.dbr, tn_id, action_id).await
435 }
436
437 async fn delete_file(&self, tn_id: TnId, file_id: &str) -> ClResult<()> {
441 file::delete(&self.db, tn_id, file_id).await
442 }
443
444 async fn list_settings(
448 &self,
449 tn_id: TnId,
450 prefix: Option<&[String]>,
451 ) -> ClResult<std::collections::HashMap<String, serde_json::Value>> {
452 setting::list(&self.dbr, tn_id, prefix).await
453 }
454
455 async fn read_setting(&self, tn_id: TnId, name: &str) -> ClResult<Option<serde_json::Value>> {
456 setting::read(&self.dbr, tn_id, name).await
457 }
458
459 async fn update_setting(
460 &self,
461 tn_id: TnId,
462 name: &str,
463 value: Option<serde_json::Value>,
464 ) -> ClResult<()> {
465 setting::update(&self.db, tn_id, name, value).await
466 }
467
468 async fn list_refs(&self, tn_id: TnId, opts: &ListRefsOptions) -> ClResult<Vec<RefData>> {
472 reference::list(&self.dbr, tn_id, opts).await
473 }
474
475 async fn get_ref(&self, tn_id: TnId, ref_id: &str) -> ClResult<Option<(Box<str>, Box<str>)>> {
476 reference::get(&self.dbr, tn_id, ref_id).await
477 }
478
479 async fn create_ref(
480 &self,
481 tn_id: TnId,
482 ref_id: &str,
483 opts: &CreateRefOptions,
484 ) -> ClResult<RefData> {
485 reference::create(&self.db, tn_id, ref_id, opts).await
486 }
487
488 async fn delete_ref(&self, tn_id: TnId, ref_id: &str) -> ClResult<()> {
489 reference::delete(&self.db, tn_id, ref_id).await
490 }
491
492 async fn use_ref(
493 &self,
494 ref_id: &str,
495 expected_types: &[&str],
496 ) -> ClResult<(TnId, Box<str>, RefData)> {
497 reference::use_ref(&self.db, ref_id, expected_types).await
498 }
499
500 async fn validate_ref(
501 &self,
502 ref_id: &str,
503 expected_types: &[&str],
504 ) -> ClResult<(TnId, Box<str>, RefData)> {
505 reference::validate_ref(&self.dbr, ref_id, expected_types).await
506 }
507
508 async fn list_tags(
512 &self,
513 tn_id: TnId,
514 prefix: Option<&str>,
515 with_counts: bool,
516 limit: Option<u32>,
517 ) -> ClResult<Vec<TagInfo>> {
518 tag::list(&self.dbr, tn_id, prefix, with_counts, limit).await
519 }
520
521 async fn add_tag(&self, tn_id: TnId, file_id: &str, tag: &str) -> ClResult<Vec<String>> {
522 tag::add(&self.db, tn_id, file_id, tag).await
523 }
524
525 async fn remove_tag(&self, tn_id: TnId, file_id: &str, tag: &str) -> ClResult<Vec<String>> {
526 tag::remove(&self.db, tn_id, file_id, tag).await
527 }
528
529 async fn update_file_data(
533 &self,
534 tn_id: TnId,
535 file_id: &str,
536 opts: &UpdateFileOptions,
537 ) -> ClResult<()> {
538 file::update_data(&self.db, tn_id, file_id, opts).await
539 }
540
541 async fn read_file(&self, tn_id: TnId, file_id: &str) -> ClResult<Option<FileView>> {
542 file::read(&self.dbr, tn_id, file_id).await
543 }
544
545 async fn record_file_access(&self, tn_id: TnId, id_tag: &str, file_id: &str) -> ClResult<()> {
549 file_user_data::record_access(&self.db, tn_id, id_tag, file_id).await
550 }
551
552 async fn record_file_modification(
553 &self,
554 tn_id: TnId,
555 id_tag: &str,
556 file_id: &str,
557 ) -> ClResult<()> {
558 file_user_data::record_modification(&self.db, tn_id, id_tag, file_id).await
559 }
560
561 async fn update_file_user_data(
562 &self,
563 tn_id: TnId,
564 id_tag: &str,
565 file_id: &str,
566 pinned: Option<bool>,
567 starred: Option<bool>,
568 ) -> ClResult<FileUserData> {
569 file_user_data::update(&self.db, tn_id, id_tag, file_id, pinned, starred).await
570 }
571
572 async fn get_file_user_data(
573 &self,
574 tn_id: TnId,
575 id_tag: &str,
576 file_id: &str,
577 ) -> ClResult<Option<FileUserData>> {
578 file_user_data::get(&self.dbr, tn_id, id_tag, file_id).await
579 }
580
581 async fn list_push_subscriptions(&self, tn_id: TnId) -> ClResult<Vec<PushSubscription>> {
585 push::list(&self.dbr, tn_id).await
586 }
587
588 async fn create_push_subscription(
589 &self,
590 tn_id: TnId,
591 subscription: &PushSubscriptionData,
592 ) -> ClResult<u64> {
593 push::create(&self.db, tn_id, subscription).await
594 }
595
596 async fn delete_push_subscription(&self, tn_id: TnId, subscription_id: u64) -> ClResult<()> {
597 push::delete(&self.db, tn_id, subscription_id).await
598 }
599}