msg_store_plugin_leveldb/
lib.rs

1use bincode::{serialize, deserialize};
2use msg_store::{
3    errors::{ Error, DbError },
4    Keeper,
5    store::{
6        Package,
7        PacketMetaData,
8        Store
9    },
10    uuid::Uuid
11};
12use db_key::Key;
13use leveldb::{
14    database::Database,
15    iterator::Iterable,
16    kv::KV,
17    options::{
18        Options,
19        ReadOptions,
20        WriteOptions
21    }
22};
23use serde::{Serialize, Deserialize};
24use std::{
25    fs::create_dir_all,
26    path::Path
27};
28
29pub type LevelStore = Store<Leveldb>;
30
31#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Serialize, Deserialize)]
32pub struct Id {
33    pub timestamp: u128,
34    pub sequence: u32
35}
36impl Id {
37    pub fn to_string(&self) -> String {
38        format!("{}-{}", self.timestamp, self.sequence)
39    }
40    pub fn from_string(id: &str) -> Uuid {
41        let split_str = id.split("-").collect::<Vec<&str>>();
42        Uuid { 
43            timestamp: split_str[0].parse().expect("Could not parse timestamp"), 
44            sequence: split_str[0].parse().expect("Could not parse sequence")
45        }
46    }
47    pub fn from_uuid(uuid: Uuid) -> Self {
48        Self {
49            timestamp: uuid.timestamp,
50            sequence: uuid.sequence
51        }
52    }
53    pub fn to_uuid(self) -> Uuid {
54        Uuid { 
55            timestamp: self.timestamp, 
56            sequence: self.sequence
57        }
58    }
59}
60
61impl Key for Id {
62    fn from_u8(key: &[u8]) -> Self {
63        deserialize(key).expect("Could not deserialize key")
64    }
65    fn as_slice<T, F: Fn(&[u8]) -> T>(&self, f: F) -> T {
66        f(&serialize(&self).expect("Could not serialize uuid"))
67    }
68}
69
70pub fn open(location: &Path) -> Result<LevelStore, Error> {
71    let plugin = match Leveldb::new(location) {
72        Ok(plugin) => Ok(plugin),
73        Err(db_error) => Err(Error::DbError(db_error))
74    }?;
75    Store::open(plugin)
76}
77
78pub struct Leveldb {
79    pub msgs: Database<Id>,
80    pub data: Database<Id>
81}
82
83impl Leveldb {
84    pub fn new(dir: &Path) -> Result<Leveldb, DbError> {
85        create_dir_all(&dir).expect("Could not create db location dir.");
86
87        let mut msgs_path = dir.to_path_buf();
88        msgs_path.push("msgs");
89        let msgs_path = msgs_path.as_path();
90
91        let mut msg_data_path = dir.to_path_buf();
92        msg_data_path.push("msg_data");
93        let msg_data_path = msg_data_path.as_path();
94
95        let mut msgs_options = Options::new();
96        msgs_options.create_if_missing = true;
97
98        let mut msg_data_options = Options::new();
99        msg_data_options.create_if_missing = true;
100
101        let msgs = match Database::open(msgs_path, msgs_options) {
102            Ok(db) => Ok(db),
103            Err(error) => Err(DbError(error.to_string()))
104        }?;
105        let data = match Database::open(Path::new(msg_data_path), msg_data_options) {
106            Ok(db) => Ok(db),
107            Err(error) => Err(DbError(error.to_string()))
108        }?;
109        
110        Ok(Leveldb {
111            msgs,
112            data
113        })
114    }
115}
116
117#[derive(Debug, Deserialize, Serialize)]
118struct DbMetadata {
119    priority: u32,
120    byte_size: u32
121}
122
123impl Keeper for Leveldb {
124    fn add(&mut self, package: &Package) -> Result<(), DbError> {
125        let data = DbMetadata {
126            priority: package.priority,
127            byte_size: package.byte_size
128        };
129        let serialized_data = match serialize(&data) {
130            Ok(data) => Ok(data),
131            Err(error) => Err(DbError(error.to_string()))
132        }?;
133        let msg = match serialize(&package.msg) {
134            Ok(data) => Ok(data),
135            Err(error) => Err(DbError(error.to_string()))
136        }?;
137        match self.data.put(WriteOptions::new(), Id::from_uuid(package.uuid), &serialized_data) {
138            Ok(_) => Ok(()),
139            Err(error) => Err(DbError(error.to_string()))
140        }?;
141        match self.msgs.put(WriteOptions::new(), Id::from_uuid(package.uuid), &msg) {
142            Ok(_) => Ok(()),
143            Err(error) => Err(DbError(error.to_string()))
144        }?;
145        Ok(())  
146    }
147    fn get(&mut self, uuid: &Uuid) -> Result<Option<String>, DbError> {
148        let data = match self.msgs.get(ReadOptions::new(), Id::from_uuid(*uuid)) {
149            Ok(data) => Ok(data),
150            Err(error) => Err(DbError(error.to_string()))
151        }?;
152        if let Some(data) = data {
153            match deserialize(&data) {
154                Ok(data) => Ok(data),
155                Err(error) => Err(DbError(error.to_string()))
156            }
157        } else {
158            Ok(None)
159        }   
160    }
161    fn del(&mut self, uuid: &Uuid) -> Result<(), DbError> {
162        match self.msgs.delete(WriteOptions::new(), Id::from_uuid(*uuid)) {
163            Ok(_) => Ok(()),
164            Err(error) => Err(DbError(error.to_string()))
165        }
166    }
167    fn fetch(&mut self) -> Result<Vec<PacketMetaData>, DbError> {
168        self.data.iter(ReadOptions::new()).map(|(id, data)| -> Result<PacketMetaData, DbError> {
169            let db_metadata: DbMetadata = match deserialize(&data) {
170                Ok(data) => Ok(data),
171                Err(error) => Err(DbError(error.to_string()))
172            }?;
173            Ok(PacketMetaData { 
174                uuid: id.to_uuid(),
175                priority: db_metadata.priority, 
176                byte_size: db_metadata.byte_size
177            })
178        }).collect::<Result<Vec<PacketMetaData>, DbError>>()
179    }
180}
181
182#[cfg(test)]
183mod tests {
184    #[test]
185    fn it_works() {
186        assert_eq!(2 + 2, 4);
187    }
188}