Skip to main content

cloudillo_meta_adapter_sqlite/
lib.rs

1use std::{path::Path, sync::Arc};
2
3mod action;
4mod file;
5mod file_user_data;
6mod profile;
7mod push;
8mod reference;
9mod schema;
10mod setting;
11mod tag;
12mod task;
13mod tenant;
14mod utils;
15use async_trait::async_trait;
16use sqlx::{
17	sqlite::{self, SqlitePool},
18	Row,
19};
20use tokio::fs;
21
22use cloudillo_types::{meta_adapter::*, prelude::*, worker::WorkerPool};
23
24#[derive(Debug)]
25pub struct MetaAdapterSqlite {
26	db: SqlitePool,
27	dbr: SqlitePool,
28	#[allow(dead_code)]
29	worker: Arc<WorkerPool>,
30}
31
32impl MetaAdapterSqlite {
33	pub async fn new(worker: Arc<WorkerPool>, path: impl AsRef<Path>) -> ClResult<Self> {
34		let db_path = path.as_ref().join("meta.db");
35		fs::create_dir_all(&path).await.expect("Cannot create meta-adapter dir");
36		let opts = sqlite::SqliteConnectOptions::new()
37			.filename(&db_path)
38			.create_if_missing(true)
39			.journal_mode(sqlite::SqliteJournalMode::Wal);
40
41		let db = sqlite::SqlitePoolOptions::new()
42			.max_connections(1)
43			.connect_with(opts.clone())
44			.await
45			.inspect_err(|err| println!("DbError: {:#?}", err))
46			.or(Err(Error::DbError))?;
47		let dbr = sqlite::SqlitePoolOptions::new()
48			.max_connections(5)
49			.connect_with(opts.read_only(true))
50			.await
51			.inspect_err(|err| println!("DbError: {:#?}", err))
52			.or(Err(Error::DbError))?;
53
54		schema::init_db(&db)
55			.await
56			.inspect_err(|err| println!("DbError: {:#?}", err))
57			.or(Err(Error::DbError))?;
58
59		// Debug PRAGMA compiler_options
60		let res = sqlx::query("PRAGMA compile_options")
61			.fetch_all(&db)
62			.await
63			.inspect_err(|err| println!("DbError: {:#?}", err))
64			.or(Err(Error::DbError))?;
65		//let max_attached = res.iter().map(|row| row.get::<&str, _>(0)).filter(|s| s.starts_with("MAX_ATTACHED=")).collect::<Vec<_>>().iter().split("=").last()?;
66		let max_attached = res
67			.iter()
68			.map(|row| row.get::<&str, _>(0))
69			.rfind(|s| s.starts_with("MAX_ATTACHED="))
70			.unwrap_or("")
71			.split("=")
72			.last();
73		println!("MAX_ATTACHED: {:?}", max_attached);
74		//println!("PRAGMA compile_options: {:#?}", res.iter().map(|row| row.get::<&str, _>(0)).filter(|s| s.starts_with("MAX_ATTACHED=")).collect::<Vec<_>>());
75
76		Ok(Self { worker, db, dbr })
77	}
78}
79
80#[async_trait]
81impl MetaAdapter for MetaAdapterSqlite {
82	// Tenant management
83	//*******************
84	async fn read_tenant(&self, tn_id: TnId) -> ClResult<Tenant<Box<str>>> {
85		tenant::read(&self.dbr, tn_id).await
86	}
87
88	async fn create_tenant(&self, tn_id: TnId, id_tag: &str) -> ClResult<TnId> {
89		tenant::create(&self.db, tn_id, id_tag).await
90	}
91
92	async fn update_tenant(&self, tn_id: TnId, tenant: &UpdateTenantData) -> ClResult<()> {
93		tenant::update(&self.db, tn_id, tenant).await
94	}
95	async fn delete_tenant(&self, tn_id: TnId) -> ClResult<()> {
96		tenant::delete(&self.db, tn_id).await
97	}
98
99	async fn list_tenants(&self, opts: &ListTenantsMetaOptions) -> ClResult<Vec<TenantListMeta>> {
100		tenant::list(&self.dbr, opts).await
101	}
102
103	async fn list_profiles(
104		&self,
105		tn_id: TnId,
106		opts: &ListProfileOptions,
107	) -> ClResult<Vec<Profile<Box<str>>>> {
108		profile::list(&self.dbr, tn_id, opts).await
109	}
110
111	async fn get_relationships(
112		&self,
113		tn_id: TnId,
114		target_id_tags: &[&str],
115	) -> ClResult<std::collections::HashMap<String, (bool, bool)>> {
116		profile::get_relationships(&self.dbr, tn_id, target_id_tags).await
117	}
118
119	async fn read_profile(
120		&self,
121		tn_id: TnId,
122		id_tag: &str,
123	) -> ClResult<(Box<str>, Profile<Box<str>>)> {
124		profile::read(&self.dbr, tn_id, id_tag).await
125	}
126
127	async fn read_profile_roles(
128		&self,
129		tn_id: TnId,
130		id_tag: &str,
131	) -> ClResult<Option<Box<[Box<str>]>>> {
132		profile::read_roles(&self.dbr, tn_id, id_tag).await
133	}
134
135	async fn create_profile(
136		&self,
137		tn_id: TnId,
138		profile: &Profile<&str>,
139		etag: &str,
140	) -> ClResult<()> {
141		profile::create(&self.db, tn_id, profile, etag).await
142	}
143	async fn update_profile(
144		&self,
145		tn_id: TnId,
146		id_tag: &str,
147		profile: &UpdateProfileData,
148	) -> ClResult<()> {
149		profile::update(&self.db, tn_id, id_tag, profile).await
150	}
151
152	async fn read_profile_public_key(
153		&self,
154		id_tag: &str,
155		key_id: &str,
156	) -> ClResult<(Box<str>, Timestamp)> {
157		profile::read_public_key(&self.dbr, id_tag, key_id).await
158	}
159
160	async fn add_profile_public_key(
161		&self,
162		id_tag: &str,
163		key_id: &str,
164		public_key: &str,
165	) -> ClResult<()> {
166		profile::add_public_key(&self.db, id_tag, key_id, public_key).await
167	}
168
169	async fn process_profile_refresh<'a>(
170		&self,
171		callback: Box<dyn Fn(TnId, &'a str, Option<&'a str>) -> ClResult<()> + Send>,
172	) {
173		profile::process_refresh(&self.dbr, callback).await
174	}
175
176	async fn list_stale_profiles(
177		&self,
178		max_age_secs: i64,
179		limit: u32,
180	) -> ClResult<Vec<(TnId, Box<str>, Option<Box<str>>)>> {
181		profile::list_stale_profiles(&self.dbr, max_age_secs, limit).await
182	}
183
184	// Action management
185	//*******************
186	async fn list_actions(
187		&self,
188		tn_id: TnId,
189		opts: &ListActionOptions,
190	) -> ClResult<Vec<ActionView>> {
191		action::list(&self.dbr, tn_id, opts).await
192	}
193
194	async fn list_action_tokens(
195		&self,
196		tn_id: TnId,
197		opts: &ListActionOptions,
198	) -> ClResult<Box<[Box<str>]>> {
199		action::list_tokens(&self.dbr, tn_id, opts).await
200	}
201
202	async fn get_action_id(&self, tn_id: TnId, a_id: u64) -> ClResult<Box<str>> {
203		action::get_id(&self.dbr, tn_id, a_id).await
204	}
205
206	async fn create_action(
207		&self,
208		tn_id: TnId,
209		action: &Action<&str>,
210		key: Option<&str>,
211	) -> ClResult<ActionId<Box<str>>> {
212		action::create(&self.db, tn_id, action, key).await
213	}
214
215	async fn finalize_action(
216		&self,
217		tn_id: TnId,
218		a_id: u64,
219		action_id: &str,
220		options: FinalizeActionOptions<'_>,
221	) -> ClResult<()> {
222		action::finalize(&self.db, tn_id, a_id, action_id, options).await
223	}
224
225	async fn create_inbound_action(
226		&self,
227		tn_id: TnId,
228		action_id: &str,
229		token: &str,
230		ack_token: Option<&str>,
231	) -> ClResult<()> {
232		action::create_inbound(&self.db, tn_id, action_id, token, ack_token).await
233	}
234
235	async fn get_action_root_id(&self, tn_id: TnId, action_id: &str) -> ClResult<Box<str>> {
236		action::get_root_id(&self.dbr, tn_id, action_id).await
237	}
238
239	async fn get_action_data(&self, tn_id: TnId, action_id: &str) -> ClResult<Option<ActionData>> {
240		action::get_data(&self.dbr, tn_id, action_id).await
241	}
242
243	async fn get_action_by_key(
244		&self,
245		tn_id: TnId,
246		action_key: &str,
247	) -> ClResult<Option<Action<Box<str>>>> {
248		action::get_by_key(&self.dbr, tn_id, action_key).await
249	}
250
251	async fn store_action_token(&self, tn_id: TnId, action_id: &str, token: &str) -> ClResult<()> {
252		action::store_token(&self.db, tn_id, action_id, token).await
253	}
254
255	async fn get_action_token(&self, tn_id: TnId, action_id: &str) -> ClResult<Option<Box<str>>> {
256		action::get_token(&self.dbr, tn_id, action_id).await
257	}
258
259	async fn update_action_data(
260		&self,
261		tn_id: TnId,
262		action_id: &str,
263		opts: &UpdateActionDataOptions,
264	) -> ClResult<()> {
265		action::update_data(&self.db, tn_id, action_id, opts).await
266	}
267
268	async fn update_inbound_action(
269		&self,
270		tn_id: TnId,
271		action_id: &str,
272		status: Option<char>,
273	) -> ClResult<()> {
274		action::update_inbound(&self.db, tn_id, action_id, status).await
275	}
276
277	async fn get_related_action_tokens(
278		&self,
279		tn_id: TnId,
280		aprv_action_id: &str,
281	) -> ClResult<Vec<(Box<str>, Box<str>)>> {
282		action::get_related_tokens(&self.db, tn_id, aprv_action_id).await
283	}
284
285	async fn create_outbound_action(
286		&self,
287		tn_id: TnId,
288		action_id: &str,
289		token: &str,
290		opts: &CreateOutboundActionOptions,
291	) -> ClResult<()> {
292		action::create_outbound(&self.db, tn_id, action_id, token, opts).await
293	}
294
295	// File management
296	//*****************
297	async fn get_file_id(&self, tn_id: TnId, f_id: u64) -> ClResult<Box<str>> {
298		file::get_id(&self.dbr, tn_id, f_id).await
299	}
300
301	async fn list_files(&self, tn_id: TnId, opts: &ListFileOptions) -> ClResult<Vec<FileView>> {
302		file::list(&self.dbr, tn_id, opts).await
303	}
304
305	async fn list_file_variants(
306		&self,
307		tn_id: TnId,
308		file_id: FileId<&str>,
309	) -> ClResult<Vec<FileVariant<Box<str>>>> {
310		file::list_variants(&self.dbr, tn_id, file_id).await
311	}
312
313	async fn list_available_variants(&self, tn_id: TnId, file_id: &str) -> ClResult<Vec<Box<str>>> {
314		file::list_available_variants(&self.dbr, tn_id, file_id).await
315	}
316
317	async fn read_file_variant(
318		&self,
319		tn_id: TnId,
320		variant_id: &str,
321	) -> ClResult<FileVariant<Box<str>>> {
322		file::read_variant(&self.dbr, tn_id, variant_id).await
323	}
324
325	async fn read_file_id_by_variant(&self, tn_id: TnId, variant_id: &str) -> ClResult<Box<str>> {
326		file::read_file_id_by_variant(&self.dbr, tn_id, variant_id).await
327	}
328
329	async fn read_f_id_by_file_id(&self, tn_id: TnId, file_id: &str) -> ClResult<u64> {
330		file::read_f_id_by_file_id(&self.dbr, tn_id, file_id).await
331	}
332
333	async fn create_file(&self, tn_id: TnId, opts: CreateFile) -> ClResult<FileId<Box<str>>> {
334		file::create(&self.db, tn_id, opts).await
335	}
336
337	async fn create_file_variant<'a>(
338		&'a self,
339		tn_id: TnId,
340		f_id: u64,
341		opts: FileVariant<&'a str>,
342	) -> ClResult<&'a str> {
343		file::create_variant(&self.db, tn_id, f_id, opts).await
344	}
345
346	async fn update_file_id(&self, tn_id: TnId, f_id: u64, file_id: &str) -> ClResult<()> {
347		file::update_id(&self.db, tn_id, f_id, file_id).await
348	}
349
350	async fn finalize_file(&self, tn_id: TnId, f_id: u64, file_id: &str) -> ClResult<()> {
351		file::finalize_file(&self.db, tn_id, f_id, file_id).await
352	}
353
354	// Task scheduler
355	//****************
356	async fn list_tasks(&self, opts: ListTaskOptions) -> ClResult<Vec<Task>> {
357		task::list(&self.dbr, &opts).await
358	}
359
360	async fn list_task_ids(&self, kind: &str, keys: &[Box<str>]) -> ClResult<Vec<u64>> {
361		task::list_ids(&self.dbr, kind, keys).await
362	}
363
364	async fn create_task(
365		&self,
366		kind: &'static str,
367		key: Option<&str>,
368		input: &str,
369		deps: &[u64],
370	) -> ClResult<u64> {
371		task::create(&self.db, kind, key, input, deps).await
372	}
373
374	async fn update_task_finished(&self, task_id: u64, output: &str) -> ClResult<()> {
375		task::mark_finished(&self.db, task_id, output).await
376	}
377
378	async fn update_task_error(
379		&self,
380		task_id: u64,
381		output: &str,
382		next_at: Option<Timestamp>,
383	) -> ClResult<()> {
384		task::mark_error(&self.db, task_id, output, next_at).await
385	}
386
387	async fn find_task_by_key(&self, key: &str) -> ClResult<Option<Task>> {
388		task::find_by_key(&self.dbr, key).await
389	}
390
391	async fn update_task(&self, task_id: u64, patch: &TaskPatch) -> ClResult<()> {
392		task::update(&self.db, task_id, patch).await
393	}
394
395	// Phase 1: Profile Management
396	async fn get_profile_info(&self, tn_id: TnId, id_tag: &str) -> ClResult<ProfileData> {
397		profile::get_info(&self.dbr, tn_id, id_tag).await
398	}
399
400	// Phase 2: Action Management
401	//***************************
402
403	async fn get_action(&self, tn_id: TnId, action_id: &str) -> ClResult<Option<ActionView>> {
404		action::get(&self.dbr, tn_id, action_id).await
405	}
406
407	async fn update_action(
408		&self,
409		tn_id: TnId,
410		action_id: &str,
411		content: Option<&str>,
412		attachments: Option<&[&str]>,
413	) -> ClResult<()> {
414		action::update(&self.db, tn_id, action_id, content, attachments).await
415	}
416
417	async fn delete_action(&self, tn_id: TnId, action_id: &str) -> ClResult<()> {
418		action::delete(&self.db, tn_id, action_id).await
419	}
420
421	async fn add_reaction(
422		&self,
423		tn_id: TnId,
424		action_id: &str,
425		reactor_id_tag: &str,
426		reaction_type: &str,
427		content: Option<&str>,
428	) -> ClResult<()> {
429		action::add_reaction(&self.db, tn_id, action_id, reactor_id_tag, reaction_type, content)
430			.await
431	}
432
433	async fn list_reactions(&self, tn_id: TnId, action_id: &str) -> ClResult<Vec<ReactionData>> {
434		action::list_reactions(&self.dbr, tn_id, action_id).await
435	}
436
437	// Phase 2: File Management Enhancements
438	//**************************************
439
440	async fn delete_file(&self, tn_id: TnId, file_id: &str) -> ClResult<()> {
441		file::delete(&self.db, tn_id, file_id).await
442	}
443
444	// Settings Management
445	//*********************
446
447	async fn list_settings(
448		&self,
449		tn_id: TnId,
450		prefix: Option<&[String]>,
451	) -> ClResult<std::collections::HashMap<String, serde_json::Value>> {
452		setting::list(&self.dbr, tn_id, prefix).await
453	}
454
455	async fn read_setting(&self, tn_id: TnId, name: &str) -> ClResult<Option<serde_json::Value>> {
456		setting::read(&self.dbr, tn_id, name).await
457	}
458
459	async fn update_setting(
460		&self,
461		tn_id: TnId,
462		name: &str,
463		value: Option<serde_json::Value>,
464	) -> ClResult<()> {
465		setting::update(&self.db, tn_id, name, value).await
466	}
467
468	// Reference / Bookmark Management
469	//********************************
470
471	async fn list_refs(&self, tn_id: TnId, opts: &ListRefsOptions) -> ClResult<Vec<RefData>> {
472		reference::list(&self.dbr, tn_id, opts).await
473	}
474
475	async fn get_ref(&self, tn_id: TnId, ref_id: &str) -> ClResult<Option<(Box<str>, Box<str>)>> {
476		reference::get(&self.dbr, tn_id, ref_id).await
477	}
478
479	async fn create_ref(
480		&self,
481		tn_id: TnId,
482		ref_id: &str,
483		opts: &CreateRefOptions,
484	) -> ClResult<RefData> {
485		reference::create(&self.db, tn_id, ref_id, opts).await
486	}
487
488	async fn delete_ref(&self, tn_id: TnId, ref_id: &str) -> ClResult<()> {
489		reference::delete(&self.db, tn_id, ref_id).await
490	}
491
492	async fn use_ref(
493		&self,
494		ref_id: &str,
495		expected_types: &[&str],
496	) -> ClResult<(TnId, Box<str>, RefData)> {
497		reference::use_ref(&self.db, ref_id, expected_types).await
498	}
499
500	async fn validate_ref(
501		&self,
502		ref_id: &str,
503		expected_types: &[&str],
504	) -> ClResult<(TnId, Box<str>, RefData)> {
505		reference::validate_ref(&self.dbr, ref_id, expected_types).await
506	}
507
508	// Tag Management
509	//***************
510
511	async fn list_tags(
512		&self,
513		tn_id: TnId,
514		prefix: Option<&str>,
515		with_counts: bool,
516		limit: Option<u32>,
517	) -> ClResult<Vec<TagInfo>> {
518		tag::list(&self.dbr, tn_id, prefix, with_counts, limit).await
519	}
520
521	async fn add_tag(&self, tn_id: TnId, file_id: &str, tag: &str) -> ClResult<Vec<String>> {
522		tag::add(&self.db, tn_id, file_id, tag).await
523	}
524
525	async fn remove_tag(&self, tn_id: TnId, file_id: &str, tag: &str) -> ClResult<Vec<String>> {
526		tag::remove(&self.db, tn_id, file_id, tag).await
527	}
528
529	// File Management Enhancements
530	//****************************
531
532	async fn update_file_data(
533		&self,
534		tn_id: TnId,
535		file_id: &str,
536		opts: &UpdateFileOptions,
537	) -> ClResult<()> {
538		file::update_data(&self.db, tn_id, file_id, opts).await
539	}
540
541	async fn read_file(&self, tn_id: TnId, file_id: &str) -> ClResult<Option<FileView>> {
542		file::read(&self.dbr, tn_id, file_id).await
543	}
544
545	// File User Data (per-user file activity tracking)
546	//**************************************************
547
548	async fn record_file_access(&self, tn_id: TnId, id_tag: &str, file_id: &str) -> ClResult<()> {
549		file_user_data::record_access(&self.db, tn_id, id_tag, file_id).await
550	}
551
552	async fn record_file_modification(
553		&self,
554		tn_id: TnId,
555		id_tag: &str,
556		file_id: &str,
557	) -> ClResult<()> {
558		file_user_data::record_modification(&self.db, tn_id, id_tag, file_id).await
559	}
560
561	async fn update_file_user_data(
562		&self,
563		tn_id: TnId,
564		id_tag: &str,
565		file_id: &str,
566		pinned: Option<bool>,
567		starred: Option<bool>,
568	) -> ClResult<FileUserData> {
569		file_user_data::update(&self.db, tn_id, id_tag, file_id, pinned, starred).await
570	}
571
572	async fn get_file_user_data(
573		&self,
574		tn_id: TnId,
575		id_tag: &str,
576		file_id: &str,
577	) -> ClResult<Option<FileUserData>> {
578		file_user_data::get(&self.dbr, tn_id, id_tag, file_id).await
579	}
580
581	// Push Subscription Management
582	//*****************************
583
584	async fn list_push_subscriptions(&self, tn_id: TnId) -> ClResult<Vec<PushSubscription>> {
585		push::list(&self.dbr, tn_id).await
586	}
587
588	async fn create_push_subscription(
589		&self,
590		tn_id: TnId,
591		subscription: &PushSubscriptionData,
592	) -> ClResult<u64> {
593		push::create(&self.db, tn_id, subscription).await
594	}
595
596	async fn delete_push_subscription(&self, tn_id: TnId, subscription_id: u64) -> ClResult<()> {
597		push::delete(&self.db, tn_id, subscription_id).await
598	}
599}