Skip to main content

greentic_operator/subscriptions_universal/
store.rs

1use std::{fs, path::PathBuf};
2
3use anyhow::{Context, Result};
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6
7use crate::runtime_state;
8
9pub use greentic_types::messaging::universal_dto::AuthUserRefV1;
10
11#[derive(Clone, Debug, Serialize, Deserialize)]
12pub struct SubscriptionState {
13    pub binding_id: String,
14    pub provider: String,
15    pub tenant: String,
16    pub team: Option<String>,
17    #[serde(default)]
18    pub resource: Option<String>,
19    #[serde(default)]
20    pub change_types: Vec<String>,
21    #[serde(default)]
22    pub notification_url: Option<String>,
23    #[serde(default)]
24    pub client_state: Option<String>,
25    #[serde(default)]
26    pub user: Option<AuthUserRefV1>,
27    #[serde(default)]
28    pub subscription_id: Option<String>,
29    #[serde(default)]
30    pub expiration_unix_ms: Option<i64>,
31    #[serde(default)]
32    pub last_error: Option<String>,
33}
34
35#[allow(clippy::too_many_arguments)]
36impl SubscriptionState {
37    pub fn from_provider_result(
38        provider: &str,
39        tenant: &str,
40        team: Option<String>,
41        binding_id: &str,
42        resource: Option<&String>,
43        change_types: &[String],
44        notification_url: Option<&String>,
45        client_state: Option<&String>,
46        user: Option<&AuthUserRefV1>,
47        response: Option<&Value>,
48    ) -> Self {
49        let payload = flatten_subscription_response(response);
50        let subscription_id = payload
51            .and_then(|value| value.get("subscription_id"))
52            .and_then(|value| value.as_str())
53            .map(|value| value.to_string());
54        let expiration_unix_ms = payload
55            .and_then(|value| value.get("expiration_unix_ms"))
56            .and_then(|value| value.as_i64());
57        let last_error = payload
58            .and_then(|value| value.get("last_error"))
59            .and_then(|value| value.as_str())
60            .map(|value| value.to_string());
61        Self {
62            binding_id: binding_id.to_string(),
63            provider: provider.to_string(),
64            tenant: tenant.to_string(),
65            team,
66            resource: resource.cloned(),
67            change_types: change_types.to_vec(),
68            notification_url: notification_url.cloned(),
69            client_state: client_state.cloned(),
70            user: user.cloned(),
71            subscription_id,
72            expiration_unix_ms,
73            last_error,
74        }
75    }
76}
77
78fn flatten_subscription_response(response: Option<&Value>) -> Option<&Value> {
79    let source = response?;
80    source.get("subscription").or(Some(source))
81}
82
83#[derive(Clone)]
84pub struct SubscriptionStore {
85    base: PathBuf,
86}
87
88impl SubscriptionStore {
89    pub fn new(base: impl Into<PathBuf>) -> Self {
90        Self { base: base.into() }
91    }
92
93    pub fn state_path(
94        &self,
95        provider: &str,
96        tenant: &str,
97        team: Option<&str>,
98        binding_id: &str,
99    ) -> PathBuf {
100        let team_dir = team.unwrap_or("default");
101        self.base
102            .join(provider)
103            .join(tenant)
104            .join(team_dir)
105            .join(format!("{binding_id}.json"))
106    }
107
108    pub fn write_state(&self, state: &SubscriptionState) -> Result<()> {
109        let path = self.state_path(
110            &state.provider,
111            &state.tenant,
112            state.team.as_deref(),
113            &state.binding_id,
114        );
115        runtime_state::write_json(&path, state)
116            .with_context(|| format!("failed to write subscription state to {path:?}"))
117    }
118
119    pub fn read_state(
120        &self,
121        provider: &str,
122        tenant: &str,
123        team: Option<&str>,
124        binding_id: &str,
125    ) -> Result<Option<SubscriptionState>> {
126        let path = self.state_path(provider, tenant, team, binding_id);
127        runtime_state::read_json(&path)
128            .with_context(|| format!("failed to read subscription state from {path:?}"))
129    }
130
131    pub fn list_states(&self) -> Result<Vec<SubscriptionState>> {
132        let mut states = Vec::new();
133        if !self.base.exists() {
134            return Ok(states);
135        }
136        self.collect_states(&self.base, &mut states)?;
137        Ok(states)
138    }
139
140    #[allow(clippy::collapsible_if)]
141    #[allow(clippy::only_used_in_recursion)]
142    fn collect_states(&self, dir: &PathBuf, states: &mut Vec<SubscriptionState>) -> Result<()> {
143        for entry in fs::read_dir(dir).with_context(|| format!("reading {}", dir.display()))? {
144            let entry = entry?;
145            let path = entry.path();
146            if path.is_dir() {
147                self.collect_states(&path, states)?;
148            } else if path
149                .extension()
150                .and_then(|value| value.to_str())
151                .map(|value| value.eq_ignore_ascii_case("json"))
152                .unwrap_or(false)
153            {
154                if let Some(state) = runtime_state::read_json(&path)? {
155                    states.push(state);
156                }
157            }
158        }
159        Ok(())
160    }
161
162    pub fn delete_state(&self, state: &SubscriptionState) -> Result<()> {
163        let path = self.state_path(
164            &state.provider,
165            &state.tenant,
166            state.team.as_deref(),
167            &state.binding_id,
168        );
169        if path.exists() {
170            fs::remove_file(&path)
171                .with_context(|| format!("failed to delete {}", path.display()))?;
172        }
173        Ok(())
174    }
175}