1pub mod docs;
9pub mod network;
10
11use crate::model::account::Account;
12use crate::model::core_config::Config;
13use crate::model::file_like::FileLike;
14use crate::model::file_metadata::Owner;
15use crate::model::signed_file::SignedFile;
16use crate::model::signed_meta::SignedMeta;
17use crate::service::activity::DocEvent;
18use crate::service::lb_id::LbID;
19use crate::{Lb, LbErrKind, LbResult};
20use db_rs::{Db, List, LookupTable, Single, TxHandle};
21use db_rs_derive::Schema;
22use docs::AsyncDocs;
23use std::fs;
24use std::ops::{Deref, DerefMut};
25use std::sync::Arc;
26use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
27use uuid::Uuid;
28use web_time::{Duration, Instant};
29
30pub(crate) type LbDb = Arc<RwLock<CoreDb>>;
31pub type CoreDb = CoreV4;
33
34#[derive(Schema, Debug)]
35#[cfg_attr(feature = "no-network", derive(Clone))]
36pub struct CoreV3 {
37 pub account: Single<Account>,
38 pub last_synced: Single<i64>,
39 pub root: Single<Uuid>,
40 pub local_metadata: LookupTable<Uuid, SignedFile>,
41 pub base_metadata: LookupTable<Uuid, SignedFile>,
42
43 pub pub_key_lookup: LookupTable<Owner, String>,
45
46 pub doc_events: List<DocEvent>,
47}
48
49#[derive(Schema, Debug)]
50#[cfg_attr(feature = "no-network", derive(Clone))]
51pub struct CoreV4 {
52 pub account: Single<Account>,
53 pub last_synced: Single<i64>,
54 pub root: Single<Uuid>,
55 pub local_metadata: LookupTable<Uuid, SignedMeta>,
56 pub base_metadata: LookupTable<Uuid, SignedMeta>,
57
58 pub pub_key_lookup: LookupTable<Owner, String>,
60
61 pub doc_events: List<DocEvent>,
62 pub id: Single<LbID>,
63}
64
65pub async fn migrate_and_init(cfg: &Config, docs: &AsyncDocs) -> LbResult<CoreV4> {
66 let cfg = db_rs::Config::in_folder(&cfg.writeable_path);
67
68 let mut db =
69 CoreDb::init(cfg.clone()).map_err(|err| LbErrKind::Unexpected(format!("{err:#?}")))?;
70 let mut old = CoreV3::init(cfg).map_err(|err| LbErrKind::Unexpected(format!("{err:#?}")))?;
71
72 let tx = db.begin_transaction()?;
74
75 info!("evaluating migration");
76 if old.account.get().is_some() && db.account.get().is_none() {
77 info!("performing migration");
78 if let Some(account) = old.account.get().cloned() {
79 db.account.insert(account)?;
80 }
81
82 if let Some(last_synced) = old.last_synced.get().copied() {
83 db.last_synced.insert(last_synced)?;
84 }
85
86 if let Some(root) = old.root.get().copied() {
87 db.root.insert(root)?;
88 }
89 for (id, file) in old.base_metadata.get() {
90 let mut meta: SignedMeta = file.clone().into();
91 if meta.is_document() {
92 if let Some(doc) = docs.maybe_get(*id, file.document_hmac().copied()).await? {
93 meta.timestamped_value
94 .value
95 .set_hmac_and_size(file.document_hmac().copied(), Some(doc.value.len()));
96 } else {
97 warn!("local document missing for {id}");
98 }
99 }
100 db.base_metadata.insert(*id, meta)?;
101 }
102
103 for (id, file) in old.local_metadata.get() {
104 let mut meta: SignedMeta = file.clone().into();
105 if meta.is_document() {
106 if let Some(doc) = docs.maybe_get(*id, file.document_hmac().copied()).await? {
107 meta.timestamped_value
108 .value
109 .set_hmac_and_size(file.document_hmac().copied(), Some(doc.value.len()));
110 } else {
111 warn!("local document missing for {id}");
112 }
113 }
114
115 db.local_metadata.insert(*id, meta)?;
116 }
117
118 for (o, s) in old.pub_key_lookup.get() {
119 db.pub_key_lookup.insert(*o, s.clone())?;
120 }
121
122 for event in old.doc_events.get() {
123 db.doc_events.push(*event)?;
124 }
125 } else {
126 info!("no migration");
127 }
128
129 tx.drop_safely()?;
130 info!("cleaning up");
133 old.account.clear()?;
134 let old_db = old.config()?.db_location_v2()?;
135 let _ = fs::remove_file(old_db);
136
137 Ok(db)
138}
139
140pub struct LbRO<'a> {
141 guard: RwLockReadGuard<'a, CoreDb>,
142}
143
144impl LbRO<'_> {
145 pub fn db(&self) -> &CoreDb {
146 self.guard.deref()
147 }
148}
149
150pub struct LbTx<'a> {
151 guard: RwLockWriteGuard<'a, CoreDb>,
152 tx: TxHandle,
153}
154
155impl LbTx<'_> {
156 pub fn db(&mut self) -> &mut CoreDb {
157 self.guard.deref_mut()
158 }
159
160 pub fn end(self) {
161 self.tx.drop_safely().unwrap();
162 }
163}
164
165impl Lb {
166 pub async fn ro_tx(&self) -> LbRO<'_> {
167 let start = Instant::now();
168
169 let guard = self.db.read().await;
170
171 if start.elapsed() > Duration::from_millis(100) {
172 warn!("readonly transaction lock acquisition took {:?}", start.elapsed());
173 }
174
175 LbRO { guard }
176 }
177
178 pub async fn begin_tx(&self) -> LbTx<'_> {
179 let start = Instant::now();
180
181 let mut guard = self.db.write().await;
182
183 if start.elapsed() > Duration::from_millis(100) {
184 warn!("readwrite transaction lock acquisition took {:?}", start.elapsed());
185 }
186
187 let tx = guard.begin_transaction().unwrap();
188
189 LbTx { guard, tx }
190 }
191}