msg_store_plugin_leveldb/
lib.rs1use bincode::{serialize, deserialize};
2use msg_store::{
3 errors::{ Error, DbError },
4 Keeper,
5 store::{
6 Package,
7 PacketMetaData,
8 Store
9 },
10 uuid::Uuid
11};
12use db_key::Key;
13use leveldb::{
14 database::Database,
15 iterator::Iterable,
16 kv::KV,
17 options::{
18 Options,
19 ReadOptions,
20 WriteOptions
21 }
22};
23use serde::{Serialize, Deserialize};
24use std::{
25 fs::create_dir_all,
26 path::Path
27};
28
29pub type LevelStore = Store<Leveldb>;
30
31#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Serialize, Deserialize)]
32pub struct Id {
33 pub timestamp: u128,
34 pub sequence: u32
35}
36impl Id {
37 pub fn to_string(&self) -> String {
38 format!("{}-{}", self.timestamp, self.sequence)
39 }
40 pub fn from_string(id: &str) -> Uuid {
41 let split_str = id.split("-").collect::<Vec<&str>>();
42 Uuid {
43 timestamp: split_str[0].parse().expect("Could not parse timestamp"),
44 sequence: split_str[0].parse().expect("Could not parse sequence")
45 }
46 }
47 pub fn from_uuid(uuid: Uuid) -> Self {
48 Self {
49 timestamp: uuid.timestamp,
50 sequence: uuid.sequence
51 }
52 }
53 pub fn to_uuid(self) -> Uuid {
54 Uuid {
55 timestamp: self.timestamp,
56 sequence: self.sequence
57 }
58 }
59}
60
61impl Key for Id {
62 fn from_u8(key: &[u8]) -> Self {
63 deserialize(key).expect("Could not deserialize key")
64 }
65 fn as_slice<T, F: Fn(&[u8]) -> T>(&self, f: F) -> T {
66 f(&serialize(&self).expect("Could not serialize uuid"))
67 }
68}
69
70pub fn open(location: &Path) -> Result<LevelStore, Error> {
71 let plugin = match Leveldb::new(location) {
72 Ok(plugin) => Ok(plugin),
73 Err(db_error) => Err(Error::DbError(db_error))
74 }?;
75 Store::open(plugin)
76}
77
78pub struct Leveldb {
79 pub msgs: Database<Id>,
80 pub data: Database<Id>
81}
82
83impl Leveldb {
84 pub fn new(dir: &Path) -> Result<Leveldb, DbError> {
85 create_dir_all(&dir).expect("Could not create db location dir.");
86
87 let mut msgs_path = dir.to_path_buf();
88 msgs_path.push("msgs");
89 let msgs_path = msgs_path.as_path();
90
91 let mut msg_data_path = dir.to_path_buf();
92 msg_data_path.push("msg_data");
93 let msg_data_path = msg_data_path.as_path();
94
95 let mut msgs_options = Options::new();
96 msgs_options.create_if_missing = true;
97
98 let mut msg_data_options = Options::new();
99 msg_data_options.create_if_missing = true;
100
101 let msgs = match Database::open(msgs_path, msgs_options) {
102 Ok(db) => Ok(db),
103 Err(error) => Err(DbError(error.to_string()))
104 }?;
105 let data = match Database::open(Path::new(msg_data_path), msg_data_options) {
106 Ok(db) => Ok(db),
107 Err(error) => Err(DbError(error.to_string()))
108 }?;
109
110 Ok(Leveldb {
111 msgs,
112 data
113 })
114 }
115}
116
117#[derive(Debug, Deserialize, Serialize)]
118struct DbMetadata {
119 priority: u32,
120 byte_size: u32
121}
122
123impl Keeper for Leveldb {
124 fn add(&mut self, package: &Package) -> Result<(), DbError> {
125 let data = DbMetadata {
126 priority: package.priority,
127 byte_size: package.byte_size
128 };
129 let serialized_data = match serialize(&data) {
130 Ok(data) => Ok(data),
131 Err(error) => Err(DbError(error.to_string()))
132 }?;
133 let msg = match serialize(&package.msg) {
134 Ok(data) => Ok(data),
135 Err(error) => Err(DbError(error.to_string()))
136 }?;
137 match self.data.put(WriteOptions::new(), Id::from_uuid(package.uuid), &serialized_data) {
138 Ok(_) => Ok(()),
139 Err(error) => Err(DbError(error.to_string()))
140 }?;
141 match self.msgs.put(WriteOptions::new(), Id::from_uuid(package.uuid), &msg) {
142 Ok(_) => Ok(()),
143 Err(error) => Err(DbError(error.to_string()))
144 }?;
145 Ok(())
146 }
147 fn get(&mut self, uuid: &Uuid) -> Result<Option<String>, DbError> {
148 let data = match self.msgs.get(ReadOptions::new(), Id::from_uuid(*uuid)) {
149 Ok(data) => Ok(data),
150 Err(error) => Err(DbError(error.to_string()))
151 }?;
152 if let Some(data) = data {
153 match deserialize(&data) {
154 Ok(data) => Ok(data),
155 Err(error) => Err(DbError(error.to_string()))
156 }
157 } else {
158 Ok(None)
159 }
160 }
161 fn del(&mut self, uuid: &Uuid) -> Result<(), DbError> {
162 match self.msgs.delete(WriteOptions::new(), Id::from_uuid(*uuid)) {
163 Ok(_) => Ok(()),
164 Err(error) => Err(DbError(error.to_string()))
165 }
166 }
167 fn fetch(&mut self) -> Result<Vec<PacketMetaData>, DbError> {
168 self.data.iter(ReadOptions::new()).map(|(id, data)| -> Result<PacketMetaData, DbError> {
169 let db_metadata: DbMetadata = match deserialize(&data) {
170 Ok(data) => Ok(data),
171 Err(error) => Err(DbError(error.to_string()))
172 }?;
173 Ok(PacketMetaData {
174 uuid: id.to_uuid(),
175 priority: db_metadata.priority,
176 byte_size: db_metadata.byte_size
177 })
178 }).collect::<Result<Vec<PacketMetaData>, DbError>>()
179 }
180}
181
182#[cfg(test)]
183mod tests {
184 #[test]
185 fn it_works() {
186 assert_eq!(2 + 2, 4);
187 }
188}