Skip to main content

greentic_operator/subscriptions_universal/
service.rs

1use anyhow::{Result, anyhow};
2use greentic_types::messaging::universal_dto::{
3    AuthUserRefV1, SubscriptionDeleteInV1, SubscriptionEnsureInV1, SubscriptionRenewInV1,
4};
5use serde_json::to_vec;
6
7use crate::demo::runner_host::{DemoRunnerHost, FlowOutcome, OperatorContext};
8use crate::domains::Domain;
9use crate::subscriptions_universal::store::SubscriptionState;
10
11pub trait ProviderRunner {
12    fn invoke(
13        &self,
14        provider: &str,
15        op: &str,
16        payload: &[u8],
17        context: &OperatorContext,
18    ) -> Result<FlowOutcome>;
19}
20
21impl ProviderRunner for DemoRunnerHost {
22    fn invoke(
23        &self,
24        provider: &str,
25        op: &str,
26        payload: &[u8],
27        context: &OperatorContext,
28    ) -> Result<FlowOutcome> {
29        self.invoke_provider_op(Domain::Messaging, provider, op, payload, context)
30    }
31}
32
33#[derive(Clone, Debug)]
34pub struct SubscriptionEnsureRequest {
35    pub binding_id: String,
36    pub resource: Option<String>,
37    pub change_types: Vec<String>,
38    pub notification_url: Option<String>,
39    pub client_state: Option<String>,
40    pub user: Option<AuthUserRefV1>,
41    pub expiration_target_unix_ms: Option<u64>,
42}
43
44#[derive(Clone, Debug)]
45pub struct SubscriptionRenewRequest {
46    pub binding_id: String,
47    pub subscription_id: Option<String>,
48    pub user: Option<AuthUserRefV1>,
49    pub resource: Option<String>,
50    pub change_types: Vec<String>,
51    pub expiration_target_unix_ms: Option<u64>,
52}
53
54#[derive(Clone, Debug)]
55pub struct SubscriptionDeleteRequest {
56    pub binding_id: String,
57    pub subscription_id: Option<String>,
58    pub user: Option<AuthUserRefV1>,
59}
60
61pub struct SubscriptionService<R: ProviderRunner> {
62    runner_host: R,
63    context: OperatorContext,
64}
65
66impl<R: ProviderRunner> SubscriptionService<R> {
67    pub fn new(runner_host: R, context: OperatorContext) -> Self {
68        Self {
69            runner_host,
70            context,
71        }
72    }
73
74    pub fn ensure_once(
75        &self,
76        provider: &str,
77        request: &SubscriptionEnsureRequest,
78    ) -> Result<SubscriptionState> {
79        let dto = self.build_ensure_payload(provider, request)?;
80        let payload = to_vec(&dto)?;
81        let outcome =
82            self.runner_host
83                .invoke(provider, "subscription_ensure", &payload, &self.context)?;
84        let outcome = Self::ensure_success(outcome)?;
85        let state = SubscriptionState::from_provider_result(
86            provider,
87            &self.context.tenant,
88            self.context.team.clone(),
89            &request.binding_id,
90            request.resource.as_ref(),
91            &request.change_types,
92            request.notification_url.as_ref(),
93            request.client_state.as_ref(),
94            request.user.as_ref(),
95            outcome.output.as_ref(),
96        );
97        Ok(state)
98    }
99
100    pub fn renew_once(
101        &self,
102        provider: &str,
103        request: &SubscriptionRenewRequest,
104    ) -> Result<SubscriptionState> {
105        let dto = self.build_renew_payload(provider, request)?;
106        let payload = to_vec(&dto)?;
107        let outcome =
108            self.runner_host
109                .invoke(provider, "subscription_renew", &payload, &self.context)?;
110        let outcome = Self::ensure_success(outcome)?;
111        let state = SubscriptionState::from_provider_result(
112            provider,
113            &self.context.tenant,
114            self.context.team.clone(),
115            &request.binding_id,
116            request.resource.as_ref(),
117            &request.change_types,
118            None,
119            None,
120            request.user.as_ref(),
121            outcome.output.as_ref(),
122        );
123        Ok(state)
124    }
125
126    pub fn delete_once(&self, provider: &str, request: &SubscriptionDeleteRequest) -> Result<()> {
127        let dto = self.build_delete_payload(provider, request)?;
128        let payload = to_vec(&dto)?;
129        let outcome =
130            self.runner_host
131                .invoke(provider, "subscription_delete", &payload, &self.context)?;
132        let _ = Self::ensure_success(outcome)?;
133        Ok(())
134    }
135
136    fn build_ensure_payload(
137        &self,
138        provider: &str,
139        request: &SubscriptionEnsureRequest,
140    ) -> Result<SubscriptionEnsureInV1> {
141        let resource = request
142            .resource
143            .as_ref()
144            .ok_or_else(|| anyhow!("resource is required for subscription ensure"))?;
145        let notification_url = request
146            .notification_url
147            .as_ref()
148            .ok_or_else(|| anyhow!("notification_url is required for subscription ensure"))?;
149        let change_types = if request.change_types.is_empty() {
150            vec!["created".to_string()]
151        } else {
152            request.change_types.clone()
153        };
154        Ok(SubscriptionEnsureInV1 {
155            v: 1,
156            provider: provider.to_string(),
157            tenant_hint: Some(self.context.tenant.clone()),
158            team_hint: self.context.team.clone(),
159            binding_id: Some(request.binding_id.clone()),
160            resource: resource.clone(),
161            change_types,
162            notification_url: notification_url.clone(),
163            expiration_minutes: None,
164            expiration_target_unix_ms: request.expiration_target_unix_ms,
165            client_state: request.client_state.clone(),
166            metadata: None,
167            user: request
168                .user
169                .clone()
170                .unwrap_or_else(|| self.default_user_ref()),
171        })
172    }
173
174    fn build_renew_payload(
175        &self,
176        provider: &str,
177        request: &SubscriptionRenewRequest,
178    ) -> Result<SubscriptionRenewInV1> {
179        let subscription_id = request
180            .subscription_id
181            .clone()
182            .ok_or_else(|| anyhow!("subscription_id is required to renew a binding"))?;
183        Ok(SubscriptionRenewInV1 {
184            v: 1,
185            provider: provider.to_string(),
186            subscription_id,
187            expiration_minutes: None,
188            expiration_target_unix_ms: request.expiration_target_unix_ms,
189            metadata: None,
190            user: request
191                .user
192                .clone()
193                .unwrap_or_else(|| self.default_user_ref()),
194        })
195    }
196
197    fn build_delete_payload(
198        &self,
199        provider: &str,
200        request: &SubscriptionDeleteRequest,
201    ) -> Result<SubscriptionDeleteInV1> {
202        let subscription_id = request
203            .subscription_id
204            .clone()
205            .ok_or_else(|| anyhow!("subscription_id is required to delete a binding"))?;
206        Ok(SubscriptionDeleteInV1 {
207            v: 1,
208            provider: provider.to_string(),
209            subscription_id,
210            user: request
211                .user
212                .clone()
213                .unwrap_or_else(|| self.default_user_ref()),
214        })
215    }
216
217    fn ensure_success(outcome: FlowOutcome) -> Result<FlowOutcome> {
218        if outcome.success {
219            Ok(outcome)
220        } else {
221            let error = outcome
222                .error
223                .unwrap_or_else(|| "provider returned failure".to_string());
224            Err(anyhow!("{error}"))
225        }
226    }
227
228    fn default_user_ref(&self) -> AuthUserRefV1 {
229        let team_hint = self
230            .context
231            .team
232            .clone()
233            .unwrap_or_else(|| "default".to_string());
234        AuthUserRefV1 {
235            user_id: format!("{}-{}", self.context.tenant, team_hint),
236            token_key: format!("operator-{}", team_hint),
237            tenant_id: Some(self.context.tenant.clone()),
238            email: None,
239            display_name: None,
240        }
241    }
242}