greentic_runner_host/storage/
state.rs1use std::str::FromStr;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use greentic_state::inmemory::InMemoryStateStore;
6use greentic_state::{StateKey as StoreStateKey, StateStore};
7use greentic_types::{EnvId, TenantCtx, TenantId};
8use serde_json::Value;
9
10use crate::engine::error::{GResult, RunnerError};
11use crate::engine::host::{SessionKey, StateHost};
12
13pub type DynStateStore = Arc<dyn StateStore>;
14
15pub(crate) const STATE_PREFIX: &str = "runner";
16
17pub struct StateStoreHost {
18 store: DynStateStore,
19}
20
21impl StateStoreHost {
22 pub fn new(store: DynStateStore) -> Self {
23 Self { store }
24 }
25}
26
27pub fn new_state_store() -> DynStateStore {
28 Arc::new(InMemoryStateStore::new())
29}
30
31pub fn state_host_from(store: DynStateStore) -> Arc<dyn StateHost> {
32 Arc::new(StateStoreHost::new(store))
33}
34
35#[async_trait]
36impl StateHost for StateStoreHost {
37 async fn get_json(&self, key: &SessionKey) -> GResult<Option<Value>> {
38 let tenant = tenant_ctx_from_key(key)?;
39 let state_key = derive_state_key(key);
40 self.store
41 .get_json(&tenant, STATE_PREFIX, &state_key, None)
42 .map_err(map_state_error)
43 }
44
45 async fn set_json(&self, key: &SessionKey, value: Value) -> GResult<()> {
46 let tenant = tenant_ctx_from_key(key)?;
47 let state_key = derive_state_key(key);
48 self.store
49 .set_json(&tenant, STATE_PREFIX, &state_key, None, &value, None)
50 .map_err(map_state_error)
51 }
52
53 async fn del(&self, key: &SessionKey) -> GResult<()> {
54 let tenant = tenant_ctx_from_key(key)?;
55 let state_key = derive_state_key(key);
56 self.store
57 .del(&tenant, STATE_PREFIX, &state_key)
58 .map_err(map_state_error)?;
59 Ok(())
60 }
61
62 async fn del_prefix(&self, _key_prefix: &str) -> GResult<()> {
63 Ok(())
65 }
66}
67
68fn tenant_ctx_from_key(key: &SessionKey) -> GResult<TenantCtx> {
69 let (env, tenant) = key
70 .tenant_key
71 .split_once("::")
72 .ok_or_else(|| RunnerError::State {
73 reason: format!("invalid tenant descriptor '{}'", key.tenant_key),
74 })?;
75 let env_id = EnvId::from_str(env).map_err(|err| RunnerError::State {
76 reason: format!("invalid env id {env}: {err}"),
77 })?;
78 let tenant_id = TenantId::from_str(tenant).map_err(|err| RunnerError::State {
79 reason: format!("invalid tenant id {tenant}: {err}"),
80 })?;
81 Ok(TenantCtx::new(env_id, tenant_id))
82}
83
84fn derive_state_key(key: &SessionKey) -> StoreStateKey {
85 let hint = key.session_hint.as_deref().unwrap_or("-");
86 StoreStateKey::from(format!("{}::{hint}", key.flow_id))
87}
88
89fn map_state_error(err: greentic_types::GreenticError) -> RunnerError {
90 RunnerError::State {
91 reason: err.to_string(),
92 }
93}