balius_runtime/kv/
mod.rs

1use std::sync::Arc;
2use tokio::sync::{Mutex, RwLock};
3
4use crate::{metrics::Metrics, wit::balius::app::kv as wit};
5
6pub use wit::{Host as CustomKv, KvError, Payload};
7
8#[derive(Clone)]
9pub enum Kv {
10    Mock,
11    Memory(Arc<RwLock<memory::MemoryKv>>),
12    Custom(Arc<Mutex<dyn KvProvider + Send + Sync>>),
13}
14
15pub struct KvHost {
16    worker_id: String,
17    provider: Kv,
18    metrics: Arc<Metrics>,
19}
20impl KvHost {
21    pub fn new(worker_id: &str, provider: &Kv, metrics: &Arc<Metrics>) -> Self {
22        Self {
23            worker_id: worker_id.to_string(),
24            provider: provider.clone(),
25            metrics: metrics.clone(),
26        }
27    }
28}
29
30pub mod memory;
31
32#[async_trait::async_trait]
33pub trait KvProvider {
34    async fn get_value(&mut self, worker_id: &str, key: String) -> Result<Payload, KvError>;
35    async fn set_value(
36        &mut self,
37        worker_id: &str,
38        key: String,
39        value: Payload,
40    ) -> Result<(), KvError>;
41    async fn list_values(
42        &mut self,
43        worker_id: &str,
44        prefix: String,
45    ) -> Result<Vec<String>, KvError>;
46}
47
48#[async_trait::async_trait]
49impl wit::Host for KvHost {
50    async fn get_value(&mut self, key: String) -> Result<Payload, KvError> {
51        self.metrics.kv_get(&self.worker_id);
52        match &mut self.provider {
53            Kv::Mock => todo!(),
54            Kv::Memory(kv) => {
55                kv.read()
56                    .await
57                    .clone()
58                    .get_value(&self.worker_id, key)
59                    .await
60            }
61            Kv::Custom(kv) => {
62                let mut lock = kv.lock().await;
63                lock.get_value(&self.worker_id, key).await
64            }
65        }
66    }
67
68    async fn set_value(&mut self, key: String, value: Payload) -> Result<(), KvError> {
69        self.metrics.kv_set(&self.worker_id);
70        match &mut self.provider {
71            Kv::Mock => todo!(),
72            Kv::Memory(kv) => {
73                kv.write()
74                    .await
75                    .set_value(&self.worker_id, key, value)
76                    .await
77            }
78            Kv::Custom(kv) => {
79                let mut lock = kv.lock().await;
80                lock.set_value(&self.worker_id, key, value).await
81            }
82        }
83    }
84
85    async fn list_values(&mut self, prefix: String) -> Result<Vec<String>, KvError> {
86        self.metrics.kv_list(&self.worker_id);
87        match &mut self.provider {
88            Kv::Mock => todo!(),
89            Kv::Memory(kv) => {
90                kv.read()
91                    .await
92                    .clone()
93                    .list_values(&self.worker_id, prefix)
94                    .await
95            }
96            Kv::Custom(kv) => {
97                let mut lock = kv.lock().await;
98                lock.list_values(&self.worker_id, prefix).await
99            }
100        }
101    }
102}