Skip to main content

systemprompt_cloud/checkout/
client.rs

1use anyhow::{Result, anyhow};
2use axum::Router;
3use axum::extract::{Path, Query, State};
4use axum::response::{Html, Json};
5use axum::routing::get;
6use futures::StreamExt;
7use serde::{Deserialize, Serialize};
8use std::sync::Arc;
9use std::time::Duration;
10use systemprompt_logging::CliService;
11use tokio::sync::{Mutex, oneshot};
12
13use crate::CloudApiClient;
14use crate::api_client::{CheckoutEvent, ProvisioningEventType};
15use crate::constants::checkout::{CALLBACK_PORT, CALLBACK_TIMEOUT_SECS};
16
17#[derive(Debug, Deserialize)]
18struct CallbackParams {
19    transaction_id: Option<String>,
20    tenant_id: Option<String>,
21    status: Option<String>,
22    error: Option<String>,
23    checkout_session_id: Option<String>,
24}
25
26#[derive(Debug, Clone, Serialize)]
27struct StatusResponse {
28    status: String,
29    message: Option<String>,
30    app_url: Option<String>,
31}
32
33#[derive(Debug, Clone)]
34pub struct CheckoutCallbackResult {
35    pub transaction_id: String,
36    // JSON: external vendor identifier
37    pub tenant_id: String,
38    pub fly_app_name: Option<String>,
39    pub needs_deploy: bool,
40}
41
42#[derive(Debug, Clone, Copy)]
43#[allow(clippy::struct_field_names)]
44pub struct CheckoutTemplates {
45    pub success_html: &'static str,
46    pub error_html: &'static str,
47    pub waiting_html: &'static str,
48}
49
50struct AppState {
51    tx: Arc<Mutex<Option<oneshot::Sender<Result<CheckoutCallbackResult>>>>>,
52    api_client: Arc<CloudApiClient>,
53    success_template: String,
54    error_template: String,
55    waiting_template: String,
56}
57
58pub async fn run_checkout_callback_flow(
59    api_client: &CloudApiClient,
60    checkout_url: &str,
61    templates: CheckoutTemplates,
62) -> Result<CheckoutCallbackResult> {
63    let (tx, rx) = oneshot::channel::<Result<CheckoutCallbackResult>>();
64    let tx = Arc::new(Mutex::new(Some(tx)));
65
66    let state = AppState {
67        tx: Arc::clone(&tx),
68        api_client: Arc::new(CloudApiClient::new(
69            api_client.api_url(),
70            api_client.token(),
71        )?),
72        success_template: templates.success_html.to_string(),
73        error_template: templates.error_html.to_string(),
74        waiting_template: templates.waiting_html.to_string(),
75    };
76
77    let app = Router::new()
78        .route("/callback", get(callback_handler))
79        .route("/status/{tenant_id}", get(status_handler))
80        .with_state(Arc::new(state));
81
82    let addr = format!("127.0.0.1:{CALLBACK_PORT}");
83    let listener = tokio::net::TcpListener::bind(&addr).await?;
84
85    CliService::info(&format!(
86        "Starting checkout callback server on http://{addr}"
87    ));
88
89    CliService::info("Opening Paddle checkout in your browser...");
90    CliService::info(&format!("URL: {checkout_url}"));
91
92    if let Err(e) = open::that(checkout_url) {
93        CliService::warning(&format!("Could not open browser automatically: {e}"));
94        CliService::info("Please open this URL manually:");
95        CliService::key_value("URL", checkout_url);
96    }
97
98    CliService::info("Waiting for checkout completion...");
99    CliService::info(&format!("(timeout in {CALLBACK_TIMEOUT_SECS} seconds)"));
100
101    let server = axum::serve(listener, app);
102
103    tokio::select! {
104        result = rx => {
105            result.map_err(|_| anyhow!("Checkout cancelled"))?
106        }
107        _ = server => {
108            Err(anyhow!("Server stopped unexpectedly"))
109        }
110        () = tokio::time::sleep(Duration::from_secs(CALLBACK_TIMEOUT_SECS)) => {
111            Err(anyhow!("Checkout timed out after {CALLBACK_TIMEOUT_SECS} seconds"))
112        }
113    }
114}
115
116async fn callback_handler(
117    State(state): State<Arc<AppState>>,
118    Query(params): Query<CallbackParams>,
119) -> Html<String> {
120    if let Some(error) = &params.error {
121        tracing::error!(error = %error, "Checkout error from callback");
122        send_result(&state.tx, Err(anyhow!("Checkout error: {}", error))).await;
123        return Html(state.error_template.clone());
124    }
125
126    if let (Some(transaction_id), Some(tenant_id)) =
127        (params.transaction_id.clone(), params.tenant_id.clone())
128    {
129        match params.status.as_deref() {
130            Some("completed") => {
131                let result = Ok(CheckoutCallbackResult {
132                    transaction_id,
133                    tenant_id: tenant_id.clone(),
134                    fly_app_name: None,
135                    needs_deploy: false,
136                });
137                send_result(&state.tx, result).await;
138                let html = state.success_template.replace("{{TENANT_ID}}", &tenant_id);
139                return Html(html);
140            },
141            Some(status) => {
142                send_result(&state.tx, Err(anyhow!("Checkout status: {}", status))).await;
143                return Html(state.error_template.clone());
144            },
145            None => {
146                send_result(
147                    &state.tx,
148                    Err(anyhow!(
149                        "Checkout callback missing required 'status' parameter"
150                    )),
151                )
152                .await;
153                return Html(state.error_template.clone());
154            },
155        }
156    }
157
158    if params.status.as_deref() == Some("pending") {
159        if let Some(checkout_session_id) = params.checkout_session_id.clone() {
160            CliService::info("Payment confirmed, waiting for provisioning...");
161
162            let api_client = Arc::clone(&state.api_client);
163            let tx = Arc::clone(&state.tx);
164            let transaction_id = params
165                .transaction_id
166                .clone()
167                .unwrap_or_else(|| checkout_session_id.clone());
168
169            tokio::spawn(async move {
170                match wait_for_checkout_provisioning(&api_client, &checkout_session_id).await {
171                    Ok(prov_result) => {
172                        let result = Ok(CheckoutCallbackResult {
173                            transaction_id,
174                            tenant_id: prov_result.event.tenant_id,
175                            fly_app_name: prov_result.event.fly_app_name,
176                            needs_deploy: prov_result.needs_deploy,
177                        });
178                        send_result(&tx, result).await;
179                    },
180                    Err(e) => {
181                        send_result(&tx, Err(e)).await;
182                    },
183                }
184            });
185
186            return Html(state.waiting_template.clone());
187        }
188
189        send_result(
190            &state.tx,
191            Err(anyhow!("Pending status but no checkout_session_id")),
192        )
193        .await;
194        return Html(state.error_template.clone());
195    }
196
197    send_result(
198        &state.tx,
199        Err(anyhow!("Missing transaction_id or tenant_id in callback")),
200    )
201    .await;
202    Html(state.error_template.clone())
203}
204
205async fn send_result(
206    tx: &Arc<Mutex<Option<oneshot::Sender<Result<CheckoutCallbackResult>>>>>,
207    result: Result<CheckoutCallbackResult>,
208) {
209    let sender = tx.lock().await.take();
210    if let Some(sender) = sender {
211        if sender.send(result).is_err() {
212            tracing::warn!("Checkout result receiver dropped");
213        }
214    }
215}
216
217struct CheckoutProvisioningResult {
218    event: CheckoutEvent,
219    needs_deploy: bool,
220}
221
222async fn wait_for_checkout_provisioning(
223    client: &CloudApiClient,
224    checkout_session_id: &str,
225) -> Result<CheckoutProvisioningResult> {
226    let mut stream = client.subscribe_checkout_events(checkout_session_id);
227
228    while let Some(event_result) = stream.next().await {
229        match event_result {
230            Ok(event) => {
231                if let Some(msg) = &event.message {
232                    CliService::info(msg);
233                }
234
235                match event.event_type {
236                    ProvisioningEventType::InfrastructureReady => {
237                        return Ok(CheckoutProvisioningResult {
238                            event,
239                            needs_deploy: true,
240                        });
241                    },
242                    ProvisioningEventType::TenantReady => {
243                        return Ok(CheckoutProvisioningResult {
244                            event,
245                            needs_deploy: false,
246                        });
247                    },
248                    ProvisioningEventType::ProvisioningFailed => {
249                        return Err(anyhow!(
250                            "Provisioning failed: {}",
251                            event.message.as_deref().unwrap_or("Unknown error")
252                        ));
253                    },
254                    _ => {},
255                }
256            },
257            Err(e) => {
258                return Err(anyhow!("SSE stream error: {}", e));
259            },
260        }
261    }
262
263    Err(anyhow!("SSE stream closed unexpectedly"))
264}
265
266async fn status_handler(
267    State(state): State<Arc<AppState>>,
268    Path(tenant_id): Path<String>,
269) -> Json<StatusResponse> {
270    match state.api_client.get_tenant_status(&tenant_id).await {
271        Ok(status) => Json(StatusResponse {
272            status: status.status,
273            message: status.message,
274            app_url: status.app_url,
275        }),
276        Err(e) => Json(StatusResponse {
277            status: "error".to_string(),
278            message: Some(e.to_string()),
279            app_url: None,
280        }),
281    }
282}