Skip to main content

systemprompt_cloud/checkout/
client.rs

1use anyhow::{Result, anyhow};
2use axum::Router;
3use axum::extract::{Path, Query, State};
4use axum::response::{Html, Json};
5use axum::routing::get;
6use futures::StreamExt;
7use serde::{Deserialize, Serialize};
8use std::sync::Arc;
9use std::time::Duration;
10use systemprompt_logging::CliService;
11use tokio::sync::{Mutex, oneshot};
12
13use crate::CloudApiClient;
14use crate::api_client::{CheckoutEvent, ProvisioningEventType};
15use crate::constants::checkout::{CALLBACK_PORT, CALLBACK_TIMEOUT_SECS};
16
17#[derive(Debug, Deserialize)]
18struct CallbackParams {
19    transaction_id: Option<String>,
20    tenant_id: Option<String>,
21    status: Option<String>,
22    error: Option<String>,
23    checkout_session_id: Option<String>,
24}
25
26#[derive(Debug, Clone, Serialize)]
27struct StatusResponse {
28    status: String,
29    message: Option<String>,
30    app_url: Option<String>,
31}
32
33#[derive(Debug, Clone)]
34pub struct CheckoutCallbackResult {
35    pub transaction_id: String,
36    pub tenant_id: String,
37    pub fly_app_name: Option<String>,
38    pub needs_deploy: bool,
39}
40
41#[derive(Debug, Clone, Copy)]
42#[allow(clippy::struct_field_names)]
43pub struct CheckoutTemplates {
44    pub success_html: &'static str,
45    pub error_html: &'static str,
46    pub waiting_html: &'static str,
47}
48
49struct AppState {
50    tx: Arc<Mutex<Option<oneshot::Sender<Result<CheckoutCallbackResult>>>>>,
51    api_client: Arc<CloudApiClient>,
52    success_template: String,
53    error_template: String,
54    waiting_template: String,
55}
56
57pub async fn run_checkout_callback_flow(
58    api_client: &CloudApiClient,
59    checkout_url: &str,
60    templates: CheckoutTemplates,
61) -> Result<CheckoutCallbackResult> {
62    let (tx, rx) = oneshot::channel::<Result<CheckoutCallbackResult>>();
63    let tx = Arc::new(Mutex::new(Some(tx)));
64
65    let state = AppState {
66        tx: Arc::clone(&tx),
67        api_client: Arc::new(CloudApiClient::new(
68            api_client.api_url(),
69            api_client.token(),
70        )?),
71        success_template: templates.success_html.to_string(),
72        error_template: templates.error_html.to_string(),
73        waiting_template: templates.waiting_html.to_string(),
74    };
75
76    let app = Router::new()
77        .route("/callback", get(callback_handler))
78        .route("/status/{tenant_id}", get(status_handler))
79        .with_state(Arc::new(state));
80
81    let addr = format!("127.0.0.1:{CALLBACK_PORT}");
82    let listener = tokio::net::TcpListener::bind(&addr).await?;
83
84    CliService::info(&format!(
85        "Starting checkout callback server on http://{addr}"
86    ));
87
88    CliService::info("Opening Paddle checkout in your browser...");
89    CliService::info(&format!("URL: {checkout_url}"));
90
91    if let Err(e) = open::that(checkout_url) {
92        CliService::warning(&format!("Could not open browser automatically: {e}"));
93        CliService::info("Please open this URL manually:");
94        CliService::key_value("URL", checkout_url);
95    }
96
97    CliService::info("Waiting for checkout completion...");
98    CliService::info(&format!("(timeout in {CALLBACK_TIMEOUT_SECS} seconds)"));
99
100    let server = axum::serve(listener, app);
101
102    tokio::select! {
103        result = rx => {
104            result.map_err(|_| anyhow!("Checkout cancelled"))?
105        }
106        _ = server => {
107            Err(anyhow!("Server stopped unexpectedly"))
108        }
109        () = tokio::time::sleep(Duration::from_secs(CALLBACK_TIMEOUT_SECS)) => {
110            Err(anyhow!("Checkout timed out after {CALLBACK_TIMEOUT_SECS} seconds"))
111        }
112    }
113}
114
115async fn callback_handler(
116    State(state): State<Arc<AppState>>,
117    Query(params): Query<CallbackParams>,
118) -> Html<String> {
119    if let Some(error) = &params.error {
120        tracing::error!(error = %error, "Checkout error from callback");
121        send_result(&state.tx, Err(anyhow!("Checkout error: {}", error))).await;
122        return Html(state.error_template.clone());
123    }
124
125    if let (Some(transaction_id), Some(tenant_id)) =
126        (params.transaction_id.clone(), params.tenant_id.clone())
127    {
128        match params.status.as_deref() {
129            Some("completed") => {
130                let result = Ok(CheckoutCallbackResult {
131                    transaction_id,
132                    tenant_id: tenant_id.clone(),
133                    fly_app_name: None,
134                    needs_deploy: false,
135                });
136                send_result(&state.tx, result).await;
137                let html = state.success_template.replace("{{TENANT_ID}}", &tenant_id);
138                return Html(html);
139            },
140            Some(status) => {
141                send_result(&state.tx, Err(anyhow!("Checkout status: {}", status))).await;
142                return Html(state.error_template.clone());
143            },
144            None => {
145                send_result(
146                    &state.tx,
147                    Err(anyhow!(
148                        "Checkout callback missing required 'status' parameter"
149                    )),
150                )
151                .await;
152                return Html(state.error_template.clone());
153            },
154        }
155    }
156
157    if params.status.as_deref() == Some("pending") {
158        if let Some(checkout_session_id) = params.checkout_session_id.clone() {
159            CliService::info("Payment confirmed, waiting for provisioning...");
160
161            let api_client = Arc::clone(&state.api_client);
162            let tx = Arc::clone(&state.tx);
163            let transaction_id = params
164                .transaction_id
165                .clone()
166                .unwrap_or_else(|| checkout_session_id.clone());
167
168            tokio::spawn(async move {
169                match wait_for_checkout_provisioning(&api_client, &checkout_session_id).await {
170                    Ok(prov_result) => {
171                        let result = Ok(CheckoutCallbackResult {
172                            transaction_id,
173                            tenant_id: prov_result.event.tenant_id,
174                            fly_app_name: prov_result.event.fly_app_name,
175                            needs_deploy: prov_result.needs_deploy,
176                        });
177                        send_result(&tx, result).await;
178                    },
179                    Err(e) => {
180                        send_result(&tx, Err(e)).await;
181                    },
182                }
183            });
184
185            return Html(state.waiting_template.clone());
186        }
187
188        send_result(
189            &state.tx,
190            Err(anyhow!("Pending status but no checkout_session_id")),
191        )
192        .await;
193        return Html(state.error_template.clone());
194    }
195
196    send_result(
197        &state.tx,
198        Err(anyhow!("Missing transaction_id or tenant_id in callback")),
199    )
200    .await;
201    Html(state.error_template.clone())
202}
203
204async fn send_result(
205    tx: &Arc<Mutex<Option<oneshot::Sender<Result<CheckoutCallbackResult>>>>>,
206    result: Result<CheckoutCallbackResult>,
207) {
208    if let Some(sender) = tx.lock().await.take() {
209        if sender.send(result).is_err() {
210            tracing::warn!("Checkout result receiver dropped");
211        }
212    }
213}
214
215struct CheckoutProvisioningResult {
216    event: CheckoutEvent,
217    needs_deploy: bool,
218}
219
220async fn wait_for_checkout_provisioning(
221    client: &CloudApiClient,
222    checkout_session_id: &str,
223) -> Result<CheckoutProvisioningResult> {
224    let mut stream = client.subscribe_checkout_events(checkout_session_id);
225
226    while let Some(event_result) = stream.next().await {
227        match event_result {
228            Ok(event) => {
229                if let Some(msg) = &event.message {
230                    CliService::info(msg);
231                }
232
233                match event.event_type {
234                    ProvisioningEventType::InfrastructureReady => {
235                        return Ok(CheckoutProvisioningResult {
236                            event,
237                            needs_deploy: true,
238                        });
239                    },
240                    ProvisioningEventType::TenantReady => {
241                        return Ok(CheckoutProvisioningResult {
242                            event,
243                            needs_deploy: false,
244                        });
245                    },
246                    ProvisioningEventType::ProvisioningFailed => {
247                        return Err(anyhow!(
248                            "Provisioning failed: {}",
249                            event.message.as_deref().unwrap_or("Unknown error")
250                        ));
251                    },
252                    _ => {},
253                }
254            },
255            Err(e) => {
256                return Err(anyhow!("SSE stream error: {}", e));
257            },
258        }
259    }
260
261    Err(anyhow!("SSE stream closed unexpectedly"))
262}
263
264async fn status_handler(
265    State(state): State<Arc<AppState>>,
266    Path(tenant_id): Path<String>,
267) -> Json<StatusResponse> {
268    match state.api_client.get_tenant_status(&tenant_id).await {
269        Ok(status) => Json(StatusResponse {
270            status: status.status,
271            message: status.message,
272            app_url: status.app_url,
273        }),
274        Err(e) => Json(StatusResponse {
275            status: "error".to_string(),
276            message: Some(e.to_string()),
277            app_url: None,
278        }),
279    }
280}