1use std::sync::Arc;
2use tokio::sync::{Mutex, RwLock};
3
4use crate::{metrics::Metrics, wit::balius::app::kv as wit, Error};
5
6pub use wit::{Host as CustomKv, KvError, Payload};
7
8#[derive(Clone)]
9pub enum Kv {
10 Mock,
11 Memory(Arc<RwLock<memory::MemoryKv>>),
12 Redb(Arc<RwLock<redb::RedbKv>>),
13 Custom(Arc<Mutex<dyn KvProvider + Send + Sync>>),
14}
15
16impl Kv {
17 pub async fn into_ephemeral(self) -> Result<Self, Error> {
18 match self {
19 Kv::Mock => Ok(self),
20 Kv::Memory(x) => Ok(Kv::Memory(x)),
21 Kv::Redb(x) => Ok(Kv::Redb(Arc::new(RwLock::new(
22 x.write().await.into_ephemeral()?,
23 )))),
24 Kv::Custom(_) => Err(Error::KvError(
25 "Cannot convert custom kv into ephemeral".to_string(),
26 )),
27 }
28 }
29}
30
31pub struct KvHost {
32 worker_id: String,
33 provider: Kv,
34 metrics: Arc<Metrics>,
35}
36impl KvHost {
37 pub fn new(worker_id: &str, provider: &Kv, metrics: &Arc<Metrics>) -> Self {
38 Self {
39 worker_id: worker_id.to_string(),
40 provider: provider.clone(),
41 metrics: metrics.clone(),
42 }
43 }
44}
45
46pub mod memory;
47pub mod redb;
48
49#[async_trait::async_trait]
50pub trait KvProvider {
51 async fn get_value(&mut self, worker_id: &str, key: String) -> Result<Payload, KvError>;
52 async fn set_value(
53 &mut self,
54 worker_id: &str,
55 key: String,
56 value: Payload,
57 ) -> Result<(), KvError>;
58 async fn list_values(
59 &mut self,
60 worker_id: &str,
61 prefix: String,
62 ) -> Result<Vec<String>, KvError>;
63}
64
65impl wit::Host for KvHost {
66 async fn get_value(&mut self, key: String) -> Result<Payload, KvError> {
67 self.metrics.kv_get(&self.worker_id);
68 match &mut self.provider {
69 Kv::Mock => todo!(),
70 Kv::Memory(kv) => {
71 kv.read()
72 .await
73 .clone()
74 .get_value(&self.worker_id, key)
75 .await
76 }
77 Kv::Redb(kv) => {
78 kv.read()
79 .await
80 .clone()
81 .get_value(&self.worker_id, key)
82 .await
83 }
84 Kv::Custom(kv) => {
85 let mut lock = kv.lock().await;
86 lock.get_value(&self.worker_id, key).await
87 }
88 }
89 }
90
91 async fn set_value(&mut self, key: String, value: Payload) -> Result<(), KvError> {
92 self.metrics.kv_set(&self.worker_id);
93 match &mut self.provider {
94 Kv::Mock => todo!(),
95 Kv::Memory(kv) => {
96 kv.write()
97 .await
98 .set_value(&self.worker_id, key, value)
99 .await
100 }
101 Kv::Redb(kv) => {
102 kv.read()
103 .await
104 .clone()
105 .set_value(&self.worker_id, key, value)
106 .await
107 }
108
109 Kv::Custom(kv) => {
110 let mut lock = kv.lock().await;
111 lock.set_value(&self.worker_id, key, value).await
112 }
113 }
114 }
115
116 async fn list_values(&mut self, prefix: String) -> Result<Vec<String>, KvError> {
117 self.metrics.kv_list(&self.worker_id);
118 match &mut self.provider {
119 Kv::Mock => todo!(),
120 Kv::Memory(kv) => {
121 kv.read()
122 .await
123 .clone()
124 .list_values(&self.worker_id, prefix)
125 .await
126 }
127 Kv::Redb(kv) => {
128 kv.read()
129 .await
130 .clone()
131 .list_values(&self.worker_id, prefix)
132 .await
133 }
134
135 Kv::Custom(kv) => {
136 let mut lock = kv.lock().await;
137 lock.list_values(&self.worker_id, prefix).await
138 }
139 }
140 }
141}