Skip to main content

gsm_core/
provider_install_store.rs

1use std::collections::{BTreeMap, HashMap};
2use std::fs;
3use std::hash::{Hash, Hasher};
4use std::path::Path;
5use std::sync::RwLock;
6
7use anyhow::{Context, Result};
8use greentic_types::{ProviderInstallId, ProviderInstallRecord, TenantCtx};
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11
12pub const PROVIDER_ID_KEY: &str = "provider_id";
13pub const INSTALL_ID_KEY: &str = "install_id";
14pub const PROVIDER_CONFIG_REFS_KEY: &str = "provider_config_refs";
15pub const PROVIDER_SECRET_REFS_KEY: &str = "provider_secret_refs";
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct ProviderInstallState {
19    pub record: ProviderInstallRecord,
20    pub config: BTreeMap<String, Value>,
21    pub secrets: BTreeMap<String, String>,
22}
23
24impl ProviderInstallState {
25    pub fn new(record: ProviderInstallRecord) -> Self {
26        Self {
27            record,
28            config: BTreeMap::new(),
29            secrets: BTreeMap::new(),
30        }
31    }
32
33    pub fn with_config(mut self, config: BTreeMap<String, Value>) -> Self {
34        self.config = config;
35        self
36    }
37
38    pub fn with_secrets(mut self, secrets: BTreeMap<String, String>) -> Self {
39        self.secrets = secrets;
40        self
41    }
42
43    pub fn routing(&self) -> Option<ProviderInstallRouting> {
44        let routing = self.record.metadata.get("routing")?.clone();
45        serde_json::from_value(routing).ok()
46    }
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize, Default)]
50pub struct ProviderInstallRouting {
51    #[serde(default)]
52    pub platform: Option<String>,
53    #[serde(default)]
54    pub channel_id: Option<String>,
55}
56
57impl ProviderInstallRouting {
58    pub fn matches(&self, platform: &str, channel_id: &str) -> bool {
59        let platform_ok = self
60            .platform
61            .as_deref()
62            .map(|p| p.eq_ignore_ascii_case(platform))
63            .unwrap_or(true);
64        let channel_ok = self
65            .channel_id
66            .as_deref()
67            .map(|c| c == channel_id)
68            .unwrap_or(false);
69        platform_ok && channel_ok
70    }
71}
72
73#[derive(Debug, thiserror::Error)]
74pub enum ProviderInstallError {
75    #[error("missing provider install {provider_id}/{install_id}")]
76    MissingInstall {
77        provider_id: String,
78        install_id: String,
79    },
80    #[error("invalid webhook signature (header {header})")]
81    InvalidSignature { header: String },
82    #[error("missing provider install secret {key}")]
83    MissingSecret { key: String },
84    #[error("missing provider install config {key}")]
85    MissingConfig { key: String },
86    #[error("missing provider install route")]
87    MissingRoute,
88}
89
90pub trait ProviderInstallStore: Send + Sync {
91    fn insert(&self, state: ProviderInstallState);
92    fn get(
93        &self,
94        tenant: &TenantCtx,
95        provider_id: &str,
96        install_id: &ProviderInstallId,
97    ) -> Option<ProviderInstallState>;
98    fn find_by_routing(
99        &self,
100        tenant: &TenantCtx,
101        provider_id: &str,
102        platform: &str,
103        channel_id: &str,
104    ) -> Option<ProviderInstallState>;
105    fn all(&self) -> Vec<ProviderInstallState>;
106}
107
108#[derive(Default)]
109pub struct InMemoryProviderInstallStore {
110    records: RwLock<HashMap<InstallKey, ProviderInstallState>>,
111    routing: RwLock<HashMap<RoutingKey, InstallKey>>,
112}
113
114impl InMemoryProviderInstallStore {
115    fn install_key(
116        tenant: &TenantCtx,
117        provider_id: &str,
118        install_id: &ProviderInstallId,
119    ) -> InstallKey {
120        InstallKey {
121            tenant: TenantKey::from_ctx(tenant),
122            provider_id: provider_id.to_string(),
123            install_id: install_id.to_string(),
124        }
125    }
126}
127
128impl ProviderInstallStore for InMemoryProviderInstallStore {
129    fn insert(&self, state: ProviderInstallState) {
130        let record = &state.record;
131        let key = InstallKey {
132            tenant: TenantKey::from_ctx(&record.tenant),
133            provider_id: record.provider_id.clone(),
134            install_id: record.install_id.to_string(),
135        };
136        if let Some(routing) = state.routing()
137            && let (Some(platform), Some(channel_id)) =
138                (routing.platform.clone(), routing.channel_id.clone())
139        {
140            let route_key = RoutingKey {
141                tenant: key.tenant.clone(),
142                provider_id: key.provider_id.clone(),
143                platform,
144                channel_id,
145            };
146            self.routing
147                .write()
148                .expect("routing lock poisoned")
149                .insert(route_key, key.clone());
150        }
151        self.records
152            .write()
153            .expect("records lock poisoned")
154            .insert(key, state);
155    }
156
157    fn get(
158        &self,
159        tenant: &TenantCtx,
160        provider_id: &str,
161        install_id: &ProviderInstallId,
162    ) -> Option<ProviderInstallState> {
163        let key = Self::install_key(tenant, provider_id, install_id);
164        self.records
165            .read()
166            .expect("records lock poisoned")
167            .get(&key)
168            .cloned()
169    }
170
171    fn find_by_routing(
172        &self,
173        tenant: &TenantCtx,
174        provider_id: &str,
175        platform: &str,
176        channel_id: &str,
177    ) -> Option<ProviderInstallState> {
178        let route_key = RoutingKey {
179            tenant: TenantKey::from_ctx(tenant),
180            provider_id: provider_id.to_string(),
181            platform: platform.to_string(),
182            channel_id: channel_id.to_string(),
183        };
184        let records = self.records.read().expect("records lock poisoned");
185        let routing = self.routing.read().expect("routing lock poisoned");
186        routing
187            .get(&route_key)
188            .and_then(|install_key| records.get(install_key).cloned())
189    }
190
191    fn all(&self) -> Vec<ProviderInstallState> {
192        self.records
193            .read()
194            .expect("records lock poisoned")
195            .values()
196            .cloned()
197            .collect()
198    }
199}
200
201#[derive(Clone, Debug, Eq)]
202struct TenantKey {
203    env: String,
204    tenant: String,
205    team: Option<String>,
206}
207
208impl TenantKey {
209    fn from_ctx(tenant: &TenantCtx) -> Self {
210        Self {
211            env: tenant.env.as_str().to_string(),
212            tenant: tenant.tenant.as_str().to_string(),
213            team: tenant.team.as_ref().map(|team| team.as_str().to_string()),
214        }
215    }
216}
217
218impl PartialEq for TenantKey {
219    fn eq(&self, other: &Self) -> bool {
220        self.env == other.env && self.tenant == other.tenant && self.team == other.team
221    }
222}
223
224impl Hash for TenantKey {
225    fn hash<H: Hasher>(&self, state: &mut H) {
226        self.env.hash(state);
227        self.tenant.hash(state);
228        self.team.hash(state);
229    }
230}
231
232#[derive(Clone, Debug, Eq, PartialEq, Hash)]
233struct InstallKey {
234    tenant: TenantKey,
235    provider_id: String,
236    install_id: String,
237}
238
239#[derive(Clone, Debug, Eq, PartialEq, Hash)]
240struct RoutingKey {
241    tenant: TenantKey,
242    provider_id: String,
243    platform: String,
244    channel_id: String,
245}
246
247pub fn apply_install_refs(meta: &mut BTreeMap<String, Value>, record: &ProviderInstallRecord) {
248    meta.insert(
249        PROVIDER_ID_KEY.to_string(),
250        Value::String(record.provider_id.clone()),
251    );
252    meta.insert(
253        INSTALL_ID_KEY.to_string(),
254        Value::String(record.install_id.to_string()),
255    );
256    if !record.config_refs.is_empty() {
257        meta.insert(
258            PROVIDER_CONFIG_REFS_KEY.to_string(),
259            serde_json::to_value(&record.config_refs).unwrap_or(Value::Null),
260        );
261    }
262    if !record.secret_refs.is_empty() {
263        meta.insert(
264            PROVIDER_SECRET_REFS_KEY.to_string(),
265            serde_json::to_value(&record.secret_refs).unwrap_or(Value::Null),
266        );
267    }
268}
269
270#[derive(Debug, Clone, Default, Serialize, Deserialize)]
271pub struct ProviderInstallStoreSnapshot {
272    #[serde(default)]
273    pub records: Vec<ProviderInstallRecord>,
274    #[serde(default)]
275    pub states: Vec<ProviderInstallState>,
276}
277
278pub fn load_install_store_from_path(path: &Path) -> Result<InMemoryProviderInstallStore> {
279    let raw = fs::read_to_string(path)
280        .with_context(|| format!("read install store {}", path.display()))?;
281    let snapshot: ProviderInstallStoreSnapshot = serde_json::from_str(&raw)
282        .with_context(|| format!("parse install store {}", path.display()))?;
283    let store = InMemoryProviderInstallStore::default();
284    let states = if snapshot.states.is_empty() {
285        snapshot
286            .records
287            .into_iter()
288            .map(ProviderInstallState::new)
289            .collect()
290    } else {
291        snapshot.states
292    };
293    for state in states {
294        store.insert(state);
295    }
296    Ok(store)
297}
298
299pub fn extract_provider_route(
300    meta: &BTreeMap<String, Value>,
301) -> Option<(String, ProviderInstallId)> {
302    let provider_id = meta.get(PROVIDER_ID_KEY)?.as_str()?.to_string();
303    let install_id = meta.get(INSTALL_ID_KEY)?.as_str()?.parse().ok()?;
304    Some((provider_id, install_id))
305}