1pub mod docs;
9pub mod network;
10
11use crate::model::account::Account;
12use crate::model::core_config::Config;
13use crate::model::file_like::FileLike;
14use crate::model::file_metadata::Owner;
15use crate::model::signed_file::SignedFile;
16use crate::model::signed_meta::SignedMeta;
17use crate::service::activity::DocEvent;
18use crate::{Lb, LbErrKind, LbResult};
19use db_rs::{Db, List, LookupTable, Single, TxHandle};
20use db_rs_derive::Schema;
21use docs::AsyncDocs;
22use std::fs;
23use std::ops::{Deref, DerefMut};
24use std::sync::Arc;
25use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
26use uuid::Uuid;
27
28pub(crate) type LbDb = Arc<RwLock<CoreDb>>;
29pub type CoreDb = CoreV4;
31
32#[derive(Schema, Debug)]
33#[cfg_attr(feature = "no-network", derive(Clone))]
34pub struct CoreV3 {
35 pub account: Single<Account>,
36 pub last_synced: Single<i64>,
37 pub root: Single<Uuid>,
38 pub local_metadata: LookupTable<Uuid, SignedFile>,
39 pub base_metadata: LookupTable<Uuid, SignedFile>,
40
41 pub pub_key_lookup: LookupTable<Owner, String>,
43
44 pub doc_events: List<DocEvent>,
45}
46
47#[derive(Schema, Debug)]
48#[cfg_attr(feature = "no-network", derive(Clone))]
49pub struct CoreV4 {
50 pub account: Single<Account>,
51 pub last_synced: Single<i64>,
52 pub root: Single<Uuid>,
53 pub local_metadata: LookupTable<Uuid, SignedMeta>,
54 pub base_metadata: LookupTable<Uuid, SignedMeta>,
55
56 pub pub_key_lookup: LookupTable<Owner, String>,
58
59 pub doc_events: List<DocEvent>,
60}
61
62pub async fn migrate_and_init(cfg: &Config, docs: &AsyncDocs) -> LbResult<CoreV4> {
63 let cfg = db_rs::Config::in_folder(&cfg.writeable_path);
64
65 let mut db =
66 CoreDb::init(cfg.clone()).map_err(|err| LbErrKind::Unexpected(format!("{err:#?}")))?;
67 let mut old = CoreV3::init(cfg).map_err(|err| LbErrKind::Unexpected(format!("{err:#?}")))?;
68
69 let tx = db.begin_transaction()?;
71
72 info!("evaluating migration");
73 if old.account.get().is_some() && db.account.get().is_none() {
74 info!("performing migration");
75 if let Some(account) = old.account.get().cloned() {
76 db.account.insert(account)?;
77 }
78
79 if let Some(last_synced) = old.last_synced.get().copied() {
80 db.last_synced.insert(last_synced)?;
81 }
82
83 if let Some(root) = old.root.get().copied() {
84 db.root.insert(root)?;
85 }
86 for (id, file) in old.base_metadata.get() {
87 let mut meta: SignedMeta = file.clone().into();
88 if meta.is_document() {
89 if let Some(doc) = docs.maybe_get(*id, file.document_hmac().copied()).await? {
90 meta.timestamped_value
91 .value
92 .set_hmac_and_size(file.document_hmac().copied(), Some(doc.value.len()));
93 } else {
94 warn!("local document missing for {id}");
95 }
96 }
97 db.base_metadata.insert(*id, meta)?;
98 }
99
100 for (id, file) in old.local_metadata.get() {
101 let mut meta: SignedMeta = file.clone().into();
102 if meta.is_document() {
103 if let Some(doc) = docs.maybe_get(*id, file.document_hmac().copied()).await? {
104 meta.timestamped_value
105 .value
106 .set_hmac_and_size(file.document_hmac().copied(), Some(doc.value.len()));
107 } else {
108 warn!("local document missing for {id}");
109 }
110 }
111
112 db.local_metadata.insert(*id, meta)?;
113 }
114
115 for (o, s) in old.pub_key_lookup.get() {
116 db.pub_key_lookup.insert(*o, s.clone())?;
117 }
118
119 for event in old.doc_events.get() {
120 db.doc_events.push(*event)?;
121 }
122 } else {
123 info!("no migration");
124 }
125
126 tx.drop_safely()?;
127 info!("cleaning up");
130 old.account.clear()?;
131 let old_db = old.config()?.db_location_v2()?;
132 let _ = fs::remove_file(old_db);
133
134 Ok(db)
135}
136
137pub struct LbRO<'a> {
138 guard: RwLockReadGuard<'a, CoreDb>,
139}
140
141impl LbRO<'_> {
142 pub fn db(&self) -> &CoreDb {
143 self.guard.deref()
144 }
145}
146
147pub struct LbTx<'a> {
148 guard: RwLockWriteGuard<'a, CoreDb>,
149 tx: TxHandle,
150}
151
152impl LbTx<'_> {
153 pub fn db(&mut self) -> &mut CoreDb {
154 self.guard.deref_mut()
155 }
156
157 pub fn end(self) {
158 self.tx.drop_safely().unwrap();
159 }
160}
161
162impl Lb {
163 pub async fn ro_tx(&self) -> LbRO<'_> {
164 let start = std::time::Instant::now();
165
166 let guard = self.db.read().await;
171
172 if start.elapsed() > std::time::Duration::from_millis(100) {
173 warn!("readonly transaction lock acquisition took {:?}", start.elapsed());
174 }
175
176 LbRO { guard }
177 }
178
179 pub async fn begin_tx(&self) -> LbTx<'_> {
180 let start = std::time::Instant::now();
181
182 let mut guard = self.db.write().await;
187
188 if start.elapsed() > std::time::Duration::from_millis(100) {
189 warn!("readwrite transaction lock acquisition took {:?}", start.elapsed());
190 }
191
192 let tx = guard.begin_transaction().unwrap();
193
194 LbTx { guard, tx }
195 }
196}