1pub mod docs;
9pub mod network;
10
11use crate::model::account::Account;
12use crate::model::core_config::Config;
13use crate::model::file_like::FileLike;
14use crate::model::file_metadata::Owner;
15use crate::model::signed_file::SignedFile;
16use crate::model::signed_meta::SignedMeta;
17use crate::service::activity::DocEvent;
18use crate::service::lb_id::LbID;
19use crate::{Lb, LbErrKind, LbResult};
20use db_rs::{Db, List, LookupTable, Single, TxHandle};
21use db_rs_derive::Schema;
22use docs::AsyncDocs;
23use std::fs;
24use std::ops::{Deref, DerefMut};
25use std::sync::Arc;
26use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
27use uuid::Uuid;
28
29pub(crate) type LbDb = Arc<RwLock<CoreDb>>;
30pub type CoreDb = CoreV4;
32
33#[derive(Schema, Debug)]
34#[cfg_attr(feature = "no-network", derive(Clone))]
35pub struct CoreV3 {
36 pub account: Single<Account>,
37 pub last_synced: Single<i64>,
38 pub root: Single<Uuid>,
39 pub local_metadata: LookupTable<Uuid, SignedFile>,
40 pub base_metadata: LookupTable<Uuid, SignedFile>,
41
42 pub pub_key_lookup: LookupTable<Owner, String>,
44
45 pub doc_events: List<DocEvent>,
46}
47
48#[derive(Schema, Debug)]
49#[cfg_attr(feature = "no-network", derive(Clone))]
50pub struct CoreV4 {
51 pub account: Single<Account>,
52 pub last_synced: Single<i64>,
53 pub root: Single<Uuid>,
54 pub local_metadata: LookupTable<Uuid, SignedMeta>,
55 pub base_metadata: LookupTable<Uuid, SignedMeta>,
56
57 pub pub_key_lookup: LookupTable<Owner, String>,
59
60 pub doc_events: List<DocEvent>,
61 pub id: Single<LbID>,
62}
63
64pub async fn migrate_and_init(cfg: &Config, docs: &AsyncDocs) -> LbResult<CoreV4> {
65 let cfg = db_rs::Config::in_folder(&cfg.writeable_path);
66
67 let mut db =
68 CoreDb::init(cfg.clone()).map_err(|err| LbErrKind::Unexpected(format!("{err:#?}")))?;
69 let mut old = CoreV3::init(cfg).map_err(|err| LbErrKind::Unexpected(format!("{err:#?}")))?;
70
71 let tx = db.begin_transaction()?;
73
74 info!("evaluating migration");
75 if old.account.get().is_some() && db.account.get().is_none() {
76 info!("performing migration");
77 if let Some(account) = old.account.get().cloned() {
78 db.account.insert(account)?;
79 }
80
81 if let Some(last_synced) = old.last_synced.get().copied() {
82 db.last_synced.insert(last_synced)?;
83 }
84
85 if let Some(root) = old.root.get().copied() {
86 db.root.insert(root)?;
87 }
88 for (id, file) in old.base_metadata.get() {
89 let mut meta: SignedMeta = file.clone().into();
90 if meta.is_document() {
91 if let Some(doc) = docs.maybe_get(*id, file.document_hmac().copied()).await? {
92 meta.timestamped_value
93 .value
94 .set_hmac_and_size(file.document_hmac().copied(), Some(doc.value.len()));
95 } else {
96 warn!("local document missing for {id}");
97 }
98 }
99 db.base_metadata.insert(*id, meta)?;
100 }
101
102 for (id, file) in old.local_metadata.get() {
103 let mut meta: SignedMeta = file.clone().into();
104 if meta.is_document() {
105 if let Some(doc) = docs.maybe_get(*id, file.document_hmac().copied()).await? {
106 meta.timestamped_value
107 .value
108 .set_hmac_and_size(file.document_hmac().copied(), Some(doc.value.len()));
109 } else {
110 warn!("local document missing for {id}");
111 }
112 }
113
114 db.local_metadata.insert(*id, meta)?;
115 }
116
117 for (o, s) in old.pub_key_lookup.get() {
118 db.pub_key_lookup.insert(*o, s.clone())?;
119 }
120
121 for event in old.doc_events.get() {
122 db.doc_events.push(*event)?;
123 }
124 } else {
125 info!("no migration");
126 }
127
128 tx.drop_safely()?;
129 info!("cleaning up");
132 old.account.clear()?;
133 let old_db = old.config()?.db_location_v2()?;
134 let _ = fs::remove_file(old_db);
135
136 Ok(db)
137}
138
139pub struct LbRO<'a> {
140 guard: RwLockReadGuard<'a, CoreDb>,
141}
142
143impl LbRO<'_> {
144 pub fn db(&self) -> &CoreDb {
145 self.guard.deref()
146 }
147}
148
149pub struct LbTx<'a> {
150 guard: RwLockWriteGuard<'a, CoreDb>,
151 tx: TxHandle,
152}
153
154impl LbTx<'_> {
155 pub fn db(&mut self) -> &mut CoreDb {
156 self.guard.deref_mut()
157 }
158
159 pub fn end(self) {
160 self.tx.drop_safely().unwrap();
161 }
162}
163
164impl Lb {
165 pub async fn ro_tx(&self) -> LbRO<'_> {
166 let start = std::time::Instant::now();
167
168 let guard = self.db.read().await;
173
174 if start.elapsed() > std::time::Duration::from_millis(100) {
175 warn!("readonly transaction lock acquisition took {:?}", start.elapsed());
176 }
177
178 LbRO { guard }
179 }
180
181 pub async fn begin_tx(&self) -> LbTx<'_> {
182 let start = std::time::Instant::now();
183
184 let mut guard = self.db.write().await;
189
190 if start.elapsed() > std::time::Duration::from_millis(100) {
191 warn!("readwrite transaction lock acquisition took {:?}", start.elapsed());
192 }
193
194 let tx = guard.begin_transaction().unwrap();
195
196 LbTx { guard, tx }
197 }
198}