web5_rust/common/
sqlite_store.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
use super::Error;
use super::traits::KeyValueStore;

use super::database::MAIN;

use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::path::PathBuf;

use rusqlite::Connection;

use serde_json::from_slice as deserialize;
use serde_json::to_vec as serialize;
use serde::{Serialize, Deserialize};

const PARTITION_KEY: &str = "__PARTITIONS__";

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Partitions {
    paths: Vec<PathBuf>
}

#[derive(Clone)]
pub struct SqliteStore {
    db: Arc<Mutex<Connection>>,
    location: PathBuf,
    partitions: HashMap<PathBuf, Self>
}

impl std::fmt::Debug for SqliteStore {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        if let Some(main) = self.partitions.get(&PathBuf::from(MAIN)) {
            write!(f,
                "SqliteDB[\n    location: {:?},\n    entries: {}\n]",
                self.location,
                main.values().ok().map(|a| a.len().to_string()).unwrap_or("Error".to_string())
            )
        } else {
            write!(f,
                "SqliteStore[\n    location: {:?},\n    entries: {}\n    partitions: {:#?}\n]",
                self.location,
                self.values().ok().map(|a| a.len().to_string()).unwrap_or("Error".to_string()),
                self.partitions
            )
        }
    }
}

impl SqliteStore {
    pub fn new_sql(location: PathBuf) -> Result<Self, Error> {
        std::fs::create_dir_all(location.clone())?;
        let db = Connection::open(location.join("kvs.db"))?;
        db.execute("CREATE TABLE if not exists kvs(key TEXT NOT NULL UNIQUE, value TEXT);", [])?;
        Ok(SqliteStore{db: Arc::new(Mutex::new(db)), location, partitions: HashMap::new()})
    }
}

impl KeyValueStore for SqliteStore {
    fn new(location: PathBuf) -> Result<Self, Error> {
        let mut store = SqliteStore::new_sql(location)?;
        if let Some(partitions) = store.get(PARTITION_KEY.as_bytes())? {
            let partitions = deserialize::<Partitions>(&partitions)?;
            for path in partitions.paths {
                store.partition(path)?;
            }
        } else {
            let partitions = Partitions{paths: vec![]};
            store.set(PARTITION_KEY.as_bytes(), &serialize(&partitions)?)?;
        }
        Ok(store)
    }
    fn partition(&mut self, paths: PathBuf) -> Result<&mut dyn KeyValueStore, Error> {
        let mut store = self;
        for path in paths.iter() {
            let path = path.into();
            if store.partitions.contains_key(&path) {
                store = store.partitions.get_mut(&path).unwrap();
            } else {
                let mut partitions = deserialize::<Partitions>(
                    &store.get(PARTITION_KEY.as_bytes())?.unwrap()
                )?;
                if !partitions.paths.contains(&path) {
                    partitions.paths.push(path.clone());
                    store.set(PARTITION_KEY.as_bytes(), &serialize(&partitions)?)?;
                }
                store.partitions.insert(
                    path.clone(),
                    SqliteStore::new(store.location().join(path.clone()))?
                );
                store = store.partitions.get_mut(&path).unwrap();
            }
        }
        Ok(*Box::new(store))
    }
    fn get_partition(&self, paths: PathBuf) -> Option<&dyn KeyValueStore> {
        let mut store = self;
        for path in paths.iter() {
            let path: PathBuf = path.into();
            store = store.partitions.get(&path)?;
        }
        Some(*Box::<&dyn KeyValueStore>::new(store))
    }
    fn clear(&mut self) -> Result<(), Error> {
        for part in self.partitions.values_mut() {
            part.clear()?;
        }
        let keys: Vec<Vec<u8>> = self.keys()?;
        for key in keys {
            self.delete(&key)?;
        }
        Ok(())
    }
    fn delete(&mut self, key: &[u8]) -> Result<(), Error> {
        let error = Error::bad_request("SqliteStore.delete", "Mutex poisoned");
        self.db.lock().or(Err(error))?.execute("DELETE FROM kvs WHERE key = ?;", [hex::encode(key)])?;
        Ok(())
    }
    fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
        let error = Error::bad_request("SqliteStore.get", "Mutex poisoned");
        let db = self.db.lock().or(Err(error))?;
        let mut stmt = db.prepare(&format!("SELECT value FROM kvs where key = \'{}\'", hex::encode(key)))?;
        let result = stmt.query_and_then([], |row| {
            let item: String = row.get(0)?;
            Ok(hex::decode(item)?)
        })?.collect::<Result<Vec<Vec<u8>>, Error>>()?;
        Ok(result.first().cloned())
    }
    fn set(&mut self, key: &[u8], value: &[u8]) -> Result<(), Error> {
        let error = Error::bad_request("SqliteStore.set", "Mutex poisoned");
        self.db.lock().or(Err(error))?.execute("
            INSERT INTO kvs(key, value) VALUES (?1, ?2) ON CONFLICT(key) DO UPDATE SET value=excluded.value;
        ", [hex::encode(key), hex::encode(value)])?;
        Ok(())
    }

    fn get_all(&self) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Error> {
        let error = Error::bad_request("SqliteStore.get_all", "Mutex poisoned");
        let db = self.db.lock().or(Err(error))?;
        let mut stmt = db.prepare("SELECT key, value FROM kvs")?;
        let result = stmt.query_and_then([], |row| {
            let key: String = row.get(0)?;
            let value: String = row.get(1)?;
            Ok((hex::decode(key)?, hex::decode(value)?))
        })?.collect::<Result<Vec<(Vec<u8>, Vec<u8>)>, Error>>()?
        .into_iter().filter(|(k, _)| k != PARTITION_KEY.as_bytes()).collect();
        Ok(result)
    }

    fn keys(&self) -> Result<Vec<Vec<u8>>, Error> {
        Ok(self.get_all()?.into_iter().map(|(k, _)| k).collect())
    }

    fn values(&self) -> Result<Vec<Vec<u8>>, Error> {
        Ok(self.get_all()?.into_iter().map(|(_, v)| v).collect())
    }

    fn location(&self) -> PathBuf { self.location.clone() }
}