greentic_operator/subscriptions_universal/
service.rs1use anyhow::{Result, anyhow};
2use greentic_types::messaging::universal_dto::{
3 AuthUserRefV1, SubscriptionDeleteInV1, SubscriptionEnsureInV1, SubscriptionRenewInV1,
4};
5use serde_json::to_vec;
6
7use crate::demo::runner_host::{DemoRunnerHost, FlowOutcome, OperatorContext};
8use crate::domains::Domain;
9use crate::subscriptions_universal::store::SubscriptionState;
10
11pub trait ProviderRunner {
12 fn invoke(
13 &self,
14 provider: &str,
15 op: &str,
16 payload: &[u8],
17 context: &OperatorContext,
18 ) -> Result<FlowOutcome>;
19}
20
21impl ProviderRunner for DemoRunnerHost {
22 fn invoke(
23 &self,
24 provider: &str,
25 op: &str,
26 payload: &[u8],
27 context: &OperatorContext,
28 ) -> Result<FlowOutcome> {
29 self.invoke_provider_op(Domain::Messaging, provider, op, payload, context)
30 }
31}
32
33#[derive(Clone, Debug)]
34pub struct SubscriptionEnsureRequest {
35 pub binding_id: String,
36 pub resource: Option<String>,
37 pub change_types: Vec<String>,
38 pub notification_url: Option<String>,
39 pub client_state: Option<String>,
40 pub user: Option<AuthUserRefV1>,
41 pub expiration_target_unix_ms: Option<u64>,
42}
43
44#[derive(Clone, Debug)]
45pub struct SubscriptionRenewRequest {
46 pub binding_id: String,
47 pub subscription_id: Option<String>,
48 pub user: Option<AuthUserRefV1>,
49 pub resource: Option<String>,
50 pub change_types: Vec<String>,
51 pub expiration_target_unix_ms: Option<u64>,
52}
53
54#[derive(Clone, Debug)]
55pub struct SubscriptionDeleteRequest {
56 pub binding_id: String,
57 pub subscription_id: Option<String>,
58 pub user: Option<AuthUserRefV1>,
59}
60
61pub struct SubscriptionService<R: ProviderRunner> {
62 runner_host: R,
63 context: OperatorContext,
64}
65
66impl<R: ProviderRunner> SubscriptionService<R> {
67 pub fn new(runner_host: R, context: OperatorContext) -> Self {
68 Self {
69 runner_host,
70 context,
71 }
72 }
73
74 pub fn ensure_once(
75 &self,
76 provider: &str,
77 request: &SubscriptionEnsureRequest,
78 ) -> Result<SubscriptionState> {
79 let dto = self.build_ensure_payload(provider, request)?;
80 let payload = to_vec(&dto)?;
81 let outcome =
82 self.runner_host
83 .invoke(provider, "subscription_ensure", &payload, &self.context)?;
84 let outcome = Self::ensure_success(outcome)?;
85 let state = SubscriptionState::from_provider_result(
86 provider,
87 &self.context.tenant,
88 self.context.team.clone(),
89 &request.binding_id,
90 request.resource.as_ref(),
91 &request.change_types,
92 request.notification_url.as_ref(),
93 request.client_state.as_ref(),
94 request.user.as_ref(),
95 outcome.output.as_ref(),
96 );
97 Ok(state)
98 }
99
100 pub fn renew_once(
101 &self,
102 provider: &str,
103 request: &SubscriptionRenewRequest,
104 ) -> Result<SubscriptionState> {
105 let dto = self.build_renew_payload(provider, request)?;
106 let payload = to_vec(&dto)?;
107 let outcome =
108 self.runner_host
109 .invoke(provider, "subscription_renew", &payload, &self.context)?;
110 let outcome = Self::ensure_success(outcome)?;
111 let state = SubscriptionState::from_provider_result(
112 provider,
113 &self.context.tenant,
114 self.context.team.clone(),
115 &request.binding_id,
116 request.resource.as_ref(),
117 &request.change_types,
118 None,
119 None,
120 request.user.as_ref(),
121 outcome.output.as_ref(),
122 );
123 Ok(state)
124 }
125
126 pub fn delete_once(&self, provider: &str, request: &SubscriptionDeleteRequest) -> Result<()> {
127 let dto = self.build_delete_payload(provider, request)?;
128 let payload = to_vec(&dto)?;
129 let outcome =
130 self.runner_host
131 .invoke(provider, "subscription_delete", &payload, &self.context)?;
132 let _ = Self::ensure_success(outcome)?;
133 Ok(())
134 }
135
136 fn build_ensure_payload(
137 &self,
138 provider: &str,
139 request: &SubscriptionEnsureRequest,
140 ) -> Result<SubscriptionEnsureInV1> {
141 let resource = request
142 .resource
143 .as_ref()
144 .ok_or_else(|| anyhow!("resource is required for subscription ensure"))?;
145 let notification_url = request
146 .notification_url
147 .as_ref()
148 .ok_or_else(|| anyhow!("notification_url is required for subscription ensure"))?;
149 let change_types = if request.change_types.is_empty() {
150 vec!["created".to_string()]
151 } else {
152 request.change_types.clone()
153 };
154 Ok(SubscriptionEnsureInV1 {
155 v: 1,
156 provider: provider.to_string(),
157 tenant_hint: Some(self.context.tenant.clone()),
158 team_hint: self.context.team.clone(),
159 binding_id: Some(request.binding_id.clone()),
160 resource: resource.clone(),
161 change_types,
162 notification_url: notification_url.clone(),
163 expiration_minutes: None,
164 expiration_target_unix_ms: request.expiration_target_unix_ms,
165 client_state: request.client_state.clone(),
166 metadata: None,
167 user: request
168 .user
169 .clone()
170 .unwrap_or_else(|| self.default_user_ref()),
171 })
172 }
173
174 fn build_renew_payload(
175 &self,
176 provider: &str,
177 request: &SubscriptionRenewRequest,
178 ) -> Result<SubscriptionRenewInV1> {
179 let subscription_id = request
180 .subscription_id
181 .clone()
182 .ok_or_else(|| anyhow!("subscription_id is required to renew a binding"))?;
183 Ok(SubscriptionRenewInV1 {
184 v: 1,
185 provider: provider.to_string(),
186 subscription_id,
187 expiration_minutes: None,
188 expiration_target_unix_ms: request.expiration_target_unix_ms,
189 metadata: None,
190 user: request
191 .user
192 .clone()
193 .unwrap_or_else(|| self.default_user_ref()),
194 })
195 }
196
197 fn build_delete_payload(
198 &self,
199 provider: &str,
200 request: &SubscriptionDeleteRequest,
201 ) -> Result<SubscriptionDeleteInV1> {
202 let subscription_id = request
203 .subscription_id
204 .clone()
205 .ok_or_else(|| anyhow!("subscription_id is required to delete a binding"))?;
206 Ok(SubscriptionDeleteInV1 {
207 v: 1,
208 provider: provider.to_string(),
209 subscription_id,
210 user: request
211 .user
212 .clone()
213 .unwrap_or_else(|| self.default_user_ref()),
214 })
215 }
216
217 fn ensure_success(outcome: FlowOutcome) -> Result<FlowOutcome> {
218 if outcome.success {
219 Ok(outcome)
220 } else {
221 let error = outcome
222 .error
223 .unwrap_or_else(|| "provider returned failure".to_string());
224 Err(anyhow!("{error}"))
225 }
226 }
227
228 fn default_user_ref(&self) -> AuthUserRefV1 {
229 let team_hint = self
230 .context
231 .team
232 .clone()
233 .unwrap_or_else(|| "default".to_string());
234 AuthUserRefV1 {
235 user_id: format!("{}-{}", self.context.tenant, team_hint),
236 token_key: format!("operator-{}", team_hint),
237 tenant_id: Some(self.context.tenant.clone()),
238 email: None,
239 display_name: None,
240 }
241 }
242}