Skip to main content

async_kvdb/
lib.rs

1#[cfg(feature = "json")]
2mod json;
3#[cfg(feature = "proto")]
4mod proto;
5
6pub use async_trait::async_trait;
7use bytes::Bytes;
8use smol_str::SmolStr;
9use std::collections::HashMap;
10
11#[cfg(feature = "json")]
12pub use json::KvdbJsonExt;
13#[cfg(feature = "proto")]
14pub use proto::KvdbProtoExt;
15
16pub type Key = SmolStr;
17pub type Value = Bytes;
18pub type KeyValue = (Key, Value);
19pub type Filter = dyn Fn(&Key) -> bool + Send + Sync;
20
21pub enum DbOp {
22    Get {
23        key: Key,
24        ch: oneshot::Sender<Option<Value>>,
25    },
26    GetMany {
27        keys: Vec<Key>,
28        ch: oneshot::Sender<HashMap<Key, Value>>,
29    },
30    GetAll {
31        ch: oneshot::Sender<HashMap<Key, Value>>,
32    },
33    Insert {
34        key: Key,
35        value: Value,
36    },
37    InsertMany {
38        data: HashMap<Key, Value>,
39    },
40    Delete {
41        key: Key,
42    },
43    DeleteMany {
44        keys: Vec<Key>,
45    },
46    DeleteAll,
47}
48
49#[derive(Default)]
50pub struct DbOps {
51    pub clear: bool,
52    pub insert: HashMap<Key, Value>,
53    pub delete: Vec<Key>,
54    pub get_one: HashMap<Key, Vec<oneshot::Sender<Option<Value>>>>,
55    pub get_many: Vec<DbOp>,
56    pub get_all: Vec<oneshot::Sender<HashMap<Key, Value>>>,
57}
58
59#[derive(Default)]
60pub struct DbOpMerger {
61    need_clear: bool,
62    ops: HashMap<Key, Option<Value>>,
63    get_one: HashMap<Key, Vec<oneshot::Sender<Option<Value>>>>,
64    get_many: Vec<DbOp>,
65    get_all: Vec<oneshot::Sender<HashMap<Key, Value>>>,
66}
67impl DbOpMerger {
68    pub fn new() -> Self {
69        Self {
70            need_clear: false,
71            ops: HashMap::new(),
72            get_one: HashMap::new(),
73            get_many: Vec::new(),
74            get_all: Vec::new(),
75        }
76    }
77    pub fn is_empty(&self) -> bool {
78        self.get_one.is_empty() && self.get_many.is_empty() && self.get_all.is_empty() && !self.need_clear && self.ops.is_empty()
79    }
80    pub fn merge(&mut self, op: DbOp) {
81        match op {
82            DbOp::Get { key, ch } => {
83                let chs = self.get_one.entry(key).or_default();
84                chs.push(ch);
85            }
86            DbOp::GetMany { .. } => {
87                self.get_many.push(op);
88            }
89            DbOp::GetAll { ch } => {
90                self.get_all.push(ch);
91            }
92            DbOp::Insert { key, value } => {
93                self.ops.insert(key, Some(value));
94            }
95            DbOp::InsertMany { data } => {
96                for (key, value) in data {
97                    self.ops.insert(key, Some(value));
98                }
99            }
100            DbOp::Delete { key } => {
101                self.ops.insert(key, None);
102            }
103            DbOp::DeleteMany { keys } => {
104                for key in keys {
105                    self.ops.insert(key, None);
106                }
107            }
108            DbOp::DeleteAll => {
109                self.need_clear = true;
110                self.ops.clear();
111            }
112        }
113    }
114    pub fn into_ops(self) -> DbOps {
115        let mut ops = DbOps::default();
116        for (k, v) in self.ops {
117            if let Some(v) = v {
118                ops.insert.insert(k, v);
119            } else {
120                ops.delete.push(k);
121            }
122        }
123        ops.clear = self.need_clear;
124        ops.get_one = self.get_one;
125        ops.get_many = self.get_many;
126        ops.get_all = self.get_all;
127        ops
128    }
129}
130
131#[async_trait]
132pub trait Kvdb {
133    async fn scan_keys(&self, filter: &Filter) -> Vec<Key>;
134    async fn get(&self, key: Key) -> Option<Value> {
135        let keys = Vec::from([key]);
136        self.get_many(keys).await.into_values().next()
137    }
138    async fn get_many(&self, keys: Vec<Key>) -> HashMap<Key, Value>;
139    async fn set(&self, key: Key, value: Value) {
140        let data = HashMap::from([(key, value)]);
141        self.set_many(data).await
142    }
143    async fn set_many(&self, data: HashMap<Key, Value>);
144    async fn delete(&self, key: Key) {
145        let keys = Vec::from([key]);
146        self.delete_many(keys).await
147    }
148    async fn delete_many(&self, keys: Vec<Key>);
149    async fn delete_all(&self);
150}
151
152#[test]
153fn empty() {
154    let op_merger = DbOpMerger::default();
155    assert!(op_merger.is_empty());
156}