cyfs_util/util/
db_helper.rs

1use cyfs_base::*;
2
3use rusqlite::{Connection, OpenFlags};
4use std::cell::RefCell;
5use std::path::PathBuf;
6use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
7use thread_local::ThreadLocal;
8
9pub struct SqliteConnectionHolder {
10    data_file: PathBuf,
11
12    /* SQLite does not support multiple writers. */
13    read_conn: ThreadLocal<RefCell<Connection>>,
14    write_conn: ThreadLocal<RefCell<Connection>>,
15    conn_rw_lock: RwLock<u32>,
16}
17
18impl SqliteConnectionHolder {
19    pub fn new(data_file: PathBuf) -> Self {
20        Self {
21            data_file,
22            read_conn: ThreadLocal::new(),
23            write_conn: ThreadLocal::new(),
24            conn_rw_lock: RwLock::new(0),
25        }
26    }
27
28    pub fn get_write_conn(
29        &self,
30    ) -> BuckyResult<(std::cell::RefMut<Connection>, RwLockWriteGuard<u32>)> {
31        let conn = self.write_conn.get_or_try(|| {
32            let ret = self.create_new_conn(false)?;
33            Ok::<RefCell<Connection>, BuckyError>(RefCell::new(ret))
34        })?;
35
36        let lock = self.conn_rw_lock.write().unwrap();
37        Ok((conn.borrow_mut(), lock))
38    }
39
40    pub fn get_read_conn(&self) -> BuckyResult<(std::cell::Ref<Connection>, RwLockReadGuard<u32>)> {
41        let conn = self.read_conn.get_or_try(|| {
42            let ret = self.create_new_conn(true)?;
43            Ok::<RefCell<Connection>, BuckyError>(RefCell::new(ret))
44        })?;
45
46        let lock = self.conn_rw_lock.read().unwrap();
47        Ok((conn.borrow(), lock))
48    }
49
50    pub fn create_new_conn(&self, read_only: bool) -> BuckyResult<Connection> {
51        let flags = if read_only {
52            OpenFlags::SQLITE_OPEN_READ_ONLY
53                | OpenFlags::SQLITE_OPEN_NO_MUTEX
54                | OpenFlags::SQLITE_OPEN_URI
55        } else {
56            OpenFlags::default()
57        };
58
59        let conn = Connection::open_with_flags(&self.data_file, flags).map_err(|e| {
60            let msg = format!("open db failed, db={}, {}", self.data_file.display(), e);
61            error!("{}", msg);
62
63            BuckyError::new(BuckyErrorCode::SqliteError, msg)
64        })?;
65
66        info!(
67            "open db for thread={:?}, file={}, read={}",
68            std::thread::current().id(),
69            self.data_file.display(),
70            read_only,
71        );
72        assert!(conn.is_autocommit());
73
74        // 设置一个30s的锁重试
75        if let Err(e) = conn.busy_timeout(std::time::Duration::from_secs(30)) {
76            error!("init sqlite busy_timeout error! {}", e);
77        }
78
79        Ok(conn)
80    }
81}