greentic_operator/subscriptions_universal/
store.rs1use std::{fs, path::PathBuf};
2
3use anyhow::{Context, Result};
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6
7use crate::runtime_state;
8
9pub use greentic_types::messaging::universal_dto::AuthUserRefV1;
10
11#[derive(Clone, Debug, Serialize, Deserialize)]
12pub struct SubscriptionState {
13 pub binding_id: String,
14 pub provider: String,
15 pub tenant: String,
16 pub team: Option<String>,
17 #[serde(default)]
18 pub resource: Option<String>,
19 #[serde(default)]
20 pub change_types: Vec<String>,
21 #[serde(default)]
22 pub notification_url: Option<String>,
23 #[serde(default)]
24 pub client_state: Option<String>,
25 #[serde(default)]
26 pub user: Option<AuthUserRefV1>,
27 #[serde(default)]
28 pub subscription_id: Option<String>,
29 #[serde(default)]
30 pub expiration_unix_ms: Option<i64>,
31 #[serde(default)]
32 pub last_error: Option<String>,
33}
34
35#[allow(clippy::too_many_arguments)]
36impl SubscriptionState {
37 pub fn from_provider_result(
38 provider: &str,
39 tenant: &str,
40 team: Option<String>,
41 binding_id: &str,
42 resource: Option<&String>,
43 change_types: &[String],
44 notification_url: Option<&String>,
45 client_state: Option<&String>,
46 user: Option<&AuthUserRefV1>,
47 response: Option<&Value>,
48 ) -> Self {
49 let payload = flatten_subscription_response(response);
50 let subscription_id = payload
51 .and_then(|value| value.get("subscription_id"))
52 .and_then(|value| value.as_str())
53 .map(|value| value.to_string());
54 let expiration_unix_ms = payload
55 .and_then(|value| value.get("expiration_unix_ms"))
56 .and_then(|value| value.as_i64());
57 let last_error = payload
58 .and_then(|value| value.get("last_error"))
59 .and_then(|value| value.as_str())
60 .map(|value| value.to_string());
61 Self {
62 binding_id: binding_id.to_string(),
63 provider: provider.to_string(),
64 tenant: tenant.to_string(),
65 team,
66 resource: resource.cloned(),
67 change_types: change_types.to_vec(),
68 notification_url: notification_url.cloned(),
69 client_state: client_state.cloned(),
70 user: user.cloned(),
71 subscription_id,
72 expiration_unix_ms,
73 last_error,
74 }
75 }
76}
77
78fn flatten_subscription_response(response: Option<&Value>) -> Option<&Value> {
79 let source = response?;
80 source.get("subscription").or(Some(source))
81}
82
83#[derive(Clone)]
84pub struct SubscriptionStore {
85 base: PathBuf,
86}
87
88impl SubscriptionStore {
89 pub fn new(base: impl Into<PathBuf>) -> Self {
90 Self { base: base.into() }
91 }
92
93 pub fn state_path(
94 &self,
95 provider: &str,
96 tenant: &str,
97 team: Option<&str>,
98 binding_id: &str,
99 ) -> PathBuf {
100 let team_dir = team.unwrap_or("default");
101 self.base
102 .join(provider)
103 .join(tenant)
104 .join(team_dir)
105 .join(format!("{binding_id}.json"))
106 }
107
108 pub fn write_state(&self, state: &SubscriptionState) -> Result<()> {
109 let path = self.state_path(
110 &state.provider,
111 &state.tenant,
112 state.team.as_deref(),
113 &state.binding_id,
114 );
115 runtime_state::write_json(&path, state)
116 .with_context(|| format!("failed to write subscription state to {path:?}"))
117 }
118
119 pub fn read_state(
120 &self,
121 provider: &str,
122 tenant: &str,
123 team: Option<&str>,
124 binding_id: &str,
125 ) -> Result<Option<SubscriptionState>> {
126 let path = self.state_path(provider, tenant, team, binding_id);
127 runtime_state::read_json(&path)
128 .with_context(|| format!("failed to read subscription state from {path:?}"))
129 }
130
131 pub fn list_states(&self) -> Result<Vec<SubscriptionState>> {
132 let mut states = Vec::new();
133 if !self.base.exists() {
134 return Ok(states);
135 }
136 self.collect_states(&self.base, &mut states)?;
137 Ok(states)
138 }
139
140 #[allow(clippy::collapsible_if)]
141 #[allow(clippy::only_used_in_recursion)]
142 fn collect_states(&self, dir: &PathBuf, states: &mut Vec<SubscriptionState>) -> Result<()> {
143 for entry in fs::read_dir(dir).with_context(|| format!("reading {}", dir.display()))? {
144 let entry = entry?;
145 let path = entry.path();
146 if path.is_dir() {
147 self.collect_states(&path, states)?;
148 } else if path
149 .extension()
150 .and_then(|value| value.to_str())
151 .map(|value| value.eq_ignore_ascii_case("json"))
152 .unwrap_or(false)
153 {
154 if let Some(state) = runtime_state::read_json(&path)? {
155 states.push(state);
156 }
157 }
158 }
159 Ok(())
160 }
161
162 pub fn delete_state(&self, state: &SubscriptionState) -> Result<()> {
163 let path = self.state_path(
164 &state.provider,
165 &state.tenant,
166 state.team.as_deref(),
167 &state.binding_id,
168 );
169 if path.exists() {
170 fs::remove_file(&path)
171 .with_context(|| format!("failed to delete {}", path.display()))?;
172 }
173 Ok(())
174 }
175}