Skip to main content

lb_rs/io/
mod.rs

1//! Members of this model are concerned with the details of IO, generally
2//! disk and network. This is the module any on-disk migrations will live
3//! and ideas around network, disk, and memory caches will be expressed.
4//! Code here should not be platform dependent, and should strive to be
5//! suitable for a range of devices: iPhones with flaky networks to servers
6//! and workstations with excellent networks.
7
8pub mod docs;
9pub mod network;
10
11use crate::model::account::Account;
12use crate::model::core_config::Config;
13use crate::model::file_like::FileLike;
14use crate::model::file_metadata::Owner;
15use crate::model::signed_file::SignedFile;
16use crate::model::signed_meta::SignedMeta;
17use crate::service::activity::DocEvent;
18use crate::service::lb_id::LbID;
19use crate::{Lb, LbErrKind, LbResult};
20use db_rs::{Db, List, LookupTable, Single, TxHandle};
21use db_rs_derive::Schema;
22use docs::AsyncDocs;
23use std::fs;
24use std::ops::{Deref, DerefMut};
25use std::sync::Arc;
26use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
27use uuid::Uuid;
28
29pub(crate) type LbDb = Arc<RwLock<CoreDb>>;
30// todo: limit visibility
31pub type CoreDb = CoreV4;
32
33#[derive(Schema, Debug)]
34#[cfg_attr(feature = "no-network", derive(Clone))]
35pub struct CoreV3 {
36    pub account: Single<Account>,
37    pub last_synced: Single<i64>,
38    pub root: Single<Uuid>,
39    pub local_metadata: LookupTable<Uuid, SignedFile>,
40    pub base_metadata: LookupTable<Uuid, SignedFile>,
41
42    /// map from pub key to username
43    pub pub_key_lookup: LookupTable<Owner, String>,
44
45    pub doc_events: List<DocEvent>,
46}
47
48#[derive(Schema, Debug)]
49#[cfg_attr(feature = "no-network", derive(Clone))]
50pub struct CoreV4 {
51    pub account: Single<Account>,
52    pub last_synced: Single<i64>,
53    pub root: Single<Uuid>,
54    pub local_metadata: LookupTable<Uuid, SignedMeta>,
55    pub base_metadata: LookupTable<Uuid, SignedMeta>,
56
57    /// map from pub key to username
58    pub pub_key_lookup: LookupTable<Owner, String>,
59
60    pub doc_events: List<DocEvent>,
61    pub id: Single<LbID>,
62}
63
64pub async fn migrate_and_init(cfg: &Config, docs: &AsyncDocs) -> LbResult<CoreV4> {
65    let cfg = db_rs::Config::in_folder(&cfg.writeable_path);
66
67    let mut db =
68        CoreDb::init(cfg.clone()).map_err(|err| LbErrKind::Unexpected(format!("{err:#?}")))?;
69    let mut old = CoreV3::init(cfg).map_err(|err| LbErrKind::Unexpected(format!("{err:#?}")))?;
70
71    // --- migration begins ---
72    let tx = db.begin_transaction()?;
73
74    info!("evaluating migration");
75    if old.account.get().is_some() && db.account.get().is_none() {
76        info!("performing migration");
77        if let Some(account) = old.account.get().cloned() {
78            db.account.insert(account)?;
79        }
80
81        if let Some(last_synced) = old.last_synced.get().copied() {
82            db.last_synced.insert(last_synced)?;
83        }
84
85        if let Some(root) = old.root.get().copied() {
86            db.root.insert(root)?;
87        }
88        for (id, file) in old.base_metadata.get() {
89            let mut meta: SignedMeta = file.clone().into();
90            if meta.is_document() {
91                if let Some(doc) = docs.maybe_get(*id, file.document_hmac().copied()).await? {
92                    meta.timestamped_value
93                        .value
94                        .set_hmac_and_size(file.document_hmac().copied(), Some(doc.value.len()));
95                } else {
96                    warn!("local document missing for {id}");
97                }
98            }
99            db.base_metadata.insert(*id, meta)?;
100        }
101
102        for (id, file) in old.local_metadata.get() {
103            let mut meta: SignedMeta = file.clone().into();
104            if meta.is_document() {
105                if let Some(doc) = docs.maybe_get(*id, file.document_hmac().copied()).await? {
106                    meta.timestamped_value
107                        .value
108                        .set_hmac_and_size(file.document_hmac().copied(), Some(doc.value.len()));
109                } else {
110                    warn!("local document missing for {id}");
111                }
112            }
113
114            db.local_metadata.insert(*id, meta)?;
115        }
116
117        for (o, s) in old.pub_key_lookup.get() {
118            db.pub_key_lookup.insert(*o, s.clone())?;
119        }
120
121        for event in old.doc_events.get() {
122            db.doc_events.push(*event)?;
123        }
124    } else {
125        info!("no migration");
126    }
127
128    tx.drop_safely()?;
129    // --- migration ends ---
130
131    info!("cleaning up");
132    old.account.clear()?;
133    let old_db = old.config()?.db_location_v2()?;
134    let _ = fs::remove_file(old_db);
135
136    Ok(db)
137}
138
139pub struct LbRO<'a> {
140    guard: RwLockReadGuard<'a, CoreDb>,
141}
142
143impl LbRO<'_> {
144    pub fn db(&self) -> &CoreDb {
145        self.guard.deref()
146    }
147}
148
149pub struct LbTx<'a> {
150    guard: RwLockWriteGuard<'a, CoreDb>,
151    tx: TxHandle,
152}
153
154impl LbTx<'_> {
155    pub fn db(&mut self) -> &mut CoreDb {
156        self.guard.deref_mut()
157    }
158
159    pub fn end(self) {
160        self.tx.drop_safely().unwrap();
161    }
162}
163
164impl Lb {
165    pub async fn ro_tx(&self) -> LbRO<'_> {
166        let start = std::time::Instant::now();
167
168        // let guard = tokio::time::timeout(std::time::Duration::from_secs(1), self.db.read())
169        //     .await
170        //     .unwrap();
171
172        let guard = self.db.read().await;
173
174        if start.elapsed() > std::time::Duration::from_millis(100) {
175            warn!("readonly transaction lock acquisition took {:?}", start.elapsed());
176        }
177
178        LbRO { guard }
179    }
180
181    pub async fn begin_tx(&self) -> LbTx<'_> {
182        let start = std::time::Instant::now();
183
184        // let mut guard = tokio::time::timeout(std::time::Duration::from_secs(1), self.db.write())
185        //     .await
186        //     .unwrap();
187
188        let mut guard = self.db.write().await;
189
190        if start.elapsed() > std::time::Duration::from_millis(100) {
191            warn!("readwrite transaction lock acquisition took {:?}", start.elapsed());
192        }
193
194        let tx = guard.begin_transaction().unwrap();
195
196        LbTx { guard, tx }
197    }
198}