balius_runtime/kv/
mod.rs

1use std::sync::Arc;
2use tokio::sync::{Mutex, RwLock};
3
4use crate::{metrics::Metrics, wit::balius::app::kv as wit, Error};
5
6pub use wit::{Host as CustomKv, KvError, Payload};
7
8#[derive(Clone)]
9pub enum Kv {
10    Mock,
11    Memory(Arc<RwLock<memory::MemoryKv>>),
12    Redb(Arc<RwLock<redb::RedbKv>>),
13    Custom(Arc<Mutex<dyn KvProvider + Send + Sync>>),
14}
15
16impl Kv {
17    pub async fn into_ephemeral(self) -> Result<Self, Error> {
18        match self {
19            Kv::Mock => Ok(self),
20            Kv::Memory(x) => Ok(Kv::Memory(x)),
21            Kv::Redb(x) => Ok(Kv::Redb(Arc::new(RwLock::new(
22                x.write().await.into_ephemeral()?,
23            )))),
24            Kv::Custom(_) => Err(Error::KvError(
25                "Cannot convert custom kv into ephemeral".to_string(),
26            )),
27        }
28    }
29}
30
31pub struct KvHost {
32    worker_id: String,
33    provider: Kv,
34    metrics: Arc<Metrics>,
35}
36impl KvHost {
37    pub fn new(worker_id: &str, provider: &Kv, metrics: &Arc<Metrics>) -> Self {
38        Self {
39            worker_id: worker_id.to_string(),
40            provider: provider.clone(),
41            metrics: metrics.clone(),
42        }
43    }
44}
45
46pub mod memory;
47pub mod redb;
48
49#[async_trait::async_trait]
50pub trait KvProvider {
51    async fn get_value(&mut self, worker_id: &str, key: String) -> Result<Payload, KvError>;
52    async fn set_value(
53        &mut self,
54        worker_id: &str,
55        key: String,
56        value: Payload,
57    ) -> Result<(), KvError>;
58    async fn list_values(
59        &mut self,
60        worker_id: &str,
61        prefix: String,
62    ) -> Result<Vec<String>, KvError>;
63}
64
65impl wit::Host for KvHost {
66    async fn get_value(&mut self, key: String) -> Result<Payload, KvError> {
67        self.metrics.kv_get(&self.worker_id);
68        match &mut self.provider {
69            Kv::Mock => todo!(),
70            Kv::Memory(kv) => {
71                kv.read()
72                    .await
73                    .clone()
74                    .get_value(&self.worker_id, key)
75                    .await
76            }
77            Kv::Redb(kv) => {
78                kv.read()
79                    .await
80                    .clone()
81                    .get_value(&self.worker_id, key)
82                    .await
83            }
84            Kv::Custom(kv) => {
85                let mut lock = kv.lock().await;
86                lock.get_value(&self.worker_id, key).await
87            }
88        }
89    }
90
91    async fn set_value(&mut self, key: String, value: Payload) -> Result<(), KvError> {
92        self.metrics.kv_set(&self.worker_id);
93        match &mut self.provider {
94            Kv::Mock => todo!(),
95            Kv::Memory(kv) => {
96                kv.write()
97                    .await
98                    .set_value(&self.worker_id, key, value)
99                    .await
100            }
101            Kv::Redb(kv) => {
102                kv.read()
103                    .await
104                    .clone()
105                    .set_value(&self.worker_id, key, value)
106                    .await
107            }
108
109            Kv::Custom(kv) => {
110                let mut lock = kv.lock().await;
111                lock.set_value(&self.worker_id, key, value).await
112            }
113        }
114    }
115
116    async fn list_values(&mut self, prefix: String) -> Result<Vec<String>, KvError> {
117        self.metrics.kv_list(&self.worker_id);
118        match &mut self.provider {
119            Kv::Mock => todo!(),
120            Kv::Memory(kv) => {
121                kv.read()
122                    .await
123                    .clone()
124                    .list_values(&self.worker_id, prefix)
125                    .await
126            }
127            Kv::Redb(kv) => {
128                kv.read()
129                    .await
130                    .clone()
131                    .list_values(&self.worker_id, prefix)
132                    .await
133            }
134
135            Kv::Custom(kv) => {
136                let mut lock = kv.lock().await;
137                lock.list_values(&self.worker_id, prefix).await
138            }
139        }
140    }
141}