lb_rs/io/
mod.rs

1//! Members of this model are concerned with the details of IO, generally
2//! disk and network. This is the module any on-disk migrations will live
3//! and ideas around network, disk, and memory caches will be expressed.
4//! Code here should not be platform dependent, and should strive to be
5//! suitable for a range of devices: iPhones with flaky networks to servers
6//! and workstations with excellent networks.
7
8pub mod docs;
9pub mod network;
10
11use crate::model::account::Account;
12use crate::model::core_config::Config;
13use crate::model::file_like::FileLike;
14use crate::model::file_metadata::Owner;
15use crate::model::signed_file::SignedFile;
16use crate::model::signed_meta::SignedMeta;
17use crate::service::activity::DocEvent;
18use crate::{Lb, LbErrKind, LbResult};
19use db_rs::{Db, List, LookupTable, Single, TxHandle};
20use db_rs_derive::Schema;
21use docs::AsyncDocs;
22use std::fs;
23use std::ops::{Deref, DerefMut};
24use std::sync::Arc;
25use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
26use uuid::Uuid;
27
28pub(crate) type LbDb = Arc<RwLock<CoreDb>>;
29// todo: limit visibility
30pub type CoreDb = CoreV4;
31
32#[derive(Schema, Debug)]
33#[cfg_attr(feature = "no-network", derive(Clone))]
34pub struct CoreV3 {
35    pub account: Single<Account>,
36    pub last_synced: Single<i64>,
37    pub root: Single<Uuid>,
38    pub local_metadata: LookupTable<Uuid, SignedFile>,
39    pub base_metadata: LookupTable<Uuid, SignedFile>,
40
41    /// map from pub key to username
42    pub pub_key_lookup: LookupTable<Owner, String>,
43
44    pub doc_events: List<DocEvent>,
45}
46
47#[derive(Schema, Debug)]
48#[cfg_attr(feature = "no-network", derive(Clone))]
49pub struct CoreV4 {
50    pub account: Single<Account>,
51    pub last_synced: Single<i64>,
52    pub root: Single<Uuid>,
53    pub local_metadata: LookupTable<Uuid, SignedMeta>,
54    pub base_metadata: LookupTable<Uuid, SignedMeta>,
55
56    /// map from pub key to username
57    pub pub_key_lookup: LookupTable<Owner, String>,
58
59    pub doc_events: List<DocEvent>,
60}
61
62pub async fn migrate_and_init(cfg: &Config, docs: &AsyncDocs) -> LbResult<CoreV4> {
63    let cfg = db_rs::Config::in_folder(&cfg.writeable_path);
64
65    let mut db =
66        CoreDb::init(cfg.clone()).map_err(|err| LbErrKind::Unexpected(format!("{err:#?}")))?;
67    let mut old = CoreV3::init(cfg).map_err(|err| LbErrKind::Unexpected(format!("{err:#?}")))?;
68
69    // --- migration begins ---
70    let tx = db.begin_transaction()?;
71
72    info!("evaluating migration");
73    if old.account.get().is_some() && db.account.get().is_none() {
74        info!("performing migration");
75        if let Some(account) = old.account.get().cloned() {
76            db.account.insert(account)?;
77        }
78
79        if let Some(last_synced) = old.last_synced.get().copied() {
80            db.last_synced.insert(last_synced)?;
81        }
82
83        if let Some(root) = old.root.get().copied() {
84            db.root.insert(root)?;
85        }
86        for (id, file) in old.base_metadata.get() {
87            let mut meta: SignedMeta = file.clone().into();
88            if meta.is_document() {
89                if let Some(doc) = docs.maybe_get(*id, file.document_hmac().copied()).await? {
90                    meta.timestamped_value
91                        .value
92                        .set_hmac_and_size(file.document_hmac().copied(), Some(doc.value.len()));
93                } else {
94                    warn!("local document missing for {id}");
95                }
96            }
97            db.base_metadata.insert(*id, meta)?;
98        }
99
100        for (id, file) in old.local_metadata.get() {
101            let mut meta: SignedMeta = file.clone().into();
102            if meta.is_document() {
103                if let Some(doc) = docs.maybe_get(*id, file.document_hmac().copied()).await? {
104                    meta.timestamped_value
105                        .value
106                        .set_hmac_and_size(file.document_hmac().copied(), Some(doc.value.len()));
107                } else {
108                    warn!("local document missing for {id}");
109                }
110            }
111
112            db.local_metadata.insert(*id, meta)?;
113        }
114
115        for (o, s) in old.pub_key_lookup.get() {
116            db.pub_key_lookup.insert(*o, s.clone())?;
117        }
118
119        for event in old.doc_events.get() {
120            db.doc_events.push(*event)?;
121        }
122    } else {
123        info!("no migration");
124    }
125
126    tx.drop_safely()?;
127    // --- migration ends ---
128
129    info!("cleaning up");
130    old.account.clear()?;
131    let old_db = old.config()?.db_location_v2()?;
132    let _ = fs::remove_file(old_db);
133
134    Ok(db)
135}
136
137pub struct LbRO<'a> {
138    guard: RwLockReadGuard<'a, CoreDb>,
139}
140
141impl LbRO<'_> {
142    pub fn db(&self) -> &CoreDb {
143        self.guard.deref()
144    }
145}
146
147pub struct LbTx<'a> {
148    guard: RwLockWriteGuard<'a, CoreDb>,
149    tx: TxHandle,
150}
151
152impl LbTx<'_> {
153    pub fn db(&mut self) -> &mut CoreDb {
154        self.guard.deref_mut()
155    }
156
157    pub fn end(self) {
158        self.tx.drop_safely().unwrap();
159    }
160}
161
162impl Lb {
163    pub async fn ro_tx(&self) -> LbRO<'_> {
164        let start = std::time::Instant::now();
165
166        // let guard = tokio::time::timeout(std::time::Duration::from_secs(1), self.db.read())
167        //     .await
168        //     .unwrap();
169
170        let guard = self.db.read().await;
171
172        if start.elapsed() > std::time::Duration::from_millis(100) {
173            warn!("readonly transaction lock acquisition took {:?}", start.elapsed());
174        }
175
176        LbRO { guard }
177    }
178
179    pub async fn begin_tx(&self) -> LbTx<'_> {
180        let start = std::time::Instant::now();
181
182        // let mut guard = tokio::time::timeout(std::time::Duration::from_secs(1), self.db.write())
183        //     .await
184        //     .unwrap();
185
186        let mut guard = self.db.write().await;
187
188        if start.elapsed() > std::time::Duration::from_millis(100) {
189            warn!("readwrite transaction lock acquisition took {:?}", start.elapsed());
190        }
191
192        let tx = guard.begin_transaction().unwrap();
193
194        LbTx { guard, tx }
195    }
196}